Dataflow パイプラインを計画する

このページでは、コードの開発を始める前のデータ パイプラインの計画で考慮すべき重要なポイントについて説明します。データ パイプラインはシステム間でデータを移動するため、多くの場合、ビジネス情報システムの重要なコンポーネントとなります。データ パイプラインのパフォーマンスと信頼性は、このような広範なシステムに影響を及ぼします。また、ビジネス要件をどの程度満たせるかにも影響します。

データ パイプラインの計画を立ててから開発を始めることで、パフォーマンスと信頼性を向上させることができます。このページでは、Dataflow パイプラインに関する以下のような計画上の考慮事項について説明します。

  • 想定するパイプライン パフォーマンス(測定可能な基準値を含む)
  • パイプラインとデータソース、シンク、その他の接続されたシステムとの統合
  • パイプライン、ソース、シンクの地域化
  • データ暗号化、プライベート ネットワークなどのセキュリティ

SLO の定義と測定

パフォーマンスの重要な測定値は、パイプラインがビジネス要件をどの程度満たしているかを示します。サービスレベル目標(SLO)により、パフォーマンスと許容しきい値を比較できるため、定義が明確になります。たとえば、システムに次のような SLO を定義します。

  • データの鮮度: 3 分以内に発生したウェブサイトのユーザー アクティビティから、商品のレコメンデーションの 90% を生成する。

  • データの正確性: 暦月 1 か月の間にエラーが含まれる顧客の請求書は、0.5% 未満である。

  • データの分離 / ロード バランシング: 優先度の高い支払いは営業日中に 10 分以内に処理し、通常の支払いは翌営業日までに処理する。

SLO の遵守はサービスレベル指標(SLI)で測定できます。サービス指標は、システムが所定の SLO をどの程度満たしているかを定量化できる指標です。たとえば、サンプルデータの鮮度を示す SLO は、最近処理されたユーザー アクティビティの経過時間を SLI として使用することで測定できます。パイプラインがユーザー アクティビティ イベントからレコメンデーションを生成し、SLI がイベント時刻とイベント処理時刻の間に 4 分間の遅延があると報告した場合、レコメンデーションはユーザーのウェブサイトでの 4 分間前までのアクティビティを考慮しません。ストリーミング データを処理するパイプラインのシステム レイテンシが 4 分を超えた場合、SLO が満たされていないことがわかります。

パイプラインを超えるシステム コンポーネントは SLO に影響するため、パイプライン自体のパフォーマンスだけでなく、システムのエンドツーエンドの健全性を示す指標など、システム全体のパフォーマンスを表すさまざまな SLI をキャプチャすることが重要です。たとえば、Dataflow パイプラインでは許容可能な遅延を考慮して結果を計算できますが、パフォーマンスの問題は、より大規模な SLO に影響を与えるダウンストリーム システムで発生する可能性があります。

検討すべき重要な SLO の詳細については、サイト信頼性エンジニアリングの書籍をご覧ください。

データの鮮度

データの鮮度とは、経過時間に関するデータのユーザビリティのことです。特に一般的なパイプライン データ鮮度の SLO 形式としてサイト信頼性エンジニアリングの書籍に記載されているものには、以下のデータの鮮度に関する SLO があります。

  • データの X% が Y(秒、日、分)に処理される。この SLO は、所定の期間内に処理されたデータの割合を表します。制限付きデータソースを処理するバッチ パイプラインに一般的に使用されます。このタイプの SLO 指標は、重要な処理ステップでの、経過したパイプライン ランタイムに対する相対的な入力データと出力データサイズです。入力データセットを読み取るステップと入力の各アイテムを処理する別のステップを選択できます。SLO の例として、「ヤクの毛刈りゲームでは、プレーヤーのスコアに影響するユーザー アクティビティの 99% が、ゲーム終了から 30 分以内を占める」というものがあります。

  • 最も古いデータの経過時間が X(秒、日、分)未満である。この SLO は、パイプラインによって生成されるデータの経過時間です。無制限ソースからデータを処理するストリーミング パイプラインに使用されます。このタイプの SLO では、パイプラインがデータを処理するまでの時間を示す指標を使用します。考えられる指標は 2 つあります。一つは、最も古い未処理のアイテムの経過時間(キューに未処理のアイテムが格納されている期間)、もう一つは直近に処理されたアイテムの経過時間です。SLO の例として、「商品のレコメンデーションは 5 分以内のユーザー アクティビティから生成される」などがあります。

  • パイプライン ジョブが X(秒、日、分)に正常に完了した。この SLO は、正常終了の期限を設定しています。制限付きデータソースからのデータを処理するバッチ パイプラインで使用されます。この SLO では、ジョブの成功を示す他のシグナル(処理した要素のうち、エラーを引き起こした要素の割合など)に加えて、パイプライン経過時間の合計とジョブ完了ステータスが必要です。SLO の例として、「現営業日の注文は、翌営業日の午前 9 時までに処理する」というものがあります。

