Dataflow パイプラインの開発とテスト

このページでは、Dataflow パイプラインの開発とテストのベスト プラクティスについて説明します。

概要

パイプラインのコードの実装方法は、本番環境でのパイプラインのパフォーマンスに大きく影響します。適正かつ効率的に機能するパイプライン コードを作成できるように、このドキュメントでは次のことについて説明します。

  • 開発とデプロイのさまざまな段階でコードの実行をサポートするパイプライン ランナー。
  • デプロイ環境。開発、テスト、本番前環境、本番環境でパイプラインを実行できます。
  • オープンソースのパイプライン コードとテンプレート。そのまま使用することも、コードを迅速に開発するために新しいパイプラインのベースとして使用することもできます。
  • パイプラインのオブザーバビリティとパフォーマンスを改善するためのパイプライン開発のコーディングのベスト プラクティス。これらのプラクティスの多くは、Apache Beam SDK を使用したプログラミングに適用されるものであり(この例では Java を使用しています)、Dataflow 固有のものではありません。しかし、多くの場合、本番環境を準備するためにコーディングのプラクティスを補う機能を Dataflow は提供しています。
  • パイプライン コードをテストするためのベスト プラクティス。まず、このドキュメントでは、単体テスト、統合テスト、エンドツーエンド テストなどのテストの範囲とその関係などの概要について説明します。次に、テストデータを作成して統合する方法と各テストで使用するパイプライン ランナーについて詳しく説明します。

パイプライン ランナー

開発とテストでは、さまざまな Apache Beam ランナーを使用してパイプライン コードを実行します。Apache Beam SDK には、ローカルの開発用とテスト用に Direct Runner が用意されています。単体テストと統合テストでは、リリース自動化ツールを使って Direct Runner を使用することもできます。たとえば、継続的インテグレーション(CI)パイプラインで Direct Runner を使用できます。

Dataflow にデプロイされるパイプラインでは、Dataflow Runner を使用します。これにより、本番環境に似た環境でパイプラインが実行されます。また、アドホック開発のテストやエンドツーエンドのパイプライン テストにも Dataflow Runner を使用できます。

このページでは、Apache Beam Java SDK を使用してビルドされたパイプラインの実行に焦点をあわせていますが、Dataflow は Python と Go を使用して開発された Apache Beam パイプラインもサポートしています。Apache Beam Java、Python、Go SDK が Dataflow で一般提供されています。SQL デベロッパーは Apache Beam SQL を使用し、使い慣れた SQL 言語でパイプラインを作成することもできます。

デプロイ環境を設定する

開発のステージごとにユーザー、データ、コード、その他のリソースを分離するには、デプロイ環境を作成します。パイプライン開発のさまざまなステージで分離された環境を用意するときに、可能であれば、別々の Google Cloud プロジェクトを使用してください。

以降のセクションでは、一般的なデプロイ環境のセットについて説明します。

ローカル環境

ローカル環境は、デベロッパー ワークステーションです。開発と迅速なテストを行うには、Direct Runner を使用してパイプライン コードをローカルで実行します。

Direct Runner を使用してローカルで実行されるパイプラインは、Pub/Sub トピックや BigQuery テーブルなどのリモート Google Cloud リソースとやり取りできます。個々のデベロッパーに個別の Google Cloud プロジェクトを用意すると、Google Cloud サービスでアドホック テストを行うためのサンドボックスを使用できます。

Pub/SubBigtable などの一部の Google Cloud サービスでは、ローカル開発用のエミュレータが用意されています。Direct Runner でこれらのエミュレータを使用すると、ローカルでエンドツーエンドの開発とテストを行うことができます。

サンドボックス環境

サンドボックス環境は、デベロッパーがコード開発時に Google Cloud サービスにアクセスできるようにする Google Cloud プロジェクトです。パイプラインのデベロッパーは、Google Cloud プロジェクトを他のデベロッパーと共有したり、各自の独立したプロジェクトを使用したりできます。独立したプロジェクトを使用することで、共有リソースの使用量と割り当ての管理に関する計画の複雑さを軽減できます。

デベロッパーは、サンドボックス環境を使用して、Dataflow Runner を使用したアドホック パイプラインを実行します。サンドボックス環境は、コード開発フェーズで本番環境のランナーに対してコードのデバッグとテストを行う場合に役立ちます。たとえば、アドホック パイプラインを実行すると、デベロッパーは次のことが可能になります。

  • コード変更によるスケーリングの動作への影響を確認する。
  • Direct Runner と Dataflow Runner で考えられる動作の違いを把握する。
  • Dataflow でグラフの最適化がどのように行われるのかを理解する。

アドホック テストでは、デベロッパーはサンドボックス環境で Dataflow を実行するために、ローカル環境からコードをデプロイできます。

本番前環境

本番前環境は、エンドツーエンド テストのように本番環境に相当する条件が求められる開発フェーズに使用する環境です。本番前環境用に別のプロジェクトを使用します。可能であれば、本番環境と同様に構成します。同様に、本番環境と同様の規模でエンドツーエンドのテストを実施する場合は、Dataflow やその他のサービスの Google Cloud プロジェクトの割り当てを本番環境とできるだけ近いものにします。

必要であれば、本番前環境をさらに複数の環境に分割することもできます。たとえば、品質管理環境は、さまざまなワークロード条件でデータの正確性、鮮度、パフォーマンスなどのサービスレベル目標(SLO)をテストする品質アナリストの作業をサポートします。

