监控持续查询

您可以使用以下 BigQuery 工具监控 BigQuery 持续查询

由于 BigQuery 持续查询运行时间较长的性质,因此通常在 SQL 查询完成时生成的指标可能不存在或不准确。

使用 INFORMATION_SCHEMA 视图

您可以使用多个 INFORMATION_SCHEMA 视图来监控持续查询和持续查询预留。

查看作业详情

您可以使用 JOBS 视图获取持续查询作业元数据。

以下查询会返回所有活跃持续查询的元数据。元数据包含输出水印时间戳,该时间戳表示持续查询已成功处理数据的时间点。

  1. 在 Google Cloud 控制台中,前往 BigQuery 页面。

    转到 BigQuery

  2. 在查询编辑器中,运行以下查询:

    SELECT
      start_time,
      job_id,
      user_email,
      query,
      state,
      reservation_id,
      continuous_query_info.output_watermark
    FROM `PROJECT_ID.region-REGION.INFORMATION_SCHEMA.JOBS`
    WHERE
      creation_time > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 7 day)
      AND continuous IS TRUE
      AND state = "RUNNING"
    ORDER BY
      start_time DESC

    替换以下内容:

查看预留分配详情

您可以使用 ASSIGNMENTSRESERVATIONS 视图获取持续查询预留分配详细信息。

返回持续查询的预留分配详细信息:

  1. 在 Google Cloud 控制台中,前往 BigQuery 页面。

    转到 BigQuery

  2. 在查询编辑器中,运行以下查询:

    SELECT
      reservation.reservation_name,
      reservation.slot_capacity
    FROM
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.ASSIGNMENTS`
        AS assignment
    INNER JOIN
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.RESERVATIONS`
        AS reservation
      ON (assignment.reservation_name = reservation.reservation_name)
    WHERE
      assignment.assignee_id = 'PROJECT_ID'
      AND job_type = 'CONTINUOUS';

    替换以下内容:

    • ADMIN_PROJECT_ID:拥有预留的管理项目的 ID。
    • LOCATION:预留的位置。
    • PROJECT_ID:分配给预留的项目的 ID。系统只会返回此项目中运行的持续查询的相关信息。

查看槽用量信息

您可以使用 ASSIGNMENTSRESERVATIONSJOBS_TIMELINE 视图获取持续查询槽用量信息。

