
本页介绍了如何将 Cloud Data Fusion 流水线事件(例如流水线状态)发布到 Pub/Sub 主题。此外,还介绍了如何创建 Cloud Run 函数来处理 Pub/Sub 消息并执行操作,例如识别和重试失败的流水线。



为了确保 Cloud Data Fusion 服务账号拥有将流水线事件发布到 Pub/Sub 主题的必要权限,请让您的管理员为 Cloud Data Fusion 服务账号授予您创建 Pub/Sub 主题的项目的 Pub/Sub Publisher (roles/pubsub.publisher) IAM 角色。


您的管理员也可以通过自定义角色或其他预定义角色向 Cloud Data Fusion 服务账号授予所需的权限。

在 Cloud Data Fusion 实例中管理事件发布

您可以使用 6.7.0 及更高版本中的 REST API 管理新建和现有 Cloud Data Fusion 实例中的事件发布。


创建一个新实例并添加 EventPublishConfig 字段。如需详细了解新实例的必需字段,请参阅实例资源参考。

curl -X POST \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
  -d '{
    "version": "VERSION_NUMBER",
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"


  • PROJECT_ID: Google Cloud 项目 ID
  • LOCATION:项目的位置
  • INSTANCE_ID:您的 Cloud Data Fusion 实例的 ID
  • VERSION_NUMBER:您创建实例时所用的 Cloud Data Fusion 版本,例如 6.10.1
  • TOPIC_ID:Pub/Sub 主题的 ID

在现有 Cloud Data Fusion 实例中启用事件发布

更新现有 Cloud Data Fusion 实例中的 EventPublishConfig 字段:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
  -d '{
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"


  • PROJECT_ID: Google Cloud 项目 ID
  • LOCATION:项目的位置
  • INSTANCE_ID:您的 Cloud Data Fusion 实例的 ID
  • TOPIC_ID:Pub/Sub 主题的 ID


如需从实例中移除事件发布功能,请将事件发布 enabled 值更新为 false

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
  -d '{ "event_publish_config": { "enabled": false } }'

创建用于读取 Pub/Sub 消息的函数

Cloud Run 函数可以读取 Pub/Sub 消息并对其执行操作,例如重试失败的流水线。如需创建 Cloud Run 函数,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往 Cloud Run 函数页面。

    前往 Cloud Run 函数

  2. 点击创建函数

  3. 输入函数名称和区域。

  4. 触发器类型字段中,选择 Cloud Pub/Sub

  5. 输入 Pub/Sub 主题 ID。

  6. 点击下一步

  7. 添加函数以读取 Pub/Sub 消息并执行其他操作。例如,您可以针对以下用例添加函数:

    • 针对流水线故障发送提醒。
    • 针对 KPI(例如记录数或运行信息)发送提醒。
    • 重启尚未重新运行的失败流水线。

    如需查看 Cloud Run 函数示例,请参阅使用场景部分。

  8. 点击部署。 如需了解详情,请参阅部署 Cloud Run 函数


以下 Cloud Run functions 示例会读取有关流水线运行状态的 Pub/Sub 消息,然后在 Cloud Data Fusion 中重试失败的流水线。

以下示例函数引用了以下 Google Cloud 组件:

  • Google Cloud project:创建 Cloud Run 函数和 Pub/Sub 主题的项目
  • Pub/Sub 主题:与您的 Cloud Data Fusion 实例关联的 Pub/Sub 主题
  • Cloud Data Fusion 实例:您设计和执行流水线的 Cloud Data Fusion 实例
  • BigQuery 表:用于捕获流水线状态以及运行和重新运行详情的 BigQuery 表
  • Cloud Run 函数:您部署用于重试失败流水线的代码的 Cloud Run 函数
  1. 以下 Cloud Run 函数示例会读取有关 Cloud Data Fusion 状态事件的 Pub/Sub 消息。

    # Triggered from a message on a Pub/Sub topic.
    def cdf_event_trigger(cloud_event):
    decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message.
    pubsub_message = json.loads(decoded_message)
    # Extract pipeline run details.
    projectName = pubsub_message["projectName"]
    publishTime = pubsub_message["publishTime"]
    instanceName = pubsub_message["instanceName"]
    namespace = pubsub_message["programStatusEventDetails"]["namespace"]
    applicationName = pubsub_message["programStatusEventDetails"]["applicationName"]
    status = pubsub_message["programStatusEventDetails"]["status"]
    event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms')
    print(f"projectName: {projectName}")
    print(f"publishTime: {publishTime}")
    print(f"instanceName: {instanceName}")
    print(f"namespace: {namespace}")
    print(f"applicationName: {applicationName}")
    print(f"status: {status}")
    print(f"event timestamp: {event_timestamp}")
        error = pubsub_message["programStatusEventDetails"]["error"]
        print(f"error: {error}")
        print(f"Pipeline: {applicationName}'s current status: {status}")
  2. 以下示例函数会创建并保存 BigQuery 表,并查询流水线运行详情。

    # Global variables.
    pipeline_rerun_count = 0
    has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes.
    table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information.
    # Update BigQuery table with the pipeline status and rerun details.
        bigquery.SchemaField("Project_Name", "STRING"),
        bigquery.SchemaField("Instance_Name", "STRING"),
        bigquery.SchemaField("Namespace", "STRING"),
        bigquery.SchemaField("Pipeline_Name", "STRING"),
        bigquery.SchemaField("Pipeline_Status", "STRING"),
        bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"),
        bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"),
    # Prepare DataFrame to load the data in BigQuery.
    data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]}
    dataframe = pd.DataFrame(data)
    # Prepare BigQuery data load job configuration.
    job_config = bigquery.LoadJobConfig(schema=schema)
    job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
    job.result()  # Wait for the job to complete.
    table = bq_client.get_table(table_id)  # Make an API request.
    print("BigQuery table: {} updated.".format(table_id))
  3. 以下示例函数会检查是否有失败的流水线,以及这些流水线是否在上一个小时内重新运行。

    bq_client = bigquery.Client()
    if status == "FAILED":
        print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.")
        QUERY = f"""
            SELECT * FROM `{table_id}`
            WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED"
            AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE)
            AND Pipeline_Rerun_Count > 0
        query_job = bq_client.query_and_wait(QUERY)  # API request.
        row_count = query_job.total_rows  # Waits for query to finish.
        print(f"Query job result row count: {row_count}")
        if (row_count > 0):
            print("Pipeline has FAILED and rerun recently...")
            global has_pipeline_failed_and_rerun_recently
            has_pipeline_failed_and_rerun_recently = True
  4. 如果最近未运行失败的流水线,以下示例函数会重新运行失败的流水线。

    if not has_pipeline_failed_and_rerun_recently:
        applicationName = applicationName
        auth_token = get_access_token()
        post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
        cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api"
        run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName)
        # Start the job.
        response = requests.post(run_pipeline_endpoint,headers=post_headers)
        print(f"Response for restarting the failed pipeline: {response}")
        global pipeline_rerun_count
        pipeline_rerun_count = 1