Cloud Monitoring を使用してデータの更新頻度を測定する方法については、Dataflow ジョブの指標をご覧ください。

データの正確性

データの正確性とは、エラーのないデータのことです。データの正確性を判定するには、いくつか方法があります。

パイプラインを実行するには、通常、データの正確性の目標を定義するために、一定期間の正確性を測定する必要があります。次に例を示します。

  • ジョブ 1 つあたり、データエラーが含まれる入力項目が X% 未満である。この SLO を使用すると、バッチ パイプラインのデータの正確性を測定できます。SLO の例として、「電気メーターの測定値を処理する日次バッチジョブで、データ入力エラーが含まれるのは測定値の 3% 未満」などがあります。
  • X 分間の移動ウィンドウで、データエラーが含まれる入力項目が Y% 未満である。この SLO を使用すると、ストリーミング パイプラインのデータ正確性を測定できます。SLO の例として、「過去 1 時間における電気メーターの測定値で、負の値が含まれるのは 2% 未満」などがあります。

SLO を測定するには、適切な期間にわたって指標を使用して、タイプ別にエラーの数を蓄積しますたとえば、不正な形式のスキーマが原因でデータが正しくない、データが有効な範囲外のデータである、などのエラーがこれに該当します。

Cloud Monitoring を使用してデータの正確性を測定する方法については、Dataflow ジョブの指標をご覧ください。

データの分離とロード バランシング

データの分離では、データを属性別にセグメント化することで、ロード バランシングを容易にします。たとえば、オンライン支払い処理プラットフォームでは、データをセグメント化して、個々の支払いを優先度の高いものと標準のものに分けます。パイプラインでは、ロード バランシングを使用して、優先度の高い支払いを標準の支払いよりも先に処理できます。

支払い処理用に次の SLO を定義したとします。

  • 優先度の高い支払いは 10 分以内に処理されます。
  • 標準の支払いは、翌営業日の午前 9 時までに処理されます。

支払いプラットフォームがこれらの SLO を遵守している場合、優先度の高い支払いが完了すると、お客様は最終処理された支払いをレポート ダッシュボードで確認できます。一方、標準の支払いはダッシュボードに翌日まで表示されない場合があります。

このシナリオには、次のオプションがあります。

  • 単一のパイプラインを実行して、優先度が高い支払いと標準の支払いの両方を処理します。
  • 複数のパイプライン間で優先度に基づいてデータを分離してロード バランシングします。

以降のセクションで、各オプションについて詳しく説明します。

混合 SLO に対する配信に単一のパイプラインを使用する

次の図では、優先度が高い支払いと標準の支払いの両方に、1 つのパイプラインを使用しています。パイプラインは、Pub/Sub トピックや Apache Kafka トピックなどのストリーミング データソースから新しい支払いの通知を受信します。その後、迅速に支払い処理を行い、ストリーミング挿入を使用してイベントを BigQuery に書き込みます。

すべての処理に 1 つのパイプライン、全体的な SLO 10 分未満。

1 つのパイプラインを使用するメリットは、管理する必要があるのが 1 つのデータソースとパイプラインのみのため、運用要件が簡素化されることです。Dataflow は、自動チューニング機能を使用して、ジョブを可能な限り迅速かつ効率的に実行できるようにします。