エンドツーエンド テストでは、テスト範囲内でデータソースとシンクを統合します。これらを本番前環境で使用可能にする方法を検討してください。テストデータは本番前環境で保存できます。たとえば、テストデータは入力データと一緒に Cloud Storage バケットに保存されます。その他のケースでは、本番環境の別のサブスクリプションを介して Pub/Sub トピックなどの本番前環境以外からテストデータを受け取る場合があります。ストリーミング パイプラインの場合は、Dataflow Streaming Data Generator などを使用し、生成されたデータでエンドツーエンド テストを実行して、本番環境に似たデータ特性とボリュームをエミュレートすることもできます。

ストリーミング パイプラインの場合は、本番環境に変更を加える前に、本番前環境でパイプラインの更新をテストします。特に、ダウンタイムを回避するために並列パイプラインを実行するなど、複数のステップを調整する必要がある場合は、ストリーミング パイプラインの更新手順をテストして検証することが重要です。

本番環境

本番環境は専用の Google Cloud プロジェクトです。継続的デリバリーは、すべてのエンドツーエンド テストに合格すると、本番環境にデプロイ アーティファクトをコピーします。

開発に関するベスト プラクティス

このセクションでは、コーディングと開発に関するベスト プラクティスについて説明します。これらの多くは、デベロッパーの生産性の向上、パイプラインのテスト容易性の向上、パフォーマンスの向上、モニタリングによる詳細な分析の有効化など、パイプラインの開発と運用の側面を補完し、強化します。

開発を始める前に、開発、テスト、デリバリーのライフサイクルをサポートするデプロイ環境を設定します。

Google 提供のテンプレートを使用する

パイプライン開発を加速させるため、Google から既存の Dataflow テンプレートが提供されているかどうか確認してください。一部のテンプレートでは、パイプラインのステップとしてカスタム ロジックを追加できます。たとえば、Pub/Sub Subscription to BigQuery テンプレートには、Cloud Storage に保存されている JavaScript ユーザー定義関数(UDF)を実行するためのパラメータが含まれています。Google 提供のテンプレートは Apache License 2.0 のオープンソースであるため、新しいパイプラインのベースとして使用できます。テンプレートは参照用のコードサンプルとしても役立ちます。

再利用可能な変換のライブラリを作成する

Apache Beam プログラミング モデルではバッチとストリーミングのデータ処理が統合され、変換の再利用が可能になっています。共通の変換の共有ライブラリを作成すると、再利用性やテスト容易性が向上し、異なるチームでコードを所有しやすくなります。

次の 2 つの Java サンプルコードについて考えてみましょう。どちらも支払いイベントも読み取ります。1 つ目は、制限のない Pub/Sub をソースとするものです。

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

// Initial read transform
PCollection<PaymentEvent> payments =
    p.apply("Read from topic",
        PubSubIO.readStrings().withTimestampAttribute(...).fromTopic(...))
        .apply("Parse strings into payment events",
            ParDo.of(new ParsePaymentEventFn()));

2 つ目は、制限付きのリレーショナル データベースをソースとするものです。

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options);

PCollection<PaymentEvent> payments =
    p.apply(
        "Read from database table",
        JdbcIO.<PaymentEvent>read()
            .withDataSourceConfiguration(...)
            .withQuery(...)
            .withRowMapper(new RowMapper<PaymentEvent>() {
              ...
            }));

両方のパイプラインが同じ処理を実行する場合、共有ライブラリを介して同じ変換を残りの処理ステップで使用できます。コードの再利用に関するベスト プラクティスの実装方法は、プログラミング言語やビルドツールによって異なります。たとえば、Maven を使用する場合、変換コードを独自のモジュールに分割できます。これにより、次のサンプルコードに示すように、別のパイプラインで使用可能なマルチモジュール内のサブモジュールとしてモジュールを追加できます。

// Reuse transforms across both pipelines
payments
    .apply("ValidatePayments", new PaymentTransforms.ValidatePayments(...))
    .apply("ProcessPayments", new PaymentTransforms.ProcessPayments(...))
  ...

詳しくは、Apache Beam ドキュメントで Apache Beam 変換のユーザーコードの作成のベスト プラクティスPTransforms の推奨スタイルガイドをご覧ください。

エラー処理にデッドレター キューを使用する

パイプラインで、要素を処理できない状況が発生することがあります。この原因はさまざまですが、一般的な原因はデータにあります。たとえば、不適切な形式の JSON が要素に含まれているため、解析に失敗することがあります。

このような状況におけるアプローチの 1 つは、DoFn.ProcessElement メソッドで例外をキャッチする方法です。例外ブロックでエラーをログに記録し、要素を廃棄します。ただし、これによりデータが失われるため、手動処理やトラブルシューティングでデータが検査できなくなります。

これよりも良い方法として、デッドレター キュー(またはデッドレター ファイル)というパターンを使用する方法があります。DoFn.ProcessElement メソッドで例外をキャッチして、通常どおりエラーをロギングします。失敗した要素を削除するのではなく、分岐出力により、失敗した要素を別の PCollection オブジェクトに書き込みます。その後、これらの要素はデータシンクに書き込まれるので、後で検査したり、別の変換を使用して処理できます。

次の Java コードの例は、デッドレター キュー パターンの実装方法を示しています。

TupleTag<Output> successTag = new TupleTag<>() {};
TupleTag<Input> deadLetterTag = new TupleTag<>() {};

PCollection<Input> input = /* ... */;

PCollectionTuple outputTuple =
    input.apply(ParDo.of(new DoFn<Input, Output>() {
      @Override
      void processElement(ProcessContext c) {
        try {
          c.output(process(c.element()));
        } catch (Exception e) {
          LOG.severe("Failed to process input {} -- adding to dead-letter file",
              c.element(), e);
          c.sideOutput(deadLetterTag, c.element());
        }
      }).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));

