Dataflow パイプライン オプションを設定する

このページでは、Dataflow ジョブのパイプライン オプションを設定する方法について説明します。これらのパイプライン オプションは、パイプラインの実行方法と実行場所、使用されるリソースを構成します。

パイプラインの実行は、Apache Beam プログラムの実行とは別のものです。記述した Apache Beam プログラムは、遅延実行用のパイプラインを構築します。つまり、このプログラムは、サポートされている Apache Beam ランナーが実行できる一連のステップを生成します。互換性のあるランナーには、Google Cloud 上の Dataflow ランナーと、ローカル環境でパイプラインを直接実行するダイレクト ランナーが含まれます。

ランタイムにパラメータを Dataflow ジョブに渡すことができます。ランタイムにパイプライン オプションを設定する方法については、パイプライン オプションの構成をご覧ください。

Apache Beam SDK でパイプライン オプションを使用する

次の SDK を使用して、Dataflow ジョブのパイプライン オプションを設定できます。

  • Apache Beam SDK for Python
  • Apache Beam SDK for Java
  • Apache Beam SDK for Go

SDK を使用するには、Apache Beam SDK クラス PipelineOptions を使用してパイプライン ランナーとその他の実行パラメータを設定します。

パイプライン オプションを指定するには、2 通りの方法があります。

  • パイプライン オプションのリストを指定して、パイプライン オプションをプログラムで設定する。
  • パイプライン コードの実行時に、コマンドラインで直接パイプライン オプションを設定する。

プログラムでパイプライン オプションを設定する

パイプライン オプションをプログラムで設定するには、PipelineOptions オブジェクトを作成して変更します。

Java

メソッド PipelineOptionsFactory.fromArgs を使用して PipelineOptions オブジェクトを作成します。

例については、このページの Dataflow サンプルで起動するをご覧ください。

Python

PipelineOptions オブジェクトを作成します。

例については、このページの Dataflow サンプルで起動するをご覧ください。

Go

PipelineOptions を使用してパイプライン オプションをプログラムで設定することは、Apache Beam SDK for Go ではサポートされていません。Go のコマンドライン引数を使用してください。

例については、このページの Dataflow サンプルで起動するをご覧ください。

コマンドラインでパイプライン オプションを設定する

コマンドライン引数を使用して、パイプライン オプションを設定できます。

Java

次の構文例は、Java クイックスタートWordCount パイプラインのものです。

mvn -Pdataflow-runner compile exec:java \
  -Dexec.mainClass=org.apache.beam.examples.WordCount \
  -Dexec.args="--project=PROJECT_ID \
  --gcpTempLocation=gs://BUCKET_NAME/temp/ \
  --output=gs://BUCKET_NAME/output \
  --runner=DataflowRunner \
  --region=REGION"

次のように置き換えます。

  • PROJECT_ID: 実際の Google Cloud プロジェクト ID
  • BUCKET_NAME: Cloud Storage バケットの名前
  • REGION: Dataflow リージョンus-central1

Python

次の構文例は、Python クイックスタートWordCount パイプラインのものです。

python -m apache_beam.examples.wordcount \
  --region DATAFLOW_REGION \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output gs://STORAGE_BUCKET/results/outputs \
  --runner DataflowRunner \
  --project PROJECT_ID \
  --temp_location gs://STORAGE_BUCKET/tmp/

