パイプラインの実行パラメータを指定する

Apache Beam プログラムがパイプラインを作成したら、パイプラインを実行する必要があります。パイプラインの実行は、Apache Beam プログラムの実行とは別のものです。Apache Beam プログラムでパイプラインを作成し、この作成したコードによって、パイプライン ランナーが実行する一連のステップが生成されます。パイプライン ランナーとして使用できるのは、Google Cloud Platform(GCP)上の Cloud Dataflow マネージド サービス、サードパーティ ランナー サービス、ローカル環境でステップを直接実行するローカル パイプライン ランナーです。

パイプライン ランナーとその他の実行オプションは、Apache Beam SDK クラス PipelineOptions を使用して指定できます。PipelineOptions を使用して、パイプラインの実行方法と実行場所、使用されるリソースを構成します。

ほとんどの場合、パイプラインは Cloud Dataflow ランナー サービスを使用してマネージド GCP リソース上で実行します。Cloud Dataflow サービスでパイプラインを実行すると、GCP プロジェクトで Compute Engine リソースと Cloud Storage リソースを使用する Cloud Dataflow ジョブが作成されます。

パイプラインをローカルで実行することもできます。パイプラインをローカルで実行すると、パイプライン変換は、Cloud Dataflow プログラムが実行されるのと同じマシン上で実行されます。ローカルの実行は、テストおよびデバッグの目的で、パイプラインが小さなメモリ内データセットを使用できる場合に特に有用です。

PipelineOptions を設定する

Cloud Dataflow プログラム内で Pipeline オブジェクトを作成する際に、PipelineOptions を渡します。Cloud Dataflow サービスは、パイプラインを実行するときに、PipelineOptions のコピーを各ワーカー インスタンスに送信します。

Java: SDK 2.x

注: メソッド ProcessContext.getPipelineOptions を使用して ParDoDoFn インスタンス内部の PipelineOptions にアクセスできます。

Python

この機能は Apache Beam SDK for Python ではまだサポートされていません。

Java: SDK 1.x

注: メソッド ProcessContext.getPipelineOptions を使用して ParDoDoFn インスタンス内部の PipelineOptions にアクセスできます。

コマンドライン引数から PipelineOptions を設定する

パイプラインは、PipelineOptions オブジェクトを作成してフィールドを直接設定することで構成できますが、Apache Beam SDK には、コマンドライン引数を使用して PipelineOptions のフィールドを設定するために使用できるコマンドライン パーサーが含まれています。

Java: SDK 2.x

コマンドラインからオプションを読み取るには、次のコード例のように、メソッド PipelineOptionsFactory.fromArgs を使用して PipelineOptions オブジェクトを作成します。

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

注: メソッド .withValidation を追加すると、Cloud Dataflow は必要なコマンドライン引数があるかチェックし、引数値を検証します。

PipelineOptionsFactory.fromArgs を使用すると、次の形式に従うコマンドライン引数が処理されます。

--<option>=<value>

この方法で PipelineOptions を構築して、org.apache.beam.sdk.options.PipelineOptions のいずれかのサブインターフェースで任意のオプションをコマンドライン引数として指定できます。

Python

コマンドラインからオプションを読み取るには、次のコード例のように PipelineOptions オブジェクトを作成します。

options = PipelineOptions(flags=argv)

PipelineOptions への引数 (flags=argv) は、次の形式に従うコマンドライン引数を処理します。

--<option>=<value>

この方法で PipelineOptions を構築して、PipelineOptions からサブクラス化することで任意のオプションを指定できます。

Java: SDK 1.x

コマンドラインからオプションを読み取るには、次のコード例のように、メソッド PipelineOptionsFactory.fromArgs を使用して PipelineOptions オブジェクトを作成します。

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

注: メソッド .withValidation を追加すると、Cloud Dataflow は必要なコマンドライン引数があるかチェックし、引数値を検証します。

PipelineOptionsFactory.fromArgs を使用すると、次の形式に従うコマンドライン引数が処理されます。

--<option>=<value>

この方法で PipelineOptions を構築して、com.google.cloud.dataflow.sdk.options.PipelineOptions のいずれかのサブインターフェースで任意のオプションをコマンドライン引数として指定できます。

カスタム オプションを作成する

標準の PipelineOptions に加えて、独自のカスタム オプションを追加できます。Cloud Dataflow のコマンドライン パーサーは、同じ形式で指定されたコマンドライン引数を使用して、カスタム オプションを設定することもできます。

Java: SDK 2.x