// Write the dead-letter inputs to a BigQuery table for later analysis
outputTuple.get(deadLetterTag)
    .apply(BigQueryIO.write(...));

// Retrieve the successful elements...
PCollection<Output> success = outputTuple.get(successTag);
// and continue processing ...

Cloud Monitoring を使用して、パイプラインのデッドレター キューにさまざまなモニタリング ポリシーとアラート ポリシーを適用できます。たとえば、デッドレター変換によって処理される要素の数とサイズを可視化し、特定のしきい値条件が満たされたときにトリガーするアラートを構成できます。

スキーマ ミューテーションを処理する

失敗した要素を別の PCollection オブジェクトに書き込むデッドレター パターンを使用すると、予期しない(しかし有効な)スキーマを持つデータを処理できます。ただし、変換されたスキーマを有効な要素として反映する要素を自動的に処理したい場合もあります。たとえば、要素のスキーマに新しいフィールドの追加などのミューテーションが反映される場合、変更に合わせてデータシンクのスキーマを適応できます。

自動スキーマ ミューテーションは、デッドレター パターンによって使用される分岐出力アプローチに依存します。しかし、この場合、追加のスキーマが発生するたびに宛先スキーマを変更する変換がトリガーされます。このアプローチの例については、Google Cloud ブログの Square Enix とストリーミング パイプラインで JSON スキーマの変更を処理する方法をご覧ください。

結合に適切な副入力または CoGroupByKey を選択する

データセットの結合は、データ パイプラインの一般的なユースケースです。副入力を使用すると、データ拡充やキーによるルックアップなど、一般的なデータ処理に関する問題を柔軟に解決できます。PCollection オブジェクトと異なり、副入力は変更可能で、ランタイムで決定できます。たとえば、副入力の値はパイプラインの別のブランチによって計算することも、リモート サービスを呼び出して決定することもできます。

Dataflow では、データを永続ストレージ(共有ディスクと同様)に保持することで副入力をサポートしています。これにより、すべてのワーカーが完全な副入力を利用できるようになります。副入力のサイズは非常に大きく、ワーカーのメモリに収まらない場合があります。ワーカーで永続ストレージから常に読み取りを行う必要がある場合、サイズの大きい副入力からの読み取りでパフォーマンスの問題が発生する可能性があります。

CoGroupByKey 変換は、共通鍵を持つ複数の PCollection オブジェクトとグループの要素を統合(フラット化)するコア Apache Beam 変換です。副入力データ全体が各ワーカーで使用可能になる副入力とは異なり、CoGroupByKey はシャッフル(グループ化)オペレーションを実行してワーカー間でデータを分散します。そのため、結合する PCollection オブジェクトが非常に大きくワーカーメモリに収まらない場合は、CoGroupByKey が最適です。

以下のガイドラインに従って、副入力を使用するか CoGroupByKey を使用するかを判断してください。

  • 結合する PCollection オブジェクトの一方がもう一方よりも非常に小さく、小さい PCollection オブジェクトがワーカーメモリに収まる場合は、副入力を使用します。副入力全体をメモリ キャッシュに格納することで、要素をすばやく効率的に取得できます。
  • ワーカーメモリを大幅に超える PCollection オブジェクトの大部分を取得する必要がある場合は、CoGroupByKey を使用します。
  • パイプライン内で結合を複数回実行する必要がある PCollection オブジェクトがある場合は、副入力を使用します。複数の CoGroupByKey 変換を使用する代わりに、複数の ParDo 変換で再利用可能な副入力を作成できます。

詳細については、Dataflow のメモリ不足エラーのトラブルシューティングをご覧ください。

要素あたりのオペレーション費用を最小限に抑える

DoFn インスタンスは、ゼロ個以上の要素で構成される極小の作業単位であるバンドルという要素のバッチを処理します。その後、個々の要素はすべての要素に対して実行される DoFn.ProcessElement メソッドで処理されます。DoFn.ProcessElement メソッドは要素ごとに呼び出されます。このため、このメソッドによって呼び出される長時間オペレーションや計算コストの高いオペレーションの場合、メソッドが処理する要素ごとに、これらのオペレーションが実行されます。

要素のバッチに対して計算コストのオペレーションを 1 回だけ実行する必要がある場合は、これらのオペレーションを DoFn.ProcessElement ではなく、DoFn.Setup メソッドと DoFn.StartBundle メソッドに含めます。以下に例を示します。

  • DoFn インスタンスの動作の一部を制御する構成ファイルの解析。DoFn.Setup メソッドを使用して DoFn インスタンスが初期化されたときに、このアクションを 1 回だけ呼び出します。

  • バンドル内のすべての要素で再利用される有効期間の短いクライアントをインスタンス化します。たとえば、バンドル内のすべての要素が 1 つのネットワーク接続で送信されるような場合です。DoFn.StartBundle メソッドを使用して、このアクションをバンドルごとに 1 回呼び出します。

バッチサイズと外部サービスの同時呼び出しを制限する

外部サービスを呼び出す場合、GroupIntoBatches 変換を使用して、指定されたサイズの要素のバッチを作成することで、呼び出しごとのオーバーヘッドを削減できます。バッチ処理では、要素が個別にではなく、1 つのペイロードとして外部サービスに送信されます。

バッチ処理と組み合わせて使用することで、受信データのパーティションに分割する適切なキーを選択して、外部サービスへの並列(同時)呼び出しの最大数を制限できます。パーティションの数によって並列化の最大数が決まります。たとえば、すべての要素に同じキーを設定すると、外部サービスを呼び出すためのダウンストリーム変換は同時に実行されません。

