パイプラインの実行パラメータを指定する

Apache Beam プログラムがパイプラインを作成したら、パイプラインを実行する必要があります。パイプラインの実行は、Apache Beam プログラムの実行とは別のものです。Apache Beam プログラムでパイプラインを作成し、この作成したコードによって、パイプライン ランナーが実行する一連のステップが生成されます。パイプライン ランナーとして使用できるのは、Google Cloud 上の Dataflow マネージド サービス、サードパーティ ランナー サービス、ローカル環境でステップを直接実行するローカル パイプライン ランナーです。

パイプライン ランナーとその他の実行オプションは、Apache Beam SDK クラス PipelineOptions を使用して指定できます。PipelineOptions を使用して、パイプラインの実行方法と実行場所、使用されるリソースを構成します。

ほとんどの場合、パイプラインは Dataflow ランナー サービスを使用してマネージド Google Cloud リソース上で実行します。Dataflow サービスでパイプラインを実行すると、Google Cloud プロジェクトで Compute Engine リソースと Cloud Storage リソースを使用する Dataflow ジョブが作成されます。

パイプラインをローカルで実行することもできます。パイプラインをローカルで実行すると、パイプライン変換は、Dataflow プログラムが実行されるのと同じマシン上で実行されます。ローカルの実行は、テストおよびデバッグの目的で、パイプラインが小さなメモリ内データセットを使用できる場合に特に有用です。

PipelineOptions を設定する

Dataflow プログラムで Pipeline オブジェクトを作成するときに、PipelineOptions を渡します。Dataflow サービスは、パイプラインを実行するときに、PipelineOptions のコピーを各ワーカー インスタンスに送信します。

Java: SDK 2.x

注: ProcessContext.getPipelineOptions メソッドを使用して ParDoDoFn インスタンス内部の PipelineOptions にアクセスできます。

Python

この機能は Apache Beam SDK for Python ではまだサポートされていません。

Java: SDK 1.x

コマンドライン引数から PipelineOptions を設定する

パイプラインは、PipelineOptions オブジェクトを作成してフィールドを直接設定することで構成できますが、Apache Beam SDK には、コマンドライン引数を使用して PipelineOptions のフィールドを設定するために使用できるコマンドライン パーサーが含まれています。

Java: SDK 2.x

コマンドラインからオプションを読み取るには、次のコード例のように、PipelineOptionsFactory.fromArgs メソッドを使用して PipelineOptions オブジェクトを作成します。

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

注: .withValidation メソッドを追加すると、Dataflow は必要なコマンドライン引数があるかチェックし、引数値を検証します。

PipelineOptionsFactory.fromArgs を使用すると、次の形式に従うコマンドライン引数が処理されます。

--<option>=<value>

この方法で PipelineOptions を構築して、org.apache.beam.sdk.options.PipelineOptions のいずれかのサブインターフェースで任意のオプションをコマンドライン引数として指定できます。

Python

コマンドラインからオプションを読み取るには、次のコード例のように PipelineOptions オブジェクトを作成します。

from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions(flags=argv)

PipelineOptions への引数 (flags=argv) は、次の形式に従うコマンドライン引数を処理します。

--<option>=<value>

この方法で PipelineOptions を構築して、PipelineOptions からサブクラス化することで任意のオプションを指定できます。

Java: SDK 1.x

カスタム オプションを作成する

標準の PipelineOptions に加えて、独自のカスタム オプションを追加できます。Dataflow のコマンドライン パーサーは、同じ形式で指定されたコマンドライン引数を使用して、カスタム オプションを設定することもできます。

Java: SDK 2.x

