创建持续查询

本文档介绍如何在 BigQuery 中运行持续查询

BigQuery 持续查询是持续运行的 SQL 语句。借助持续查询,您可以实时分析 BigQuery 中的传入数据,然后将结果导出到 Bigtable、Pub/Sub 或 Spanner,或将结果写入 BigQuery 表。

选择账号类型

您可以使用用户账号创建和运行持续查询作业,也可以使用用户账号创建持续查询作业,然后使用服务账号运行该作业。您必须使用服务账号来运行将结果导出到 Pub/Sub 主题的持续查询。

当您使用用户账号时,持续查询最多运行两天。当您使用服务账号时,持续查询最多运行 150 天。如需了解详情,请参阅授权

所需权限

本部分介绍创建和运行持续查询所需的权限。作为提到的 Identity and Access Management (IAM) 角色的替代方法,您还可以通过自定义角色获取所需的权限。

使用用户账号时需要的权限

本部分介绍了使用用户账号创建和运行持续查询所需的角色和权限。

如需在 BigQuery 中创建作业,用户账号必须具有 bigquery.jobs.create IAM 权限。以下每个 IAM 角色都可授予 bigquery.jobs.create 权限:

如需从 BigQuery 表导出数据,用户账号必须具有 bigquery.tables.export IAM 权限。以下每个 IAM 角色都可授予 bigquery.tables.export 权限:

如需更新 BigQuery 表中的数据,用户账号必须具有 bigquery.tables.updateData IAM 权限。以下每个 IAM 角色都可授予 bigquery.tables.updateData 权限:

如果用户账号必须启用持续查询用例所需的 API,则该用户账号必须具有 Service Usage Admin (roles/serviceusage.serviceUsageAdmin) 角色。

使用服务账号时需要的权限

本部分介绍了创建持续查询的用户账号和运行持续查询的服务账号所需的角色和权限。

用户账号权限

如需在 BigQuery 中创建作业,用户账号必须具有 bigquery.jobs.create IAM 权限。以下每个 IAM 角色都可授予 bigquery.jobs.create 权限:

如需提交使用服务账号运行的作业,用户账号必须具有 Service Account User (roles/iam.serviceAccountUser) 角色。如果您使用同一用户账号来创建服务账号,则该用户账号必须具有 Service Account Admin (roles/iam.serviceAccountAdmin) 角色。如需了解如何将用户的访问权限限制到单个服务账号(而不是项目中的所有服务账号),请参阅授予单个角色

如果用户账号必须启用持续查询用例所需的 API,则该用户账号必须具有 Service Usage Admin (roles/serviceusage.serviceUsageAdmin) 角色。

服务账号权限

如需从 BigQuery 表导出数据,服务账号必须具有 bigquery.tables.export IAM 权限。以下每个 IAM 角色都可授予 bigquery.tables.export 权限:

如需更新 BigQuery 表中的数据,服务账号必须具有 bigquery.tables.updateData IAM 权限。以下每个 IAM 角色都可授予 bigquery.tables.updateData 权限:

准备工作

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Roles required to select or create a project

    • Select a project: Selecting a project doesn't require a specific IAM role—you can select any project that you've been granted a role on.
    • Create a project: To create a project, you need the Project Creator (roles/resourcemanager.projectCreator), which contains the resourcemanager.projects.create permission. Learn how to grant roles.

    Go to project selector

  2. Verify that billing is enabled for your Google Cloud project.

  3. Enable the BigQuery API.

    Roles required to enable APIs

    To enable APIs, you need the Service Usage Admin IAM role (roles/serviceusage.serviceUsageAdmin), which contains the serviceusage.services.enable permission. Learn how to grant roles.

    Enable the API

创建预留

创建企业版或企业 Plus 版预留,然后使用 CONTINUOUS 作业类型创建预留分配。此预留可以使用自动扩缩。 持续查询存在适用于预留分配的预留限制

导出到 Pub/Sub

