パイプラインのステータスをモニタリングする

このページでは、Cloud Data Fusion パイプライン イベント(パイプラインのステータスなど)を Pub/Sub トピックにパブリッシュする方法について説明します。また、Pub/Sub メッセージを処理し、失敗したパイプラインの特定と再試行などのアクションを実行する Cloud Run 関数を作成する方法についても説明します。

始める前に

  • Pub/Sub が Cloud Data Fusion パイプライン イベントをパブリッシュできるトピックを作成します。

必要なロール

Cloud Data Fusion サービス アカウントに Pub/Sub トピックにパイプライン イベントをパブリッシュするために必要な権限を確実に付与するには、Pub/Sub トピックを作成するプロジェクトに対する Pub/Sub パブリッシャー roles/pubsub.publisher)IAM ロールを Cloud Data Fusion サービス アカウントに付与するよう管理者に依頼してください。

ロールの付与については、プロジェクト、フォルダ、組織へのアクセス権の管理をご覧ください。

管理者は、カスタムロールや他の事前定義ロールから、Cloud Data Fusion サービス アカウントに必要な権限を付与することもできます。

Cloud Data Fusion インスタンスでイベントのパブリッシュを管理する

バージョン 6.7.0 以降の REST API を使用して、新規および既存の Cloud Data Fusion インスタンスでイベントのパブリッシュを管理できます。

新しいインスタンスでイベントを公開する

新しいインスタンスを作成し、EventPublishConfig フィールドを含めます。新しいインスタンスに必要なフィールドの詳細については、インスタンス リソースのリファレンスをご覧ください。

curl -X POST \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
  -d '{
    "version": "VERSION_NUMBER",
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
  }'

以下を置き換えます。

  • PROJECT_ID: Google Cloud プロジェクト ID
  • LOCATION: プロジェクトの場所
  • INSTANCE_ID: Cloud Data Fusion インスタンスの ID。
  • VERSION_NUMBER: インスタンスを作成する Cloud Data Fusion のバージョン(例: 6.10.1
  • TOPIC_ID: Pub/Sub トピックの ID。

既存の Cloud Data Fusion インスタンスでイベント パブリッシュを有効にする

既存の Cloud Data Fusion インスタンスの EventPublishConfig フィールドを更新します。

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
  -d '{
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
}'

以下を置き換えます。

  • PROJECT_ID: Google Cloud プロジェクト ID
  • LOCATION: プロジェクトの場所
  • INSTANCE_ID: Cloud Data Fusion インスタンスの ID。
  • TOPIC_ID: Pub/Sub トピックの ID。

インスタンスからイベント公開を削除する

インスタンスからイベント公開を削除するには、イベント公開の enabled 値を false に更新します。

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
  -d '{ "event_publish_config": { "enabled": false } }'

Pub/Sub メッセージを読み取る関数を作成する

Cloud Run 関数は、Pub/Sub メッセージを読み取り、失敗したパイプラインの再試行など、メッセージに基づいてアクションを実行できます。Cloud Run functions を作成するには、次のようにします。

  1. Google Cloud コンソールで、[Cloud Run 関数] ページに移動します。

    Cloud Run 関数に移動します

  2. [関数を作成] をクリックします。

  3. 関数の名前とリージョンを入力します。

  4. [トリガー] フィールドで [Cloud Pub/Sub] を選択します。

  5. Pub/Sub トピック ID を入力します。

  6. [次へ] をクリックします。

  7. Pub/Sub メッセージを読み取って他のアクションを実行する関数を追加します。たとえば、次のようなユースケースに関数を追加できます。

    • パイプラインの障害のアラートを送信します。
    • レコード数や実行情報などの KPI のアラートを送信します。
    • 再実行されていない失敗したパイプラインを再起動します。

    Cloud Run functions の例については、ユースケースのセクションをご覧ください。

  8. [デプロイ] をクリックします。 詳細については、Cloud Run 関数 をデプロイするをご覧ください。

ユースケース: パイプラインのステータスを記録し、失敗したパイプラインを再試行する

次の Cloud Run functions の例では、パイプラインの実行ステータスに関する Pub/Sub メッセージを読み取り、Cloud Data Fusion で失敗したパイプラインを再試行します。