要素に対してキーを生成する場合は、次のいずれかの方法を検討してください。

  • データキーとして使用するデータセットの属性(ユーザー ID など)を選択します。
  • データキーを生成して、固定数のパーティションで要素をランダムに分割します。パーティションの数は、使用可能なキー値の数によって決まります。並列処理のために十分なパーティションを作成する必要があります。また、各パーティションには、GroupIntoBatches が機能するために十分な数の要素も必要です。

次の Java コードの例は、10 個のパーティションに要素をランダムに分割する方法を示しています。

// PII or classified data which needs redaction.
PCollection<String> sensitiveData = ...;

int numPartitions = 10; // Number of parallel batches to create.
PCollection<KV<Long, Iterable<String>>> batchedData =
    sensitiveData
        .apply("Assign data into partitions",
            ParDo.of(new DoFn<String, KV<Long, String>>() {
              Random random = new Random();

              @ProcessElement
              public void assignRandomPartition(ProcessContext context) {
                context.output(
                  KV.of(randomPartitionNumber(), context.element()));
              }
              private static int randomPartitionNumber() {
                return random.nextInt(numPartitions);
              }
            }))
        .apply("Create batches of sensitive data",
            GroupIntoBatches.<Long, String>ofSize(100L));

// Use batched sensitive data to fully utilize Redaction API
// (which has a rate limit but allows large payloads)
batchedData
    .apply("Call Redaction API in batches", callRedactionApiOnBatch());

不適切に融合された手順によるパフォーマンスの問題を特定する

Dataflow は、変換およびそのパイプラインの作成に使用されたデータに基づいて、パイプラインを表すステップのグラフを作成します。これはパイプライン実行グラフと呼ばれます。

パイプラインをデプロイするときに、Dataflow でパイプライン実行グラフを変更することでパフォーマンスを改善できます。たとえば、Dataflow で融合最適化というプロセスを使用して一部のオペレーションを融合することで、パイプライン内のすべての中間 PCollection オブジェクトの書き込みに伴うパフォーマンスとコストへの影響を回避できます。

場合によっては、Dataflow がパイプライン内のオペレーションを融合する最適な方法の選択を誤り、Dataflow サービスが使用可能ワーカーを十分に利用できなくなる可能性があります。このような場合は、一部のオペレーションの融合を防ぐ必要があります。

次の Apache Beam コードの例について考えてみましょう。GenerateSequence 変換では、小規模な制限付き PCollection オブジェクトが作成され、2 つのダウンストリーム ParDo 変換によって処理されます。

import com.google.common.math.LongMath;
...

public class FusedStepsPipeline {

  final class FindLowerPrimesFn extends DoFn<Long, String> {
    @ProcessElement
    public void processElement(ProcessContext c) {
      Long n = c.element();
      if (n > 1) {
        for (long i = 2; i < n; i++) {
          if (LongMath.isPrime(i)) {
            c.output(Long.toString(i));
          }
        }
      }
    }
  }

  public static void main(String[] args) {
    Pipeline p = Pipeline.create(options);

    PCollection<Long> sequence = p.apply("Generate Sequence",
        GenerateSequence
            .from(0)
            .to(1000000));

    // Pipeline branch 1
    sequence.apply("Find Primes Less-than-N",
        ParDo.of(new FindLowerPrimesFn()));

    // Pipeline branch 2
    sequence.apply("Increment Number",
        MapElements.via(new SimpleFunction<Long, Long>() {
          public Long apply(Long n) {
            return ++n;
          }
        }));

    p.run().waitUntilFinish();
  }
}

Find Primes Less-than-N 変換は計算コストが高く、値の数が多くなると実行に時間がかかる可能性があります。対照的に、Increment Number 変換は迅速に完了します。

次の図は、Dataflow モニタリング インターフェースのパイプラインのグラフ表現を示しています。

Dataflow インターフェースでのパイプライン フローの表示

Dataflow モニタリング インターフェースを使用してジョブをモニタリングすると、両方の変換で同じ処理レート(1 秒あたり 13 要素)が表示されます。Increment Number 変換で要素が迅速に処理されることを期待したものの、そうではなく、Find Primes Less-than-N と同じ処理レートに拘束されているように見えます。

この理由は、Dataflow がステップを 1 つのステージに融合していて、個別に実行できないためです。次の gcloud コマンドを使用できます。

gcloud dataflow jobs describe --full job-id --format json

結果の出力で、融合されたステップの説明は ComponentTransform 配列内の ExecutionStageSummary オブジェクトにあります。

