このページでは、Cloud Data Fusion パイプライン イベント(パイプラインのステータスなど)を Pub/Sub トピックにパブリッシュする方法について説明します。また、Pub/Sub メッセージを処理し、失敗したパイプラインの特定と再試行などのアクションを実行する Cloud Run 関数を作成する方法についても説明します。
始める前に
- Pub/Sub が Cloud Data Fusion パイプライン イベントをパブリッシュできるトピックを作成します。
必要なロール
Cloud Data Fusion サービス アカウントに Pub/Sub トピックにパイプライン イベントをパブリッシュするために必要な権限を確実に付与するには、Pub/Sub トピックを作成するプロジェクトに対する Pub/Sub パブリッシャー (roles/pubsub.publisher
)IAM ロールを Cloud Data Fusion サービス アカウントに付与するよう管理者に依頼してください。
管理者は、カスタムロールや他の事前定義ロールから、Cloud Data Fusion サービス アカウントに必要な権限を付与することもできます。
Cloud Data Fusion インスタンスでイベントのパブリッシュを管理する
バージョン 6.7.0 以降の REST API を使用して、新規および既存の Cloud Data Fusion インスタンスでイベントのパブリッシュを管理できます。
新しいインスタンスでイベントを公開する
新しいインスタンスを作成し、EventPublishConfig
フィールドを含めます。新しいインスタンスに必要なフィールドの詳細については、インスタンス リソースのリファレンスをご覧ください。
curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
"https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
-d '{
"version": "VERSION_NUMBER",
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
以下を置き換えます。
PROJECT_ID
: Google Cloud プロジェクト IDLOCATION
: プロジェクトの場所INSTANCE_ID
: Cloud Data Fusion インスタンスの ID。VERSION_NUMBER
: インスタンスを作成する Cloud Data Fusion のバージョン(例:6.10.1
)TOPIC_ID
: Pub/Sub トピックの ID。
既存の Cloud Data Fusion インスタンスでイベント パブリッシュを有効にする
既存の Cloud Data Fusion インスタンスの EventPublishConfig
フィールドを更新します。
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
-d '{
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
以下を置き換えます。
PROJECT_ID
: Google Cloud プロジェクト IDLOCATION
: プロジェクトの場所INSTANCE_ID
: Cloud Data Fusion インスタンスの ID。TOPIC_ID
: Pub/Sub トピックの ID。
インスタンスからイベント公開を削除する
インスタンスからイベント公開を削除するには、イベント公開の enabled
値を false
に更新します。
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
-d '{ "event_publish_config": { "enabled": false } }'
Pub/Sub メッセージを読み取る関数を作成する
Cloud Run 関数は、Pub/Sub メッセージを読み取り、失敗したパイプラインの再試行など、メッセージに基づいてアクションを実行できます。Cloud Run functions を作成するには、次のようにします。
Google Cloud コンソールで、[Cloud Run 関数] ページに移動します。
[関数を作成] をクリックします。
関数の名前とリージョンを入力します。
[トリガー] フィールドで [Cloud Pub/Sub] を選択します。
Pub/Sub トピック ID を入力します。
[次へ] をクリックします。
Pub/Sub メッセージを読み取って他のアクションを実行する関数を追加します。たとえば、次のようなユースケースに関数を追加できます。
- パイプラインの障害のアラートを送信します。
- レコード数や実行情報などの KPI のアラートを送信します。
- 再実行されていない失敗したパイプラインを再起動します。
Cloud Run functions の例については、ユースケースのセクションをご覧ください。
[デプロイ] をクリックします。 詳細については、Cloud Run 関数 をデプロイするをご覧ください。
ユースケース: パイプラインのステータスを記録し、失敗したパイプラインを再試行する
次の Cloud Run functions の例では、パイプラインの実行ステータスに関する Pub/Sub メッセージを読み取り、Cloud Data Fusion で失敗したパイプラインを再試行します。
これらの関数の例は、次の Google Cloud コンポーネントを参照しています。
- Google Cloud プロジェクト: Cloud Run functions と Pub/Sub トピックが作成されるプロジェクト
- Pub/Sub トピック: Cloud Data Fusion インスタンスにリンクされている Pub/Sub トピック
- Cloud Data Fusion インスタンス: パイプラインを設計して実行する Cloud Data Fusion インスタンス
- BigQuery テーブル: パイプラインのステータスと実行と再実行の詳細をキャプチャする BigQuery テーブル
- Cloud Run function: 失敗したパイプラインを再試行するコードをデプロイする Cloud Run function
次の Cloud Run function の例は、Cloud Data Fusion ステータス イベントに関する Pub/Sub メッセージを読み取ります。
# Triggered from a message on a Pub/Sub topic. @functions_framework.cloud_event def cdf_event_trigger(cloud_event): decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message. pubsub_message = json.loads(decoded_message) # Extract pipeline run details. projectName = pubsub_message["projectName"] publishTime = pubsub_message["publishTime"] instanceName = pubsub_message["instanceName"] namespace = pubsub_message["programStatusEventDetails"]["namespace"] applicationName = pubsub_message["programStatusEventDetails"]["applicationName"] status = pubsub_message["programStatusEventDetails"]["status"] event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms') print(f"projectName: {projectName}") print(f"publishTime: {publishTime}") print(f"instanceName: {instanceName}") print(f"namespace: {namespace}") print(f"applicationName: {applicationName}") print(f"status: {status}") print(f"event timestamp: {event_timestamp}") try: error = pubsub_message["programStatusEventDetails"]["error"] print(f"error: {error}") except: print(f"Pipeline: {applicationName}'s current status: {status}")
次の関数の例では、BigQuery テーブルを作成して保存し、パイプラインの実行の詳細をクエリします。
# Global variables. pipeline_rerun_count = 0 has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes. table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information. # Update BigQuery table with the pipeline status and rerun details. schema=[ bigquery.SchemaField("Project_Name", "STRING"), bigquery.SchemaField("Instance_Name", "STRING"), bigquery.SchemaField("Namespace", "STRING"), bigquery.SchemaField("Pipeline_Name", "STRING"), bigquery.SchemaField("Pipeline_Status", "STRING"), bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"), bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"), ] # Prepare DataFrame to load the data in BigQuery. data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]} dataframe = pd.DataFrame(data) # Prepare BigQuery data load job configuration. job_config = bigquery.LoadJobConfig(schema=schema) job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config) job.result() # Wait for the job to complete. table = bq_client.get_table(table_id) # Make an API request. print("BigQuery table: {} updated.".format(table_id))
次の関数の例では、失敗したパイプラインと、過去 1 時間以内にそれらが再実行されたかどうかを確認します。
bq_client = bigquery.Client() if status == "FAILED": print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.") QUERY = f""" SELECT * FROM `{table_id}` WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED" AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE) AND Pipeline_Rerun_Count > 0 """ query_job = bq_client.query_and_wait(QUERY) # API request. row_count = query_job.total_rows # Waits for query to finish. print(f"Query job result row count: {row_count}") if (row_count > 0): print("Pipeline has FAILED and rerun recently...") global has_pipeline_failed_and_rerun_recently has_pipeline_failed_and_rerun_recently = True
失敗したパイプラインが最近実行されていない場合、次のサンプル関数は失敗したパイプラインを再実行します。
if not has_pipeline_failed_and_rerun_recently: applicationName = applicationName auth_token = get_access_token() post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"} cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api" run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName) # Start the job. response = requests.post(run_pipeline_endpoint,headers=post_headers) print(f"Response for restarting the failed pipeline: {response}") global pipeline_rerun_count pipeline_rerun_count = 1
次のステップ
- Cloud Run の関数を作成する方法を学習します。