これらの関数の例は、次の Google Cloud コンポーネントを参照しています。

  • Google Cloud プロジェクト: Cloud Run functions と Pub/Sub トピックが作成されるプロジェクト
  • Pub/Sub トピック: Cloud Data Fusion インスタンスにリンクされている Pub/Sub トピック
  • Cloud Data Fusion インスタンス: パイプラインを設計して実行する Cloud Data Fusion インスタンス
  • BigQuery テーブル: パイプラインのステータスと実行と再実行の詳細をキャプチャする BigQuery テーブル
  • Cloud Run function: 失敗したパイプラインを再試行するコードをデプロイする Cloud Run function
  1. 次の Cloud Run function の例は、Cloud Data Fusion ステータス イベントに関する Pub/Sub メッセージを読み取ります。

    # Triggered from a message on a Pub/Sub topic.
    @functions_framework.cloud_event
    def cdf_event_trigger(cloud_event):
    
    decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message.
    pubsub_message = json.loads(decoded_message)
    
    # Extract pipeline run details.
    projectName = pubsub_message["projectName"]
    publishTime = pubsub_message["publishTime"]
    instanceName = pubsub_message["instanceName"]
    namespace = pubsub_message["programStatusEventDetails"]["namespace"]
    applicationName = pubsub_message["programStatusEventDetails"]["applicationName"]
    status = pubsub_message["programStatusEventDetails"]["status"]
    event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms')
    
    print(f"projectName: {projectName}")
    print(f"publishTime: {publishTime}")
    print(f"instanceName: {instanceName}")
    print(f"namespace: {namespace}")
    print(f"applicationName: {applicationName}")
    print(f"status: {status}")
    print(f"event timestamp: {event_timestamp}")
    try:
        error = pubsub_message["programStatusEventDetails"]["error"]
        print(f"error: {error}")
    except:
        print(f"Pipeline: {applicationName}'s current status: {status}")
    
  2. 次の関数の例では、BigQuery テーブルを作成して保存し、パイプラインの実行の詳細をクエリします。

    # Global variables.
    pipeline_rerun_count = 0
    has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes.
    table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information.
    
    # Update BigQuery table with the pipeline status and rerun details.
    schema=[
        bigquery.SchemaField("Project_Name", "STRING"),
        bigquery.SchemaField("Instance_Name", "STRING"),
        bigquery.SchemaField("Namespace", "STRING"),
        bigquery.SchemaField("Pipeline_Name", "STRING"),
        bigquery.SchemaField("Pipeline_Status", "STRING"),
        bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"),
        bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"),
    ]
    
    # Prepare DataFrame to load the data in BigQuery.
    data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]}
    dataframe = pd.DataFrame(data)
    
    # Prepare BigQuery data load job configuration.
    job_config = bigquery.LoadJobConfig(schema=schema)
    
    job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
    job.result()  # Wait for the job to complete.
    
    table = bq_client.get_table(table_id)  # Make an API request.
    print("BigQuery table: {} updated.".format(table_id))
    
  3. 次の関数の例では、失敗したパイプラインと、過去 1 時間以内にそれらが再実行されたかどうかを確認します。

    bq_client = bigquery.Client()
    
    if status == "FAILED":
        print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.")
    
        QUERY = f"""
            SELECT * FROM `{table_id}`
            WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED"
            AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE)
            AND Pipeline_Rerun_Count > 0
            """
    
        query_job = bq_client.query_and_wait(QUERY)  # API request.
        row_count = query_job.total_rows  # Waits for query to finish.
        print(f"Query job result row count: {row_count}")
    
        if (row_count > 0):
            print("Pipeline has FAILED and rerun recently...")
            global has_pipeline_failed_and_rerun_recently
            has_pipeline_failed_and_rerun_recently = True
    
  4. 失敗したパイプラインが最近実行されていない場合、次のサンプル関数は失敗したパイプラインを再実行します。

    if not has_pipeline_failed_and_rerun_recently:
        applicationName = applicationName
        auth_token = get_access_token()
        post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
        cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api"
        run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName)
    
        # Start the job.
        response = requests.post(run_pipeline_endpoint,headers=post_headers)
        print(f"Response for restarting the failed pipeline: {response}")
        global pipeline_rerun_count
        pipeline_rerun_count = 1
    

次のステップ