1 つのパイプラインを使用するデメリットは、共有パイプラインで標準の支払いよりも優先度の高い支払いを優先できず、パイプライン リソースが両方の支払いタイプで共有されることです。前述のビジネス シナリオでは、パイプラインは 2 つの SLO をより厳密に維持する必要があります。つまり、処理される支払いの実際の優先度に関係なく、パイプラインでは優先度の高い支払いに SLO を使用する必要があります。もう一つのデメリットは、作業のバックログが発生したときに、ストリーミング パイプラインが作業の緊急度に応じてバックログ処理の優先度を設定できないことです。

特定の SLO に合わせて複数のパイプラインを使用する

2 つのパイプラインを使用してリソースを分離し、特定の SLO に配信することもできます。次の図はこの方法を示しています。

2 つのパイプラインを使用。1 つは優先度の高い支払い(SLO 10 分未満)、もう 1 つは優先度の低い支払い(SLO 24 時間未満)。

優先度の高い支払いは、優先度が高い処理のためにストリーミング パイプラインに分離されます。標準の支払いは、毎日実行され、処理されたデータの書き込みに BigQuery の読み込みジョブを使用するバッチ パイプラインで処理されます。

異なるパイプラインでデータを分離することには利点があります。より厳密な SLO に対して優先度の高い支払いを配信するには、優先度の高い支払い専用のパイプラインに割り当てるリソースを増やして、処理時間を短縮できます。リソース構成には、Dataflow ワーカーの追加、サイズの大きいマシンの使用、自動スケーリングの有効化などがあります。優先度の高いアイテムを別の処理キューに分離すると、標準の優先度の支払いが急増した場合に処理の遅延を緩和できます

複数のパイプラインを使用して、バッチソースとストリーミング ソースからのデータを分離してロードバランスする場合、Apache Beam プログラミング モデルでは高い優先度(ストリーミング)のパイプラインと標準の優先度(バッチ)のパイプラインで同じコードを共有できます。唯一の例外は、最初の読み取り変換です。これはバッチ パイプラインの制限付きソースから読み取ります。詳細については、再利用可能な変換のライブラリを作成するをご覧ください。

データソースとデータシンクの計画

データを処理するには、データ パイプラインを他のシステムと統合する必要があります。これらのシステムを、ソースとシンクといいます。データ パイプラインはソースからデータを読み取り、シンクにデータを書き込みます。ソースやシンク以外にも、データ パイプラインは外部システムとやり取りして、データ拡充、フィルタリング、または処理ステップ内で外部ビジネス ロジックを呼び出すことができます。

スケーラビリティ確保のため、Dataflow は複数のワーカーでパイプラインのステージを並行して実行します。パイプライン コードと Dataflow サービスの外部にある要因もパイプラインのスケーラビリティに影響します。たとえば、次のような要因があります。

  • 外部システムのスケーラビリティ: パイプラインとやり取りする外部システムは、パフォーマンス上の制約になり、スケーラビリティを制限する可能性があります。たとえば、Apache Kafka の構成に使用されたパーティションの数がトピックに必要な読み取りスループットに対して不十分である場合、パイプラインのパフォーマンスに影響を与える可能性があります。パイプラインとそのコンポーネントがパフォーマンス目標を達成できるようにするには、使用している外部システムのベスト プラクティスのドキュメントをご覧ください。また、組み込みのスケーラビリティを提供する Google Cloud サービスを使用して、インフラストラクチャのキャパシティ計画をシンプルにすることもできます。詳細については、このページの Google Cloud マネージド ソースとシンクの使用をご覧ください。

  • データ形式の選択: 特定のデータ形式が他のデータ形式よりも高速に読み取られる場合があります。たとえば、Avro などの並列読み込みをサポートするデータ形式を使用すると、通常、フィールドに改行が埋め込まれた CSV ファイルを使用するよりも高速で、圧縮ファイルを使用する場合よりも高速になります。

  • データのロケーションとネットワーク トポロジ: データ パイプラインに対するデータソースとシンクの相対的な地理的近さとネットワーク特性は、パフォーマンスに影響する可能性があります。詳細については、このページのリージョンに関する考慮事項をご覧ください。

外部サービス呼び出し