将数据导出到 Pub/Sub 需要使用其他 API、IAM 权限和 Google Cloud 资源。如需了解详情,请参阅导出到 Pub/Sub

在 Pub/Sub 消息中嵌入自定义属性作为元数据

您可以使用 Pub/Sub 属性提供有关消息的附加信息,例如优先级、来源、目的地或附加元数据。您还可以使用属性过滤有关订阅的消息

在持续查询结果中,如果某个列名为 _ATTRIBUTES,则其值会复制到 Pub/Sub 消息属性中。_ATTRIBUTES 中提供的字段用作属性键。

_ATTRIBUTES 列必须是 JSON 类型,格式为 ARRAY<STRUCT<STRING, STRING>>STRUCT<STRING>

如需查看示例,请参阅将数据导出到 Pub/Sub 主题

导出到 Bigtable

将数据导出到 Bigtable 需要使用其他 API、IAM 权限和 Google Cloud资源。如需了解详情,请参阅导出到 Bigtable

导出到 Spanner

将数据导出到 Spanner 需要使用其他 API、IAM 权限和 Google Cloud资源。如需了解详情,请参阅导出到 Spanner(反向 ETL)

将数据写入 BigQuery 表

您可以使用 INSERT 语句将数据写入 BigQuery 表。

使用 AI 函数

在持续查询中使用受支持的 AI 函数需要使用其他 API、IAM 权限和 Google Cloud资源。如需了解详情,请根据您的用例查看以下主题之一:

在持续查询中使用 AI 函数时,请考虑查询输出是否会保持在相应函数的配额范围内。如果您超出配额,则可能需要单独处理未处理的记录。

指定起始点

您必须在持续查询的 FROM 子句中使用 APPENDS 函数,来指定要处理的最早数据。例如,APPENDS(TABLE my_table, CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) 会告知 BigQuery 处理在持续查询开始前最多 10 分钟添加到表 my_table 中的数据。添加到 my_table 中的数据会在添加时进行处理。数据处理不会受到任何人为延迟。 在持续查询中使用 APPENDS 函数时,请勿向该函数提供 end_timestamp 参数。

以下示例展示了如何在查询接收流式出租车行程信息的 BigQuery 表时,使用 APPENDS 函数从特定时间点启动持续查询:

EXPORT DATA
  OPTIONS (format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS (
  SELECT
    TO_JSON_STRING(STRUCT(ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE
    ride_status = 'enroute');

指定早于当前时间的起始点

如果您想处理当前时间点之前的数据,可以使用 APPENDS 函数为查询指定较早的起始点。您指定的起始点必须在您要从中进行选择的表的时间旅行窗口内。默认情况下,时间旅行窗口涵盖过去 7 天。

如需纳入超出时间旅行窗口的数据,请使用标准查询来插入或导出截至特定时间点的数据,然后从该时间点启动持续查询。

示例

以下示例展示了如何从接收流式出租车行程信息的 BigQuery 表中将截至特定时间点的较旧数据加载到某个表中,然后从较旧数据的截止点启动持续查询。

  1. 运行标准查询,以回填截至特定时间点的数据:

    INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
    SELECT
      timestamp,
      meter_reading,
      ride_status,
      passenger_count,
      ST_Distance(
        ST_GeogPoint(pickup_longitude, pickup_latitude),
        ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
        SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
    FROM `myproject.real_time_taxi_streaming.taxirides`
      -- Include all data inserted into the table up to this point in time.
      -- This timestamp must be within the time travel window.
      FOR SYSTEM_TIME AS OF '2025-01-01 00:00:00 UTC'
    WHERE
      ride_status = 'dropoff';
  2. 从查询停止的时间点运行持续查询:

    INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
    SELECT
      timestamp,
      meter_reading,
      ride_status,
      passenger_count,
      ST_Distance(
        ST_GeogPoint(pickup_longitude, pickup_latitude),
        ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
        SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
    FROM
      APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
        -- Configure the APPENDS TVF start_timestamp to start processing
        -- data right where the batch query left off.
        -- This timestamp must be within the time travel window.
        TIMESTAMP '2025-01-01 00:00:00 UTC' + INTERVAL 1 MICROSECOND)
    WHERE
      ride_status = 'dropoff';

使用用户账号运行持续查询

本部分介绍如何使用用户账号运行持续查询。持续查询运行后,您可以关闭 Google Cloud 控制台、终端窗口或应用,而不会中断查询执行。由用户账号运行的持续查询最多可运行两天,然后会自动停止。如需继续处理新的传入数据,请启动新的持续查询并指定起点。 如需自动执行此过程,请参阅重试失败的查询

如需运行持续查询,请按以下步骤操作:

控制台

  1. 在 Google Cloud 控制台中,前往 BigQuery 页面。

    转到 BigQuery

  2. 在查询编辑器中,点击 更多

    1. 选择查询模式部分中,选择持续查询
    2. 点击确认
    3. 可选:如需控制查询的运行时长,请点击查询设置,然后设置作业超时(以毫秒为单位)。
  3. 在查询编辑器中,输入持续查询的 SQL 语句。SQL 语句只能包含支持的操作

  4. 点击运行

bq

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

  2. 在 Cloud Shell 中,使用带有 --continuous 标志的 bq query 命令运行持续查询:

    bq query --use_legacy_sql=false --continuous=true
    'QUERY'

    QUERY 替换为用于持续查询的 SQL 语句。SQL 语句只能包含支持的操作。 您可以使用 --job_timeout_ms 标志控制查询的运行时长。

  3. API

    通过调用 jobs.insert 方法运行持续查询。您必须在传入的 Job 资源的 JobConfigurationQuery 中将 continuous 字段设置为 true。 您可以选择通过设置 jobTimeoutMs 字段来控制查询的运行时长。

    curl --request POST \
      "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \
      --header "Authorization: Bearer $(gcloud auth print-access-token)" \
      --header "Content-Type: application/json; charset=utf-8" \
      --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true}}}' \
      --compressed

    替换以下内容:

    • PROJECT_ID:您的项目 ID。
    • QUERY:持续查询的 SQL 语句。SQL 语句只能包含支持的操作

使用服务账号运行持续查询

本部分介绍如何使用服务账号运行持续查询。持续查询运行后,您可以关闭 Google Cloud 控制台、终端窗口或应用,而不会中断查询执行。使用服务账号运行的持续查询最多可运行 150 天,然后会自动停止。如需继续处理新的传入数据,请启动新的持续查询并指定起点。 如需自动执行此过程,请参阅重试失败的查询

如需使用服务账号运行持续查询,请按以下步骤操作:

控制台

  1. 创建服务账号
  2. 向服务账号授予所需权限
  3. 在 Google Cloud 控制台中,前往 BigQuery 页面。

    转到 BigQuery

  4. 在查询编辑器中,点击更多

  5. 选择查询模式部分中,选择持续查询

  6. 点击确认

  7. 在查询编辑器中,依次点击更多 > 查询设置

  8. 持续查询部分中,使用服务账号框选择您创建的服务账号。

  9. 可选:如需控制查询的运行时长,请设置作业超时(以毫秒为单位)。

  10. 点击保存

  11. 在查询编辑器中,输入持续查询的 SQL 语句。SQL 语句只能包含支持的操作

  12. 点击运行

bq

  1. 创建服务账号
  2. 向服务账号授予所需权限
  3. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

  4. 在命令行上,使用带有以下标志的 bq query 命令运行持续查询:

    • --continuous 标志设置为 true 以使查询持续。
    • 使用 --connection_property 标志指定要使用的服务账号。
    • 可选:设置 --job_timeout_ms 标志以限制查询运行时。
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'QUERY'

    替换以下内容:

    • PROJECT_ID:您的项目 ID。
    • SERVICE_ACCOUNT_EMAIL:服务账号电子邮件地址。您可以在 Google Cloud 控制台的服务账号页面上获取服务账号邮箱。
    • QUERY:持续查询的 SQL 语句。SQL 语句只能包含支持的操作
  5. API

    1. 创建服务账号
    2. 向服务账号授予所需权限
    3. 通过调用 jobs.insert 方法运行持续查询。在您传入的 Job 资源JobConfigurationQuery 资源中设置以下字段:

      • continuous 字段设置为 true 以使查询持续。
      • 使用 connectionProperties 字段指定要使用的服务账号。

      您可以选择通过在 JobConfiguration 资源中设置 jobTimeoutMs 字段来控制查询的运行时长。

      curl --request POST \
        "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \
        --header "Authorization: Bearer $(gcloud auth print-access-token)" \
        --header "Content-Type: application/json; charset=utf-8" \
        --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":[{"key":"service_account","value":"SERVICE_ACCOUNT_EMAIL"}]}}}' \
        --compressed

      替换以下内容:

      • PROJECT_ID:您的项目 ID。
      • QUERY:持续查询的 SQL 语句。SQL 语句只能包含支持的操作
      • SERVICE_ACCOUNT_EMAIL:服务账号电子邮件地址。您可以在 Google Cloud 控制台的服务账号页面上获取服务账号邮箱。

创建自定义作业 ID

每个查询作业都会分配一个作业 ID,您可以使用该 ID 来搜索和管理作业。默认情况下,作业 ID 是随机生成的。为了更轻松地使用作业历史记录作业探索器搜索持续查询的作业 ID,您可以分配自定义作业 ID 前缀:

  1. 在 Google Cloud 控制台中,前往 BigQuery 页面。

    转到 BigQuery

  2. 在查询编辑器中,点击更多

  3. 选择查询模式部分中,选择持续查询

  4. 点击确认

  5. 在查询编辑器中,依次点击更多 > 查询设置

  6. 自定义作业 ID 前缀部分中,输入自定义名称前缀。

  7. 点击保存

示例

以下 SQL 示例展示了持续查询的常见用例。

将数据导出到 Pub/Sub 主题

以下示例展示了一个持续查询,该查询过滤来自接收流式出租车行程信息的 BigQuery 表的数据,并使用消息属性将数据实时发布到 Pub/Sub 主题:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message,
    TO_JSON(
      STRUCT(
        CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
);

将数据导出到 Bigtable 表

以下示例展示了一个持续查询,该查询过滤来自接收流式出租车行程信息的 BigQuery 表的数据,并将数据实时导出到 Bigtable 表中:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_BIGTABLE',
    truncate = TRUE,
    overwrite = TRUE,
    uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides')
AS (
  SELECT
    CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey,
    STRUCT(
      timestamp,
      latitude,
      longitude,
      meter_reading,
      ride_status,
      passenger_count) AS features
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
);

将数据导出到 Spanner 表

以下示例展示了一个持续查询,该查询过滤来自接收流式出租车行程信息的 BigQuery 表的数据,并将数据实时导出到 Spanner 表中:

EXPORT DATA
 OPTIONS (
   format = 'CLOUD_SPANNER',
   uri = 'https://spanner.googleapis.com/projects/myproject/instances/myspannerinstance/databases/taxi-real-time-rides',
   spanner_options ="""{
      "table": "rides",
      -- To ensure data is written to Spanner in the correct sequence
      -- during a continuous export, use the change_timestamp_column
      -- option. This should be mapped to a timestamp column from your
      -- BigQuery data. If your source data lacks a timestamp, the 
      -- _CHANGE_TIMESTAMP pseudocolumn provided by the APPENDS function 
      -- will be automatically mapped to the "change_timestamp" column.
      "change_timestamp_column": "change_timestamp"
   }"""
  )
  AS (
  SELECT
    ride_id,
    latitude,
    longitude,
    meter_reading,
    ride_status,
    passenger_count
  FROM APPENDS(
        TABLE `myproject.real_time_taxi_streaming.taxirides`,
        -- Configure the APPENDS TVF start_timestamp to specify when you want to
        -- start processing data using your continuous query.
        -- This example starts processing at 10 minutes before the current time.
        CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
  );

将数据写入 BigQuery 表

以下示例展示了一个持续查询,该查询过滤并转换来自接收流式出租车行程信息的 BigQuery 表的数据,然后将数据实时写入另一个 BigQuery 表。这样一来,数据便可用于进一步的下游分析。

INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
SELECT
  timestamp,
  meter_reading,
  ride_status,
  passenger_count,
  ST_Distance(
    ST_GeogPoint(pickup_longitude, pickup_latitude),
    ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
    SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
FROM
  APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
    -- Configure the APPENDS TVF start_timestamp to specify when you want to
    -- start processing data using your continuous query.
    -- This example starts processing at 10 minutes before the current time.
    CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
WHERE
  ride_status = 'dropoff';

使用 Vertex AI 模型处理数据

以下示例展示了一个持续查询,该查询使用 Vertex AI 模型根据出租车乘客当前的纬度和经度为其生成广告,然后将结果实时导出到 Pub/Sub 主题:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude,
        prompt,
        ml_generate_text_llm_result)) AS message
  FROM
    ML.GENERATE_TEXT(
      MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`,
      (
        SELECT
          timestamp,
          ride_id,
          latitude,
          longitude,
          CONCAT(
            'Generate an ad based on the current latitude of ',
            latitude,
            ' and longitude of ',
            longitude) AS prompt
        FROM
          APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
            -- Configure the APPENDS TVF start_timestamp to specify when you
            -- want to start processing data using your continuous query.
            -- This example starts processing at 10 minutes before the current time.
            CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
        WHERE ride_status = 'enroute'
      ),
      STRUCT(
        50 AS max_output_tokens,
        1.0 AS temperature,
        40 AS top_k,
        1.0 AS top_p,
        TRUE AS flatten_json_output))
      AS ml_output
);

修改持续查询的 SQL

在持续查询作业运行期间,您无法更新持续查询中使用的 SQL。您必须取消持续查询作业,修改 SQL,然后从停止原始持续查询作业的时间点启动新的持续查询作业。

请按照以下步骤修改持续查询中使用的 SQL:

  1. 对要更新的持续查询作业查看作业详细信息,并记下作业 ID。
  2. 如果可能,请暂停收集上游数据。如果您无法执行此操作,则在重新启动持续查询时可能会出现一些数据重复。
  3. 取消要修改的持续查询
  4. 使用 INFORMATION_SCHEMA JOBS 视图获取原始持续查询作业的 end_time 值:

    SELECT end_time
    FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
    WHERE
      EXTRACT(DATE FROM creation_time) = current_date()
    AND error_result.reason = 'stopped'
    AND job_id = 'JOB_ID';

    替换以下内容:

    • PROJECT_ID:您的项目 ID。
    • REGION:项目使用的区域。
    • JOB_ID:您在第 1 步中确定的持续查询作业 ID。
  5. 修改持续查询 SQL 语句,以从特定时间点启动持续查询(使用您在第 5 步中检索的 end_time 值作为起始点)。

  6. 修改持续查询 SQL 语句,以反映您需要进行的更改。

  7. 运行修改后的持续查询。

取消持续查询

您可以像取消任何其他作业一样取消持续查询作业。在作业取消后,查询最多可能需要一分钟才能停止运行。

如果您取消查询,然后重新启动查询,重新启动的查询的行为会类似于新的独立查询。重新启动的查询不会开始处理上一个作业停止处理的数据,并且无法引用上一个查询的结果。请参阅从特定时间点启动持续查询

监控查询和处理错误

持续查询可能会因数据不一致、架构更改、临时服务中断或维护等因素而中断。虽然 BigQuery 可以处理一些暂时性错误,但以下最佳实践有助于提高作业的弹性:

后续步骤