...

    "executionPipelineStage": [
      {
        "componentSource": [
          ...
        ],
        "componentTransform": [
          {
            "name": "s1",
            "originalTransform": "Generate Sequence/Read(BoundedCountingSource)",
            "userName": "Generate Sequence/Read(BoundedCountingSource)"
          },
          {
            "name": "s2",
            "originalTransform": "Find Primes Less-than-N",
            "userName": "Find Primes Less-than-N"
          },
          {
            "name": "s3",
            "originalTransform": "Increment Number/Map",
            "userName": "Increment Number/Map"
          }
        ],
        "id": "S01",
        "kind": "PAR_DO_KIND",
        "name": "F0"
      }

...

このシナリオでは、Find Primes Less-than-N 変換は遅いステップであるため、このステップの前に融合を解除するのが適切といえます。ステップの融合を解除する方法としては、次の Java コードの例のように、ステップの前に GroupByKey 変換を挿入してグループ化を解除する方法があります。

sequence
    .apply("Map Elements", MapElements.via(new SimpleFunction<Long, KV<Long, Void>>() {
      public KV<Long, Void> apply(Long n) {
        return KV.of(n, null);
      }
    }))
    .apply("Group By Key", GroupByKey.<Long, Void>create())
    .apply("Emit Keys", Keys.<Long>create())
    .apply("Find Primes Less-than-N", ParDo.of(new FindLowerPrimesFn()));

これらの融合を解除するステップを結合し、再利用可能な複合変換にすることもできます。

ステップの融合の解除後にパイプラインを実行すると、すぐに Increment Number が完了します。より長時間実行される Find Primes Less-than-N 変換は別のステージで実行されます。

この例では、グループ化解除操作を適用して、ステップの融合を解除します。ほかの状況では別のアプローチも可能です。この場合、GenerateSequence 変換の連続出力による重複出力の処理は問題になりません。グループ化(GroupByKey)変換とグループ解除(Keys)変換では、重複したキーを持つ KV オブジェクトの重複が除去され、単一キーになります。グループ化オペレーションとグループ化解除オペレーションの後に重複を保持するには、ランダムキーと元の入力を値として KV ペアを作成し、ランダムキーを使用してグループ化を行い、出力として各キーの値を発行します。

Apache Beam 指標を使用してパイプライン分析情報を収集する

Apache Beam 指標は、実行中のパイプラインのプロパティをレポートする、さまざまな指標を生成するためのユーティリティ クラスです。Cloud Monitoring を使用する場合、Apache Beam の指標は Cloud Monitoring のカスタム指標として利用できます

次の Java スニペットは、DoFn サブクラスで使用される Counter 指標の例です。

final TupleTag<String> errorTag = new TupleTag<String>(){};
final TupleTag<MockObject> successTag = new TupleTag<MockObject>(){};

final class ParseEventFn extends DoFn<String, MyObject> {

  private final Counter malformedCounter = Metrics.counter(ParseEventFn.class, "malformedJson");
  private final Counter emptyCounter = Metrics.counter(ParseEventFn.class, "emptyPayload");
  private Gson gsonParser;

  @Setup
  public setup() {
    gsonParser = new Gson();
  }

  @ProcessElement
  public void processElement(ProcessContext c) {
    try {
      MyObject myObj = gsonParser.fromJson(c.element(), MyObject.class);
      if (myObj.getPayload() != null) {
        //  Output the element if non-empty payload
        c.output(successTag, myObj);
      }
      else {
        // Increment empty payload counter
        emptyCounter.inc();
      }
    }
    catch (JsonParseException e) {
      // Increment malformed JSON counter
      malformedCounter.inc();
      // Output the element to dead-letter queue
      c.output(errorTag, c.element());
    }
  }
}

このサンプルコードでは 2 つのカウンタを使用しています。一方のカウンタでは JSON 解析エラー(malformedCounter)が追跡され、もう一方のカウンタでは JSON メッセージが有効のときに空のペイロード(emptyCounter)が含まれているかどうかが追跡されます。Cloud Monitoring では、カスタム指標の名前は custom.googleapis.com/dataflow/malformedJsoncustom.googleapis.com/dataflow/emptyPayload になります。カスタム指標を使用すると、Cloud Monitoring で可視化とアラート ポリシーを作成できます。

パイプラインをテストする

ソフトウェア開発、単体テスト、統合テスト、エンドツーエンド テストは、ソフトウェア テストの一般的な種類です。これらのテストの種類は、データ パイプラインにも適用できます。

Apache Beam SDK には、これらのテストを有効にする機能があります。テストのタイプごとに異なるデプロイ環境が理想的です。次の図は、単体テスト、統合テスト、エンドツーエンド テストがパイプラインとデータのさまざまな部分でどのように適用されるかを示しています。

テストの種類と、変換、パイプライン、データソース、データシンクとの関係

この図は、さまざまなテストの範囲と、それらの変換(DoFnPTransform サブクラス)、パイプライン、データソース、データシンクとの関係を示しています。

次のセクションで、Dataflow を使用して一般的なソフトウェア テストをデータ パイプラインに適用する方法を説明します。このセクションを読み終えたら、この図に戻り、さまざまな種類のテストがどのように関係しているかを確認してください。

データ サンプリング

Dataflow パイプラインの各ステップでデータをモニタリングするには、テスト中のデータ サンプリングを有効にします。これにより、変換の出力を表示して、正しい出力であることを確認できます。

単体テスト

単体テストでは、検証済みのデータ入力と出力のセットを比較して、DoFn サブクラスと複合変換PTransform サブクラス)が正しく機能しているかどうか評価します。通常、デベロッパーはこれらのテストをローカル環境で実行できます。ビルド環境で継続的インテグレーション(CI)の単体テストの自動化を利用して、テストを自動的に実行することもできます。

Direct Runner は、変換のビジネス ロジックのテストに焦点を当てた、参照用のテストデータのサブセットを使用して単体テストを実行します。テストデータは、テストを実行するマシンのローカルメモリに収まるサイズでなければなりません。

Apache Beam SDK には、個々の変換(DoFn サブクラス)、複合変換(PTransform サブクラス)、パイプライン全体に対して単体テストを事項するための TestPipeline という JUnit ルールが用意されています。JUnit テストクラスの次のコード スニペットに示されているように、Direct Runner や Dataflow Runner などの Apache Beam パイプライン ランナー上で TestPipeline を使用して、PAssert を使用して PCollection オブジェクトのコンテンツにアサーションを適用できます。

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void myPipelineTest() throws Exception {
  final PCollection<String> pcol = p.apply(...)
  PAssert.that(pcol).containsInAnyOrder(...);
  p.run();
}