パイプラインから外部サービスを呼び出すと、呼び出しごとのオーバーヘッドが発生し、パイプラインのパフォーマンスと効率が低下する可能性があります。データ パイプラインが外部サービスを呼び出す場合、オーバーヘッドを軽減するために、可能であれば複数のデータ要素を単一のリクエストにまとめます。BigQueryIO やストリーミング挿入オペレーションなど、多くのネイティブ Apache Beam I/O 変換は、このタスクを自動的に実行します。容量上限のほかに、一部の外部サービスでは、一定期間の合計呼び出し数を制限する割り当て(1 日の割り当てなど)や、呼び出しレートの上限(たとえば、リクエスト数)があります。

Dataflow は複数のワーカー間で作業を並列に処理するため、トラフィック量が非常に多い外部サービスを圧迫する可能性や、使用可能な割り当てを使い切る可能性があります。自動スケーリングが使用されている場合、Dataflow は遅いステップ(外部の呼び出しなど)を実行するためのワーカーを追加することにより補正を試行する場合があります。ワーカーの追加により、外部システムの負荷がさらに増加することがあります。予想される負荷が外部システムでサポートされているか確認するか、パイプラインからのトラフィックを持続可能なレベルに制限する必要があります。詳細については、バッチサイズと外部サービスの同時呼び出しを制限するをご覧ください。

Google Cloud マネージド ソースとシンクを使用する

Dataflow パイプラインで Google Cloud マネージド サービスを使用すると、スケーラビリティ、一貫性のあるパフォーマンス、ほとんどの要件に対応できる割り当てと制限が提供され、キャパシティ管理の複雑さがなくなります。しかし、パイプライン オペレーションのさまざまな割り当てと制限に注意する必要があります。Dataflow 自体には割り当てと上限があります。割り当てと上限の一部は、Google Cloud サポートにお問い合わせいただくと増加できます。

Dataflow は Compute Engine VM インスタンスを使用してジョブを実行するため、十分な Compute Engine 割り当てがあることを確認する必要があります。Compute Engine の割り当てが不十分な場合、パイプラインの自動スケーリングが妨げられることや、ジョブが開始されないことがあります。

このセクションの残りの部分では、さまざまな Google Cloud の割り当てと上限がパイプラインの設計、開発、モニタリングにどのように影響するかについて説明します。Pub/Sub と BigQuery は、パイプラインのソースとシンクの例として使用されます。

例 1: Pub/Sub

Pub/Sub を Dataflow で使用する場合、Pub/Sub は、ストリーミング データ パイプラインとの間でメッセージを配信するためのスケーラブルで耐久性の高いイベント取り込みサービスを提供します。Google Cloud コンソールを使用して Pub/Sub 割り当ての使用量を表示し、割り当て上限を引き上げることができます。プロジェクトごとの割り当てと上限を超える単一のパイプラインがある場合は、割り当ての増加をリクエストすることをおすすめします。

Pub/Sub の割り当てと上限は、プロジェクト レベルの使用量に基づいて設計されています。具体的には、別々のプロジェクトのパブリッシャーとサブスクライバーには、独立したデータ スループットの割り当てが与えられます。複数のパイプラインが 1 つのトピックをパブリッシュまたはサブスクライブする場合、各パイプラインを個別のプロジェクトにデプロイすることで、そのトピックで許容される最大スループットを得ることができます。この構成では、各パイプラインで異なるプロジェクトベースのサービス アカウントを使用してメッセージの利用とパブリッシュを行います。

次の図では、パイプライン 1パイプライン 2 は、プロジェクト A で使用できる、同じサブスクライバーとパブリッシャーのスループットの割り当てを共有しています。これに対し、パイプライン 3 では、プロジェクト B のサブスクライバーとパブリッシャーのスループットの割り当て全体を使用できます。

3 つのパイプライン。パイプライン 1 とパイプライン 2 はパイプライン プロジェクト A にあり、Pub/Sub トピックに固有のサブスクリプションがある。パイプライン 3 は、独自のサブスクリプションを持つパイプライン プロジェクト B にある。

複数のパイプラインがトピックへの個別のサブスクリプションを使用して 1 つの Pub/Sub トピックからの読み取りを行うことができます。これにより、Dataflow パイプラインは他のサブスクライバー(他のパイプラインなど)とは無関係にメッセージを pull して確認できるようになります。この機能により、追加の Pub/Sub サブスクリプションを作成して、パイプラインのクローンを簡単に作成できます。追加のサブスクリプションを作成すると、高可用性(通常はストリーミング ユースケースの場合)のためのレプリカ パイプラインの作成、同じデータに対する追加のテスト パイプラインの実行、パイプライン更新の有効化に役立ちます。