次のように置き換えます。

  • DATAFLOW_REGION: Dataflow ジョブをデプロイするリージョン(例: europe-west1

    --region フラグは、メタデータ サーバー、ローカル クライアント、または環境変数に設定されているデフォルト リージョンをオーバーライドします。

  • STORAGE_BUCKET: Cloud Storage バケット名

  • PROJECT_ID: Google Cloud プロジェクト ID

Go

次の構文例は、Go クイックスタートWordCount パイプラインのものです。

go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
   --output gs://BUCKET_NAME/results/outputs \
   --runner dataflow \
   --project PROJECT_ID \
   --region DATAFLOW_REGION \
   --staging_location gs://BUCKET_NAME/binaries/

次のように置き換えます。

  • BUCKET_NAME: Cloud Storage バケット名

  • PROJECT_ID: Google Cloud プロジェクト ID

  • DATAFLOW_REGION: Dataflow ジョブをデプロイするリージョン。例: europe-west1--region フラグは、メタデータ サーバー、ローカル クライアント、または環境変数に設定されているデフォルト リージョンをオーバーライドします。

試験運用版のパイプライン オプションを設定する

Java、Python、Go SDK では、experiments パイプライン オプションを使用して、試験運用版または一般提供前の Dataflow 機能を有効にします。

プログラムで設定する

プログラムで experiments オプションを設定するには、次の構文を使用します。

Java

PipelineOptions オブジェクトに、次の構文を使用して experiments オプションを追加します。この例では、テストフラグを使用してブートディスクのサイズを 80 GB に設定しています。

options.setExperiments("streaming_boot_disk_size_gb=80")

PipelineOptions オブジェクトの作成例については、このページの Dataflow サンプルで起動するをご覧ください。

Python

PipelineOptions オブジェクトに、次の構文を使用して experiments オプションを追加します。この例では、テストフラグを使用してブートディスクのサイズを 80 GB に設定しています。

beam_options = PipelineOptions(
  beam_args,
  experiments=['streaming_boot_disk_size_gb=80'])

PipelineOptions オブジェクトの作成例については、このページの Dataflow サンプルで起動するをご覧ください。

Go

PipelineOptions を使用してパイプライン オプションをプログラムで設定することは、Apache Beam SDK for Go ではサポートされていません。Go のコマンドライン引数を使用してください。

コマンドラインで設定する

コマンドラインで experiments オプションを設定するには、次の構文を使用します。

Java

この例では、テストフラグを使用してブートディスクのサイズを 80 GB に設定しています。

--experiments=streaming_boot_disk_size_gb=80

Python

この例では、テストフラグを使用してブートディスクのサイズを 80 GB に設定しています。

--experiments=streaming_boot_disk_size_gb=80

Go

この例では、テストフラグを使用してブートディスクのサイズを 80 GB に設定しています。

--experiments=streaming_boot_disk_size_gb=80

テンプレートで設定する

Dataflow テンプレートの実行時に試験運用版機能を有効にするには、--additional-experiments フラグを使用します。

クラシック テンプレート

gcloud dataflow jobs run JOB_NAME --additional-experiments=EXPERIMENT[,...]

Flex テンプレート

gcloud dataflow flex-template run JOB_NAME --additional-experiments=EXPERIMENT[,...]

パイプライン オプション オブジェクトにアクセスする

Apache Beam プログラムで Pipeline オブジェクトを作成するときに、PipelineOptions を渡します。Dataflow サービスは、パイプラインを実行するときに、PipelineOptions のコピーを各ワーカーに送信します。

Java

ProcessContext.getPipelineOptions メソッドを使用して、ParDo 変換の DoFn インスタンス内の PipelineOptions にアクセスします。

Python

この機能は Apache Beam SDK for Python ではサポートされていません。

Go

beam.PipelineOptions を使用してパイプライン オプションにアクセスします。

Dataflow で起動する

Dataflow ランナー サービスを使用して、マネージド Google Cloud リソースでジョブを実行します。Dataflow でパイプラインを実行すると、Google Cloud プロジェクトの Compute Engine リソースと Cloud Storage リソースを使用する Dataflow ジョブが作成されます。Dataflow の権限の詳細については、Dataflow のセキュリティと権限をご覧ください。

Dataflow ジョブでは、パイプライン実行中に一時ファイルを格納するために Cloud Storage を使用します。不要なストレージ費用が発生しないようにするには、Dataflow ジョブで一時的なストレージに使用するバケットで削除(復元可能)機能をオフにします。詳細については、バケットから削除(復元可能)ポリシーを削除するをご覧ください。

必須オプションを設定する

Dataflow を使用してパイプラインを実行するには、次のパイプライン オプションを設定します。

Java

  • project: Google Cloud プロジェクトの ID。
  • runner: パイプラインを実行するパイプライン ランナー。Google Cloud で実行する場合は DataflowRunner にする必要があります。
  • gcpTempLocation: 多くの一時ファイルをステージングするための Dataflow 用の Cloud Storage パス。バケットを指定する場合は、事前にバケットを作成しておく必要があります。gcpTempLocation を設定しない場合は、パイプライン オプション tempLocation を設定できます。その場合、gcpTempLocationtempLocation の値に設定されます。どちらも指定されていない場合は、デフォルトの gcpTempLocation が作成されます。
  • stagingLocation: バイナリ ファイルをステージングするための Dataflow 用の Cloud Storage パス。Apache Beam SDK 2.28 以降を使用している場合は、このオプションを設定しないでください。Apache Beam SDK 2.28 以前を使用している場合は、このオプションを設定しないと、tempLocation に指定したものがステージングのロケーションに使用されます。

    このオプションと tempLocation のどちらも指定されていない場合は、デフォルトの gcpTempLocation が作成されます。tempLocation が指定され、gcpTempLocation が指定されていない場合、tempLocation は Cloud Storage パスである必要があり、gcpTempLocation はデフォルトで同じ値になります。tempLocation が指定されず、gcpTempLocation が指定されている場合は、tempLocation に値は入力されません。

Python

  • project: Google Cloud プロジェクト ID。
  • region: Dataflow ジョブのリージョン。
  • runner: パイプラインを実行するパイプライン ランナー。Google Cloud で実行する場合は DataflowRunner にする必要があります。
  • temp_location: パイプラインの実行中に作成される一時ジョブファイルをステージングするための Dataflow 用の Cloud Storage パス。

Go

  • project: Google Cloud プロジェクト ID。
  • region: Dataflow ジョブのリージョン。
  • runner: パイプラインを実行するパイプライン ランナー。Google Cloud で実行する場合は dataflow にする必要があります。
  • staging_location: パイプラインの実行中に作成される一時ジョブファイルをステージングするための Dataflow 用の Cloud Storage パス。

プログラムでパイプライン オプションを設定する

次のサンプルコードは、Dataflow を使用してパイプラインを実行するために、ランナーとその他の必須オプションをプログラムで設定してパイプラインを作成する方法を示しています。

Java

// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()

# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
    beam_args,
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
    region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']

# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
  pass  # build your pipeline here.

Go

Apache Beam SDK for Go では、Go コマンドライン引数が使用されます。フラグの値を設定するには、flag.Set() を使用します。

// Use the Go flag package to parse custom options.
flag.Parse()

// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")

beam.Init()

// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()

パイプラインを作成した後で、すべてのパイプライン読み取り、変換、書き込みを指定し、パイプラインを実行します。

コマンドラインからパイプライン オプションを使用する

次の例は、コマンドラインで指定されたパイプライン オプションを使用する方法を示しています。この例では、パイプライン オプションをプログラムで設定していません。

Java

// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

コマンドライン オプションを解析するには、Python argparse モジュールを使用します。

# Use Python argparse module to parse custom arguments
import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# For more details on how to use argparse, take a look at:
#   https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
    '--input-file',
    default='gs://dataflow-samples/shakespeare/kinglear.txt',
    help='The file path for the input text to process.')
