监控流水线状态

本页面介绍了如何发布 Cloud Data Fusion 流水线事件,例如 流水线状态。此外,还介绍了如何创建 Cloud Run 函数来处理 Pub/Sub 消息并执行操作,例如识别和重试失败的流水线。

准备工作

所需的角色

为了确保 Cloud Data Fusion 服务账号具备必要的 拥有将流水线事件发布到 Pub/Sub 主题的权限, 请让管理员向 Cloud Data Fusion 服务账号授予 对您在其中创建 Pub/Sub 主题的项目的 Pub/Sub Publisher (roles/pubsub.publisher) IAM 角色。 如需详细了解如何授予角色,请参阅管理对项目、文件夹和组织的访问权限

您的管理员还可以通过自定义角色或其他预定义角色向 Cloud Data Fusion 服务账号授予所需的权限。

在 Cloud Data Fusion 实例中管理事件发布

您可以使用 6.7.0 及更高版本中的 REST API 管理新建和现有 Cloud Data Fusion 实例中的事件发布。

在新实例中发布事件

创建一个新实例并添加 EventPublishConfig 字段。有关 有关新实例的必填字段的信息,请参阅 实例资源 参考。

curl -X POST \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances?instanceId=INSTANCE_ID" \
  -d '{
    "version": "VERSION_NUMBER",
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
  }'

替换以下内容:

  • PROJECT_ID:Google Cloud 项目 ID
  • LOCATION:项目的位置
  • INSTANCE_ID:您的 Cloud Data Fusion 实例的 ID
  • VERSION_NUMBER:Cloud Data Fusion 的版本 创建实例的位置,例如 6.10.1
  • TOPIC_ID:Pub/Sub 主题的 ID

在现有 Cloud Data Fusion 实例中启用事件发布

更新 EventPublishConfig 字段:

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \
  https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config \
  -d '{
    "event_publish_config": {
      "enabled": true,
      "topic": "projects/PROJECT_ID/topics/TOPIC_ID"
    }
}'

替换以下内容:

  • PROJECT_ID:Google Cloud 项目 ID
  • LOCATION: 项目的位置
  • INSTANCE_ID:您的 Cloud Data Fusion 实例的 ID
  • TOPIC_ID:Pub/Sub 主题的 ID

从实例中移除事件发布

如需从实例中移除事件发布功能,请将事件发布 enabled 值更新为 false

curl -X PATCH \
  -H "Authorization: Bearer $(gcloud auth print-access-token)" \
  -H "Content-Type: application/json" \ "https://datafusion.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/instances/INSTANCE_ID?updateMask=event_publish_config" \
  -d '{ "event_publish_config": { "enabled": false } }'

创建用于读取 Pub/Sub 消息的函数

Cloud Run 函数可以读取 Pub/Sub 消息并执行 例如重试失败的流水线要创建 Cloud Run 函数 执行以下操作:

  1. 在 Google Cloud 控制台中,前往 Cloud Run 函数页面。

    前往 Cloud Run 函数

  2. 点击创建函数

  3. 输入函数名称和区域。

  4. 触发器类型字段中,选择 Cloud Pub/Sub

  5. 输入 Pub/Sub 主题 ID。

  6. 点击下一步

  7. 添加函数以读取 Pub/Sub 消息 操作。例如,您可以针对以下用例添加函数:

    • 发送流水线故障提醒。
    • 针对 KPI(例如记录数或运行信息)发送提醒。
    • 重启未重新运行且失败的流水线。

    如需查看 Cloud Run 函数示例,请参阅 用例部分。

  8. 点击部署。 如需了解详情,请参阅部署 Cloud Run 函数

使用场景:记录流水线状态并重试失败的流水线

以下 Cloud Run 函数示例会读取有关流水线运行状态的 Pub/Sub 消息,然后在 Cloud Data Fusion 中重试失败的流水线。