返回持续查询的槽用量信息:

  1. 在 Google Cloud 控制台中,前往 BigQuery 页面。

    转到 BigQuery

  2. 在查询编辑器中,运行以下查询:

    SELECT
      jobs.period_start,
      reservation.reservation_name,
      reservation.slot_capacity,
      SUM(jobs.period_slot_ms) / 1000 AS consumed_total_slots
    FROM
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.ASSIGNMENTS`
        AS assignment
    INNER JOIN
      `ADMIN_PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.RESERVATIONS`
        AS reservation
      ON (assignment.reservation_name = reservation.reservation_name)
    INNER JOIN
      `PROJECT_ID.region-LOCATION.INFORMATION_SCHEMA.JOBS_TIMELINE` AS jobs
      ON (
        UPPER(CONCAT('ADMIN_PROJECT_ID:LOCATION.', assignment.reservation_name))
        = UPPER(jobs.reservation_id))
    WHERE
      assignment.assignee_id = 'PROJECT_ID'
      AND assignment.job_type = 'CONTINUOUS'
      AND jobs.period_start
        BETWEEN TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
        AND CURRENT_TIMESTAMP()
    GROUP BY 1, 2, 3
    ORDER BY jobs.period_start DESC;

    替换以下内容:

    • ADMIN_PROJECT_ID:拥有预留的管理项目的 ID。
    • LOCATION:预留的位置。
    • PROJECT_ID:分配给预留的项目的 ID。系统只会返回此项目中运行的持续查询的相关信息。

您还可以使用其他工具(例如 Metrics Explorer管理资源图表)监控持续查询预留。如需了解详情,请参阅监控 BigQuery 预留

使用查询执行图

您可以使用查询执行图获取持续查询的性能数据分析和一般统计信息。如需了解详情,请参阅查看查询性能数据分析

查看作业历史记录

您可以在个人作业历史记录或项目的作业历史记录中查看持续查询作业详细信息。如需了解详情,请参阅查看作业详细信息

请注意,历史作业列表按作业开始时间排序,因此已经运行了一段时间的持续查询可能不会靠近列表的开头。

使用管理作业探索器

在管理作业探索器中,过滤作业以显示持续查询,方法是将作业类别过滤条件设置为持续查询

使用 Cloud Monitoring

您可以使用 Cloud Monitoring 查看特定于 BigQuery 持续查询的指标。如需了解详情,请参阅创建信息中心、图表和提醒,并了解可用于直观呈现数据的指标

针对失败的查询触发提醒

与其定期检查持续查询是否失败,不如创建提醒来通知您失败情况。一种方法是创建自定义 Cloud Logging 基于日志的指标,其中包含作业的过滤条件,以及基于该指标的 Cloud Monitoring 提醒政策

  1. 创建持续查询时,使用自定义作业 ID 前缀。多个持续查询可以共享同一前缀。例如,您可以使用前缀 prod- 来表示生产查询。
  2. 在 Google Cloud 控制台中,前往基于日志的指标页面。

    转到“基于日志的指标”

  3. 点击创建指标。此时会显示创建日志指标面板。

  4. 对于指标类型,选择计数器

  5. 详细信息部分中,为指标指定名称。例如 CUSTOM_JOB_ID_PREFIX-metric

  6. 过滤条件选择部分中,在构建过滤条件编辑器中输入以下内容:

    resource.type = "bigquery_project"
    protoPayload.resourceName : "projects/PROJECT_ID/jobs/CUSTOM_JOB_ID_PREFIX"
    severity = ERROR
    

    替换以下内容:

    • PROJECT_ID:您的项目的名称。
    • CUSTOM_JOB_ID_PREFIX:您为持续查询设置的自定义作业 ID 前缀的名称。
  7. 点击创建指标

  8. 在导航菜单中,点击基于日志的指标。您刚创建的指标会显示在用户定义的指标列表中。

  9. 在相应指标的行中,点击 更多操作,然后点击根据指标创建提醒

  10. 点击下一步。您无需更改政策配置模式页面上的默认设置。

  11. 点击下一步。您无需更改配置提醒触发器页面上的默认设置。

  12. 选择通知渠道,然后输入提醒政策的名称。

  13. 点击创建政策

您可以运行具有所选自定义作业 ID 前缀的持续查询,然后取消该查询,以测试您的提醒。提醒可能需要几分钟时间才能到达您的通知渠道。

重试失败的查询

重试失败的持续查询有助于避免持续流水线长时间停机或需要人工干预才能重启的情况。重试失败的持续查询时,请务必考虑以下事项:

  • 是否可以接受重新处理上一个查询失败之前处理过的一些数据量。
  • 如何处理限制重试次数或使用指数退避算法。

以下是自动重试查询的一种可能方法:

  1. 创建 Cloud Logging 接收器,并根据符合以下条件的包含过滤器将日志路由到 Pub/Sub 主题:

    resource.type = "bigquery_project"
    protoPayload.resourceName : "projects/PROJECT_ID/jobs/CUSTOM_JOB_ID_PREFIX"
    severity = ERROR
    

    替换以下内容:

    • PROJECT_ID:您的项目的名称。
    • CUSTOM_JOB_ID_PREFIX:您为持续查询设置的自定义作业 ID 前缀的名称。
  2. 创建 Cloud Run 函数,该函数会在 Pub/Sub 收到与您的过滤条件匹配的日志时触发。

    Cloud Run 函数可以接受来自 Pub/Sub 消息的数据载荷,并尝试使用与失败查询相同的 SQL 语法启动新的持续查询,但开始时间仅在上一个作业停止之后。

例如,您可以使用类似于以下内容的函数:

Python

试用此示例之前,请按照 BigQuery 快速入门:使用客户端库中的 Python 设置说明进行操作。 如需了解详情,请参阅 BigQuery Python API 参考文档

如需向 BigQuery 进行身份验证,请设置应用默认凭证。如需了解详情,请参阅为客户端库设置身份验证

import base64
import json
import logging
import re
import uuid

import google.auth
import google.auth.transport.requests
import requests


def retry_continuous_query(event, context):
    logging.info("Cloud Function started.")

    if "data" not in event:
        logging.info("No data in Pub/Sub message.")
        return

    try:
        # Decode and parse the Pub/Sub message data
        log_entry = json.loads(base64.b64decode(event["data"]).decode("utf-8"))

        # Extract the SQL query and other necessary data
        proto_payload = log_entry.get("protoPayload", {})
        metadata = proto_payload.get("metadata", {})
        job_change = metadata.get("jobChange", {})
        job = job_change.get("job", {})
        job_config = job.get("jobConfig", {})
        query_config = job_config.get("queryConfig", {})
        sql_query = query_config.get("query")
        job_stats = job.get("jobStats", {})
        end_timestamp = job_stats.get("endTime")
        failed_job_id = job.get("jobName")

        # Check if required fields are missing
        if not all([sql_query, failed_job_id, end_timestamp]):
            logging.error("Required fields missing from log entry.")
            return

        logging.info(f"Retrying failed job: {failed_job_id}")

        # Adjust the timestamp in the SQL query
        timestamp_match = re.search(
            r"\s*TIMESTAMP\(('.*?')\)(\s*\+ INTERVAL 1 MICROSECOND)?", sql_query
        )

        if timestamp_match:
            original_timestamp = timestamp_match.group(1)
            new_timestamp = f"'{end_timestamp}'"
            sql_query = sql_query.replace(original_timestamp, new_timestamp)
        elif "CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE" in sql_query:
            new_timestamp = f"TIMESTAMP('{end_timestamp}') + INTERVAL 1 MICROSECOND"
            sql_query = sql_query.replace(
                "CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE", new_timestamp
            )

        # Get access token
        credentials, project = google.auth.default(
            scopes=["https://www.googleapis.com/auth/cloud-platform"]
        )
        request = google.auth.transport.requests.Request()
        credentials.refresh(request)
        access_token = credentials.token

        # API endpoint
        url = f"https://bigquery.googleapis.com/bigquery/v2/projects/{project}/jobs"

        # Request headers
        headers = {
            "Authorization": f"Bearer {access_token}",
            "Content-Type": "application/json",
        }

        # Generate a random UUID
        random_suffix = str(uuid.uuid4())[:8]  # Take the first 8 characters of the UUID

        # Combine the prefix and random suffix
        job_id = f"CUSTOM_JOB_ID_PREFIX{random_suffix}"

        # Request payload
        data = {
            "configuration": {
                "query": {
                    "query": sql_query,
                    "useLegacySql": False,
                    "continuous": True,
                    "connectionProperties": [
                        {"key": "service_account", "value": "SERVICE_ACCOUNT"}
                    ],
                    # ... other query parameters ...
                },
                "labels": {"bqux_job_id_prefix": "CUSTOM_JOB_ID_PREFIX"},
            },
            "jobReference": {
                "projectId": project,
                "jobId": job_id,  # Use the generated job ID here
            },
        }

        # Make the API request
        response = requests.post(url, headers=headers, json=data)

        # Handle the response
        if response.status_code == 200:
            logging.info("Query job successfully created.")
        else:
            logging.error(f"Error creating query job: {response.text}")

    except Exception as e:
        logging.error(
            f"Error processing log entry or retrying query: {e}", exc_info=True
        )

    logging.info("Cloud Function finished.")

后续步骤