監控管道狀態

本頁說明如何將 Cloud Data Fusion 管道事件 (例如管道狀態) 發布至 Pub/Sub 主題。本文也說明如何建立 Cloud Run 函式,處理 Pub/Sub 訊息並採取行動,例如識別及重試失敗的管道。

事前準備

  • 建立主題,讓 Pub/Sub 發布 Cloud Data Fusion 管道事件。

必要的角色

為確保 Cloud Data Fusion 服務帳戶具備將管道事件發布至 Pub/Sub 主題的必要權限, 請管理員在您建立 Pub/Sub 主題的專案中,授予 Cloud Data Fusion 服務帳戶「Pub/Sub 發布者」 (roles/pubsub.publisher) 身分與存取權管理角色。

如要進一步瞭解如何授予角色,請參閱「管理專案、資料夾和機構的存取權」。

管理員或許也能透過自訂角色或其他預先定義的角色,將必要權限授予 Cloud Data Fusion 服務帳戶。

管理 Cloud Data Fusion 執行個體中的事件發布作業

在 6.7.0 以上版本中,您可以使用 REST API 管理新舊 Cloud Data Fusion 執行個體的事件發布作業。

在新執行個體中發布事件

建立新執行個體並加入 EventPublishConfig 欄位。如要進一步瞭解新執行個體的必填欄位,請參閱執行個體資源參考資料。

curl -X POST \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
  -d '{
    "version": "VERSION_NUMBER",
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
  }'

更改下列內容:

  • PROJECT_ID:專案 ID Google Cloud
  • LOCATION:專案位置
  • INSTANCE_ID:Cloud Data Fusion 執行個體的 ID
  • VERSION_NUMBER:建立執行個體的 Cloud Data Fusion 版本,例如 6.10.1
  • TOPIC_ID:Pub/Sub 主題的 ID

在現有 Cloud Data Fusion 執行個體中啟用事件發布功能

更新現有 Cloud Data Fusion 執行個體中的 EventPublishConfig 欄位:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
  -d '{
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
}'

更改下列內容:

  • PROJECT_ID:專案 ID Google Cloud
  • LOCATION:專案的位置
  • INSTANCE_ID:Cloud Data Fusion 執行個體的 ID
  • TOPIC_ID:Pub/Sub 主題的 ID

從執行個體中移除活動發布功能

如要從執行個體中移除事件發布功能,請將事件發布 enabled 值更新為 false

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
  -d '{ "event_publish_config": { "enabled": false } }'

建立函式來讀取 Pub/Sub 訊息

Cloud Run 函式可以讀取 Pub/Sub 訊息並採取行動,例如重試失敗的管道。如要建立 Cloud Run 函式,請按照下列指示操作:

  1. 前往 Google Cloud 控制台的「Cloud Run functions」頁面。

    前往 Cloud Run functions

  2. 按一下「建立函式」

  3. 輸入函式名稱和區域。

  4. 在「觸發條件類型」欄位中,選取「Cloud Pub/Sub」

  5. 輸入 Pub/Sub 主題 ID。

  6. 點選「下一步」

  7. 新增函式來讀取 Pub/Sub 訊息,並採取其他動作。舉例來說,您可以為下列用途新增函式:

    • 在管道失敗時傳送快訊。
    • 傳送 KPI 的快訊,例如記錄數或執行資訊。
    • 重新啟動尚未重新執行的失敗管道。

    如需 Cloud Run 函式範例,請參閱用途一節。

  8. 按一下「Deploy」(部署)。 詳情請參閱「部署 Cloud Run 函式」。

使用案例:查看文件管道狀態,並重試失敗的管道

下列 Cloud Run 函式範例會讀取有關管道執行狀態的 Pub/Sub 訊息,然後在 Cloud Data Fusion 中重試失敗的管道。