個別の変換の単体テスト

コードを再利用可能な変換(たとえば、トップレベルまたは静的なネストされたクラスとして)に分解することで、パイプラインのさまざまな部分を対象とするテストを作成できます。再利用可能な変換にはテスト以外のメリットもあります。パイプラインのビジネス ロジックをコンポーネント部分にカプセル化することで、コードの保守性と再利用性を高めることができます。対照的に、パイプラインで匿名の個々の内部クラスを使用して変換を実装している場合は、パイプラインの個々の部分のテストが難しくなることがあります。

次の Java スニペットでは匿名の内部クラスとして変換が実装されているため、テストを簡単に行うことができません。

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

PCollection<Integer> output =
    p.apply("Read from text", TextIO.Read.from(...))
        .apply("Split words", ParDo.of(new DoFn() {
          // Untestable anonymous transform 1
        }))
        .apply("Generate anagrams", ParDo.of(new DoFn() {
          // Untestable anonymous transform 2
        }))
        .apply("Count words", Count.perElement());

次の例では、匿名の内部クラスが名前付きの具体的な DoFn サブクラスにリファクタリングされています。前の例と比べてみてください。エンドツーエンドのパイプラインを構成する具体的な DoFn サブクラスごとに、個別の単体テストを作成できます。

PipelineOptions options = PipelineOptionsFactory.create();

Pipeline p = Pipeline.create(options)

PCollection<Integer> output =
    p.apply("Read from text", TextIO.Read.from(...))
        .apply("Split words", ParDo.of(new SplitIntoWordsFn()))
        .apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()))
        .apply("Count words", Count.perElement());

DoFn サブクラスのテストは、単一変換を含むバッチ パイプラインの単体テストに似ています。Create 変換を使用してテストデータの PCollection オブジェクトを作成し、DoFn オブジェクトに渡します。PAssert を使用して、PCollection オブジェクトのコンテンツが正しいことを確認します。次の Java コードの例では、正しい出力形式を確認するために PAssert クラスを使用しています。

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testGenerateAnagramsFn() {
    // Create the test input
    PCollection<String> words = p.apply(Create.of("friend"));

    // Test a single DoFn using the test input
    PCollection<String> anagrams =
        words.apply("Generate anagrams", ParDo.of(new GenerateAnagramsFn()));

    // Assert correct output from
    PAssert.that(anagrams).containsInAnyOrder(
        "finder", "friend", "redfin", "refind");

    p.run();
}

統合テスト

統合テストでは、パイプライン全体が正しく機能していることを確認します。次のような統合テストについて考えてみましょう。

  • データ パイプラインを構成する個々の変換が統合された機能を評価する変換統合テスト。変換統合テストは、外部のデータソースとデータシンクとの統合を除けば、パイプライン全体の単体テストと考えることができます。Apache Beam SDK には、データ パイプラインにテストデータを提供し、処理の結果を検証するメソッドが用意されています。変換統合テストを実行するために Direct Runner が使用されます。
  • データ パイプラインのライブ データソースとデータシンクとの統合を評価するシステム統合テスト。パイプラインが外部システムと通信するには、外部サービスにアクセスするための適切な認証情報を使用してテストを構成する必要があります。ストリーミング パイプラインは無期限に実行されるため、実行中のパイプラインをいつ、どのように停止するかを決めておく必要があります。Direct Runner を使用してシステム統合テストを実行することで、Dataflow ジョブを送信して完了を待つ必要がなくなり、パイプラインと他のシステムの統合をすばやく検証できます。

デベロッパーの生産性を低下させずに欠陥を迅速に検出してフィードバックを提供するように、変換とシステム統合テストを設計する必要があります。Dataflow ジョブとして実行されるテストなど、実行時間の長いテストの場合は、実行頻度の低いエンドツーエンド テストが適している可能性があります。

データ パイプラインは 1 つ以上の関連する変換と考えることができます。パイプラインのカプセル化複合変換を作成し、TestPipeline を使用してパイプライン全体の統合テストを実行できます。パイプラインのテストモード(バッチモードまたはストリーミング モード)によって、Create 変換または TestStream 変換のどちらかを使用してテストデータを提供します。

統合テストにテストデータを使用する

本番環境では、パイプラインがさまざまなデータソースやデータシンクと統合されている可能性があります。しかし、単体テストと変換統合テストでは、テスト入力を提供して出力を直接検証することで、パイプライン コードのビジネス ロジックの検証に焦点を合わせる必要があります。このアプローチにより、テストの簡素化だけでなく、パイプライン固有の問題とデータソースやデータシンクに起因する可能性のある問題を切り分けることができます。

バッチ パイプラインをテストする

バッチ パイプラインでは、Create 変換を使用して、標準のインメモリ コレクション(Java List オブジェクトなど)から入力テストデータの PCollection オブジェクトを作成します。テストデータが小さく、コードに含めることができる場合は、Create 変換を使用します。その後、出力の PCollection オブジェクトに PAssert を使用して、パイプライン コードが正しいかどうかを確認できます。この方法は、Direct Runner と Dataflow Runner でサポートされています。

次の Java コード スニペットは、出力の PCollection オブジェクトに対するパイプライン(WeatherStatsPipeline)を構成する個々の変換の一部、または全部を含む複合変換のアサーションを示しています。このアプローチは、パイプラインの個々の変換の単体テストに似ています。