独自のオプションを追加するには、次の例のように、各オプションのゲッター メソッドとセッター メソッドでインターフェースを定義します。

  public interface MyOptions extends PipelineOptions {
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Python

独自のオプションを追加するには、次の例のように add_argument() メソッドを使用します(これは Python の標準 argparse モジュールとまったく同じ動作をします)。

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

Java: SDK 1.x

ユーザーが --help をコマンドライン引数として渡したときに表示される説明と、デフォルト値を指定することもできます。

Java: SDK 2.x

次のように、アノテーションを使用して説明とデフォルト値を設定します。

  public interface MyOptions extends PipelineOptions {
    @Description("My custom command line argument.")
    @Default.String("DEFAULT")
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

PipelineOptionsFactory でインターフェースを登録してから、PipelineOptions オブジェクトを作成するときにインターフェースを渡すことをおすすめします。PipelineOptionsFactory でインターフェースを登録する場合、--help でカスタム オプション インターフェースを検索し、--help コマンドの出力に追加できます。PipelineOptionsFactory は、カスタム オプションが他のすべての登録済みオプションと互換であることも検証します。

次のコード例は、PipelineOptionsFactory でカスタム オプション インターフェースを登録する方法を示しています。

  PipelineOptionsFactory.register(MyOptions.class);
  MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                            .withValidation()
                                            .as(MyOptions.class);

これで、パイプラインは --myCustomOption=value をコマンドライン引数として受け入れることができます。

Python

説明とデフォルト値は次のように設定します。

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input',
        help='Input for the pipeline',
        default='gs://my-bucket/input')
    parser.add_argument(
        '--output',
        help='Output for the pipeline',
        default='gs://my-bucket/output')

Java: SDK 1.x

Cloud Dataflow サービスで実行するように PipelineOptions を構成する

Dataflow マネージド サービスを使用してパイプラインを実行するには、PipelineOptions で次のフィールドを設定する必要があります。

Java: SDK 2.x

  • project - Google Cloud プロジェクトの ID。
  • runner - プログラムを解析し、パイプラインを作成するパイプライン ランナー。クラウド実行では、これは DataflowRunner である必要があります。
  • gcpTempLocation - 一時ファイルをステージングするための Dataflow 用の Cloud Storage パス。パイプラインを実行する前に、まずこのバケットを作成する必要があります。gcpTempLocation を指定しない場合は、パイプライン オプション tempLocation を指定することができ、gcpTempLocationtempLocation の値に設定されます。どちらも指定されていない場合は、デフォルトの gcpTempLocation が作成されます。
  • stagingLocation - バイナリ ファイルをステージングするための Dataflow 用の Cloud Storage バケット。このオプションを設定しないと、tempLocation に指定したものがステージング場所にも使用されます。
  • これも tempLocation も指定されていない場合は、デフォルトの gcpTempLocation が作成されます。tempLocation が指定され、gcpTempLocation が指定されていない場合、tempLocation は Cloud Storage パスである必要があり、gcpTempLocation には同じ値が使用されます。tempLocation が指定されず、gcpTempLocation が指定されている場合、tempLocation に値は入力されません。

注: Apache Beam SDK for Java 2.15.0 以降を使用している場合は、region も指定する必要があります。

Python

  • project - Google Cloud プロジェクトの ID。
  • runner - プログラムを解析し、パイプラインを作成するパイプライン ランナー。クラウド実行では、これは DataflowRunner である必要があります。
  • staging_location - ジョブを実行するワーカーが必要とするコード パッケージをステージングするための Dataflow 用の Cloud Storage パス。
  • temp_location - パイプラインの実行中に作成される一時ジョブファイルをステージングするための Dataflow 用の Cloud Storage パス。

注: Apache Beam SDK for Python 2.15.0 以降を使用している場合は、region も指定する必要があります。

Java: SDK 1.x

これらのオプションは、プログラムで設定するか、コマンドラインを使用して指定できます。次のサンプルコードでは、Dataflow マネージド サービスを使用してパイプラインを実行するため、ランナーとその他の必要なオプションをプログラムで設定してパイプラインを作成しています。

Java: SDK 2.x

  // Create and set your PipelineOptions.
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

  // For Cloud execution, set the Cloud Platform project, staging location,
  // and specify DataflowRunner.
  options.setProject("my-project-id");
  options.setStagingLocation("gs://my-bucket/binaries");
  options.setRunner(DataflowRunner.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
options = PipelineOptions(
    flags=argv,
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
    region='us-central1')

# Create the Pipeline with the specified options.
# with beam.Pipeline(options=options) as pipeline:
#   pass  # build your pipeline here.

Java: SDK 1.x

パイプラインを作成した後で、すべてのパイプライン読み取り、変換、書き込みを指定し、パイプラインを実行します。

次のサンプルコードでは、コマンドラインを使用して Dataflow サービスの実行に必要なオプションを設定しています。

Java: SDK 2.x

  // Create and set your PipelineOptions.
  MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

# Use Python argparse module to parse custom arguments
import argparse

import apache_beam as beam

parser = argparse.ArgumentParser()
parser.add_argument('--input')
parser.add_argument('--output')
args, beam_args = parser.parse_known_args(argv)

# Create the Pipeline with remaining arguments.
with beam.Pipeline(argv=beam_args) as pipeline:
  lines = pipeline | 'Read files' >> beam.io.ReadFromText(args.input)
  lines | 'Write files' >> beam.io.WriteToText(args.output)

Java: SDK 1.x

パイプラインを作成した後で、すべてのパイプライン読み取り、変換、書き込みを指定し、パイプラインを実行します。

Java: SDK 2.x

必要なオプションをコマンドラインで渡す場合は、--project--runner--gcpTempLocation--stagingLocation のオプションを使用します。

Python

必要なオプションをコマンドラインで渡す場合は、--project--runner--staging_location のオプションを使用します。

Java: SDK 1.x

非同期実行

Java: SDK 2.x

DataflowRunner を使用すると、パイプラインは Google のクラウドで非同期実行されます。パイプラインの実行中に、Dataflow Monitoring Interface または Dataflow コマンドライン インターフェースを使用して、ジョブの処理状況のモニタリング、実行の詳細の表示、パイプライン結果の更新の受信を行うことができます。

Python

DataflowRunner を使用すると、パイプラインは Google のクラウドで非同期実行されます。パイプラインの実行中に、Dataflow Monitoring Interface または Dataflow コマンドライン インターフェースを使用して、ジョブの処理状況のモニタリング、実行の詳細の表示、パイプライン結果の更新の受信を行うことができます。

Java: SDK 1.x

同期実行

Java: SDK 2.x

パイプライン ランナーとして DataflowRunner を指定して、pipeline.run().waitUntilFinish() を明示的に呼び出します。

DataflowRunner を使用して、pipeline.run() から返された PipelineResult オブジェクトで waitUntilFinish() を呼び出すと、パイプラインはクラウドで実行されますが、ローカルコードはクラウドジョブが終了し、最後の DataflowPipelineJob オブジェクトが返されるまで待機します。 ジョブの実行中、Dataflow サービスは待機しながらジョブ ステータスの更新とコンソール メッセージを出力します。

Java SDK 1.x を過去に使用していて、コマンドラインで --runner BlockingDataflowPipelineRunner を使用して、パイプラインが終了するまでブロックするようメイン プログラムにインタラクティブに指示していた場合、Java 2.x ではメイン プログラムで waitUntilFinish() を明示的に呼び出す必要があります。

Python

パイプラインの完了までブロックするには、ランナーの run() メソッドから返される PipelineResult オブジェクトの wait_until_finish() メソッドを使用します。

Java: SDK 1.x

注: コマンドラインで Ctrl+C を入力しても、ジョブはキャンセルされません。Dataflow サービスは Google Cloud でジョブを引き続き実行します。ジョブをキャンセルするには、Dataflow Monitoring Interface または Dataflow コマンドライン インターフェースを使用します。

ストリーミング実行

Java: SDK 2.x

無限データソース(Pub/Sub など)からパイプラインで読み取る場合、パイプラインは自動的にストリーミング モードで実行されます。

無限データソースとシンクをパイプラインで使用する場合、GroupByKey などの集約を使用するには、事前に制限なし PCollection のウィンドウ処理方式を選択しておく必要があります。

Python

無限データソースまたはシンク(Pub/Sub など)をパイプラインで使用する場合は、streaming オプションを true に設定する必要があります。

ストリーミング ジョブは、デフォルトで n1-standard-2 以上の Compute Engine マシンタイプを使用します。n1-standard-2 はストリーミング ジョブを実行するのに最低限必要なマシンタイプであるため、これをオーバーライドしないでください。

無限データソースとシンクをパイプラインで使用する場合、GroupByKey などの集約を使用するには、事前に制限なし PCollection のウィンドウ処理方式を選択しておく必要があります。

Java: SDK 1.x

他の Cloud Dataflow パイプライン オプションの設定

クラウドでパイプラインを実行するには、PipelineOptions オブジェクトの次のフィールドをプログラムで設定します。

Java: SDK 2.x

フィールド 説明 デフォルト値
runner Class (NameOfRunner) 使用する PipelineRunner。このフィールドでは、実行時の PipelineRunner を決定できます。 DirectRunner(ローカルモード)
streaming boolean ストリーミング モードが有効か無効か。有効な場合は true です。 無限ソースからパイプラインで読み取る場合、デフォルト値は true です。それ以外の場合は false です。
project String Google Cloud プロジェクトのプロジェクト ID。これは、Dataflow マネージド サービスを使用してパイプラインを実行する場合に必要です。 設定されていない場合、デフォルトは Cloud SDK で現在構成されているプロジェクトになります。
jobName String 実行されている Dataflow ジョブの名前は、Dataflow のジョブリストとジョブの詳細に表示されます。既存のパイプラインの更新時にも使用されます。 Dataflow では自動的に一意の名前が生成されます。
gcpTempLocation String 一時ファイルの Cloud Storage パス。gs:// から始まる有効な Cloud Storage URL である必要があります。
stagingLocation String ローカル ファイルをステージングするための Cloud Storage パス。gs:// から始まる有効な Cloud Storage URL である必要があります。 設定されていない場合、tempLocation に指定したものがデフォルトになります。
autoscalingAlgorithm String Dataflow ジョブの自動スケーリング モード。使用できる値は、THROUGHPUT_BASED(自動スケーリングを有効にする)または NONE(無効にする)です。Dataflow マネージド サービスでの自動スケーリングの動作については、自動チューニング機能をご覧ください。 すべてのバッチ Dataflow ジョブと Streaming Engine を使用するストリーミング ジョブのデフォルトは THROUGHPUT_BASED です。 Streaming Engine を使用しないストリーミング ジョブのデフォルトは NONE です。
numWorkers int パイプラインの実行時に使用する Google Compute Engine インスタンスの初期の数。このオプションは、ジョブの開始時に Dataflow サービスが起動するワーカー数を決定します。 指定しない場合は、Dataflow サービスが適切なワーカー数を決定します。
maxNumWorkers int 実行中にパイプラインに対して使用可能になる Compute Engine インスタンスの最大数。ジョブを自動またはその他の方法でスケールアップできるように、これはワーカーの初期数(numWorkers で指定)よりも大きくできることに注意してください。 指定しない場合は、Dataflow サービスが適切なワーカー数を決定します。
numberOfWorkerHarnessThreads int ワーカー ハンドラごとのスレッド数。 指定しないと、Dataflow サービスがワーカーあたりの適切なスレッド数を決定します。
region String Dataflow ジョブをデプロイするリージョン エンドポイントを指定します。 設定しない場合、us-central1 がデフォルトになります。
workerRegion String

ワーカー インスタンスを起動してパイプラインを実行するための Compute Engine のリージョンを指定します。このオプションは、ジョブのデプロイ、管理、モニタリングに使用される region とは別のロケーションでワーカーを実行するために使用されます。workerRegion のゾーンには自動的に割り当てられます

注: このオプションは workerZonezone と組み合わせることはできません。

設定されていない場合は、デフォルトで region の値になります。
workerZone String

ワーカー インスタンスを起動してパイプラインを実行するための Compute Engine のゾーンを指定します。このオプションは、ジョブのデプロイ、管理、モニタリングに使用される region とは別のロケーションでワーカーを実行するために使用されます。

注: このオプションは workerRegionzone と組み合わせることはできません。

region または workerRegion を指定した場合、workerZone はデフォルトで対応するリージョンのゾーンになります。この動作を変更するには、別のゾーンを指定します。
zone String (非推奨)Apache Beam SDK 2.17.0 以前では、ワーカー インスタンスを起動してパイプラインを実行するための Compute Engine のゾーンを指定します。 region を指定すると、zone はデフォルトで対応するリージョンのゾーンになります。この動作を変更するには、別のゾーンを指定します。
dataflowKmsKey String 保存データの暗号化に使用する顧客管理の暗号鍵(CMEK)を指定します。暗号鍵は Cloud KMS で制御できます。この機能を使用するには、gcpTempLocation を定義する必要があります。 指定しない場合、CMEK の代わりにデフォルトの Google Cloud 暗号化が使用されます。
flexRSGoal String 自動スケーリングされたバッチジョブに Flexible Resource Scheduling(FlexRS)を指定します。numWorkersautoscalingAlgorithmzoneregionworkerMachineType パラメータに影響します。詳細については、FlexRS パイプライン オプションのセクションをご覧ください。 指定しない場合、デフォルトの SPEED_OPTIMIZED が使用されます。これは、このフラグを省略するのと同じです。FlexRS を有効にするには、Dataflow サービスで利用可能な割引リソースを選択できるように、値 COST_OPTIMIZED を指定する必要があります。
filesToStage List<String> 各ワーカーで使用できるように、ローカル ファイル、ファイルのディレクトリ、またはアーカイブ(JAR や zip ファイルなど)の空でないリスト。このオプションを設定した場合、指定するファイルのみがアップロードされます(Java クラスパスは無視されます)。すべてのリソースを適切なクラスパス順序で指定する必要があります。リソースはコードに限定されず、構成ファイルやその他のリソースを含めてすべてのワーカーで使用可能にすることができます。コードでは、Java の標準リソース検索メソッドを使用してリストされているリソースにアクセスできます。注意: Dataflow はアップロードの前にファイルを Zip で圧縮し、それによって起動時のコストが高くなるため、ディレクトリ パスの指定は最善ではありません。また、パイプラインで処理することを意図したデータをワーカーに転送するためには、このオプションを使用しないでください。使用すると、適切な Dataflow データソースと組み合わせたネイティブの Cloud Storage/BigQuery API を使用するよりも大幅に低速になります。 filesToStage を省略した場合、Dataflow は Java クラスパスに基づいてステージングするファイルを推定します。左の列に記載されている考慮事項と注意事項(リストするファイルのタイプとコードからそれらにアクセスする方法)はここにも適用されます。
network String Compute Engine インスタンスを起動してパイプラインを実行するための Compute Engine ネットワークネットワークの指定方法をご覧ください。 設定しないと、default というネットワーク名が想定されます。
subnetwork String Compute Engine インスタンスを起動してパイプラインを実行するための Compute Engine サブネットワークサブネットワークの指定方法をご覧ください。 Dataflow サービスによりデフォルト値が決定されます。
usePublicIps boolean Dataflow ワーカーでパブリック IP アドレスを使用するかどうかを指定します。この値を false に設定した場合、Dataflow ワーカーではすべての通信にプライベート IP アドレスが使用されます。この場合、subnetwork オプションを指定すると network オプションは無視されます。指定した network または subnetwork限定公開の Google アクセスが有効になっていることを確認してください。 設定しないと、デフォルト値は true になり、Dataflow ワーカーでパブリック IP アドレスが使用されます。
enableStreamingEngine boolean Dataflow Streaming Engine が有効か無効か。有効な場合は true です。Streaming Engine を有効にすると、ストリーミング パイプラインのステップを Dataflow サービス バックエンドで実行し、CPU、メモリ、Persistent Disk ストレージ リソースを節約できます。 デフォルト値は false です。つまり、ストリーミング パイプラインのステップ全体がワーカー VM 上で実行されます。
createFromSnapshot String ストリーミング ジョブの作成時に使用するスナップショット ID を指定します。スナップショットを使用してストリーミング パイプラインの状態を保存し、その状態から新しいバージョンのジョブを開始できます。スナップショットの詳細については、スナップショットの使用をご覧ください。 設定されていない場合、ジョブの作成にスナップショットが使用されません。
hotKeyLoggingEnabled boolean パイプラインでホットキーが検出されると、ユーザーの Cloud Logging プロジェクトにキーが出力されるよう指定します。 設定しない場合、ホットキーの存在のみがログに記録されます。
diskSizeGb int

各リモート Compute Engine ワーカー インスタンスで使用するディスクサイズ(ギガバイト単位)。設定する場合、ワーカーのブートイメージとローカルログを考慮して、少なくとも 30 GB を指定します。

Dataflow Shuffle を使用するバッチジョブの場合、このオプションではワーカー VM のブートディスクのサイズを設定します。Dataflow Shuffle を使用しないバッチジョブの場合、このオプションでは、シャッフルされたデータの保存に使用するディスクのサイズを設定します。ブートディスクのサイズには影響しません。

Streaming Engine を使用するストリーミング ジョブの場合、このオプションでブートディスクのサイズが設定されます。Streaming Engine を使用しないストリーミング ジョブの場合、Dataflow サービスによって作成される追加の永続ディスクのサイズを設定します。ブートディスクには影響しません。ストリーミング ジョブが Streaming Engine を使用しない場合は、ブートディスクのサイズをテストフラグ streaming_boot_disk_size_gb で設定できます。たとえば、80 GB のブートディスクを作成するには --experiments=streaming_boot_disk_size_gb=80 を指定します。

Cloud Platform プロジェクトで定義されたデフォルト サイズを使用するには 0 に設定します。

バッチジョブが Dataflow Shuffle を使用する場合、デフォルトは 25 GB です。使用しない場合のデフォルトは 250 GB です。

ストリーミング ジョブが Streaming Engine を使用する場合、デフォルトは 30 GB です。使用しない場合のデフォルトは 400 GB です。

警告: ディスクサイズを小さくすると、使用可能なシャッフル I/O が減少します。Dataflow Shuffle や Streaming Engine を使用しないシャッフル バインドされているジョブは、ランタイムとジョブコストが増加する可能性があります。

serviceAccount String my-service-account-name@<project-id>.iam.gserviceaccount.com の形式で、ユーザー管理コントローラ サービス アカウントを指定します。詳細については、Cloud Dataflow のセキュリティと権限のページのコントローラ サービス アカウントのセクションをご覧ください。 設定されていない場合、ワーカーはプロジェクトの Compute Engine サービス アカウントをコントローラ サービス アカウントとして使用します。
workerDiskType String 使用する永続ディスクのタイプ。ディスクタイプ リソースの完全 URL によって指定します。たとえば、SSD 永続ディスクを指定する場合は compute.googleapis.com/projects//zones//diskTypes/pd-ssd を使用します。詳細については、diskTypes に関する Compute Engine API リファレンス ページをご覧ください。 Dataflow サービスによりデフォルト値が決定されます。
workerMachineType String

ワーカー VM の起動時に Dataflow によって使用される Compute Engine マシンタイプ。使用可能な任意の Compute Engine マシンタイプ ファミリーまたはカスタム マシンタイプを使用できます。

n1 マシンタイプを使用すると最適な結果が得られます。f1g1 シリーズのワーカーなどの共有コア マシンタイプは、Dataflow のサービスレベル契約ではサポートされません。

ワーカーの vCPU の数およびメモリ量(GB)に基づいて料金が請求されます。請求はマシンタイプ ファミリーとは独立しています。

このオプションを設定しない場合は、Dataflow サービスがジョブに基づいてマシンタイプを選択します。

パイプライン設定オプションの全リストについては、PipelineOptions インターフェース(およびそのサブインターフェース)の API for Java リファレンス ドキュメントをご覧ください。

Python

フィールド 説明 デフォルト値
runner str 使用する PipelineRunner。このフィールドは、DirectRunner または DataflowRunner のいずれかです。 DirectRunner(ローカルモード)
streaming bool ストリーミング モードが有効か無効か。有効な場合は true です。 false
project str Google Cloud プロジェクトのプロジェクト ID。これは、Dataflow マネージド サービスを使用してパイプラインを実行する場合に必要です。 設定されていない場合は、エラーがスローされます。
job_name String 実行されている Dataflow ジョブの名前は、Dataflow のジョブリストとジョブの詳細に表示されます。 Dataflow では自動的に一意の名前が生成されます。
temp_location str 一時ファイルの Cloud Storage パス。gs:// から始まる有効な Cloud Storage URL である必要があります。 設定されていない場合は、デフォルトで staging_location の値になります。Google クラウドでパイプラインを実行するには、少なくとも temp_location または staging_location のいずれかを指定する必要があります。
staging_location str ローカル ファイルをステージングするための Cloud Storage パス。gs:// から始まる有効な Cloud Storage URL である必要があります。 設定されていない場合、デフォルトは temp_location 内のステージング ディレクトリになります。Google クラウドでパイプラインを実行するには、少なくとも temp_location または staging_location のいずれかを指定する必要があります。
autoscaling_algorithm str Dataflow ジョブの自動スケーリング モード。使用できる値は、THROUGHPUT_BASED(自動スケーリングを有効にする)または NONE(無効にする)です。Dataflow マネージド サービスでの自動スケーリングの動作については、自動チューニング機能をご覧ください。 すべてのバッチ Dataflow ジョブと Streaming Engine を使用するストリーミング ジョブのデフォルトは THROUGHPUT_BASED です。 Streaming Engine を使用しないストリーミング ジョブのデフォルトは NONE です。
num_workers int パイプラインの実行時に使用する Compute Engine インスタンスの数。 指定しない場合は、Dataflow サービスが適切なワーカー数を決定します。
max_num_workers int 実行中にパイプラインに対して使用可能になる Compute Engine インスタンスの最大数。ジョブを自動またはその他の方法でスケールアップできるように、これはワーカーの初期数(num_workers で指定)よりも大きくできることに注意してください。 指定しない場合は、Dataflow サービスが適切なワーカー数を決定します。
number_of_worker_harness_threads int ワーカー ハンドラごとのスレッド数。 指定しない場合、Dataflow サービスがワーカーあたりの適切なスレッド数を決定します。このパラメータを使用するには、--experiments=use_runner_v2 フラグを使用する必要があります。
region str Dataflow ジョブをデプロイするリージョン エンドポイントを指定します。 設定しない場合、us-central1 がデフォルトになります。
worker_region String

ワーカー インスタンスを起動してパイプラインを実行するための Compute Engine のリージョンを指定します。このオプションは、ジョブのデプロイ、管理、モニタリングに使用される region とは別のロケーションでワーカーを実行するために使用されます。worker_region のゾーンには自動的に割り当てられます

注: このオプションは worker_zonezone と組み合わせることはできません。

設定されていない場合は、デフォルトで region の値になります。
worker_zone String

ワーカー インスタンスを起動してパイプラインを実行するための Compute Engine のゾーンを指定します。このオプションは、ジョブのデプロイ、管理、モニタリングに使用される region とは別のロケーションでワーカーを実行するために使用されます。

注: このオプションは worker_regionzone と組み合わせることはできません。

region または worker_region を指定した場合、worker_zone はデフォルトで対応するリージョンのゾーンになります。この動作を変更するには、別のゾーンを指定します。
zone str (非推奨)Apache Beam SDK 2.17.0 以前では、ワーカー インスタンスを起動してパイプラインを実行するための Compute Engine のゾーンを指定します。 region を指定すると、zone はデフォルトで対応するリージョンのゾーンになります。この動作を変更するには、別のゾーンを指定します。
dataflow_kms_key str 保存データの暗号化に使用する顧客管理の暗号鍵(CMEK)を指定します。暗号鍵は Cloud KMS で制御できます。この機能を使用するには、temp_location を定義する必要があります。 指定しない場合、CMEK の代わりにデフォルトの Google Cloud 暗号化が使用されます。
flexrs_goal str 自動スケーリングされたバッチジョブに Flexible Resource Scheduling(FlexRS)を指定します。num_workersautoscaling_algorithmzoneregionmachine_type パラメータに影響します。詳細については、FlexRS パイプライン オプションのセクションをご覧ください。 指定しない場合、デフォルトの SPEED_OPTIMIZED が使用されます。これは、このフラグを省略するのと同じです。FlexRS を有効にするには、Dataflow サービスで利用可能な割引リソースを選択できるように、値 COST_OPTIMIZED を指定する必要があります。
network str Compute Engine インスタンスを起動してパイプラインを実行するための Compute Engine ネットワークネットワークの指定方法をご覧ください。 設定しないと、default というネットワーク名が想定されます。
subnetwork str Compute Engine インスタンスを起動してパイプラインを実行するための Compute Engine サブネットワークサブネットワークの指定方法をご覧ください。 Dataflow サービスによりデフォルト値が決定されます。
use_public_ips bool Dataflow ワーカーでパブリック IP アドレスを使用する必要があることを指定します。この値を false に設定した場合、Dataflow ワーカーではすべての通信にプライベート IP アドレスが使用されます。この場合、subnetwork オプションを指定すると network オプションは無視されます。指定した network または subnetwork限定公開の Google アクセスが有効になっていることを確認してください。このオプションを使用するには、Beam SDK for Python が必要です。非推奨の Dataflow SDK for Python ではサポートされていません。 設定されていない場合、Dataflow ワーカーでパブリック IP アドレスが使用されます。
enable_streaming_engine bool Dataflow Streaming Engine が有効か無効か。有効な場合は true です。Streaming Engine を有効にすると、ストリーミング パイプラインのステップを Dataflow サービス バックエンドで実行し、CPU、メモリ、Persistent Disk ストレージ リソースを節約できます。 デフォルト値は false です。つまり、ストリーミング パイプラインのステップ全体がワーカー VM 上で実行されます。
disk_size_gb int

各リモート Compute Engine ワーカー インスタンスで使用するディスクサイズ(ギガバイト単位)。設定する場合は、ワーカー ブートイメージとローカルログを考慮して、ディスクサイズとして少なくとも 30 GB を指定します。

Dataflow Shuffle を使用するバッチジョブの場合、このオプションではワーカー VM のブートディスクのサイズを設定します。Dataflow Shuffle を使用しないバッチジョブの場合、このオプションでは、シャッフルされたデータの保存に使用するディスクのサイズを設定します。ブートディスクのサイズには影響しません。

Streaming Engine を使用するストリーミング ジョブの場合、このオプションでブートディスクのサイズが設定されます。Streaming Engine を使用しないストリーミング ジョブの場合、Dataflow サービスによって作成される追加の永続ディスクのサイズを設定します。ブートディスクには影響しません。ストリーミング ジョブが Streaming Engine を使用しない場合は、ブートディスクのサイズをテストフラグ streaming_boot_disk_size_gb で設定できます。たとえば、80 GB のブートディスクを作成するには --experiments=streaming_boot_disk_size_gb=80 を指定します。

Cloud Platform プロジェクトで定義されたデフォルト サイズを使用するには 0 に設定します。

バッチジョブが Dataflow Shuffle を使用する場合、デフォルトは 25 GB です。使用しない場合のデフォルトは 250 GB です。

ストリーミング ジョブが Streaming Engine を使用する場合、デフォルトは 30 GB です。使用しない場合のデフォルトは 400 GB です。

警告: ディスクサイズを小さくすると、使用可能なシャッフル I/O が減少します。Dataflow Shuffle や Streaming Engine を使用しないシャッフル バインドされているジョブは、ランタイムとジョブコストが増加する可能性があります。

service_account_email str my-service-account-name@<project-id>.iam.gserviceaccount.com の形式で、ユーザー管理コントローラ サービス アカウントを指定します。詳細については、Cloud Dataflow のセキュリティと権限のページのコントローラ サービス アカウントのセクションをご覧ください。 設定されていない場合、ワーカーはプロジェクトの Compute Engine サービス アカウントをコントローラ サービス アカウントとして使用します。
worker_disk_type str 使用する永続ディスクのタイプ。ディスクタイプ リソースの完全 URL によって指定します。たとえば、SSD 永続ディスクを指定する場合は compute.googleapis.com/projects//zones//diskTypes/pd-ssd を使用します。詳細については、diskTypes に関する Compute Engine API リファレンス ページをご覧ください。 Dataflow サービスによりデフォルト値が決定されます。
machine_type str

ワーカー VM の起動時に Dataflow によって使用される Compute Engine マシンタイプ。使用可能な任意の Compute Engine マシンタイプ ファミリーまたはカスタム マシンタイプを使用できます。

n1 マシンタイプを使用すると最適な結果が得られます。f1g1 シリーズのワーカーなどの共有コア マシンタイプは、Dataflow のサービスレベル契約ではサポートされません。

ワーカーの vCPU の数およびメモリ量(GB)に基づいて料金が請求されます。請求はマシンタイプ ファミリーとは独立しています。

このオプションを設定しない場合は、Dataflow サービスがジョブに基づいてマシンタイプを選択します。

Java: SDK 1.x

ローカル実行用に PipelineOption を構成する

パイプラインをマネージド クラウド リソースで実行する代わりに、パイプラインをローカルで実行することを選択できます。ローカル実行には、テスト、デバッグ、または小さなデータセットに対するパイプラインの実行に一定のメリットがあります。たとえば、ローカル実行の場合、リモート Dataflow サービスと関連の Google Cloud プロジェクトとの依存関係が削除されます。

ローカル実行を使用する場合、ローカルメモリに十分に収まる小さなデータセットでパイプラインを実行することを強くおすすめします。Create 変換を使用して小さなメモリ内データセットを作成するか、Read 変換を使用して小さなローカルまたはリモート ファイルを操作できます。ローカル実行では、少数の外部依存関係でテストとデバッグをすばやく簡単に実行する方法が提供されますが、ローカル環境で使用可能なメモリにより制限されます。

次のコード例は、ローカル環境で実行するパイプラインの作成方法を示しています。

Java: SDK 2.x

  // Create and set our Pipeline Options.
  PipelineOptions options = PipelineOptionsFactory.create();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

注: ローカルモードでは、DirectRunner がすでにデフォルトになっているため、ランナーを設定する必要はありません。ただし、DirectRunner を明示的に依存関係として含めるか、クラスパスに追加する必要があります。

Python

# Create and set your Pipeline Options.
options = PipelineOptions(flags=argv)
my_options = options.view_as(MyOptions)

with Pipeline(options=options) as pipeline:
  pass  # build your pipeline here.

注: ローカルモードでは、DirectRunner がすでにデフォルトになっているため、ランナーを設定する必要はありません。

Java: SDK 1.x

パイプラインを作成したら、それを実行します。

他のローカル パイプライン オプションを設定する

パイプラインをローカルで実行する場合、PipelineOptions のプロパティのデフォルト値は一般に十分です。

Java: SDK 2.x

Java PipelineOptions のデフォルト値は Java API リファレンスに記載されています。詳しくは、PipelineOptions クラスリストをご覧ください。

パイプラインで BigQuery や Cloud Storage などの Google Cloud プロダクトを IO に使用している場合は、特定の Google Cloud プロジェクトと認証情報のオプションを設定することが必要な場合があります。このような場合は、GcpOptions.setProject を使用して Google Cloud プロジェクト ID を設定する必要があります。認証情報を明示的に設定することが必要な場合もあります。詳細については、GcpOptions クラスをご覧ください。

Python

Python PipelineOptions のデフォルト値は Python API リファレンスに記載されています。詳しくは、PipelineOptions モジュール リストをご覧ください。

パイプラインで BigQuery や Cloud Storage などの Google Cloud プロダクトを IO に使用している場合は、特定の Google Cloud プロジェクトと認証情報のオプションを設定することが必要な場合があります。このような場合は、options.view_as(GoogleCloudOptions).project を使用して Google Cloud プロジェクト ID を設定する必要があります。認証情報を明示的に設定することが必要な場合もあります。詳細については、GoogleCloudOptions クラスをご覧ください。

Java: SDK 1.x