例 2: BigQuery

Apache Beam SDK では、Java、Python、Go などの複数の言語で BigQuery データの読み取りと書き込みがサポートされています。Java を使用する場合は、BigQueryIO クラスにこの機能があります。BigQueryIO では、2 つの方法(EXPORT(テーブルのエクスポート)と DIRECT_READ)でデータを読み取ることができます。異なる読み取りメソッドでは、BigQuery の割り当ても異なります。

デフォルトの読み取り方法はテーブルのエクスポートです。次の図のように動作します。

パイプラインは BigQuery にエクスポート リクエストを送信し、BigQuery は Cloud Storage の一時的な場所にデータを書き込む。パイプラインは、この一時的な場所からデータを読み取る。

図は次のフローを示しています。

  1. BigQueryIO は、BigQuery エクスポート リクエストを呼び出してテーブルデータをエクスポートします。エクスポートされたテーブルデータは、一時的な Cloud Storage の場所に書き込まれます。
  2. BigQueryIO は、一時的な Cloud Storage の場所からテーブルデータを読み取ります。

BigQuery エクスポート リクエストはエクスポート割り当てによって制限されます。また、エクスポート リクエストは、パイプラインがデータの処理を開始する前に完了する必要もあります。これにより、ジョブの実行時間が長くなります。

それに対して、直接読み取りアプローチでは、BigQuery Storage API を使用して BigQuery から直接テーブルデータを読み取ります。BigQuery Storage API は、gRPC を使用してテーブル行データの高スループットの読み取りパフォーマンスを実現します。BigQuery Storage API を使用すると、エクスポート ステップが不要になります。これにより、エクスポートの割り当て制限が回避され、ジョブの実行時間が短縮されます。

次の図は、BigQuery Storage API を使用した場合のフローを示しています。BigQuery エクスポート リクエストを使用する前述のフローとは対照的に、このフローは単純です。テーブルからパイプラインにデータを取得する 1 つの直接読み取りステップがあります。

BigQuery テーブルから直接読み取りを行うパイプライン。

BigQuery テーブルにデータを書き込む場合も、独自の割り当ての影響が発生します。BigQuery 読み込みジョブを使用するバッチ パイプラインは、テーブルレベルとプロジェクト レベルで適用されるさまざまな BigQuery 読み込みジョブ割り当てを消費します。同様に、BigQuery ストリーミング挿入を使用するストリーミング パイプラインは、BigQuery ストリーミング挿入割り当てを消費します。

データの読み取りと書き込みに最適なメソッドを決定するには、ユースケースを検討します。たとえば、BigQuery 読み込みジョブを使用して、1 日に数千回データをテーブルに追記することは避けてください。ストリーミング パイプラインを使用して、ほぼリアルタイムでデータを BigQuery に書き込みます。ストリーミング パイプラインでは、この目的にストリーミング挿入または Storage Write API を使用する必要があります。

リージョンに関する考慮事項

Dataflow は、複数の Google Cloud リージョンのマネージド サービスとして提供されます。ジョブの実行に使用するリージョンを選択する場合は、次のことを考慮してください。

  • データソースとシンクのロケーション
  • データ処理環境に関する設定や制限
  • 特定のリージョンでのみ提供される Dataflow 機能
  • 特定のジョブの実行管理に使用されるリージョン
  • ジョブのワーカーに使用されるゾーン

ジョブごとに、ジョブとワーカーに使用するリージョン設定が異なる場合があります。リージョンとゾーンを指定するタイミングなどの詳細については、Dataflow リージョンのドキュメントをご覧ください。

Dataflow ジョブを実行するリージョンを指定することで、高可用性と障害復旧のためのリージョンに関する考慮事項について計画を立てることができます。詳しくは、高可用性と地理的冗長性をご覧ください。

リージョン

Dataflow リージョンでは、ジョブに関連するメタデータ(変換名など、Apache Beam グラフ自体に関する情報など)を保存して処理します。また、自動スケーリングなどのワーカーの動作も制御します。リージョンを指定すると、セキュリティとコンプライアンス、データの局所性、ジョブのリージョン配置の要件を満たすことができます。リージョン間のネットワーク呼び出しのパフォーマンスへの影響を避けるため、可能であれば、ジョブとワーカーには同じリージョンを使用することをおすすめします。