private class WeatherStatsPipeline extends
    PTransform<PCollection<Integer>, PCollection<WeatherSummary>> {
  @Override
  public PCollection<WeatherSummary> expand(PCollection<Integer> input) {
    // Pipeline transforms …
  }
}

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testWeatherPipeline() {
  // Create test input consisting of temperature readings
  PCollection<Integer> tempCelsius =
      p.apply(Create.of(24, 22, 20, 22, 21, 21, 20));

  // CalculateWeatherStats calculates the min, max, and average temperature
  PCollection<WeatherSummary> result =
      tempCelsius.apply("Calculate weather statistics", new WeatherStatsPipeline());

   // Assert correct output from CalculateWeatherStats
   PAssert.thatSingleton(result).isEqualTo(new WeatherSummary.Builder()
       .withAverageTemp(21)
       .withMaxTemp(24)
       .withMinTemp(20)
       .build());

   p.run();
}

ウィンドウ処理の動作をテストするには、次のコード スニペットのように、Create 変換を使用してタイムスタンプ付きの要素を作成することもできます。

private static final Duration WINDOW_DURATION = Duration.standardMinutes(3);

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testWindowedData() {
    PCollection<String> input =
        p.apply(
            Create.timestamped(
                    TimestampedValue.of("a", new Instant(0L)),
                    TimestampedValue.of("a", new Instant(0L)),
                    TimestampedValue.of("b", new Instant(0L)),
                    TimestampedValue.of("c", new Instant(0L)),
                    TimestampedValue.of("c", new Instant(0L).plus(WINDOW_DURATION)))
                .withCoder(StringUtf8Coder.of()));

   PCollection<KV<String, Long>> windowedCount =
       input
           .apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
           .apply(Count.perElement());

    PAssert.that(windowedCount)
        .containsInAnyOrder(
            // Output from first window
            KV.of("a", 2L),
            KV.of("b", 1L),
            KV.of("c", 1L),
            // Output from second window
            KV.of("c", 1L));

   p.run();
}

ストリーミング パイプラインをテストする

ストリーミング パイプラインには、制限なしのデータの処理方法を定義する前提条件が存在します。こうした前提条件は多くの場合、実際の状況でのデータに適時性に関するものであるため、前提条件が true か false かに応じて正確性に影響を及ぼします。したがって、ストリーミング パイプラインの統合テストでは、ストリーミング データの到着に関する非決定的な特性をシミュレートするテストを含めるのが理想的です。

このようなテストを可能にするため、Apache Beam SDK には、データ パイプラインの結果で要素のタイミング(早い、時間どおり、遅い)の効果をモデル化する TestStream クラスが用意されています。これらのテストを PAssert クラスとともに使用して、想定される結果と比較します。

TestStream は、Direct Runner と Dataflow Runner でサポートされています。次のコードサンプルでは、TestStream 変換を作成します。

final Duration WINDOW_DURATION = Duration.standardMinutes(3);

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testDroppedLateData() {
   TestStream<String> input = TestStream.create(StringUtf8Coder.of())
      // Add elements arriving before the watermark
      .addElements(
         TimestampedValue.of("a", new Instant(0L)),
         TimestampedValue.of("a", new Instant(0L)),
         TimestampedValue.of("b", new Instant(0L)),
         TimestampedValue.of("c", new Instant(0L).plus(Duration.standardMinutes(3))))
         // Advance the watermark past the end of the window
      .advanceWatermarkTo(new Instant(0L).plus(WINDOW_DURATION).plus(Duration.standardMinutes(1)))
      // Add elements which will be dropped due to lateness
      .addElements(
         TimestampedValue.of("c", new Instant(0L)))
      // Advance the watermark to infinity which will close all windows
      .advanceWatermarkToInfinity();

      PCollection<KV<String, Long>> windowedCount =
          p.apply(input)
             .apply(Window.into(FixedWindows.of(WINDOW_DURATION)))
             .apply(Count.perElement());

   PAssert.that(windowedCount)
      .containsInAnyOrder(
          // Output from first window
          KV.of("a", 2L),
          KV.of("b", 1L),
          KV.of("c", 1L));

   p.run();
}

TestStream の詳細については、Apache Beam での制限なしのパイプラインのテストをご覧ください。単体テストで Apache Beam SDK を使用する方法については、Apache Beam のドキュメントをご覧ください。

統合テストで Google Cloud サービスを使用する

Direct Runner は Google Cloud サービスと統合できるため、必要に応じてローカル環境のアドホック テストやシステム統合テストで Pub/Sub、BigQuery などのサービスを使用できます。Direct Runner を使用する場合、パイプラインは、gcloud コマンドライン ツールで構成されたユーザー アカウント、または GOOGLE_APPLICATION_CREDENTIALS 環境変数に指定したサービス アカウントとして実行されます。したがって、パイプラインを実行する前に、必要なすべてのリソースに対する十分な権限をこのアカウントに付与する必要があります。詳細については、Dataflow のセキュリティと権限をご覧ください。

完全にローカルな統合テストでは、一部の Google Cloud サービスでローカル エミュレータを使用できます。ローカル エミュレータは Pub/SubBigtable で使用できます。

ストリーミング パイプラインのシステム統合テストでは、setBlockOnRun メソッド(DirectOptions インターフェースに定義)を使用して、Direct Runner でパイプラインを非同期で実行できます。それ以外の場合は、パイプラインが手動で終了されるまで、パイプラインの実行で呼び出し元の親プロセス(ビルド パイプラインのスクリプトなど)がブロックされます。パイプラインを非同期で実行する場合は、次のコード例のように、返された PipelineResult インスタンスを使用してパイプラインの実行をキャンセルできます。

public interface StreamingIntegrationTestOptions extends
   DirectOptions, StreamingOptions, MyOtherPipelineOptions {
   ...
}

@Rule
public final transient TestPipeline p = TestPipeline.create();