parser.add_argument(
    '--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()

# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | 'Read files' >> beam.io.ReadFromText(args.input_file)
      | 'Write files' >> beam.io.WriteToText(args.output_path))

Go

Go flag パッケージを使用して、コマンドライン オプションを解析します。beam.Init() を呼び出す前にオプションを解析する必要があります。この例では、output がコマンドライン オプションです。

// Define configuration options
var (
  output = flag.String("output", "", "Output file (required).")
)

// Parse options before beam.Init()
flag.Parse()

beam.Init()

// Input validation must be done after beam.Init()
if *output == "" {
  log.Fatal("No output provided!")
}

p := beam.NewPipeline()

パイプラインを作成した後で、すべてのパイプライン読み取り、変換、書き込みを指定し、パイプラインを実行します。

実行モードを制御する

Apache Beam プログラムが Dataflow などのサービスでパイプラインを実行する場合、プログラムはパイプラインを非同期で実行するか、パイプラインが完了するまでブロックします。この動作は、次のガイダンスを使用して変更できます。

Java

Apache Beam Java プログラムが Dataflow などのサービスでパイプラインを実行すると、通常は非同期で実行されます。パイプラインを実行し、ジョブが完了するまで待機するには、DataflowRunner をパイプライン ランナーとして設定し、明示的に pipeline.run().waitUntilFinish() を呼び出します。

DataflowRunner を使用し、pipeline.run() から返された PipelineResult オブジェクトで waitUntilFinish() を呼び出すと、パイプラインは Google Cloud で実行されますが、ローカルコードはクラウドジョブが終了して最後の DataflowPipelineJob オブジェクトが返されるまで待機します。ジョブの実行中、Dataflow サービスは待機しながらジョブ ステータスの更新とコンソール メッセージを出力します。

Python

Apache Beam Python プログラムが Dataflow などのサービスでパイプラインを実行すると、通常は非同期で実行されます。パイプラインの完了までブロックするには、ランナーの run() メソッドから返される PipelineResult オブジェクトの wait_until_finish() メソッドを使用します。

Go