以下示例函数涉及以下 Google Cloud 组件:

  • Google Cloud 项目:创建 Cloud Run 函数和 Pub/Sub 主题的项目
  • Pub/Sub 主题:Pub/Sub 主题 您的 Cloud Data Fusion 实例
  • Cloud Data Fusion 实例:Cloud Data Fusion 您可以在其中设计和执行流水线
  • BigQuery 表:用于捕获流水线状态以及运行和重新运行详情的 BigQuery 表
  • Cloud Run 函数:您在 Cloud Run 中部署的 Cloud Run 函数 用于重试失败流水线的代码
  1. 以下 Cloud Run 函数示例读取 有关 Cloud Data Fusion 状态的 Pub/Sub 消息 事件。

    # Triggered from a message on a Pub/Sub topic.
    @functions_framework.cloud_event
    def cdf_event_trigger(cloud_event):
    
    decoded_message = base64.b64decode(cloud_event.data["message"]["data"]).decode('utf-8') # Decode Pub/Sub message.
    pubsub_message = json.loads(decoded_message)
    
    # Extract pipeline run details.
    projectName = pubsub_message["projectName"]
    publishTime = pubsub_message["publishTime"]
    instanceName = pubsub_message["instanceName"]
    namespace = pubsub_message["programStatusEventDetails"]["namespace"]
    applicationName = pubsub_message["programStatusEventDetails"]["applicationName"]
    status = pubsub_message["programStatusEventDetails"]["status"]
    event_timestamp = pd.to_datetime(pubsub_message["programStatusEventDetails"]["eventTime"], unit = 'ms')
    
    print(f"projectName: {projectName}")
    print(f"publishTime: {publishTime}")
    print(f"instanceName: {instanceName}")
    print(f"namespace: {namespace}")
    print(f"applicationName: {applicationName}")
    print(f"status: {status}")
    print(f"event timestamp: {event_timestamp}")
    try:
        error = pubsub_message["programStatusEventDetails"]["error"]
        print(f"error: {error}")
    except:
        print(f"Pipeline: {applicationName}'s current status: {status}")
    
  2. 以下示例函数会创建并保存 BigQuery 表,并查询流水线运行详情。

    # Global variables.
    pipeline_rerun_count = 0
    has_pipeline_failed_and_rerun_recently = False # Timeframe: within last 60 minutes.
    table_id = "bigquery-table-1" # The BigQuery target table for storing pipeline run information.
    
    # Update BigQuery table with the pipeline status and rerun details.
    schema=[
        bigquery.SchemaField("Project_Name", "STRING"),
        bigquery.SchemaField("Instance_Name", "STRING"),
        bigquery.SchemaField("Namespace", "STRING"),
        bigquery.SchemaField("Pipeline_Name", "STRING"),
        bigquery.SchemaField("Pipeline_Status", "STRING"),
        bigquery.SchemaField("Event_Timestamp", "TIMESTAMP"),
        bigquery.SchemaField("Pipeline_Rerun_Count", "INTEGER"),
    ]
    
    # Prepare DataFrame to load the data in BigQuery.
    data = {'Project_Name':[projectName], 'Instance_Name':[instanceName], 'Namespace':[namespace], 'Pipeline_Name':[applicationName], 'Pipeline_Status':[status], 'Event_Timestamp':[event_timestamp], 'Pipeline_Rerun_Count':[pipeline_rerun_count]}
    dataframe = pd.DataFrame(data)
    
    # Prepare BigQuery data load job configuration.
    job_config = bigquery.LoadJobConfig(schema=schema)
    
    job = bq_client.load_table_from_dataframe(dataframe, table_id, job_config=job_config)
    job.result()  # Wait for the job to complete.
    
    table = bq_client.get_table(table_id)  # Make an API request.
    print("BigQuery table: {} updated.".format(table_id))
    
  3. 以下示例函数会检查是否有流水线失败,以及这些流水线是否在上一个小时内重新运行。

    bq_client = bigquery.Client()
    
    if status == "FAILED":
        print(f"ALERT -- Pipeline: {applicationName} has failed. Checking for rerun: pipeline hasn't failed and rerun in the last 60 minutes.")
    
        QUERY = f"""
            SELECT * FROM `{table_id}`
            WHERE Pipeline_Name = "{applicationName}" AND Pipeline_Status = "FAILED"
            AND "{event_timestamp}" < DATETIME_ADD(Event_Timestamp, INTERVAL 60 MINUTE)
            AND Pipeline_Rerun_Count > 0
            """
    
        query_job = bq_client.query_and_wait(QUERY)  # API request.
        row_count = query_job.total_rows  # Waits for query to finish.
        print(f"Query job result row count: {row_count}")
    
        if (row_count > 0):
            print("Pipeline has FAILED and rerun recently...")
            global has_pipeline_failed_and_rerun_recently
            has_pipeline_failed_and_rerun_recently = True
    
  4. 如果失败的流水线最近未运行,则运行以下示例函数 重新运行失败的流水线。

    if not has_pipeline_failed_and_rerun_recently:
        applicationName = applicationName
        auth_token = get_access_token()
        post_headers = {"Authorization": "Bearer " + auth_token,"Accept": "application/json"}
        cdap_endpoint = "https://instance1-project1-dot-location1.datafusion.googleusercontent.com/api"
        run_pipeline_endpoint = cdap_endpoint + "/v3/namespaces/{}/apps/{}/workflows/DataPipelineWorkflow/start".format(namespace, applicationName)
    
        # Start the job.
        response = requests.post(run_pipeline_endpoint,headers=post_headers)
        print(f"Response for restarting the failed pipeline: {response}")
        global pipeline_rerun_count
        pipeline_rerun_count = 1
    

后续步骤