本頁說明如何將 Cloud Data Fusion 管道事件 (例如管道狀態) 發布至 Pub/Sub 主題。本文也說明如何建立 Cloud Run 函式,處理 Pub/Sub 訊息並採取行動,例如識別及重試失敗的管道。
事前準備
- 建立主題,讓 Pub/Sub 發布 Cloud Data Fusion 管道事件。
必要的角色
為確保 Cloud Data Fusion 服務帳戶具備將管道事件發布至 Pub/Sub 主題的必要權限,
請管理員在您建立 Pub/Sub 主題的專案中,授予 Cloud Data Fusion 服務帳戶「Pub/Sub 發布者」 (roles/pubsub.publisher
) 身分與存取權管理角色。
管理員或許也能透過自訂角色或其他預先定義的角色,將必要權限授予 Cloud Data Fusion 服務帳戶。
管理 Cloud Data Fusion 執行個體中的事件發布作業
在 6.7.0 以上版本中,您可以使用 REST API 管理新舊 Cloud Data Fusion 執行個體的事件發布作業。
在新執行個體中發布事件
建立新執行個體並加入 EventPublishConfig
欄位。如要進一步瞭解新執行個體的必填欄位,請參閱執行個體資源參考資料。
curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
"https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
-d '{
"version": "VERSION_NUMBER",
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
更改下列內容:
PROJECT_ID
:專案 ID Google CloudLOCATION
:專案位置INSTANCE_ID
:Cloud Data Fusion 執行個體的 IDVERSION_NUMBER
:建立執行個體的 Cloud Data Fusion 版本,例如6.10.1
TOPIC_ID
:Pub/Sub 主題的 ID
在現有 Cloud Data Fusion 執行個體中啟用事件發布功能
更新現有 Cloud Data Fusion 執行個體中的
EventPublishConfig
欄位:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
-d '{
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
更改下列內容:
PROJECT_ID
:專案 ID Google CloudLOCATION
:專案的位置INSTANCE_ID
:Cloud Data Fusion 執行個體的 IDTOPIC_ID
:Pub/Sub 主題的 ID
從執行個體中移除活動發布功能
如要從執行個體中移除事件發布功能,請將事件發布 enabled
值更新為 false
:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
-d '{ "event_publish_config": { "enabled": false } }'
建立函式來讀取 Pub/Sub 訊息
Cloud Run 函式可以讀取 Pub/Sub 訊息並採取行動,例如重試失敗的管道。如要建立 Cloud Run 函式,請按照下列指示操作:
前往 Google Cloud 控制台的「Cloud Run functions」頁面。
按一下「建立函式」。
輸入函式名稱和區域。
在「觸發條件類型」欄位中,選取「Cloud Pub/Sub」。
輸入 Pub/Sub 主題 ID。
點選「下一步」。
新增函式來讀取 Pub/Sub 訊息,並採取其他動作。舉例來說,您可以為下列用途新增函式:
- 在管道失敗時傳送快訊。
- 傳送 KPI 的快訊,例如記錄數或執行資訊。
- 重新啟動尚未重新執行的失敗管道。
如需 Cloud Run 函式範例,請參閱用途一節。
按一下「Deploy」(部署)。 詳情請參閱「部署 Cloud Run 函式」。
使用案例:查看文件管道狀態,並重試失敗的管道
下列 Cloud Run 函式範例會讀取有關管道執行狀態的 Pub/Sub 訊息,然後在 Cloud Data Fusion 中重試失敗的管道。
這些範例函式是指下列 Google Cloud 元件:
- Google Cloud project:建立 Cloud Run 函式和 Pub/Sub 主題的專案
- Pub/Sub 主題:連結至 Cloud Data Fusion 執行個體的 Pub/Sub 主題
- Cloud Data Fusion 執行個體:您可在這個 Cloud Data Fusion 執行個體中設計及執行管道
- BigQuery 資料表:擷取管道狀態,以及執行和重新執行詳細資料的 BigQuery 資料表
- Cloud Run 函式:您部署程式碼的 Cloud Run 函式,用於重試失敗的管道
下列 Cloud Run 函式範例會讀取有關 Cloud Data Fusion 狀態事件的 Pub/Sub 訊息。
# Triggered from a message on a Pub/Sub topic. @functions_framework.cloud_event def cdf_event_trigger(cloud_event): decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message. pubsub_message = json.loads(decoded_message) # Extract pipeline run details. projectName = pubsub_message["projectName"] publishTime = pubsub_message["publishTime"] instanceName = pubsub_message["instanceName"] namespace = pubsub_message["programStatusEventDetails"]["namespace"] applicationName = pubsub_message["programStatusEventDetails"]["applicationName"] status = pubsub_message["programStatusEventDetails"]["status"] event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms') print(f"projectName: {projectName}") print(f"publishTime: {publishTime}") print(f"instanceName: {instanceName}") print(f"namespace: {namespace}") print(f"applicationName: {applicationName}") print(f"status: {status}") print(f"event timestamp: {event_timestamp}") try: error = pubsub_message["programStatusEventDetails"]["error"] print(f"error: {error}") except: print(f"Pipeline: {applicationName}'s current status: {status}")
下列函式範例會建立及儲存 BigQuery 資料表,並查詢管道執行詳細資料。
# Global variables. pipeline_rerun_count = 0 has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes. table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information. # Update BigQuery table with the pipeline status and rerun details. schema=[ bigquery.SchemaField("Project_Name", "STRING"), bigquery.SchemaField("Instance_Name", "STRING"), bigquery.SchemaField("Namespace", "STRING"), bigquery.SchemaField("Pipeline_Name", "STRING"), bigquery.SchemaField("Pipeline_Status", "STRING"), bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"), bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"), ] # Prepare DataFrame to load the data in BigQuery. data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]} dataframe = pd.DataFrame(data) # Prepare BigQuery data load job configuration. job_config = bigquery.LoadJobConfig(schema=schema) job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config) job.result() # Wait for the job to complete. table = bq_client.get_table(table_id) # Make an API request. print("BigQuery table: {} updated.".format(table_id))
下列範例函式會檢查失敗的管道,以及管道是否在過去一小時內重新執行。
bq_client = bigquery.Client() if status == "FAILED": print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.") QUERY = f""" SELECT * FROM `{table_id}` WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED" AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE) AND Pipeline_Rerun_Count > 0 """ query_job = bq_client.query_and_wait(QUERY) # API request. row_count = query_job.total_rows # Waits for query to finish. print(f"Query job result row count: {row_count}") if (row_count > 0): print("Pipeline has FAILED and rerun recently...") global has_pipeline_failed_and_rerun_recently has_pipeline_failed_and_rerun_recently = True
如果失敗的管道最近未執行,下列範例函式會重新執行失敗的管道。
if not has_pipeline_failed_and_rerun_recently: applicationName = applicationName auth_token = get_access_token() post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"} cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api" run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName) # Start the job. response = requests.post(run_pipeline_endpoint,headers=post_headers) print(f"Response for restarting the failed pipeline: {response}") global pipeline_rerun_count pipeline_rerun_count = 1
後續步驟
- 瞭解如何編寫 Cloud Run 函式。