@Test
@Category(NeedsRunner.class)
public void testNonBlockingPipeline() {
    StreamingIntegrationTestOptions options =
        p.getOptions().as(StreamingIntegrationOptions.class);

    options.setBlockOnRun(false); // Set non-blocking pipeline execution
    options.setStreaming(true); // Set streaming mode

    p.apply(...); // Apply pipeline transformations

    PipelineResult result = p.run(); // Run the pipeline

    // Generate input, verify output, etc
    ...

    // Later on, cancel the pipeline using the previously returned
    result.cancel();
}

エンドツーエンド テスト

エンドツーエンド テストでは、本番環境とよく似ている条件において、Dataflow Runner でエンドツーエンド パイプラインを実行して、オペレーションが正しく行われたことを検証します。このテストでは、Dataflow Runner を使用したビジネス ロジックが正しく機能することや、本番環境相当の負荷でパイプラインが期待どおりに動作するかどうかを確認します。通常、エンドツーエンド テストは、本番前環境として指定された専用の Google Cloud プロジェクトで実行します。

さまざまなスケールでパイプラインをテストするには、さまざまなタイプのエンドツーエンド テストを使用します。次に例を示します。

  • テスト データセットの一部分(全体の 1% 程度)を使用して、小規模なエンドツーエンド テストを実行し、本番前環境でパイプライン機能をすばやく検証します。
  • 完全なテスト データセットを使用して、大規模なエンドツーエンド テストを実行し、本番環境相当のデータ量と条件でパイプラインの機能を検証します。

ストリーミング パイプラインで同じデータを使用できる場合は、本番環境パイプラインと並行してテスト パイプラインを実行することをおすすめします。これにより、自動スケーリングやパフォーマンスなどの結果とオペレーションの動作を比較できます。

エンドツーエンド テストは、パイプラインが本番環境の SLO をどの程度満たすかを予測するのに役立ちます。本番前環境は、本番環境と同様の条件でパイプラインをテストします。エンドツーエンド テストでは、Dataflow Runner を使用してパイプラインを実行し、本番環境のデータセットと一致する(またはよく似た)完全な参照データセットを処理します。

実際のデータを正確にシミュレートするテスト用の合成データを生成できない場合があります。この問題に対処するためのアプローチの一つは、本番環境のデータソースからの抽出をクレンジングして参照データセットを作成することです。ここでは、機密データが適切な変換によって匿名化されます。この目的のため、機密データ保護を使用することをおすすめします。機密データ保護は、さまざまなコンテンツ タイプとデータソースから機密データを検出し、秘匿化、マスキング、フォーマット保持暗号化、日付シフトなど、さまざまな匿名化手法を適用できます。

バッチ パイプラインとストリーミング パイプラインのエンドツーエンド テストの違い

大規模なテスト データセットに完全なエンドツーエンド テストを実行する前に、テストデータの一部(全体の 1% 程度)を使用してテストを行い、短時間で期待される動作を確認することをおすすめします。Direct Runner を使用する統合テストと同様に、Dataflow Runner を使用してパイプラインを実行するときに PCollection オブジェクトに PAssert を使用できます。PAssert の詳細については、このページの単体テストのセクションをご覧ください。

ユースケースによっては、エンドツーエンド テストで非常に大規模な出力を検証するのは、実用的でなかったり、費用がかかる場合があります。その場合は、代わりに出力結果セットからの代表サンプルで検証できます。たとえば、BigQuery を使用して出力行をサンプリングし、期待される結果の参照データセットと比較できます。

ストリーミング パイプラインでは、合成データでの現実的なストリーミング条件のシミュレーションが困難な場合があります。エンドツーエンド テストにストリーミング データを提供する一般的な方法は、本番環境のデータソースとテストを統合することです。Pub/Sub をデータソースとして使用する場合、既存のトピックへの追加のサブスクリプションにより、エンドツーエンド テスト用に別のデータ ストリームを有効にできます。次に、同じデータを使用する異なるパイプラインの結果を比較できます。これは、他の本番前環境および本番環境のパイプラインに対してパイプラインの候補を検証する際に役立ちます。

次の図は、この方法によって本番環境パイプラインとテスト パイプラインを異なるデプロイ環境で並行して実行する方法を示しています。

単一の Pub/Sub ストリーミング ソースを使用した本番環境パイプラインと並行したテスト パイプラインの実行

この図では、両方のパイプラインが同じ Pub/Sub トピックから読み取りますが、個別のサブスクリプションを使用します。この設定により、2 つのパイプラインが同じデータを独立して処理するため、結果を比較できます。テスト パイプラインは本番環境プロジェクトとは別のサービス アカウントを使用するため、本番環境プロジェクトの Pub/Sub サブスクライバーの割り当ては使用されません。

バッチ パイプラインとは異なり、ストリーミング パイプラインは明示的にキャンセルされるまで継続されます。エンドツーエンド テストでは、パイプラインをある時期まで(たとえばエンドツーエンド テストの実行まで)そのまま実行するか、結果を確認できるようテストの完了を表すポイントでパイプラインをキャンセルするかを決定する必要があります。

使用するテストデータの種類は、この決定に影響します。たとえば、ストリーミング パイプラインに渡される制限付きのテストデータのセットを使用する場合、すべての要素の処理が完了したらパイプラインをキャンセルできます。あるいは、現実のデータソース(本番環境で使用されている既存の Pub/Sub トピックなど)を使用する場合や、継続的にテストデータを別に生成している場合は、より長い期間テスト パイプラインの実行を継続することをおすすめします。後者の場合、本番環境での挙動や他のテスト パイプラインとの動作を比較できます。