パイプラインのステータスをモニタリングする

このページでは、Cloud Data Fusion パイプライン イベント(パイプラインのステータスなど)を Pub/Sub トピックにパブリッシュする方法について説明します。また、Pub/Sub メッセージを処理し、失敗したパイプラインの特定と再試行などのアクションを実行する Cloud Run 関数を作成する方法についても説明します。

始める前に

  • Pub/Sub が Cloud Data Fusion パイプライン イベントを公開できるトピックを作成します。

必要なロール

Pub/Sub トピックにパイプライン イベントを公開するために必要な権限を Cloud Data Fusion サービス アカウントに付与するには、管理者に Cloud Data Fusion サービス アカウントに {101 }Pub/Sub パブリッシャー roles/pubsub.publisher )で、Pub/Sub トピックを作成するプロジェクトに対する IAM ロールを割り当てます。 ロールの付与については、プロジェクト、フォルダ、組織へのアクセスを管理するをご覧ください。

管理者は、カスタムロールや他の事前定義ロールから、Cloud Data Fusion サービス アカウントに必要な権限を付与することもできます。

Cloud Data Fusion インスタンスでのイベントの公開を管理する

バージョン 6.7.0 以降の REST API を使用して、新規および既存の Cloud Data Fusion インスタンスでイベントのパブリッシュを管理できます。

新しいインスタンスでイベントを公開する

新しいインスタンスを作成し、EventPublishConfig フィールドを含めます。新しいインスタンスの必須フィールドの詳細については、インスタンス リソースのリファレンスをご覧ください。

curl -X POST \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
  -d '{
    "version": "VERSION_NUMBER",
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
  }'

次のように置き換えます。

  • PROJECT_ID: Google Cloud プロジェクト ID
  • LOCATION: プロジェクトのロケーション。
  • INSTANCE_ID: Cloud Data Fusion インスタンスの ID。
  • VERSION_NUMBER: インスタンスを作成する Cloud Data Fusion のバージョン(例: 6.10.1
  • TOPIC_ID: Pub/Sub トピックの ID。

既存の Cloud Data Fusion インスタンスでイベント パブリッシュを有効にする

既存の Cloud Data Fusion インスタンスの EventPublishConfig フィールドを更新します。

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
  -d '{
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
}'

次のように置き換えます。

  • PROJECT_ID: Google Cloud プロジェクト ID
  • LOCATION: プロジェクトのロケーション
  • INSTANCE_ID: Cloud Data Fusion インスタンスの ID。
  • TOPIC_ID: Pub/Sub トピックの ID。

インスタンスからイベントの公開を削除する

インスタンスからイベント公開を削除するには、イベント公開の enabled 値を false に更新します。

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
  -d '{ "event_publish_config": { "enabled": false } }'

Pub/Sub メッセージを読み取る関数を作成する

Cloud Run の関数は Pub/Sub メッセージを読み取り、失敗したパイプラインの再試行など、メッセージを処理できます。Cloud Run の関数を作成するには、次のようにします。

  1. Google Cloud コンソールで、[Cloud Run 関数] ページに移動します。

    Cloud Run 関数に移動します

  2. [関数を作成] をクリックします。

  3. 関数名とリージョンを入力します。

  4. [トリガー] フィールドで [Cloud Pub/Sub] を選択します。

  5. Pub/Sub トピック ID を入力します。

  6. [次へ] をクリックします。

  7. Pub/Sub メッセージを読み取り、他のアクションを実行する関数を追加します。たとえば、次のユースケースの関数を追加できます。

    • パイプラインの障害に関するアラートを送信します。
    • レコード数や実行情報など、KPI のアラートを送信します。
    • 再実行されていない失敗したパイプラインを再起動します。

    Cloud Run 関数の例については、ユースケースのセクションをご覧ください。

  8. [デプロイ] をクリックします。 詳細については、Cloud Run 関数 をデプロイするをご覧ください。

ユースケース: パイプラインのステータスを記録し、失敗したパイプラインを再試行する

次の Cloud Run 関数の例では、パイプラインの実行ステータスに関する Pub/Sub メッセージを読み取り、Cloud Data Fusion で失敗したパイプラインを再試行します。