Dataflow ワーカー

Dataflow ジョブは、Compute Engine VM インスタンス(Dataflow ワーカーと呼ばれます)を使用してパイプラインを実行します。Dataflow ジョブは、Dataflow ロケーションがないリージョンを含む、任意の Compute Engine ゾーンをワーカーに使用できます。ジョブのワーカー リージョンを指定することで、ワーカーのリージョン配置を制御できます。ワーカーのリージョンまたはゾーンを指定するには、次の操作を行います。

  • gcloud CLI を使用して Dataflow テンプレートからジョブを作成する場合は、--worker-region フラグを使用してワーカー リージョンをオーバーライドするか、--worker-zone フラグを使用してワーカーゾーンをオーバーライドします。
  • Apache Beam Java SDK を使用してジョブを作成する場合は、パイプライン オプションを使用してワーカーのリージョンとゾーンを設定します。ワーカー リージョンをオーバーライドするには workerRegion を、ワーカーゾーンをオーバーライドするには workerZone を使用します。

ネットワークのレイテンシとスループットを向上させるには、データソースやシンクに地理的に近いリージョンにワーカーを作成することをおすすめします。ジョブの作成時にワーカーのリージョンまたはゾーンを指定しない場合、Dataflow が自動的にデフォルトのゾーンに設定します。これは、ジョブと同じリージョンにあるゾーンです。

Dataflow Shuffle サービスまたは Streaming Engine を使用しない場合、ジョブによって処理されたデータ(PCollection オブジェクトに保存されているデータ)は、ユーザーコードによりワーカー外のデータが伝送されていなければ、ジョブのワーカーに存在します。Dataflow Shuffle サービスまたは Streaming Engine が有効になっている場合、PCollection オブジェクトで表される分散データセットをワーカーとサービス間で転送できます。

データ暗号化に関する考慮事項

Dataflow はフルマネージド サービスとして、データ パイプライン全体で処理中のデータと保存中のデータの両方に Google 管理の暗号鍵を使用して自動的にデータを暗号化します。Google が管理する暗号鍵を使用する代わりに、独自の暗号鍵を管理することもできます。その場合、Dataflow は Cloud Key Management Service(KMS)を使用した顧客管理の暗号鍵(CMEK)をサポートしています。また、Cloud HSM も使用できます。これは、クラウドでホストされるハードウェア セキュリティ モジュール(HSM)サービスで、FIPS 140-2 レベル 3 認定 HSM のクラスタで暗号鍵をホストし、暗号処理を実行できます。

CMEK を使用すると、Dataflow は Cloud KMS 鍵を使用してデータを保護します。ただし、ウィンドウ処理、グループ化、結合などのデータ鍵ベースのオペレーションは除きます。データ鍵に個人情報(PII)などの機密データが含まれている場合、Dataflow パイプラインに入る前に鍵をハッシュ化または変換する必要があります。

プライベート ネットワークに関する考慮事項

ネットワーキングとセキュリティの要件によって、Dataflow ジョブなどの VM ベースのワークロードでプライベート IP アドレスのみを使用することが義務付けられている場合があります。Dataflow では、すべてのネットワーク通信にワーカーがプライベート IP アドレスを使用するように指定できます。パブリック IP が無効になっている場合は、サブネットワークで限定公開の Google アクセスを有効にして、Dataflow ワーカーが Google API とサービスにアクセスできるようにする必要があります。

Dataflow ジョブで、Google Cloud の外部にあるネットワーク リソースにアクセスするためにパブリック IP が必要な場合は、Dataflow ワーカーでパブリック IP を無効にすることをおすすめします。パブリック IP を無効にすると、Dataflow ワーカーはサブネットワーク外のリソースやピア VPC ネットワークにアクセスできなくなります。同様に、サブネットワークまたはピア VPC ネットワークの外部から VM ワーカーへのネットワーク アクセスも許可されません。

--usePublicIps パイプライン オプションを使用してワーカーにプライベート IP のみを割り当てる方法については、パイプライン オプションをご覧ください。

次のステップ