监控流水线状态

本页面介绍如何将 Cloud Data Fusion 流水线事件(例如流水线状态)发布到 Pub/Sub 主题。本文档还介绍了如何创建处理 Pub/Sub 消息并采取相应行动(例如识别失败的流水线并重试)的 Cloud Run 函数。

准备工作

  • 创建主题,以便 Pub/Sub 可以将 Cloud Data Fusion 流水线事件发布到该主题。

所需的角色

为了确保 Cloud Data Fusion 服务账号拥有将流水线事件发布到 Pub/Sub 主题所需的权限,请让您的管理员为 Cloud Data Fusion 服务账号授予您创建 Pub/Sub 主题的项目中的 Pub/Sub Publisher (roles/pubsub.publisher) IAM 角色。

如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您的管理员也可以通过自定义角色或其他预定义角色向 Cloud Data Fusion 服务账号授予所需的权限。

管理 Cloud Data Fusion 实例中的事件发布

您可以使用 6.7.0 版及更高版本中的 REST API 管理新 Cloud Data Fusion 实例和现有 Cloud Data Fusion 实例中的事件发布。

在新实例中发布活动

创建新实例并添加 EventPublishConfig 字段。如需详细了解新实例的必需字段,请参阅实例资源参考文档。

curl -X POST \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
  -d '{
    "version": "VERSION_NUMBER",
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
  }'

替换以下内容:

  • PROJECT_ID: Google Cloud 项目 ID
  • LOCATION:项目的位置
  • INSTANCE_ID:Cloud Data Fusion 实例的 ID
  • VERSION_NUMBER:您创建实例时所用的 Cloud Data Fusion 版本,例如 6.10.1
  • TOPIC_ID:Pub/Sub 主题的 ID

在现有 Cloud Data Fusion 实例中启用事件发布

更新现有 Cloud Data Fusion 实例中的 EventPublishConfig 字段:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
  -d '{
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
}'

替换以下内容:

  • PROJECT_ID: Google Cloud 项目 ID
  • LOCATION:项目位置
  • INSTANCE_ID:Cloud Data Fusion 实例的 ID
  • TOPIC_ID:Pub/Sub 主题的 ID

从实例中移除活动发布

如需从实例中移除事件发布,请将事件发布 enabled 值更新为 false

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
  -d '{ "event_publish_config": { "enabled": false } }'

创建用于读取 Pub/Sub 消息的函数

Cloud Run functions 可以读取 Pub/Sub 消息并对其采取行动,例如重试失败的流水线。如需创建 Cloud Run 函数,请执行以下操作:

  1. 在 Google Cloud 控制台中,前往 Cloud Run functions 页面。

    前往 Cloud Run functions

  2. 点击创建函数

  3. 输入函数名称和区域。

  4. 触发器类型字段中,选择 Cloud Pub/Sub

  5. 输入 Pub/Sub 主题 ID。

  6. 点击下一步

  7. 添加函数以读取 Pub/Sub 消息并执行其他操作。例如,您可以针对以下使用情形添加函数:

    • 针对流水线故障发送提醒。
    • 针对 KPI(例如记录数或运行信息)发送提醒。
    • 重新启动尚未重新运行的失败流水线。

    如需查看 Cloud Run 函数示例,请参阅使用场景部分。

  8. 点击部署。 如需了解详情,请参阅部署 Cloud Run 函数

用例:记录流水线状态并重试失败的流水线

以下示例 Cloud Run functions 会读取有关流水线运行状态的 Pub/Sub 消息,然后在 Cloud Data Fusion 中重试失败的流水线。

这些示例函数引用了以下 Google Cloud 组件:

  • Google Cloud project:创建 Cloud Run functions 和 Pub/Sub 主题的项目
  • Pub/Sub 主题:与 Cloud Data Fusion 实例关联的 Pub/Sub 主题
  • Cloud Data Fusion 实例:您在其中设计和执行流水线的 Cloud Data Fusion 实例
  • BigQuery 表:用于捕获流水线状态以及运行和重新运行详细信息的 BigQuery 表
  • Cloud Run 函数:您在其中部署用于重试失败流水线的代码的 Cloud Run 函数
  1. 以下 Cloud Run 函数示例会读取有关 Cloud Data Fusion 状态事件的 Pub/Sub 消息。

    # Triggered from a message on a Pub/Sub topic.
    @functions_framework.cloud_event
    def cdf_event_trigger(cloud_event):
    
    decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message.
    pubsub_message = json.loads(decoded_message)
    
    # Extract pipeline run details.
    projectName = pubsub_message["projectName"]
    publishTime = pubsub_message["publishTime"]
    instanceName = pubsub_message["instanceName"]
    namespace = pubsub_message["programStatusEventDetails"]["namespace"]
    applicationName = pubsub_message["programStatusEventDetails"]["applicationName"]
    status = pubsub_message["programStatusEventDetails"]["status"]
    event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms')
    
    print(f"projectName: {projectName}")
    print(f"publishTime: {publishTime}")
    print(f"instanceName: {instanceName}")
    print(f"namespace: {namespace}")
    print(f"applicationName: {applicationName}")
    print(f"status: {status}")
    print(f"event timestamp: {event_timestamp}")
    try:
        error = pubsub_message["programStatusEventDetails"]["error"]
        print(f"error: {error}")
    except:
        print(f"Pipeline: {applicationName}'s current status: {status}")
    
  2. 以下示例函数会创建并保存 BigQuery 表,并查询流水线运行详情。

    # Global variables.
    pipeline_rerun_count = 0
    has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes.
    table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information.
    
    # Update BigQuery table with the pipeline status and rerun details.
    schema=[
        bigquery.SchemaField("Project_Name", "STRING"),
        bigquery.SchemaField("Instance_Name", "STRING"),
        bigquery.SchemaField("Namespace", "STRING"),
        bigquery.SchemaField("Pipeline_Name", "STRING"),
        bigquery.SchemaField("Pipeline_Status", "STRING"),
        bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"),
        bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"),
    ]
    
    # Prepare DataFrame to load the data in BigQuery.
    data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]}
    dataframe = pd.DataFrame(data)
    
    # Prepare BigQuery data load job configuration.
    job_config = bigquery.LoadJobConfig(schema=schema)
    
    job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
    job.result()  # Wait for the job to complete.
    
    table = bq_client.get_table(table_id)  # Make an API request.
    print("BigQuery table: {} updated.".format(table_id))
    
  3. 以下示例函数用于检查失败的流水线以及这些流水线是否在过去一小时内重新运行过。

    bq_client = bigquery.Client()
    
    if status == "FAILED":
        print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.")
    
        QUERY = f"""
            SELECT * FROM `{table_id}`
            WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED"
            AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE)
            AND Pipeline_Rerun_Count > 0
            """
    
        query_job = bq_client.query_and_wait(QUERY)  # API request.
        row_count = query_job.total_rows  # Waits for query to finish.
        print(f"Query job result row count: {row_count}")
    
        if (row_count > 0):
            print("Pipeline has FAILED and rerun recently...")
            global has_pipeline_failed_and_rerun_recently
            has_pipeline_failed_and_rerun_recently = True
    
  4. 如果失败的流水线最近未运行,以下示例函数会重新运行该流水线。

    if not has_pipeline_failed_and_rerun_recently:
        applicationName = applicationName
        auth_token = get_access_token()
        post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
        cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api"
        run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName)
    
        # Start the job.
        response = requests.post(run_pipeline_endpoint,headers=post_headers)
        print(f"Response for restarting the failed pipeline: {response}")
        global pipeline_rerun_count
        pipeline_rerun_count = 1
    

后续步骤