本页面介绍了如何发布 Cloud Data Fusion 流水线事件,例如 流水线状态。此外,还介绍了如何创建 Cloud Run 函数来处理 Pub/Sub 消息并执行操作,例如识别和重试失败的流水线。
准备工作
- 创建一个主题,供 Pub/Sub 发布 Cloud Data Fusion 流水线事件。
所需的角色
为了确保 Cloud Data Fusion 服务账号具备必要的
拥有将流水线事件发布到 Pub/Sub 主题的权限,
请让管理员向 Cloud Data Fusion 服务账号授予
对您在其中创建 Pub/Sub 主题的项目的 Pub/Sub Publisher (roles/pubsub.publisher
) IAM 角色。
如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限。
您的管理员还可以通过自定义角色或其他预定义角色向 Cloud Data Fusion 服务账号授予所需的权限。
在 Cloud Data Fusion 实例中管理事件发布
您可以使用 6.7.0 及更高版本中的 REST API 管理新建和现有 Cloud Data Fusion 实例中的事件发布。
在新实例中发布事件
创建一个新实例并添加 EventPublishConfig
字段。有关
有关新实例的必填字段的信息,请参阅
实例资源
参考。
curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
"https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
-d '{
"version": "VERSION_NUMBER",
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
替换以下内容:
PROJECT_ID
:Google Cloud 项目 IDLOCATION
:项目的位置INSTANCE_ID
:您的 Cloud Data Fusion 实例的 IDVERSION_NUMBER
:Cloud Data Fusion 的版本 创建实例的位置,例如6.10.1
TOPIC_ID
:Pub/Sub 主题的 ID
在现有 Cloud Data Fusion 实例中启用事件发布
更新
EventPublishConfig
字段:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \
https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
-d '{
"event_publish_config": {
"enabled": true,
"topic": "projects/PROJECT_ID/topics/TOPIC_ID"
}
}'
替换以下内容:
PROJECT_ID
:Google Cloud 项目 IDLOCATION
: 项目的位置INSTANCE_ID
:您的 Cloud Data Fusion 实例的 IDTOPIC_ID
:Pub/Sub 主题的 ID
从实例中移除事件发布
如需从实例中移除事件发布功能,请将事件发布 enabled
值更新为 false
:
curl -X PATCH \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
-d '{ "event_publish_config": { "enabled": false } }'
创建用于读取 Pub/Sub 消息的函数
Cloud Run 函数可以读取 Pub/Sub 消息并执行 例如重试失败的流水线要创建 Cloud Run 函数 执行以下操作:
在 Google Cloud 控制台中,前往 Cloud Run 函数页面。
点击创建函数。
输入函数名称和区域。
在触发器类型字段中,选择 Cloud Pub/Sub。
输入 Pub/Sub 主题 ID。
点击下一步。
添加函数以读取 Pub/Sub 消息 操作。例如,您可以针对以下用例添加函数:
- 发送流水线故障提醒。
- 针对 KPI(例如记录数或运行信息)发送提醒。
- 重启未重新运行且失败的流水线。
如需查看 Cloud Run 函数示例,请参阅 用例部分。
点击部署。 如需了解详情,请参阅部署 Cloud Run 函数。
使用场景:记录流水线状态并重试失败的流水线
以下 Cloud Run 函数示例会读取有关流水线运行状态的 Pub/Sub 消息,然后在 Cloud Data Fusion 中重试失败的流水线。
以下示例函数涉及以下 Google Cloud 组件:
- Google Cloud 项目:创建 Cloud Run 函数和 Pub/Sub 主题的项目
- Pub/Sub 主题:Pub/Sub 主题 您的 Cloud Data Fusion 实例
- Cloud Data Fusion 实例:Cloud Data Fusion 您可以在其中设计和执行流水线
- BigQuery 表:用于捕获流水线状态以及运行和重新运行详情的 BigQuery 表
- Cloud Run 函数:您在 Cloud Run 中部署的 Cloud Run 函数 用于重试失败流水线的代码
以下 Cloud Run 函数示例读取 有关 Cloud Data Fusion 状态的 Pub/Sub 消息 事件。
# Triggered from a message on a Pub/Sub topic. @functions_framework.cloud_event def cdf_event_trigger(cloud_event): decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message. pubsub_message = json.loads(decoded_message) # Extract pipeline run details. projectName = pubsub_message["projectName"] publishTime = pubsub_message["publishTime"] instanceName = pubsub_message["instanceName"] namespace = pubsub_message["programStatusEventDetails"]["namespace"] applicationName = pubsub_message["programStatusEventDetails"]["applicationName"] status = pubsub_message["programStatusEventDetails"]["status"] event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms') print(f"projectName: {projectName}") print(f"publishTime: {publishTime}") print(f"instanceName: {instanceName}") print(f"namespace: {namespace}") print(f"applicationName: {applicationName}") print(f"status: {status}") print(f"event timestamp: {event_timestamp}") try: error = pubsub_message["programStatusEventDetails"]["error"] print(f"error: {error}") except: print(f"Pipeline: {applicationName}'s current status: {status}")
以下示例函数会创建并保存 BigQuery 表,并查询流水线运行详情。
# Global variables. pipeline_rerun_count = 0 has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes. table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information. # Update BigQuery table with the pipeline status and rerun details. schema=[ bigquery.SchemaField("Project_Name", "STRING"), bigquery.SchemaField("Instance_Name", "STRING"), bigquery.SchemaField("Namespace", "STRING"), bigquery.SchemaField("Pipeline_Name", "STRING"), bigquery.SchemaField("Pipeline_Status", "STRING"), bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"), bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"), ] # Prepare DataFrame to load the data in BigQuery. data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]} dataframe = pd.DataFrame(data) # Prepare BigQuery data load job configuration. job_config = bigquery.LoadJobConfig(schema=schema) job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config) job.result() # Wait for the job to complete. table = bq_client.get_table(table_id) # Make an API request. print("BigQuery table: {} updated.".format(table_id))
以下示例函数会检查是否有流水线失败,以及这些流水线是否在上一个小时内重新运行。
bq_client = bigquery.Client() if status == "FAILED": print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.") QUERY = f""" SELECT * FROM `{table_id}` WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED" AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE) AND Pipeline_Rerun_Count > 0 """ query_job = bq_client.query_and_wait(QUERY) # API request. row_count = query_job.total_rows # Waits for query to finish. print(f"Query job result row count: {row_count}") if (row_count > 0): print("Pipeline has FAILED and rerun recently...") global has_pipeline_failed_and_rerun_recently has_pipeline_failed_and_rerun_recently = True
如果失败的流水线最近未运行,则运行以下示例函数 重新运行失败的流水线。
if not has_pipeline_failed_and_rerun_recently: applicationName = applicationName auth_token = get_access_token() post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"} cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api" run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName) # Start the job. response = requests.post(run_pipeline_endpoint,headers=post_headers) print(f"Response for restarting the failed pipeline: {response}") global pipeline_rerun_count pipeline_rerun_count = 1
后续步骤
- 了解如何编写 Cloud Run 函数。