このページでは、Cloud Composer とトリガーによるパイプラインのオーケストレーションについて説明します。Cloud Data Fusion では、Cloud Composer を使用してパイプラインをオーケストレートすることをおすすめします。オーケストレーションをより簡単に管理する必要がある場合は、トリガーを使用します。
Composer
Cloud Composer を使用してパイプラインをオーケストレートする
Cloud Composer を使用して Cloud Data Fusion でパイプラインの実行をオーケストレートすると、次のメリットがあります。
- 一元化されたワークフロー管理: 複数の Cloud Data Fusion パイプラインの実行を一元的に管理します。
- 依存関係の管理: 適切な実行順序を確保するために、パイプライン間の依存関係を定義します。
- モニタリングとアラート: Cloud Composer には、モニタリング機能と障害のアラートが用意されています。
- 他のサービスとの統合: Cloud Composer を使用すると、Cloud Data Fusion と他の Google Cloud サービスにまたがるワークフローをオーケストレートできます。
Cloud Composer を使用して Cloud Data Fusion パイプラインをオーケストレートするには、次の手順を行います。
Cloud Composer 環境を設定します。
- Cloud Composer 環境を作成します。環境がない場合は、Google Cloud プロジェクトに環境をプロビジョニングします。この環境がオーケストレーション ワークスペースです。
- 権限を付与する。Cloud Composer サービス アカウントに、Cloud Data Fusion にアクセスするために必要な権限(パイプラインの開始、停止、一覧表示の権限など)があることを確認します。
オーケストレーション用の有向非巡回グラフ(DAG)を定義します。
- DAG を作成する: Cloud Composer で、Cloud Data Fusion パイプラインのオーケストレーション ワークフローを定義する DAG を作成します。
- Cloud Data Fusion オペレーター: DAG 内で Cloud Composer の Cloud Data Fusion オペレーターを使用します。これらのオペレーターを使用すると、Cloud Data Fusion をプログラムで操作できます。
Cloud Data Fusion オペレーター
Cloud Data Fusion パイプラインのオーケストレーションには、次のオペレーターがあります。
CloudDataFusionStartPipelineOperator
ID で Cloud Data Fusion パイプラインの実行をトリガーします。次のパラメータがあります。
- パイプライン ID
- ロケーション(Google Cloud リージョン)
- パイプラインの名前空間
- ランタイム引数(省略可)
- 完了を待つ(省略可)
- タイムアウト(省略可)
CloudDataFusionStopPipelineOperator
実行中の Cloud Data Fusion パイプラインを停止できます。
CloudDataFusionDeletePipelineOperator
Cloud Data Fusion パイプラインを削除します。
DAG ワークフローを構築する
DAG ワークフローを構築する際は、次の点を考慮してください。
- 依存関係の定義: DAG 構造を使用して、タスク間の依存関係を定義します。たとえば、ある Namespace のパイプラインが正常に完了するのを待ってから、別の Namespace の別のパイプラインをトリガーするタスクがあります。
- スケジュール設定: 特定の間隔(毎日、1 時間ごとなど)で DAG を実行するようにスケジュール設定するか、手動でトリガーするように設定します。
詳細については、Cloud Composer の概要をご覧ください。
トリガー
トリガーを使用してパイプラインをオーケストレートする
Cloud Data Fusion トリガーを使用すると、1 つ以上のアップストリーム パイプラインの完了(成功、失敗、指定した条件)時にダウンストリーム パイプラインを自動的に実行できます。
トリガーは次のようなタスクに役立ちます。
- データをクレンジングし、それから複数のダウンストリーム パイプラインで利用できるようにする。
- ランタイム引数やプラグイン構成などの情報をパイプライン間で共有する。このタスクはペイロード構成と呼ばれます。
- 実行ごとに更新する必要のある静的パイプラインではなく、時間、日、週、月のデータを使用して実行される一連の動的パイプラインを備えています。
たとえば、会社の発送に関するすべての情報が含まれているデータセットがあるとします。このデータに基づいて、いくつかのビジネスに関する質問に回答したいとします。これを行うには、配送に関する未加工データをクリーンアップするパイプラインを 1 つ作成します。このパイプラインは Shipments Data Cleaning と呼ばれます。次に、2 つ目のパイプライン「Delayed Shipments USA」を作成します。このパイプラインは、クレンジングされたデータを読み取り、指定されたしきい値を超えて遅延した米国内の配送を見つけます。米国での遅延配送パイプラインは、アップストリームの配送データ クレンジングパイプラインが正常に完了するとすぐにトリガーできます。
また、ダウンストリーム パイプラインはアップストリーム パイプラインの出力を消費するため、このトリガーを使用してダウンストリーム パイプラインを実行するときに、読み取る入力ディレクトリ(アップストリーム パイプラインが出力を生成したディレクトリ)も受信するように指定する必要があります。このプロセスはペイロード構成の渡しと呼ばれ、ランタイム引数で定義します。これにより、時間、日、週、月のデータを使用して実行される一連の動的パイプラインを作成できます(実行ごとに更新する必要がある静的パイプラインではありません)。
トリガーを使用してパイプラインをオーケストレートする手順は次のとおりです。
アップストリーム パイプラインとダウンストリーム パイプラインを作成します。
- Cloud Data Fusion Studio で、オーケストレーション チェーンを形成するパイプラインを設計してデプロイします。
- ワークフローの次のパイプライン(ダウンストリーム)をアクティブにするパイプラインの完了について検討します。
省略可: アップストリーム パイプラインのランタイム引数を渡します。
- パイプライン間でペイロード構成をランタイム引数として渡す必要がある場合は、ランタイム引数を構成します。これらの引数は、実行時にダウンストリーム パイプラインに渡すことができます。
ダウンストリーム パイプラインで受信トリガーを作成します。
- Cloud Data Fusion Studio で、[リスト] ページに移動します。[デプロイ済み] タブで、ダウンストリーム パイプラインの名前をクリックします。そのパイプラインの [デプロイ] ビューが表示されます。
- ページ中央の左側にある [受信トリガー] をクリックします。使用可能なパイプラインのリストが表示されます。
- アップストリーム パイプラインをクリックします。アップストリーム パイプラインの完了状態(Succeeds、Fails、Stops)を 1 つ以上の条件として選択し、ダウンストリーム パイプラインが実行されるようにします。
- アップストリーム パイプラインがダウンストリーム パイプラインと情報(ペイロード構成と呼ばれる)を共有する場合は、[トリガーの構成] をクリックし、ペイロード構成をランタイム引数として渡すの手順に沿って操作します。有効になっていない場合は、[トリガーを有効にする] をクリックします。
トリガーをテストします。
- アップストリーム パイプラインの実行を開始します。
- トリガーが正しく構成されている場合、構成された条件に基づいて、アップストリーム パイプラインの完了時にダウンストリーム パイプラインが自動的に実行されます。
ペイロード構成をランタイム引数として渡す
ペイロード構成を使用すると、アップストリーム パイプラインとダウンストリーム パイプラインとの間で情報を共有できます。この情報には、出力ディレクトリ、データ形式、パイプラインの実行日などがあります。この情報は、読み取る適切なデータセットの決定などの意思決定に、ダウンストリーム パイプラインによって使用されます。
アップストリーム パイプラインからダウンストリーム パイプラインに情報を渡すには、ダウンストリーム パイプラインのランタイム引数を、アップストリーム パイプラインのランタイム引数またはプラグインの構成の値で設定します。
ダウンストリーム パイプラインがトリガーされて実行されるたびに、ダウンストリーム パイプラインをトリガーしたアップストリーム パイプラインの特定の実行のランタイム引数を使用して、そのペイロード構成が設定されます。
ペイロード構成をランタイム引数として渡す手順は次のとおりです。