Apache Beam Go プログラムが Dataflow でパイプラインを実行する場合、プログラムはデフォルトで同期を行い、パイプラインが完了するまでブロックします。ブロックを行わない場合は、次の 2 つの方法があります。

  1. Go ルーチンでジョブを開始します。

    go func() {
      pr, err := beamx.Run(ctx, p)
      if err != nil {
        // Handle the error.
      }
      // Send beam.PipelineResult into a channel.
      results <- pr
    }()
    // Do other operations while the pipeline runs.
    
  2. jobopts パッケージにある --async コマンドライン フラグを使用します。

実行の詳細の表示、進行状況のモニタリング、ジョブ完了ステータスの確認を行うには、Dataflow モニタリング インターフェースまたは Dataflow コマンドライン インターフェースを使用します。

ストリーミング ソースを使用する

Java

無限データソース(Pub/Sub など)からパイプラインで読み取る場合、パイプラインは自動的にストリーミング モードで実行されます。

Python

無限データソース(Pub/Sub など)をパイプラインで使用する場合は、streaming オプションを true に設定する必要があります。

Go

無限データソース(Pub/Sub など)からパイプラインで読み取る場合、パイプラインは自動的にストリーミング モードで実行されます。

ストリーミング ジョブは、デフォルトで n1-standard-2 以上の Compute Engine マシンタイプを使用します。

ローカルで起動する

パイプラインをマネージド クラウド リソースで実行する代わりに、パイプラインをローカルで実行することを選択できます。ローカル実行には、テスト、デバッグ、または小さなデータセットに対するパイプラインの実行に一定のメリットがあります。たとえば、ローカル実行の場合、リモート Dataflow サービスと関連の Google Cloud プロジェクトとの依存関係が削除されます。

ローカル実行を使用する場合、ローカルメモリに十分に収まる小さなデータセットでパイプラインを実行する必要があります。Create 変換を使用して小さなメモリ内データセットを作成するか、Read 変換を使用して小さなローカル ファイルまたはリモート ファイルを操作できます。通常、ローカル実行の場合、より少ない外部依存関係でテストとデバッグをより速く簡単に実行できますが、ローカル環境で使用可能なメモリによって制限されます。

次のサンプルコードは、ローカル環境で実行するパイプラインの作成方法を示しています。

Java

// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()

# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)

with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | beam.io.ReadFromText(args.input)
      | beam.io.WriteToText(args.output))

Go

// Parse options before beam.Init()
flag.Parse()

beam.Init()

p := beam.NewPipeline()

パイプラインを作成したら、それを実行します。

カスタム パイプライン オプションを作成する

標準の PipelineOptions に加えて、独自のカスタム オプションを追加できます。Apache Beam のコマンドラインでは、同じ形式で指定されたコマンドライン引数を使用して、カスタム オプションを解析することもできます。

Java

独自のオプションを追加するには、次の例のように、各オプションのゲッター メソッドとセッター メソッドでインターフェースを定義します。

public interface MyOptions extends PipelineOptions {
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

Python

独自のオプションを追加するには、次の例のように add_argument() メソッドを使用します(これは Python の標準 argparse モジュールとまったく同じ動作をします)。

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

Go

独自のオプションを追加するには、次の例に示すように、Go フラグ パッケージを使用します。

var (
  input  = flag.String("input", "", "")
  output = flag.String("output", "", "")
)

ユーザーが --help をコマンドライン引数として渡したときに表示される説明と、デフォルト値を指定することもできます。

Java

次のように、アノテーションを使用して説明とデフォルト値を設定します。

public interface MyOptions extends PipelineOptions {
  @Description("My custom command line argument.")
  @Default.String("DEFAULT")
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

PipelineOptionsFactory でインターフェースを登録してから、PipelineOptions オブジェクトを作成するときにインターフェースを渡すことをおすすめします。PipelineOptionsFactory でインターフェースを登録する場合、--help でカスタム オプション インターフェースを検索し、--help コマンドの出力に追加できます。PipelineOptionsFactory は、カスタム オプションが他のすべての登録済みオプションと互換性があることを検証します。

次のサンプルコードは、PipelineOptionsFactory でカスタム オプション インターフェースを登録する方法を示しています。

PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                          .withValidation()
                                          .as(MyOptions.class);

これで、パイプラインは --myCustomOption=value をコマンドライン引数として受け入れることができます。

Python

説明とデフォルト値は次のように設定します。

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input',
        default='gs://dataflow-samples/shakespeare/kinglear.txt',
        help='The file path for the input text to process.')
    parser.add_argument(
        '--output', required=True, help='The path prefix for output files.')

Go

説明とデフォルト値は次のように設定します。

var (
  input  = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
  output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)