これらの関数の例では、次の Google Cloud コンポーネントを参照します。

  • Google Cloud プロジェクト: Cloud Run の関数と Pub/Sub トピックが作成されるプロジェクト
  • Pub/Sub トピック: Cloud Data Fusion インスタンスにリンクされている Pub/Sub トピック
  • Cloud Data Fusion インスタンス: パイプラインを設計して実行する Cloud Data Fusion インスタンス
  • BigQuery テーブル: パイプラインのステータスと実行と再実行の詳細をキャプチャする BigQuery テーブル
  • Cloud Run 関数: 失敗したパイプラインを再試行するコードをデプロイする Cloud Run 関数
  1. 次の Cloud Run 関数の例は、Cloud Data Fusion のステータス イベントに関する Pub/Sub メッセージを読み取ります。

    # Triggered from a message on a Pub/Sub topic.
    @functions_framework.cloud_event
    def cdf_event_trigger(cloud_event):
    
    decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message.
    pubsub_message = json.loads(decoded_message)
    
    # Extract pipeline run details.
    projectName = pubsub_message["projectName"]
    publishTime = pubsub_message["publishTime"]
    instanceName = pubsub_message["instanceName"]
    namespace = pubsub_message["programStatusEventDetails"]["namespace"]
    applicationName = pubsub_message["programStatusEventDetails"]["applicationName"]
    status = pubsub_message["programStatusEventDetails"]["status"]
    event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms')
    
    print(f"projectName: {projectName}")
    print(f"publishTime: {publishTime}")
    print(f"instanceName: {instanceName}")
    print(f"namespace: {namespace}")
    print(f"applicationName: {applicationName}")
    print(f"status: {status}")
    print(f"event timestamp: {event_timestamp}")
    try:
        error = pubsub_message["programStatusEventDetails"]["error"]
        print(f"error: {error}")
    except:
        print(f"Pipeline: {applicationName}'s current status: {status}")
    
  2. 次のサンプル関数は、BigQuery テーブルを作成して保存し、パイプライン実行の詳細をクエリします。

    # Global variables.
    pipeline_rerun_count = 0
    has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes.
    table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information.
    
    # Update BigQuery table with the pipeline status and rerun details.
    schema=[
        bigquery.SchemaField("Project_Name", "STRING"),
        bigquery.SchemaField("Instance_Name", "STRING"),
        bigquery.SchemaField("Namespace", "STRING"),
        bigquery.SchemaField("Pipeline_Name", "STRING"),
        bigquery.SchemaField("Pipeline_Status", "STRING"),
        bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"),
        bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"),
    ]
    
    # Prepare DataFrame to load the data in BigQuery.
    data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]}
    dataframe = pd.DataFrame(data)
    
    # Prepare BigQuery data load job configuration.
    job_config = bigquery.LoadJobConfig(schema=schema)
    
    job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
    job.result()  # Wait for the job to complete.
    
    table = bq_client.get_table(table_id)  # Make an API request.
    print("BigQuery table: {} updated.".format(table_id))
    
  3. 次のサンプル関数は、失敗したパイプラインと、過去 1 時間以内に再実行されたかどうかを確認します。

    bq_client = bigquery.Client()
    
    if status == "FAILED":
        print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.")
    
        QUERY = f"""
            SELECT * FROM `{table_id}`
            WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED"
            AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE)
            AND Pipeline_Rerun_Count > 0
            """
    
        query_job = bq_client.query_and_wait(QUERY)  # API request.
        row_count = query_job.total_rows  # Waits for query to finish.
        print(f"Query job result row count: {row_count}")
    
        if (row_count > 0):
            print("Pipeline has FAILED and rerun recently...")
            global has_pipeline_failed_and_rerun_recently
            has_pipeline_failed_and_rerun_recently = True
    
  4. 失敗したパイプラインが最近実行されていない場合、次のサンプル関数は失敗したパイプラインを再実行します。

    if not has_pipeline_failed_and_rerun_recently:
        applicationName = applicationName
        auth_token = get_access_token()
        post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
        cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api"
        run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName)
    
        # Start the job.
        response = requests.post(run_pipeline_endpoint,headers=post_headers)
        print(f"Response for restarting the failed pipeline: {response}")
        global pipeline_rerun_count
        pipeline_rerun_count = 1
    

次のステップ