独自のオプションを追加するには、次の例のように、各オプションのゲッター メソッドとセッター メソッドでインターフェースを定義します。

  public interface MyOptions extends PipelineOptions {
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Python

独自のオプションを追加するには、次の例のように add_argument() メソッドを使用します(これは Python の標準 argparse モジュールとまったく同じ動作をします)。

class MyOptions(PipelineOptions):

  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

Java: SDK 1.x

独自のオプションを追加するには、次の例のように、各オプションのゲッター メソッドとセッター メソッドでインターフェースを定義します。

  public interface MyOptions extends PipelineOptions {
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

ユーザーが --help をコマンドライン引数として渡したときに表示される説明と、デフォルト値を指定することもできます。

Java: SDK 2.x

次のように、アノテーションを使用して説明とデフォルト値を設定します。

  public interface MyOptions extends PipelineOptions {
    @Description("My custom command line argument.")
    @Default.String("DEFAULT")
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

PipelineOptionsFactory でインターフェースを登録してから、PipelineOptions オブジェクトを作成するときにインターフェースを渡すことをおすすめします。PipelineOptionsFactory でインターフェースを登録する場合、--help でカスタム オプション インターフェースを検索し、--help コマンドの出力に追加できます。 PipelineOptionsFactory は、カスタム オプションが他のすべての登録済みオプションと互換であることも検証します。

次のコード例は、PipelineOptionsFactory でカスタム オプション インターフェースを登録する方法を示しています。

  PipelineOptionsFactory.register(MyOptions.class);
  MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                            .withValidation()
                                            .as(MyOptions.class);

これで、パイプラインは --myCustomOption=value をコマンドライン引数として受け入れることができます。

Python

説明とデフォルト値は次のように設定します。

class MyOptions(PipelineOptions):

  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input',
                        help='Input for the pipeline',
                        default='gs://my-bucket/input')
    parser.add_argument('--output',
                        help='Output for the pipeline',
                        default='gs://my-bucket/output')

Java: SDK 1.x

次のように、アノテーションを使用して説明とデフォルト値を設定します。

  public interface MyOptions extends PipelineOptions {
    @Description("My custom command line argument.")
    @Default.String("DEFAULT")
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

PipelineOptionsFactory でインターフェースを登録してから、PipelineOptions オブジェクトを作成するときにインターフェースを渡すことをおすすめします。PipelineOptionsFactory でインターフェースを登録する場合、--help でカスタム オプション インターフェースを検索し、--help コマンドの出力に追加できます。 PipelineOptionsFactory は、カスタム オプションが他のすべての登録済みオプションと互換であることも検証します。

次のコード例は、PipelineOptionsFactory でカスタム オプション インターフェースを登録する方法を示しています。

  PipelineOptionsFactory.register(MyOptions.class);
  MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                            .withValidation()
                                            .as(MyOptions.class);

これで、パイプラインは --myCustomOption=value をコマンドライン引数として受け入れることができます。

Cloud Dataflow サービスで実行するように PipelineOptions を構成する

Cloud Dataflow マネージド サービスを使用してパイプラインを実行するには、PipelineOptions で次のフィールドを設定する必要があります。

Java: SDK 2.x

  • project - GCP プロジェクトの ID。
  • runner - プログラムを解析し、パイプラインを作成するパイプライン ランナー。クラウド実行では、これは DataflowRunner である必要があります。
  • gcpTempLocation - 一時ファイルをステージングするための Cloud Dataflow 用の Cloud Storage パス。パイプラインを実行する前に、まずこのバケットを作成する必要があります。gcpTempLocation を指定しない場合は、パイプライン オプション tempLocation を指定することができ、gcpTempLocationtempLocation の値に設定されます。どちらも指定されていない場合は、デフォルトの gcpTempLocation が作成されます。
  • stagingLocation - バイナリ ファイルをステージングするための Cloud Dataflow 用の Cloud Storage バケット。このオプションを設定しないと、tempLocation に指定したものがステージング場所にも使用されます。
  • これも tempLocation も指定されていない場合は、デフォルトの gcpTempLocation が作成されます。tempLocation が指定され、gcpTempLocation が指定されていない場合、tempLocation は Cloud Storage パスである必要があり、gcpTempLocation には同じ値が使用されます。tempLocation が指定されておらず、gcpTempLocation が指定されている場合、tempLocation は設定されません。

Python

  • job_name - 実行される Cloud Dataflow ジョブの名前。
  • project - GCP プロジェクトの ID。
  • runner - プログラムを解析し、パイプラインを作成するパイプライン ランナー。クラウド実行では、これは DataflowRunner である必要があります。
  • staging_location - ジョブを実行するワーカーが必要とするコード パッケージをステージングするための Cloud Dataflow 用の Cloud Storage パス。
  • temp_location - パイプラインの実行中に作成される一時ジョブファイルをステージングするための Cloud Dataflow 用の Cloud Storage パス。

Java: SDK 1.x

  • project - GCP プロジェクトの ID。
  • runner - プログラムを解析し、パイプラインを作成するパイプライン ランナー。クラウド実行では、これは DataflowPipelineRunner または BlockingDataflowPipelineRunner である必要があります。
  • stagingLocation - バイナリ ファイルと一時ファイルをステージングするための Cloud Dataflow 用の Cloud Storage バケット。パイプラインを実行する前に、まずこのバケットを作成する必要があります。

これらのオプションは、プログラムで設定するか、コマンドラインを使用して指定できます。次のコード例は、Cloud Dataflow マネージド サービスを使用してパイプラインを実行するために、ランナーおよび必要なその他のオプションをプログラムで設定することでパイプラインを作成する方法を示しています。

Java: SDK 2.x

  // Create and set your PipelineOptions.
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

  // For Cloud execution, set the Cloud Platform project, staging location,
  // and specify DataflowRunner.
  options.setProject("my-project-id");
  options.setStagingLocation("gs://my-bucket/binaries");
  options.setRunner(DataflowRunner.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

# Create and set your PipelineOptions.
options = PipelineOptions(flags=argv)

# For Cloud execution, set the Cloud Platform project, job_name,
# staging location, temp_location and specify DataflowRunner.
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'my-project-id'
google_cloud_options.job_name = 'myjob'
google_cloud_options.staging_location = 'gs://my-bucket/binaries'
google_cloud_options.temp_location = 'gs://my-bucket/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'

# Create the Pipeline with the specified options.
p = Pipeline(options=options)

Java: SDK 1.x

  // Create and set your PipelineOptions.
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

  // For Cloud execution, set the Cloud Platform project, staging location,
  // and specify DataflowPipelineRunner or BlockingDataflowPipelineRunner.
  options.setProject("my-project-id");
  options.setStagingLocation("gs://my-bucket/binaries");
  options.setRunner(DataflowPipelineRunner.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

パイプラインを作成した後で、すべてのパイプライン読み取り、変換、書き込みを指定し、パイプラインを実行します。

次のコード例は、コマンドラインを使用して、Cloud Dataflow サービスの実行に必要なオプションを設定する方法を示しています。

Java: SDK 2.x

  // Create and set your PipelineOptions.
  MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

# Use Python argparse module to parse custom arguments
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('--input')
parser.add_argument('--output')
known_args, pipeline_args = parser.parse_known_args(argv)

# Create the Pipeline with remaining arguments.
with beam.Pipeline(argv=pipeline_args) as p:
  lines = p | 'ReadFromText' >> beam.io.ReadFromText(known_args.input)
  lines | 'WriteToText' >> beam.io.WriteToText(known_args.output)

Java: SDK 1.x

  // Create and set your PipelineOptions.
  MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

パイプラインを作成した後で、すべてのパイプライン読み取り、変換、書き込みを指定し、パイプラインを実行します。

Java: SDK 2.x

必要なオプションをコマンドラインで渡す場合は、--project--runner--tempLocation のオプションを使用し、必要に応じて --stagingLocation オプションを使用します。

Python

必要なオプションをコマンドラインで渡す場合は、--project--runner--stagingLocation の各オプションを使用します。

Java: SDK 1.x

必要なオプションをコマンドラインで渡す場合は、--project--runner--stagingLocation のオプションを使用します。

非同期実行

Java: SDK 2.x

DataflowRunner を使用すると、パイプラインは Google のクラウドで非同期実行されます。パイプラインの実行中に、Cloud Dataflow モニタリング インターフェースまたは Cloud Dataflow コマンドライン インターフェースを使用して、ジョブの処理状況のモニタリング、実行の詳細の表示、パイプライン結果の更新の受信を行うことができます。

Python

DataflowRunner を使用すると、パイプラインは Google のクラウドで非同期実行されます。パイプラインの実行中に、Cloud Dataflow モニタリング インターフェースまたは Cloud Dataflow コマンドライン インターフェースを使用して、ジョブの処理状況のモニタリング、実行の詳細の表示、パイプライン結果の更新の受信を行うことができます。

Java: SDK 1.x

DataflowPipelineRunner を使用すると、パイプラインは Google のクラウドで非同期実行されます。パイプラインの実行中に、Cloud Dataflow モニタリング インターフェースまたは Cloud Dataflow コマンドライン インターフェースを使用して、ジョブの処理状況のモニタリング、実行の詳細の表示、パイプライン結果の更新の受信を行うことができます。

ブロッキング実行

Java: SDK 2.x

パイプライン ランナーとして DataflowRunner を指定し、pipeline.run().waitUntilFinish() を明示的に呼び出します。

DataflowRunner を使用して、pipeline.run() から返された PipelineResult オブジェクトで waitUntilFinish() を呼び出すと、パイプラインは DataflowRunner と同じようにクラウド上で実行されますが、起動されたジョブが終了するまで待機して最終的な DataflowPipelineJob オブジェクトを返します。ジョブの実行中、Cloud Dataflow サービスは待機しながらジョブ ステータスの更新とコンソール メッセージを出力します。

Java SDK 1.x を過去に使用していて、コマンドラインで --runner BlockingDataflowPipelineRunner を使用して、パイプラインが終了するまでブロックするようメイン プログラムにインタラクティブに指示していた場合、Java 2.x ではメイン プログラムで waitUntilFinish() を明示的に呼び出す必要があります。

Python

パイプラインの完了までブロックするには、ランナーの run() メソッドから返される PipelineResult オブジェクトの wait_until_finish() メソッドを使用します。

Java: SDK 1.x

パイプライン ランナーとして BlockingDataflowPipelineRunner を指定します。

BlockingDataflowPipelineRunner を使用する場合、パイプラインは DataflowPipelineRunner と同じようにクラウド上で実行されますが、起動されたジョブが終了するまで待機します。ジョブの実行中、Cloud Dataflow サービスは待機しながらジョブ ステータスの更新とコンソール メッセージを出力します。BlockingDataflowPipelineRunner を使用すると最終的な DataflowPipelineJob オブジェクトが返されます。

注: コマンドラインで Ctrl+C を入力しても、ジョブはキャンセルされません。Cloud Dataflow サービスは GCP でジョブを引き続き実行します。ジョブをキャンセルするには、Cloud Dataflow モニタリング インターフェースまたは Cloud Dataflow コマンドライン インターフェースを使用します。

ストリーミング実行

Java: SDK 2.x

無限データソース(Cloud Pub/Sub など)からパイプラインで読み取る場合、パイプラインは自動的にストリーミング モードで実行されます。

無限データソースとシンクをパイプラインで使用する場合、GroupByKey などの集約を使用するには、事前に制限なし PCollection のウィンドウ処理方式を選択しておく必要があります。

Python

は、

無限データソースまたはシンク(Cloud Pub/Sub など)をパイプラインで使用する場合は、streaming オプションを true に設定する必要があります。

ストリーミング ジョブは、デフォルトで n1-standard-2 以上の Compute Engine マシンタイプを使用します。n1-standard-2 はストリーミング ジョブを実行するのに最低限必要なマシンタイプであるため、これをオーバーライドしないでください。

無限データソースとシンクをパイプラインで使用する場合、GroupByKey などの集約を使用するには、事前に制限なし PCollection のウィンドウ処理方式を選択しておく必要があります。

Java: SDK 1.x

無限データソースまたはシンク(Cloud Pub/Sub など)をパイプラインで使用する場合は、PipelineOptionsstreaming オプションを true に設定する必要があります。次の例に示すように、streaming オプションをプログラムで設定できます。

  DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
  dataflowOptions.setStreaming(true);

無限データソースとシンクをパイプラインで使用する場合、GroupByKey などの集約を使用するには、事前に制限なし PCollectionウィンドウ処理方式を選択しておく必要があります。

その他のクラウド パイプライン オプションを設定する

クラウドでパイプラインを実行するには、PipelineOptions オブジェクトの次のフィールドをプログラムで設定します。

Java: SDK 2.x

フィールド 説明 デフォルト値
runner Class (NameOfRunner) 使用する PipelineRunner。このフィールドでは、実行時の PipelineRunner を決定できます。 DirectRunner(ローカルモード)
streaming boolean ストリーミング モードが有効か無効か。有効な場合は true です。 無限ソースからパイプラインで読み取る場合、デフォルト値は true です。それ以外の場合は、false です。
project String GCP プロジェクトのプロジェクト ID。これは、Cloud Dataflow マネージド サービスを使用してパイプラインを実行する場合に必要です。 設定されていない場合、デフォルトは Cloud SDK で現在構成されているプロジェクトになります。
gcpTempLocation String 一時ファイルの Cloud Storage パス。gs:// から始まる有効な Cloud Storage URL である必要があります。
stagingLocation String ローカル ファイルをステージングするための Cloud Storage パス。gs:// から始まる有効な Cloud Storage URL である必要があります。 設定されていない場合、tempLocation に指定したものがデフォルトになります。
autoscalingAlgorithm String Cloud Dataflow ジョブに使用する自動スケーリング モード。使用できる値は、自動スケーリングを有効にする THROUGHPUT_BASED または無効にする NONE です。Dataflow マネージド サービスでの自動スケーリングの動作について詳しくは、自動チューニング機能をご覧ください。 Cloud Dataflow SDK for Java バージョン 1.6.0 以降を使用するすべてのバッチ Cloud Dataflow ジョブのデフォルトは THROUGHPUT_BASED、Cloud Dataflow SDK for Java の以前のバージョンを使用するストリーミング ジョブまたはバッチジョブのデフォルトは NONE になります。
numWorkers int パイプラインの実行時に使用する Google Compute Engine インスタンスの初期の数。このオプションは、ジョブの開始時に Dataflow サービスが起動するワーカー数を決定します。 指定しない場合は、Cloud Dataflow サービスが適切なワーカー数を決定します。
maxNumWorkers int 実行中にパイプラインに対して使用可能になる Compute Engine インスタンスの最大数。ジョブを自動またはその他の方法でスケールアップできるように、これはワーカーの初期数(numWorkers で指定)よりも大きくできることに注意してください。 指定しない場合は、Cloud Dataflow サービスが適切なワーカー数を決定します。
region String リージョン エンドポイントを指定すると、Cloud Dataflow ジョブをデプロイするリージョンを定義できます。 設定されていない場合、デフォルトは us-central1 になります。
zone String ワーカー インスタンスを起動してパイプラインを実行するための Compute Engine のゾーン。 region パラメータを指定した場合、zone パラメータはデフォルトで対応するリージョンのゾーンになります。この動作を変更するには、別のゾーンを指定します。
flexRSGoal String 自動スケーリングされたバッチジョブに Flexible Resource Scheduling(FlexRS)を指定します。numWorkersautoscalingAlgorithmzoneregionworkerMachineType パラメータに影響します。詳細については、FlexRS パイプライン オプションのセクションをご覧ください。 指定しない場合、デフォルトの SPEED_OPTIMIZED が使用されます。これは、このフラグを省略するのと同じです。FlexRS を有効にするには、Cloud Dataflow サービスで利用可能な割引リソースを選択できるように、値 COST_OPTIMIZED を指定する必要があります。
filesToStage List<String> 各ワーカーに対して使用可能にするローカル ファイル、ファイルのディレクトリ、またはアーカイブ(.jar、.zip)のリスト。このオプションを設定した場合、指定するファイルのみがアップロードされます(Java クラスパスは無視されます)。すべてのリソースを適切なクラスパス順序で指定する必要があります。リソースはコードに限定されず、構成ファイルやその他のリソースを含めてすべてのワーカーで使用可能にすることができます。コードでは、Java の標準リソース検索メソッドを使用してリストされているリソースにアクセスできます。注意: Cloud Dataflow はアップロードの前にファイルを Zip で圧縮し、それによって起動時のコストが高くなるため、ディレクトリ パスの指定は最善ではありません。また、パイプラインで処理することを意図したデータをワーカーに転送するためには、このオプションを使用しないでください。使用すると、適切な Cloud Dataflow データソースと組み合わせたネイティブの Cloud Storage/BigQuery API を使用するよりも大幅に低速になります。 filesToStage が空白の場合、Cloud Dataflow は Java クラスパスに基づいてステージングするファイルを推定します。左の列に記載されている考慮事項と注意事項(リストするファイルのタイプとコードからそれらにアクセスする方法)はここにも適用されます。
network String Compute Engine インスタンスを起動してパイプラインを実行するための Compute Engine ネットワークネットワークの指定方法をご覧ください。 設定されていない場合、GCP ではネットワーク名 default の使用が想定されます。
subnetwork String Compute Engine インスタンスを起動してパイプラインを実行するための Compute Engine サブネットワークサブネットワークの指定方法をご覧ください。 Cloud Dataflow サービスによりデフォルト値が決定されます。
usePublicIps boolean Cloud Dataflow ワーカーでパブリック IP アドレスを使用するかどうかを指定します。この値を false に設定した場合、Cloud Dataflow ワーカーではすべての通信にプライベート IP アドレスが使用されます。この場合、subnetwork オプションを指定すると network オプションは無視されます。指定した network または subnetwork限定公開の Google アクセスが有効になっていることを確認してください。 設定されていない場合、デフォルト値は true になり、Cloud Dataflow ワーカーでパブリック IP アドレスが使用されます。
diskSizeGb int 各リモート Compute Engine ワーカー インスタンスで使用するディスクサイズ(ギガバイト単位)。ワーカー ブートイメージとローカルログを考慮して、最小ディスクサイズは 30 GB です。パイプラインでデータをシャッフルする場合は、最小値より多く割り当てる必要があります。

Cloud Platform プロジェクトで定義されたデフォルト サイズを使用するには 0 に設定します。

警告: ディスクサイズを小さくすると、使用可能なシャッフル I/O が減少します。シャッフルバインドされているジョブは、ランタイムとジョブコストが増加する可能性があります。

serviceAccount String my-service-account-name@<project-id>.iam.gserviceaccount.com の形式で、ユーザー管理コントローラ サービス アカウントを指定します。詳細については、Cloud Dataflow のセキュリティと権限のページのコントローラ サービス アカウントのセクションをご覧ください。 設定されていない場合、ワーカーはプロジェクトの Compute Engine サービス アカウントをコントローラ サービス アカウントとして使用します。
workerDiskType String 使用する永続ディスクのタイプ。ディスクタイプ リソースの完全 URL によって指定します。たとえば、SSD 永続ディスクを指定する場合は compute.googleapis.com/projects//zones//diskTypes/pd-ssd を使用します。詳細については、diskTypes に関する Compute Engine API リファレンス ページをご覧ください。 Cloud Dataflow サービスによりデフォルト値が決定されます。
workerMachineType String ワーカー VM の起動時に Cloud Dataflow によって使用される Compute Engine マシンタイプ。Cloud Dataflow では n1 シリーズとカスタム マシンタイプがサポートされます。f1g1 シリーズのワーカーなどの共有コア マシンタイプは、Cloud Dataflow のサービスレベル契約ではサポートされません。 このオプションを設定しないと、Cloud Dataflow サービスはジョブに基づいてマシンタイプを選択します。
enableStreamingEngine boolean Cloud Dataflow Streaming Engine が有効か無効か。有効な場合は true です。Streaming Engine を有効にすると、ストリーミング パイプラインのステップを Cloud Dataflow サービス バックエンドで実行し、CPU、メモリ、Persistent Disk ストレージ リソースを節約できます。 デフォルト値は false です。つまり、ストリーミング パイプラインのステップ全体がワーカー VM 上で実行されます。

パイプライン構成オプションの全リストについては、PipelineOptions インターフェース(およびそのサブインターフェース)の API for Java リファレンス ドキュメントをご覧ください。

Python

フィールド 説明 デフォルト値
runner str 使用する PipelineRunner。このフィールドは、DirectRunner または DataflowRunner のいずれかになります。 DirectRunner(ローカルモード)
streaming bool ストリーミング モードが有効か無効か。有効な場合は true です。 false
project str GCP プロジェクトのプロジェクト ID。これは、Cloud Dataflow マネージド サービスを使用してパイプラインを実行する場合に必要です。 設定されていない場合は、エラーがスローされます。
temp_location str 一時ファイルの Cloud Storage パス。gs:// から始まる有効な Cloud Storage URL である必要があります。 設定されていない場合は、デフォルトで staging_location の値になります。Google クラウドでパイプラインを実行するには、少なくとも temp_location または staging_location のいずれかを指定する必要があります。
staging_location str ローカル ファイルをステージングするための Cloud Storage パス。gs:// から始まる有効な Cloud Storage URL である必要があります。 設定されていない場合、デフォルトは temp_location 内のステージング ディレクトリになります。Google クラウドでパイプラインを実行するには、少なくとも temp_location または staging_location のいずれかを指定する必要があります。
autoscaling_algorithm str Cloud Dataflow ジョブに使用する自動スケーリング モード。使用できる値は、自動スケーリングを有効にする THROUGHPUT_BASED または無効にする NONE です。Cloud Dataflow マネージド サービスでの自動スケーリングの動作について詳しくは、自動チューニング機能をご覧ください。 すべてのバッチ Cloud Dataflow ジョブのデフォルトは THROUGHPUT_BASED です。
num_workers int パイプラインの実行時に使用する Compute Engine インスタンスの数。 指定しない場合は、Cloud Dataflow サービスが適切なワーカー数を決定します。
max_num_workers int 実行中にパイプラインに対して使用可能になる Compute Engine インスタンスの最大数。ジョブを自動またはその他の方法でスケールアップできるように、これはワーカーの初期数(num_workers で指定)よりも大きくできることに注意してください。 指定しない場合は、Cloud Dataflow サービスが適切なワーカー数を決定します。
region str リージョン エンドポイントを指定すると、Cloud Dataflow ジョブをデプロイするリージョンを定義できます。 設定されていない場合、デフォルトは us-central1 になります。
zone str ワーカー インスタンスを起動してパイプラインを実行するための Compute Engine のゾーン。 region パラメータを指定した場合、zone パラメータはデフォルトで対応するリージョンのゾーンになります。この動作を変更するには、別のゾーンを指定します。
flexrs_goal str 自動スケーリングされたバッチジョブに Flexible Resource Scheduling(FlexRS)を指定します。num_workersautoscaling_algorithmzoneregionmachine_type パラメータに影響します。詳細については、FlexRS パイプライン オプションのセクションをご覧ください。 指定しない場合、デフォルトの SPEED_OPTIMIZED が使用されます。これは、このフラグを省略するのと同じです。FlexRS を有効にするには、Cloud Dataflow サービスで利用可能な割引リソースを選択できるように、値 COST_OPTIMIZED を指定する必要があります。
network str Compute Engine インスタンスを起動してパイプラインを実行するための Compute Engine ネットワークネットワークの指定方法をご覧ください。 設定されていない場合、GCP ではネットワーク名 default の使用が想定されます。
subnetwork str Compute Engine インスタンスを起動してパイプラインを実行するための Compute Engine サブネットワークサブネットワークの指定方法をご覧ください。 Cloud Dataflow サービスによりデフォルト値が決定されます。
no_use_public_ips bool Cloud Dataflow ワーカーがパブリック IP アドレスを使用してはいけないことを指定します。そのため、Cloud Dataflow ワーカーではすべての通信にプライベート IP アドレスが使用されます。subnetwork オプションを指定すると、network オプションは無視されます。指定した network または subnetwork限定公開の Google アクセスが有効になっていることを確認してください。パブリック IP パラメータを使用するには Beam SDK for Python が必要です。Cloud Dataflow SDK for Python では、このパラメータはサポートされていません。 設定されていない場合、Cloud Dataflow ワーカーでパブリック IP アドレスが使用されます。
disk_size_gb int 各リモート Compute Engine ワーカー インスタンスで使用するディスクサイズ(ギガバイト単位)。ワーカー ブートイメージとローカルログを考慮して、最小ディスクサイズは 30 GB です。パイプラインでデータをシャッフルする場合は、最小値より多く割り当てる必要があります。 GCP プロジェクトで定義されたデフォルト サイズを使用するには 0 に設定します。
service_account_email str my-service-account-name@<project-id>.iam.gserviceaccount.com の形式で、ユーザー管理コントローラ サービス アカウントを指定します。詳細については、Cloud Dataflow のセキュリティと権限のページのコントローラ サービス アカウントのセクションをご覧ください。 設定されていない場合、ワーカーはプロジェクトの Compute Engine サービス アカウントをコントローラ サービス アカウントとして使用します。
worker_disk_type str 使用する永続ディスクのタイプ。ディスクタイプ リソースの完全 URL によって指定します。たとえば、SSD 永続ディスクを指定する場合は compute.googleapis.com/projects//zones//diskTypes/pd-ssd を使用します。詳細については、diskTypes に関する Compute Engine API リファレンス ページをご覧ください。 Cloud Dataflow サービスによりデフォルト値が決定されます。
machine_type str ワーカー VM の起動時に Cloud Dataflow によって使用される Compute Engine マシンタイプ。なお、worker_machine_type はエイリアス フラグです。 このオプションを設定しないと、Cloud Dataflow サービスはジョブに基づいてマシンタイプを選択します。

Java: SDK 1.x

フィールド 説明 デフォルト値
runner Class (NameOfRunner) 使用する PipelineRunner。このフィールドでは、実行時の PipelineRunner を決定できます。 DirectPipelineRunner(ローカルモード)
streaming boolean ストリーミング モード(現在はベータ版)が有効か無効か。有効な場合は true です。 false
project String GCP プロジェクトのプロジェクト ID。これは、Cloud Dataflow マネージド サービスを使用してパイプラインを実行する場合に必要です。 設定されていない場合、デフォルトは Cloud SDK で現在構成されているプロジェクトになります。
tempLocation String 一時ファイルの Cloud Storage パス。gs:// から始まる有効な Cloud Storage URL である必要があります。 設定されていない場合は、デフォルトで stagingLocation の値になります。Google クラウドでパイプラインを実行するには、少なくとも tempLocation または stagingLocation のいずれかを指定する必要があります。
stagingLocation String ローカル ファイルをステージングするための Cloud Storage パス。gs:// から始まる有効な Cloud Storage URL である必要があります。 設定されていない場合、デフォルトは tempLocation 内のステージング ディレクトリになります。Google クラウドでパイプラインを実行するには、少なくとも tempLocation または stagingLocation のいずれかを指定する必要があります。
autoscalingAlgorithm String Cloud Dataflow ジョブに使用する自動スケーリング モード。使用できる値は、自動スケーリングを有効にする THROUGHPUT_BASED または無効にする NONE です。Cloud Dataflow マネージド サービスでの自動スケーリングの動作について詳しくは、自動チューニング機能をご覧ください。 Cloud Dataflow SDK for Java バージョン 1.6.0 以降を使用するすべてのバッチ Cloud Dataflow ジョブのデフォルトは THROUGHPUT_BASED、Cloud Dataflow SDK for Java の以前のバージョンを使用するストリーミング ジョブまたはバッチジョブのデフォルトは NONE になります。
numWorkers int パイプラインの実行時に使用する Compute Engine インスタンスの初期の数。このオプションは、ジョブの開始時に Cloud Dataflow サービスが起動するワーカー数を決定します。 指定しない場合は、Cloud Dataflow サービスが適切なワーカー数を決定します。
maxNumWorkers int 実行中にパイプラインに対して使用可能になる Compute Engine インスタンスの最大数。ジョブを自動またはその他の方法でスケールアップできるように、これはワーカーの初期数(numWorkers で指定)よりも大きくできることに注意してください。 指定しない場合は、Cloud Dataflow サービスが適切なワーカー数を決定します。
zone String ワーカー インスタンスを起動してパイプラインを実行するための Compute Engine のゾーン 設定されていない場合、デフォルトは us-central1-f になります。たとえばワーカーが EU で起動されるように別のゾーンを指定できます。
filesToStage List<String> 各ワーカーに対して使用可能にするローカル ファイル、ファイルのディレクトリ、またはアーカイブ(.jar、.zip)のリスト。このオプションを設定した場合、指定するファイルのみがアップロードされます(Java クラスパスは無視されます)。すべてのリソースを適切なクラスパス順序で指定する必要があります。リソースはコードに限定されず、構成ファイルやその他のリソースを含めてすべてのワーカーで使用可能にすることができます。コードでは、Java の標準リソース検索メソッドを使用してリストされているリソースにアクセスできます。注意: Cloud Dataflow はアップロードの前にファイルを Zip で圧縮し、それによって起動時のコストが高くなるため、ディレクトリ パスの指定は最善ではありません。また、パイプラインで処理することを意図したデータをワーカーに転送するためには、このオプションを使用しないでください。使用すると、適切な Cloud Dataflow データソースと組み合わせたネイティブの Cloud Storage/BigQuery API を使用するよりも大幅に低速になります。 filesToStage が空白の場合、Cloud Dataflow は Java クラスパスに基づいてステージングするファイルを推定します。左の列に記載されている考慮事項と注意事項(リストするファイルのタイプとコードからそれらにアクセスする方法)はここにも適用されます。
network String Compute Engine インスタンスを起動してパイプラインを実行するための Compute Engine ネットワークネットワークの指定方法をご覧ください。 設定されていない場合、GCP ではネットワーク名 default の使用が想定されます。
subnetwork String Compute Engine インスタンスを起動してパイプラインを実行するための Compute Engine サブネットワークサブネットワークの指定方法をご覧ください。 Cloud Dataflow サービスによりデフォルト値が決定されます。
diskSizeGb int 各リモート Compute Engine ワーカー インスタンスで使用するディスクサイズ(ギガバイト単位)。ワーカー ブートイメージとローカルログを考慮して、最小ディスクサイズは 30 GB です。パイプラインでデータをシャッフルする場合は、最小値より多く割り当てる必要があります。

GCP プロジェクトで定義されたデフォルト サイズを使用するには 0 に設定します。

警告: ディスクサイズを小さくすると、使用可能なシャッフル I/O が減少します。シャッフル バインドされているジョブによって、ランタイムとジョブコストが増加する可能性があります。

workerDiskType String 使用する永続ディスクのタイプ。ディスクタイプ リソースの完全 URL によって指定します。たとえば、SSD 永続ディスクを指定する場合は compute.googleapis.com/projects//zones//diskTypes/pd-ssd を使用します。詳細については、diskTypes に関する Compute Engine API リファレンス ページをご覧ください。 Cloud Dataflow サービスによりデフォルト値が決定されます。
workerMachineType String ワーカー VM の起動時に Cloud Dataflow によって使用される Compute Engine マシンタイプ。Cloud Dataflow では n1 シリーズとカスタム マシンタイプがサポートされます。f1g1 シリーズのワーカーなどの共有コア マシンタイプは、Cloud Dataflow のサービスレベル契約ではサポートされません。 このオプションを設定しないと、Cloud Dataflow サービスはジョブに基づいてマシンタイプを選択します。

パイプライン構成オプションの全リストについては、PipelineOptions インターフェース(およびそのサブインターフェース)の API for Java リファレンス ドキュメントをご覧ください。

ローカル実行用に PipelineOption を構成する

パイプラインをマネージド クラウド リソースで実行する代わりに、パイプラインをローカルで実行することを選択できます。ローカル実行には、テスト、デバッグ、または小さなデータセットに対するパイプラインの実行に一定のメリットがあります。たとえば、ローカル実行ではリモート Cloud Dataflow サービスと関連 GCP プロジェクトへの依存関係が除去されます。

ローカル実行を使用する場合、ローカルメモリに十分に収まる小さなデータセットでパイプラインを実行することを強くおすすめします。Create 変換を使用して小さなメモリ内データセットを作成するか、Read 変換を使用して小さなローカルまたはリモート ファイルを操作できます。ローカル実行では、少数の外部依存関係でテストとデバッグをすばやく簡単に実行する方法が提供されますが、ローカル環境で使用可能なメモリにより制限されます。

次のコード例は、ローカル環境で実行するパイプラインの作成方法を示しています。

Java: SDK 2.x

  // Create and set our Pipeline Options.
  PipelineOptions options = PipelineOptionsFactory.create();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

注: ローカルモードでは、DirectRunner がすでにデフォルトになっているため、ランナーを設定する必要はありません。ただし、DirectRunner を明示的に依存関係として含めるか、クラスパスに追加する必要があります。

Python

# Create and set your Pipeline Options.
options = PipelineOptions()
p = Pipeline(options=options)

注: ローカルモードでは、DirectRunner がすでにデフォルトになっているため、ランナーを設定する必要はありません。

Java: SDK 1.x

  // Create and set our Pipeline Options.
  PipelineOptions options = PipelineOptionsFactory.create();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

注: ローカルモードでは、DirectPipelineRunner がすでにデフォルトになっているため、ランナーを設定する必要はありません。

パイプラインを作成したら、それを実行します。

他のローカル パイプライン オプションを設定する

パイプラインをローカルで実行する場合、PipelineOptions のプロパティのデフォルト値は一般に十分です。

Java: SDK 2.x

Java PipelineOptions のデフォルト値は Java API リファレンスに記載されています。詳しくは、PipelineOptions クラスリストをご覧ください。

パイプラインで BigQuery や Cloud Storage などの GCP プロダクトを IO に使用している場合は、特定の GCP プロジェクトと認証情報のオプションを設定することが必要な場合があります。このような場合は、GcpOptions.setProject を使用して Google Cloud プロジェクト ID を設定する必要があります。認証情報を明示的に設定することが必要な場合もあります。詳細については、GcpOptions クラスをご覧ください。

Python

Python PipelineOptions のデフォルト値は Python API リファレンスに記載されています。詳しくは、PipelineOptions モジュール リストをご覧ください。

パイプラインで BigQuery や Cloud Storage などの GCP プロダクトを IO に使用している場合は、特定の GCP プロジェクトと認証情報のオプションを設定することが必要な場合があります。このような場合は、options.view_as(GoogleCloudOptions).project を使用して Google Cloud プロジェクト ID を設定する必要があります。認証情報を明示的に設定することが必要な場合もあります。詳細については、GoogleCloudOptions クラスをご覧ください。

Java: SDK 1.x

Java PipelineOptions のデフォルト値は Java API リファレンスに記載されています。詳しくは、PipelineOptions クラスリストをご覧ください。

パイプラインで BigQuery や Cloud Storage などの GCP プロダクトを IO に使用している場合は、特定の GCP プロジェクトと認証情報のオプションを設定することが必要な場合があります。このような場合は、GcpOptions.setProject を使用して Google Cloud プロジェクト ID を設定する必要があります。認証情報を明示的に設定することが必要な場合もあります。詳細については、GcpOptions クラスをご覧ください。

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。