這些範例函式是指下列 Google Cloud 元件:

  • Google Cloud project:建立 Cloud Run 函式和 Pub/Sub 主題的專案
  • Pub/Sub 主題:連結至 Cloud Data Fusion 執行個體的 Pub/Sub 主題
  • Cloud Data Fusion 執行個體:您可在這個 Cloud Data Fusion 執行個體中設計及執行管道
  • BigQuery 資料表:擷取管道狀態,以及執行和重新執行詳細資料的 BigQuery 資料表
  • Cloud Run 函式:您部署程式碼的 Cloud Run 函式,用於重試失敗的管道
  1. 下列 Cloud Run 函式範例會讀取有關 Cloud Data Fusion 狀態事件的 Pub/Sub 訊息。

    # Triggered from a message on a Pub/Sub topic.
    @functions_framework.cloud_event
    def cdf_event_trigger(cloud_event):
    
    decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message.
    pubsub_message = json.loads(decoded_message)
    
    # Extract pipeline run details.
    projectName = pubsub_message["projectName"]
    publishTime = pubsub_message["publishTime"]
    instanceName = pubsub_message["instanceName"]
    namespace = pubsub_message["programStatusEventDetails"]["namespace"]
    applicationName = pubsub_message["programStatusEventDetails"]["applicationName"]
    status = pubsub_message["programStatusEventDetails"]["status"]
    event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms')
    
    print(f"projectName: {projectName}")
    print(f"publishTime: {publishTime}")
    print(f"instanceName: {instanceName}")
    print(f"namespace: {namespace}")
    print(f"applicationName: {applicationName}")
    print(f"status: {status}")
    print(f"event timestamp: {event_timestamp}")
    try:
        error = pubsub_message["programStatusEventDetails"]["error"]
        print(f"error: {error}")
    except:
        print(f"Pipeline: {applicationName}'s current status: {status}")
    
  2. 下列函式範例會建立及儲存 BigQuery 資料表,並查詢管道執行詳細資料。

    # Global variables.
    pipeline_rerun_count = 0
    has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes.
    table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information.
    
    # Update BigQuery table with the pipeline status and rerun details.
    schema=[
        bigquery.SchemaField("Project_Name", "STRING"),
        bigquery.SchemaField("Instance_Name", "STRING"),
        bigquery.SchemaField("Namespace", "STRING"),
        bigquery.SchemaField("Pipeline_Name", "STRING"),
        bigquery.SchemaField("Pipeline_Status", "STRING"),
        bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"),
        bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"),
    ]
    
    # Prepare DataFrame to load the data in BigQuery.
    data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]}
    dataframe = pd.DataFrame(data)
    
    # Prepare BigQuery data load job configuration.
    job_config = bigquery.LoadJobConfig(schema=schema)
    
    job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
    job.result()  # Wait for the job to complete.
    
    table = bq_client.get_table(table_id)  # Make an API request.
    print("BigQuery table: {} updated.".format(table_id))
    
  3. 下列範例函式會檢查失敗的管道,以及管道是否在過去一小時內重新執行。

    bq_client = bigquery.Client()
    
    if status == "FAILED":
        print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.")
    
        QUERY = f"""
            SELECT * FROM `{table_id}`
            WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED"
            AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE)
            AND Pipeline_Rerun_Count > 0
            """
    
        query_job = bq_client.query_and_wait(QUERY)  # API request.
        row_count = query_job.total_rows  # Waits for query to finish.
        print(f"Query job result row count: {row_count}")
    
        if (row_count > 0):
            print("Pipeline has FAILED and rerun recently...")
            global has_pipeline_failed_and_rerun_recently
            has_pipeline_failed_and_rerun_recently = True
    
  4. 如果失敗的管道最近未執行,下列範例函式會重新執行失敗的管道。

    if not has_pipeline_failed_and_rerun_recently:
        applicationName = applicationName
        auth_token = get_access_token()
        post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
        cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api"
        run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName)
    
        # Start the job.
        response = requests.post(run_pipeline_endpoint,headers=post_headers)
        print(f"Response for restarting the failed pipeline: {response}")
        global pipeline_rerun_count
        pipeline_rerun_count = 1
    

後續步驟