Apache Beam プログラムがパイプラインを作成したら、パイプラインを実行する必要があります。パイプラインの実行は、Apache Beam プログラムの実行とは別のものです。Apache Beam プログラムでパイプラインを作成し、この作成したコードによって、パイプライン ランナーが実行する一連のステップが生成されます。パイプライン ランナーとして使用できるのは、Google Cloud 上の Dataflow マネージド サービス、サードパーティ ランナー サービス、ローカル環境でステップを直接実行するローカル パイプライン ランナーです。
パイプライン ランナーとその他の実行オプションは、Apache Beam SDK クラス PipelineOptions
を使用して指定できます。PipelineOptions
を使用して、パイプラインの実行方法と実行場所、使用されるリソースを構成します。
ほとんどの場合、パイプラインは Dataflow ランナー サービスを使用してマネージド Google Cloud リソース上で実行します。Dataflow サービスでパイプラインを実行すると、Google Cloud プロジェクトで Compute Engine リソースと Cloud Storage リソースを使用する Dataflow ジョブが作成されます。
パイプラインをローカルで実行することもできます。パイプラインをローカルで実行すると、パイプライン変換は、Dataflow プログラムが実行されるのと同じマシン上で実行されます。ローカルの実行は、テストおよびデバッグの目的で、パイプラインが小さなメモリ内データセットを使用できる場合に特に有用です。
PipelineOptions を設定する
Dataflow プログラムで Pipeline
オブジェクトを作成するときに、PipelineOptions
を渡します。Dataflow サービスは、パイプラインを実行するときに、PipelineOptions
のコピーを各ワーカー インスタンスに送信します。
Java: SDK 2.x
注: ProcessContext.getPipelineOptions
メソッドを使用して ParDo
の DoFn
インスタンス内部の PipelineOptions
にアクセスできます。
Python
この機能は Apache Beam SDK for Python ではまだサポートされていません。
Java: SDK 1.x
コマンドライン引数から PipelineOptions を設定する
パイプラインは、PipelineOptions
オブジェクトを作成してフィールドを直接設定することで構成できますが、Apache Beam SDK には、コマンドライン引数を使用して PipelineOptions
のフィールドを設定するために使用できるコマンドライン パーサーが含まれています。
Java: SDK 2.x
コマンドラインからオプションを読み取るには、次のコード例のように、PipelineOptionsFactory.fromArgs
メソッドを使用して PipelineOptions
オブジェクトを作成します。
PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();
注: .withValidation
メソッドを追加すると、Dataflow は必要なコマンドライン引数があるかチェックし、引数値を検証します。
PipelineOptionsFactory.fromArgs
を使用すると、次の形式に従うコマンドライン引数が処理されます。
--<option>=<value>
この方法で PipelineOptions
を構築して、org.apache.beam.sdk.options.PipelineOptions のいずれかのサブインターフェースで任意のオプションをコマンドライン引数として指定できます。
Python
コマンドラインからオプションを読み取るには、次のコード例のように PipelineOptions
オブジェクトを作成します。
from apache_beam.options.pipeline_options import PipelineOptions options = PipelineOptions(flags=argv)
PipelineOptions
への引数 (flags=argv)
は、次の形式に従うコマンドライン引数を処理します。
--<option>=<value>
この方法で PipelineOptions
を構築して、PipelineOptions
からサブクラス化することで任意のオプションを指定できます。
Java: SDK 1.x
カスタム オプションを作成する
標準の PipelineOptions
に加えて、独自のカスタム オプションを追加できます。Dataflow のコマンドライン パーサーは、同じ形式で指定されたコマンドライン引数を使用して、カスタム オプションを設定することもできます。
Java: SDK 2.x
独自のオプションを追加するには、次の例のように、各オプションのゲッター メソッドとセッター メソッドでインターフェースを定義します。
public interface MyOptions extends PipelineOptions { String getMyCustomOption(); void setMyCustomOption(String myCustomOption); }
Python
独自のオプションを追加するには、次の例のように add_argument()
メソッドを使用します(これは Python の標準 argparse モジュールとまったく同じ動作をします)。
from apache_beam.options.pipeline_options import PipelineOptions class MyOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument('--input') parser.add_argument('--output')
Java: SDK 1.x
ユーザーが --help
をコマンドライン引数として渡したときに表示される説明と、デフォルト値を指定することもできます。
Java: SDK 2.x
次のように、アノテーションを使用して説明とデフォルト値を設定します。
public interface MyOptions extends PipelineOptions { @Description("My custom command line argument.") @Default.String("DEFAULT") String getMyCustomOption(); void setMyCustomOption(String myCustomOption); }
PipelineOptionsFactory
でインターフェースを登録してから、PipelineOptions
オブジェクトを作成するときにインターフェースを渡すことをおすすめします。PipelineOptionsFactory
でインターフェースを登録する場合、--help
でカスタム オプション インターフェースを検索し、--help
コマンドの出力に追加できます。PipelineOptionsFactory
は、カスタム オプションが他のすべての登録済みオプションと互換であることも検証します。
次のコード例は、PipelineOptionsFactory
でカスタム オプション インターフェースを登録する方法を示しています。
PipelineOptionsFactory.register(MyOptions.class); MyOptions options = PipelineOptionsFactory.fromArgs(args) .withValidation() .as(MyOptions.class);
これで、パイプラインは --myCustomOption=value
をコマンドライン引数として受け入れることができます。
Python
説明とデフォルト値は次のように設定します。
from apache_beam.options.pipeline_options import PipelineOptions class MyOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument( '--input', help='Input for the pipeline', default='gs://my-bucket/input') parser.add_argument( '--output', help='Output for the pipeline', default='gs://my-bucket/output')
Java: SDK 1.x
Cloud Dataflow サービスで実行するように PipelineOptions を構成する
Dataflow マネージド サービスを使用してパイプラインを実行するには、PipelineOptions
で次のフィールドを設定する必要があります。
Java: SDK 2.x
project
- Google Cloud プロジェクトの ID。runner
- プログラムを解析し、パイプラインを作成するパイプライン ランナー。クラウド実行では、これはDataflowRunner
である必要があります。gcpTempLocation
- 一時ファイルをステージングするための Dataflow 用の Cloud Storage パス。パイプラインを実行する前に、まずこのバケットを作成する必要があります。gcpTempLocation
を指定しない場合は、パイプライン オプションtempLocation
を指定することができ、gcpTempLocation
はtempLocation
の値に設定されます。どちらも指定されていない場合は、デフォルトのgcpTempLocation
が作成されます。stagingLocation
- バイナリ ファイルをステージングするための Dataflow 用の Cloud Storage バケット。このオプションを設定しないと、tempLocation
に指定したものがステージング場所にも使用されます。
これも
tempLocation
も指定されていない場合は、デフォルトの gcpTempLocation
が作成されます。tempLocation
が指定され、gcpTempLocation
が指定されていない場合、tempLocation
は Cloud Storage パスである必要があり、gcpTempLocation
には同じ値が使用されます。tempLocation
が指定されず、gcpTempLocation
が指定されている場合、tempLocation
に値は入力されません。注: Apache Beam SDK for Java 2.15.0 以降を使用している場合は、region
も指定する必要があります。
Python
project
- Google Cloud プロジェクトの ID。runner
- プログラムを解析し、パイプラインを作成するパイプライン ランナー。クラウド実行では、これはDataflowRunner
である必要があります。staging_location
- ジョブを実行するワーカーが必要とするコード パッケージをステージングするための Dataflow 用の Cloud Storage パス。temp_location
- パイプラインの実行中に作成される一時ジョブファイルをステージングするための Dataflow 用の Cloud Storage パス。
注: Apache Beam SDK for Python 2.15.0 以降を使用している場合は、region
も指定する必要があります。
Java: SDK 1.x
これらのオプションは、プログラムで設定するか、コマンドラインを使用して指定できます。次のサンプルコードでは、Dataflow マネージド サービスを使用してパイプラインを実行するため、ランナーとその他の必要なオプションをプログラムで設定してパイプラインを作成しています。
Java: SDK 2.x
// Create and set your PipelineOptions. DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); // For Cloud execution, set the Cloud Platform project, staging location, // and specify DataflowRunner. options.setProject("my-project-id"); options.setStagingLocation("gs://my-bucket/binaries"); options.setRunner(DataflowRunner.class); // Create the Pipeline with the specified options. Pipeline p = Pipeline.create(options);
Python
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions # Create and set your PipelineOptions. # For Cloud execution, specify DataflowRunner and set the Cloud Platform # project, job name, temporary files location, and region. # For more information about regions, check: # https://cloud.google.com/dataflow/docs/concepts/regional-endpoints options = PipelineOptions( flags=argv, runner='DataflowRunner', project='my-project-id', job_name='unique-job-name', temp_location='gs://my-bucket/temp', region='us-central1') # Create the Pipeline with the specified options. # with beam.Pipeline(options=options) as pipeline: # pass # build your pipeline here.
Java: SDK 1.x
パイプラインを作成した後で、すべてのパイプライン読み取り、変換、書き込みを指定し、パイプラインを実行します。
次のサンプルコードでは、コマンドラインを使用して Dataflow サービスの実行に必要なオプションを設定しています。
Java: SDK 2.x
// Create and set your PipelineOptions. MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation(); // Create the Pipeline with the specified options. Pipeline p = Pipeline.create(options);
Python
# Use Python argparse module to parse custom arguments import argparse import apache_beam as beam parser = argparse.ArgumentParser() parser.add_argument('--input') parser.add_argument('--output') args, beam_args = parser.parse_known_args(argv) # Create the Pipeline with remaining arguments. with beam.Pipeline(argv=beam_args) as pipeline: lines = pipeline | 'Read files' >> beam.io.ReadFromText(args.input) lines | 'Write files' >> beam.io.WriteToText(args.output)
Java: SDK 1.x
パイプラインを作成した後で、すべてのパイプライン読み取り、変換、書き込みを指定し、パイプラインを実行します。
Java: SDK 2.x
必要なオプションをコマンドラインで渡す場合は、--project
、--runner
、--gcpTempLocation
、--stagingLocation
のオプションを使用します。
Python
必要なオプションをコマンドラインで渡す場合は、--project
、--runner
、--staging_location
のオプションを使用します。
Java: SDK 1.x
非同期実行
Java: SDK 2.x
DataflowRunner
を使用すると、パイプラインは Google のクラウドで非同期実行されます。パイプラインの実行中に、Dataflow Monitoring Interface または Dataflow コマンドライン インターフェースを使用して、ジョブの処理状況のモニタリング、実行の詳細の表示、パイプライン結果の更新の受信を行うことができます。
Python
DataflowRunner
を使用すると、パイプラインは Google のクラウドで非同期実行されます。パイプラインの実行中に、Dataflow Monitoring Interface または Dataflow コマンドライン インターフェースを使用して、ジョブの処理状況のモニタリング、実行の詳細の表示、パイプライン結果の更新の受信を行うことができます。
Java: SDK 1.x
同期実行
Java: SDK 2.x
パイプライン ランナーとして DataflowRunner
を指定して、pipeline.run().waitUntilFinish()
を明示的に呼び出します。
DataflowRunner
を使用して、pipeline.run()
から返された PipelineResult
オブジェクトで waitUntilFinish()
を呼び出すと、パイプラインはクラウドで実行されますが、ローカルコードはクラウドジョブが終了し、最後の DataflowPipelineJob
オブジェクトが返されるまで待機します。
ジョブの実行中、Dataflow サービスは待機しながらジョブ ステータスの更新とコンソール メッセージを出力します。
Java SDK 1.x を過去に使用していて、コマンドラインで --runner
BlockingDataflowPipelineRunner
を使用して、パイプラインが終了するまでブロックするようメイン プログラムにインタラクティブに指示していた場合、Java 2.x ではメイン プログラムで waitUntilFinish()
を明示的に呼び出す必要があります。
Python
パイプラインの完了までブロックするには、ランナーの run()
メソッドから返される PipelineResult
オブジェクトの wait_until_finish()
メソッドを使用します。
Java: SDK 1.x
注: コマンドラインで Ctrl+C
を入力しても、ジョブはキャンセルされません。Dataflow サービスは Google Cloud でジョブを引き続き実行します。ジョブをキャンセルするには、Dataflow Monitoring Interface または Dataflow コマンドライン インターフェースを使用します。
ストリーミング実行
Java: SDK 2.x
無限データソース(Pub/Sub など)からパイプラインで読み取る場合、パイプラインは自動的にストリーミング モードで実行されます。
無限データソースとシンクをパイプラインで使用する場合、GroupByKey
などの集約を使用するには、事前に制限なし PCollection のウィンドウ処理方式を選択しておく必要があります。
Python
無限データソースまたはシンク(Pub/Sub など)をパイプラインで使用する場合は、streaming
オプションを true に設定する必要があります。
ストリーミング ジョブは、デフォルトで n1-standard-2
以上の Compute Engine マシンタイプを使用します。n1-standard-2
はストリーミング ジョブを実行するのに最低限必要なマシンタイプであるため、これをオーバーライドしないでください。
無限データソースとシンクをパイプラインで使用する場合、GroupByKey
などの集約を使用するには、事前に制限なし PCollection のウィンドウ処理方式を選択しておく必要があります。
Java: SDK 1.x
他の Cloud Dataflow パイプライン オプションの設定
クラウドでパイプラインを実行するには、PipelineOptions
オブジェクトの次のフィールドをプログラムで設定します。
Java: SDK 2.x
フィールド | 型 | 説明 | デフォルト値 |
---|---|---|---|
runner |
Class (NameOfRunner) |
使用する PipelineRunner 。このフィールドでは、実行時の PipelineRunner を決定できます。 |
DirectRunner (ローカルモード) |
streaming |
boolean |
ストリーミング モードが有効か無効か。有効な場合は true です。 | 無限ソースからパイプラインで読み取る場合、デフォルト値は true です。それ以外の場合は false です。 |
project |
String |
Google Cloud プロジェクトのプロジェクト ID。これは、Dataflow マネージド サービスを使用してパイプラインを実行する場合に必要です。 | 設定されていない場合、デフォルトは Cloud SDK で現在構成されているプロジェクトになります。 |
jobName |
String |
実行されている Dataflow ジョブの名前は、Dataflow のジョブリストとジョブの詳細に表示されます。既存のパイプラインの更新時にも使用されます。 | Dataflow では自動的に一意の名前が生成されます。 |
gcpTempLocation |
String |
一時ファイルの Cloud Storage パス。gs:// から始まる有効な Cloud Storage URL である必要があります。 |
|
stagingLocation |
String |
ローカル ファイルをステージングするための Cloud Storage パス。gs:// から始まる有効な Cloud Storage URL である必要があります。 |
設定されていない場合、tempLocation に指定したものがデフォルトになります。 |
autoscalingAlgorithm |
String
| Dataflow ジョブの自動スケーリング モード。使用できる値は、THROUGHPUT_BASED (自動スケーリングを有効にする)または NONE (無効にする)です。Dataflow マネージド サービスでの自動スケーリングの動作については、自動チューニング機能をご覧ください。 |
すべてのバッチ Dataflow ジョブと Streaming Engine を使用するストリーミング ジョブのデフォルトは THROUGHPUT_BASED です。
Streaming Engine を使用しないストリーミング ジョブのデフォルトは NONE です。 |
numWorkers |
int |
パイプラインの実行時に使用する Google Compute Engine インスタンスの初期の数。このオプションは、ジョブの開始時に Dataflow サービスが起動するワーカー数を決定します。 | 指定しない場合は、Dataflow サービスが適切なワーカー数を決定します。 |
maxNumWorkers |
int |
実行中にパイプラインに対して使用可能になる Compute Engine インスタンスの最大数。ジョブを自動またはその他の方法でスケールアップできるように、これはワーカーの初期数(numWorkers で指定)よりも大きくできることに注意してください。 |
指定しない場合は、Dataflow サービスが適切なワーカー数を決定します。 |
numberOfWorkerHarnessThreads |
int |
ワーカー ハンドラごとのスレッド数。 | 指定しないと、Dataflow サービスがワーカーあたりの適切なスレッド数を決定します。 |
region |
String |
Dataflow ジョブをデプロイするリージョン エンドポイントを指定します。 | 設定しない場合、us-central1 がデフォルトになります。 |
workerRegion |
String |
ワーカー インスタンスを起動してパイプラインを実行するための Compute Engine のリージョンを指定します。このオプションは、ジョブのデプロイ、管理、モニタリングに使用される 注: このオプションは |
設定されていない場合は、デフォルトで region の値になります。 |
workerZone |
String |
ワーカー インスタンスを起動してパイプラインを実行するための Compute Engine のゾーンを指定します。このオプションは、ジョブのデプロイ、管理、モニタリングに使用される 注: このオプションは |
region または workerRegion を指定した場合、workerZone はデフォルトで対応するリージョンのゾーンになります。この動作を変更するには、別のゾーンを指定します。 |
zone |
String |
(非推奨)Apache Beam SDK 2.17.0 以前では、ワーカー インスタンスを起動してパイプラインを実行するための Compute Engine のゾーンを指定します。 | region を指定すると、zone はデフォルトで対応するリージョンのゾーンになります。この動作を変更するには、別のゾーンを指定します。 |
dataflowKmsKey |
String |
保存データの暗号化に使用する顧客管理の暗号鍵(CMEK)を指定します。暗号鍵は Cloud KMS で制御できます。この機能を使用するには、gcpTempLocation を定義する必要があります。 |
指定しない場合、CMEK の代わりにデフォルトの Google Cloud 暗号化が使用されます。 |
flexRSGoal |
String |
自動スケーリングされたバッチジョブに Flexible Resource Scheduling(FlexRS)を指定します。numWorkers 、autoscalingAlgorithm 、zone 、region 、workerMachineType パラメータに影響します。詳細については、FlexRS パイプライン オプションのセクションをご覧ください。 |
指定しない場合、デフォルトの SPEED_OPTIMIZED が使用されます。これは、このフラグを省略するのと同じです。FlexRS を有効にするには、Dataflow サービスで利用可能な割引リソースを選択できるように、値 COST_OPTIMIZED を指定する必要があります。 |
filesToStage |
List<String> |
各ワーカーで使用できるように、ローカル ファイル、ファイルのディレクトリ、またはアーカイブ(JAR や zip ファイルなど)の空でないリスト。このオプションを設定した場合、指定するファイルのみがアップロードされます(Java クラスパスは無視されます)。すべてのリソースを適切なクラスパス順序で指定する必要があります。リソースはコードに限定されず、構成ファイルやその他のリソースを含めてすべてのワーカーで使用可能にすることができます。コードでは、Java の標準リソース検索メソッドを使用してリストされているリソースにアクセスできます。注意: Dataflow はアップロードの前にファイルを Zip で圧縮し、それによって起動時のコストが高くなるため、ディレクトリ パスの指定は最善ではありません。また、パイプラインで処理することを意図したデータをワーカーに転送するためには、このオプションを使用しないでください。使用すると、適切な Dataflow データソースと組み合わせたネイティブの Cloud Storage/BigQuery API を使用するよりも大幅に低速になります。 |
filesToStage を省略した場合、Dataflow は Java クラスパスに基づいてステージングするファイルを推定します。左の列に記載されている考慮事項と注意事項(リストするファイルのタイプとコードからそれらにアクセスする方法)はここにも適用されます。 |
network |
String |
Compute Engine インスタンスを起動してパイプラインを実行するための Compute Engine ネットワーク。ネットワークの指定方法をご覧ください。 | 設定しないと、default というネットワーク名が想定されます。 |
subnetwork |
String |
Compute Engine インスタンスを起動してパイプラインを実行するための Compute Engine サブネットワーク。サブネットワークの指定方法をご覧ください。 | Dataflow サービスによりデフォルト値が決定されます。 |
usePublicIps |
boolean |
Dataflow ワーカーでパブリック IP アドレスを使用するかどうかを指定します。この値を false に設定した場合、Dataflow ワーカーではすべての通信にプライベート IP アドレスが使用されます。この場合、subnetwork オプションを指定すると network オプションは無視されます。指定した network または subnetwork の限定公開の Google アクセスが有効になっていることを確認してください。 |
設定しないと、デフォルト値は true になり、Dataflow ワーカーでパブリック IP アドレスが使用されます。 |
enableStreamingEngine |
boolean |
Dataflow Streaming Engine が有効か無効か。有効な場合は true です。Streaming Engine を有効にすると、ストリーミング パイプラインのステップを Dataflow サービス バックエンドで実行し、CPU、メモリ、Persistent Disk ストレージ リソースを節約できます。 | デフォルト値は false です。つまり、ストリーミング パイプラインのステップ全体がワーカー VM 上で実行されます。 |
createFromSnapshot |
String |
ストリーミング ジョブの作成時に使用するスナップショット ID を指定します。スナップショットを使用してストリーミング パイプラインの状態を保存し、その状態から新しいバージョンのジョブを開始できます。スナップショットの詳細については、スナップショットの使用をご覧ください。 | 設定されていない場合、ジョブの作成にスナップショットが使用されません。 |
hotKeyLoggingEnabled |
boolean |
パイプラインでホットキーが検出されると、ユーザーの Cloud Logging プロジェクトにキーが出力されるよう指定します。 | 設定しない場合、ホットキーの存在のみがログに記録されます。 |
diskSizeGb |
int |
各リモート Compute Engine ワーカー インスタンスで使用するディスクサイズ(ギガバイト単位)。設定する場合、ワーカーのブートイメージとローカルログを考慮して、少なくとも 30 GB を指定します。 Dataflow Shuffle を使用するバッチジョブの場合、このオプションではワーカー VM のブートディスクのサイズを設定します。Dataflow Shuffle を使用しないバッチジョブの場合、このオプションでは、シャッフルされたデータの保存に使用するディスクのサイズを設定します。ブートディスクのサイズには影響しません。
Streaming Engine を使用するストリーミング ジョブの場合、このオプションでブートディスクのサイズが設定されます。Streaming Engine を使用しないストリーミング ジョブの場合、Dataflow サービスによって作成される追加の永続ディスクのサイズを設定します。ブートディスクには影響しません。ストリーミング ジョブが Streaming Engine を使用しない場合は、ブートディスクのサイズをテストフラグ |
Cloud Platform プロジェクトで定義されたデフォルト サイズを使用するには バッチジョブが Dataflow Shuffle を使用する場合、デフォルトは 25 GB です。使用しない場合のデフォルトは 250 GB です。 ストリーミング ジョブが Streaming Engine を使用する場合、デフォルトは 30 GB です。使用しない場合のデフォルトは 400 GB です。 警告: ディスクサイズを小さくすると、使用可能なシャッフル I/O が減少します。Dataflow Shuffle や Streaming Engine を使用しないシャッフル バインドされているジョブは、ランタイムとジョブコストが増加する可能性があります。 |
serviceAccount |
String |
my-service-account-name@<project-id>.iam.gserviceaccount.com の形式で、ユーザー管理コントローラ サービス アカウントを指定します。詳細については、Cloud Dataflow のセキュリティと権限のページのコントローラ サービス アカウントのセクションをご覧ください。 |
設定されていない場合、ワーカーはプロジェクトの Compute Engine サービス アカウントをコントローラ サービス アカウントとして使用します。 |
workerDiskType |
String |
使用する永続ディスクのタイプ。ディスクタイプ リソースの完全 URL によって指定します。たとえば、SSD 永続ディスクを指定する場合は compute.googleapis.com/projects//zones//diskTypes/pd-ssd を使用します。詳細については、diskTypes に関する Compute Engine API リファレンス ページをご覧ください。 |
Dataflow サービスによりデフォルト値が決定されます。 |
workerMachineType |
String |
ワーカー VM の起動時に Dataflow によって使用される Compute Engine マシンタイプ。使用可能な任意の Compute Engine マシンタイプ ファミリーまたはカスタム マシンタイプを使用できます。
ワーカーの vCPU の数およびメモリ量(GB)に基づいて料金が請求されます。請求はマシンタイプ ファミリーとは独立しています。 |
このオプションを設定しない場合は、Dataflow サービスがジョブに基づいてマシンタイプを選択します。 |
パイプライン設定オプションの全リストについては、PipelineOptions インターフェース(およびそのサブインターフェース)の API for Java リファレンス ドキュメントをご覧ください。
Python
フィールド | 型 | 説明 | デフォルト値 |
---|---|---|---|
runner |
str |
使用する PipelineRunner 。このフィールドは、DirectRunner または DataflowRunner のいずれかです。 |
DirectRunner (ローカルモード) |
streaming |
bool |
ストリーミング モードが有効か無効か。有効な場合は true です。 | false |
project |
str |
Google Cloud プロジェクトのプロジェクト ID。これは、Dataflow マネージド サービスを使用してパイプラインを実行する場合に必要です。 | 設定されていない場合は、エラーがスローされます。 |
job_name |
String |
実行されている Dataflow ジョブの名前は、Dataflow のジョブリストとジョブの詳細に表示されます。 | Dataflow では自動的に一意の名前が生成されます。 |
temp_location |
str |
一時ファイルの Cloud Storage パス。gs:// から始まる有効な Cloud Storage URL である必要があります。 |
設定されていない場合は、デフォルトで staging_location の値になります。Google クラウドでパイプラインを実行するには、少なくとも temp_location または staging_location のいずれかを指定する必要があります。 |
staging_location |
str |
ローカル ファイルをステージングするための Cloud Storage パス。gs:// から始まる有効な Cloud Storage URL である必要があります。 |
設定されていない場合、デフォルトは temp_location 内のステージング ディレクトリになります。Google クラウドでパイプラインを実行するには、少なくとも temp_location または staging_location のいずれかを指定する必要があります。 |
autoscaling_algorithm |
str
| Dataflow ジョブの自動スケーリング モード。使用できる値は、THROUGHPUT_BASED (自動スケーリングを有効にする)または NONE (無効にする)です。Dataflow マネージド サービスでの自動スケーリングの動作については、自動チューニング機能をご覧ください。 |
すべてのバッチ Dataflow ジョブと Streaming Engine を使用するストリーミング ジョブのデフォルトは THROUGHPUT_BASED です。
Streaming Engine を使用しないストリーミング ジョブのデフォルトは NONE です。 |
num_workers |
int |
パイプラインの実行時に使用する Compute Engine インスタンスの数。 | 指定しない場合は、Dataflow サービスが適切なワーカー数を決定します。 |
max_num_workers |
int |
実行中にパイプラインに対して使用可能になる Compute Engine インスタンスの最大数。ジョブを自動またはその他の方法でスケールアップできるように、これはワーカーの初期数(num_workers で指定)よりも大きくできることに注意してください。 |
指定しない場合は、Dataflow サービスが適切なワーカー数を決定します。 |
number_of_worker_harness_threads |
int |
ワーカー ハンドラごとのスレッド数。 | 指定しない場合、Dataflow サービスがワーカーあたりの適切なスレッド数を決定します。このパラメータを使用するには、--experiments=use_runner_v2 フラグを使用する必要があります。 |
region |
str |
Dataflow ジョブをデプロイするリージョン エンドポイントを指定します。 | 設定しない場合、us-central1 がデフォルトになります。 |
worker_region |
String |
ワーカー インスタンスを起動してパイプラインを実行するための Compute Engine のリージョンを指定します。このオプションは、ジョブのデプロイ、管理、モニタリングに使用される 注: このオプションは |
設定されていない場合は、デフォルトで region の値になります。 |
worker_zone |
String |
ワーカー インスタンスを起動してパイプラインを実行するための Compute Engine のゾーンを指定します。このオプションは、ジョブのデプロイ、管理、モニタリングに使用される 注: このオプションは |
region または worker_region を指定した場合、worker_zone はデフォルトで対応するリージョンのゾーンになります。この動作を変更するには、別のゾーンを指定します。 |
zone |
str |
(非推奨)Apache Beam SDK 2.17.0 以前では、ワーカー インスタンスを起動してパイプラインを実行するための Compute Engine のゾーンを指定します。 | region を指定すると、zone はデフォルトで対応するリージョンのゾーンになります。この動作を変更するには、別のゾーンを指定します。 |
dataflow_kms_key |
str |
保存データの暗号化に使用する顧客管理の暗号鍵(CMEK)を指定します。暗号鍵は Cloud KMS で制御できます。この機能を使用するには、temp_location を定義する必要があります。 |
指定しない場合、CMEK の代わりにデフォルトの Google Cloud 暗号化が使用されます。 |
flexrs_goal |
str |
自動スケーリングされたバッチジョブに Flexible Resource Scheduling(FlexRS)を指定します。num_workers 、autoscaling_algorithm 、zone 、region 、machine_type パラメータに影響します。詳細については、FlexRS パイプライン オプションのセクションをご覧ください。 |
指定しない場合、デフォルトの SPEED_OPTIMIZED が使用されます。これは、このフラグを省略するのと同じです。FlexRS を有効にするには、Dataflow サービスで利用可能な割引リソースを選択できるように、値 COST_OPTIMIZED を指定する必要があります。 |
network |
str |
Compute Engine インスタンスを起動してパイプラインを実行するための Compute Engine ネットワーク。ネットワークの指定方法をご覧ください。 | 設定しないと、default というネットワーク名が想定されます。 |
subnetwork |
str |
Compute Engine インスタンスを起動してパイプラインを実行するための Compute Engine サブネットワーク。サブネットワークの指定方法をご覧ください。 | Dataflow サービスによりデフォルト値が決定されます。 |
use_public_ips |
bool |
Dataflow ワーカーでパブリック IP アドレスを使用する必要があることを指定します。この値を false に設定した場合、Dataflow ワーカーではすべての通信にプライベート IP アドレスが使用されます。この場合、subnetwork オプションを指定すると network オプションは無視されます。指定した network または subnetwork の限定公開の Google アクセスが有効になっていることを確認してください。このオプションを使用するには、Beam SDK for Python が必要です。非推奨の Dataflow SDK for Python ではサポートされていません。 |
設定されていない場合、Dataflow ワーカーでパブリック IP アドレスが使用されます。 |
enable_streaming_engine |
bool |
Dataflow Streaming Engine が有効か無効か。有効な場合は true です。Streaming Engine を有効にすると、ストリーミング パイプラインのステップを Dataflow サービス バックエンドで実行し、CPU、メモリ、Persistent Disk ストレージ リソースを節約できます。 | デフォルト値は false です。つまり、ストリーミング パイプラインのステップ全体がワーカー VM 上で実行されます。 |
disk_size_gb |
int |
各リモート Compute Engine ワーカー インスタンスで使用するディスクサイズ(ギガバイト単位)。設定する場合は、ワーカー ブートイメージとローカルログを考慮して、ディスクサイズとして少なくとも 30 GB を指定します。 Dataflow Shuffle を使用するバッチジョブの場合、このオプションではワーカー VM のブートディスクのサイズを設定します。Dataflow Shuffle を使用しないバッチジョブの場合、このオプションでは、シャッフルされたデータの保存に使用するディスクのサイズを設定します。ブートディスクのサイズには影響しません。
Streaming Engine を使用するストリーミング ジョブの場合、このオプションでブートディスクのサイズが設定されます。Streaming Engine を使用しないストリーミング ジョブの場合、Dataflow サービスによって作成される追加の永続ディスクのサイズを設定します。ブートディスクには影響しません。ストリーミング ジョブが Streaming Engine を使用しない場合は、ブートディスクのサイズをテストフラグ |
Cloud Platform プロジェクトで定義されたデフォルト サイズを使用するには バッチジョブが Dataflow Shuffle を使用する場合、デフォルトは 25 GB です。使用しない場合のデフォルトは 250 GB です。 ストリーミング ジョブが Streaming Engine を使用する場合、デフォルトは 30 GB です。使用しない場合のデフォルトは 400 GB です。 警告: ディスクサイズを小さくすると、使用可能なシャッフル I/O が減少します。Dataflow Shuffle や Streaming Engine を使用しないシャッフル バインドされているジョブは、ランタイムとジョブコストが増加する可能性があります。 |
service_account_email |
str |
my-service-account-name@<project-id>.iam.gserviceaccount.com の形式で、ユーザー管理コントローラ サービス アカウントを指定します。詳細については、Cloud Dataflow のセキュリティと権限のページのコントローラ サービス アカウントのセクションをご覧ください。 |
設定されていない場合、ワーカーはプロジェクトの Compute Engine サービス アカウントをコントローラ サービス アカウントとして使用します。 |
worker_disk_type |
str |
使用する永続ディスクのタイプ。ディスクタイプ リソースの完全 URL によって指定します。たとえば、SSD 永続ディスクを指定する場合は compute.googleapis.com/projects//zones//diskTypes/pd-ssd を使用します。詳細については、diskTypes に関する Compute Engine API リファレンス ページをご覧ください。 |
Dataflow サービスによりデフォルト値が決定されます。 |
machine_type |
str |
ワーカー VM の起動時に Dataflow によって使用される Compute Engine マシンタイプ。使用可能な任意の Compute Engine マシンタイプ ファミリーまたはカスタム マシンタイプを使用できます。
ワーカーの vCPU の数およびメモリ量(GB)に基づいて料金が請求されます。請求はマシンタイプ ファミリーとは独立しています。 |
このオプションを設定しない場合は、Dataflow サービスがジョブに基づいてマシンタイプを選択します。 |
Java: SDK 1.x
ローカル実行用に PipelineOption を構成する
パイプラインをマネージド クラウド リソースで実行する代わりに、パイプラインをローカルで実行することを選択できます。ローカル実行には、テスト、デバッグ、または小さなデータセットに対するパイプラインの実行に一定のメリットがあります。たとえば、ローカル実行の場合、リモート Dataflow サービスと関連の Google Cloud プロジェクトとの依存関係が削除されます。
ローカル実行を使用する場合、ローカルメモリに十分に収まる小さなデータセットでパイプラインを実行することを強くおすすめします。Create
変換を使用して小さなメモリ内データセットを作成するか、Read
変換を使用して小さなローカルまたはリモート ファイルを操作できます。ローカル実行では、少数の外部依存関係でテストとデバッグをすばやく簡単に実行する方法が提供されますが、ローカル環境で使用可能なメモリにより制限されます。
次のコード例は、ローカル環境で実行するパイプラインの作成方法を示しています。
Java: SDK 2.x
// Create and set our Pipeline Options. PipelineOptions options = PipelineOptionsFactory.create(); // Create the Pipeline with the specified options. Pipeline p = Pipeline.create(options);
注: ローカルモードでは、DirectRunner
がすでにデフォルトになっているため、ランナーを設定する必要はありません。ただし、DirectRunner
を明示的に依存関係として含めるか、クラスパスに追加する必要があります。
Python
# Create and set your Pipeline Options. options = PipelineOptions(flags=argv) my_options = options.view_as(MyOptions) with Pipeline(options=options) as pipeline: pass # build your pipeline here.
注: ローカルモードでは、DirectRunner
がすでにデフォルトになっているため、ランナーを設定する必要はありません。
Java: SDK 1.x
パイプラインを作成したら、それを実行します。
他のローカル パイプライン オプションを設定する
パイプラインをローカルで実行する場合、PipelineOptions
のプロパティのデフォルト値は一般に十分です。
Java: SDK 2.x
Java PipelineOptions
のデフォルト値は Java API リファレンスに記載されています。詳しくは、PipelineOptions クラスリストをご覧ください。
パイプラインで BigQuery や Cloud Storage などの Google Cloud プロダクトを IO に使用している場合は、特定の Google Cloud プロジェクトと認証情報のオプションを設定することが必要な場合があります。このような場合は、GcpOptions.setProject
を使用して Google Cloud プロジェクト ID を設定する必要があります。認証情報を明示的に設定することが必要な場合もあります。詳細については、GcpOptions クラスをご覧ください。
Python
Python PipelineOptions
のデフォルト値は Python API リファレンスに記載されています。詳しくは、PipelineOptions モジュール リストをご覧ください。
パイプラインで BigQuery や Cloud Storage などの Google Cloud プロダクトを IO に使用している場合は、特定の Google Cloud プロジェクトと認証情報のオプションを設定することが必要な場合があります。このような場合は、options.view_as(GoogleCloudOptions).project
を使用して Google Cloud プロジェクト ID を設定する必要があります。認証情報を明示的に設定することが必要な場合もあります。詳細については、GoogleCloudOptions クラスをご覧ください。
Java: SDK 1.x