创建持续查询

本文档介绍如何在 BigQuery 中运行持续查询

BigQuery 持续查询是持续运行的 SQL 语句。借助持续查询,您可以实时分析 BigQuery 中的传入数据,然后将结果导出到 Bigtable 或 Pub/Sub,或将结果写入 BigQuery 表。

选择账号类型

您可以使用用户账号创建和运行持续查询作业,也可以使用用户账号创建持续查询作业,然后使用服务账号运行该作业。您必须使用服务账号来运行将结果导出到 Pub/Sub 主题的持续查询。

当您使用用户账号时,持续查询会运行两天。当您使用服务账号时,持续查询会一直运行,直到明确取消为止。如需了解详情,请参阅授权

所需权限

本部分介绍创建和运行持续查询所需的权限。作为提到的 Identity and Access Management (IAM) 角色的替代方法,您还可以通过自定义角色获取所需的权限。

使用用户账号时需要的权限

本部分介绍了使用用户账号创建和运行持续查询所需的角色和权限。

如需在 BigQuery 中创建作业,用户账号必须具有 bigquery.jobs.create IAM 权限。以下每个 IAM 角色都可授予 bigquery.jobs.create 权限:

如需从 BigQuery 表导出数据,用户账号必须具有 bigquery.tables.export IAM 权限。以下每个 IAM 角色都可授予 bigquery.tables.export 权限:

如需更新 BigQuery 表中的数据,用户账号必须具有 bigquery.tables.updateData IAM 权限。以下每个 IAM 角色都可授予 bigquery.tables.updateData 权限:

如果用户账号必须启用持续查询用例所需的 API,则该用户账号必须具有 Service Usage Admin (roles/serviceusage.serviceUsageAdmin) 角色。

使用服务账号时需要的权限

本部分介绍了创建持续查询的用户账号和运行持续查询的服务账号所需的角色和权限。

用户账号权限

如需在 BigQuery 中创建作业,用户账号必须具有 bigquery.jobs.create IAM 权限。以下每个 IAM 角色都可授予 bigquery.jobs.create 权限:

如需提交使用服务账号运行的作业,用户账号必须具有 Service Account User (roles/iam.serviceAccountUser) 角色。如果您使用同一用户账号来创建服务账号,则该用户账号必须具有 Service Account Admin (roles/iam.serviceAccountAdmin) 角色。如需了解如何将用户的访问权限限制到单个服务账号(而不是项目中的所有服务账号),请参阅授予单个角色

如果用户账号必须启用持续查询用例所需的 API,则该用户账号必须具有 Service Usage Admin (roles/serviceusage.serviceUsageAdmin) 角色。

服务账号权限

如需从 BigQuery 表导出数据,服务账号必须具有 bigquery.tables.export IAM 权限。以下每个 IAM 角色都可授予 bigquery.tables.export 权限:

如需更新 BigQuery 表中的数据,服务账号必须具有 bigquery.tables.updateData IAM 权限。以下每个 IAM 角色都可授予 bigquery.tables.updateData 权限:

准备工作

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the BigQuery API.

    Enable the API

创建预留

创建企业版或企业 Plus 版预留,然后使用 CONTINUOUS 作业类型创建预留分配

为持续查询创建预留分配时,关联的预留不得超过 500 个槽,并且无法配置为使用自动扩缩

导出到 Pub/Sub

将数据导出到 Pub/Sub 需要使用其他 API、IAM 权限和 Google Cloud 资源。如需了解详情,请参阅导出到 Pub/Sub

在 Pub/Sub 消息中嵌入自定义属性作为元数据

您可以使用 Pub/Sub 属性提供有关消息的附加信息,例如优先级、来源、目的地或附加元数据。您还可以使用属性过滤有关订阅的消息

在持续查询结果中,如果某个列名为 _ATTRIBUTES,则其值会复制到 Pub/Sub 消息属性中。_ATTRIBUTES 中提供的字段用作属性键。

_ATTRIBUTES 列必须是 JSON 类型,格式为 ARRAY<STRUCT<STRING, STRING>>STRUCT<STRING>

如需查看示例,请参阅将数据导出到 Pub/Sub 主题

导出到 Bigtable

将数据导出到 Bigtable 需要使用其他 API、IAM 权限和 Google Cloud资源。如需了解详情,请参阅导出到 Bigtable

将数据写入 BigQuery 表

您可以使用 INSERT 语句将数据写入 BigQuery 表。

使用 AI 函数

在持续查询中使用受支持的 AI 函数需要使用其他 API、IAM 权限和 Google Cloud资源。如需了解详情,请根据您的用例查看以下主题之一:

在持续查询中使用 AI 函数时,请考虑查询输出是否会保持在相应函数的配额范围内。如果您超出配额,则可能需要单独处理未处理的记录。

指定起始点

您必须在持续查询的 FROM 子句中使用 APPENDS 函数来指定开始处理数据的时间点。

以下示例展示了如何在查询接收流式出租车行程信息的 BigQuery 表时,使用 APPENDS 函数从特定时间点启动持续查询:

EXPORT DATA
  OPTIONS (format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS (
  SELECT
    TO_JSON_STRING(STRUCT(ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE
    ride_status = 'enroute');

指定早于当前时间的起始点

如果您想处理当前时间点之前的数据,可以使用 APPENDS 函数为查询指定较早的起始点。您指定的起始点必须在您要从中进行选择的表的时间旅行窗口内。默认情况下,时间旅行窗口涵盖过去 7 天。

对于向 BigQuery 表插入数据和向 Bigtable 导出数据,您可以使用批量查询来包含超出时间旅行窗口的数据。使用批量查询可插入或导出截至特定时间点的数据,然后从该时间点启动持续查询。

示例

以下示例展示了如何从接收流式出租车行程信息的 BigQuery 表中将截至特定时间点的较旧数据加载到某个表中,然后从较旧数据的截止点启动持续查询。

  1. 使用批量查询回填截至特定时间点的数据:

    INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
    SELECT
      timestamp,
      meter_reading,
      ride_status,
      passenger_count,
      ST_Distance(
        ST_GeogPoint(pickup_longitude, pickup_latitude),
        ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
        SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
    FROM `myproject.real_time_taxi_streaming.taxirides`
      -- This timestamp provides data outside the time travel window.
      FOR SYSTEM_TIME AS OF '2025-01-01 00:00:00 UTC'
    WHERE
      ride_status = 'dropoff';
  2. 从批量查询停止的时间点启动持续查询:

    INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
    SELECT
      timestamp,
      meter_reading,
      ride_status,
      passenger_count,
      ST_Distance(
        ST_GeogPoint(pickup_longitude, pickup_latitude),
        ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
        SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
    FROM
      APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
        -- Configure the APPENDS TVF start_timestamp to start processing
        -- data right where the batch query left off.
        -- This timestamp needs to be within the time travel window.
        TIMESTAMP '2025-01-01 00:00:00 UTC' + INTERVAL 1 MICROSECOND)
    WHERE
      ride_status = 'dropoff';

使用用户账号运行持续查询

本部分介绍如何使用用户账号运行持续查询。持续查询运行后,您可以关闭 Google Cloud 控制台、终端窗口或应用,而不会中断查询执行。

如需运行持续查询,请按以下步骤操作:

控制台

  1. 在 Google Cloud 控制台中,转到 BigQuery 页面。

    转到 BigQuery

  2. 在查询编辑器中,点击更多

  3. 选择查询模式部分中,选择持续查询

  4. 点击确认

  5. 在查询编辑器中,输入持续查询的 SQL 语句。SQL 语句只能包含支持的操作

  6. 点击运行

bq

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 在 Cloud Shell 中,使用带有 --continuous 标志的 bq query 命令运行持续查询:

    bq query --use_legacy_sql=false --continuous=true
    'QUERY'

    QUERY 替换为用于持续查询的 SQL 语句。SQL 语句只能包含支持的操作

API

通过调用 jobs.insert 方法运行持续查询。您必须在传入的 Job 资源的 JobConfigurationQuery 中将 continuous 字段设置为 true

curl --request POST \
  "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \
  --header "Authorization: Bearer $(gcloud auth print-access-token)" \
  --header "Content-Type: application/json; charset=utf-8" \
  --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true}}}' \
  --compressed

替换以下内容:

  • PROJECT_ID:您的项目 ID。
  • QUERY:持续查询的 SQL 语句。SQL 语句只能包含支持的操作

使用服务账号运行持续查询

本部分介绍如何使用服务账号运行持续查询。持续查询运行后,您可以关闭 Google Cloud 控制台、终端窗口或应用,而不会中断查询执行。

如需使用服务账号运行持续查询,请按以下步骤操作:

控制台

  1. 创建服务账号
  2. 向服务账号授予所需权限
  3. 在 Google Cloud 控制台中,转到 BigQuery 页面。

    转到 BigQuery

  4. 在查询编辑器中,点击更多

  5. 选择查询模式部分中,选择持续查询

  6. 点击确认

  7. 在查询编辑器中,依次点击更多 > 查询设置

  8. 持续查询部分中,使用服务账号框选择您创建的服务账号。

  9. 点击保存

  10. 在查询编辑器中,输入持续查询的 SQL 语句。SQL 语句只能包含支持的操作

  11. 点击运行

bq

  1. 创建服务账号
  2. 向服务账号授予所需权限
  3. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  4. 在命令行上,使用带有以下标志的 bq query 命令运行持续查询:

    • --continuous 标志设置为 true 以使查询持续。
    • 使用 --connection_property 标志指定要使用的服务账号。
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'QUERY'

    替换以下内容:

    • PROJECT_ID:您的项目 ID。
    • SERVICE_ACCOUNT_EMAIL:服务账号电子邮件地址。您可以在 Google Cloud 控制台的服务账号页面上获取服务账号电子邮件地址。
    • QUERY:持续查询的 SQL 语句。SQL 语句只能包含支持的操作

API

  1. 创建服务账号
  2. 向服务账号授予所需权限
  3. 通过调用 jobs.insert 方法运行持续查询。在您传入的 Job 资源JobConfigurationQuery 资源中设置以下字段:

    • continuous 字段设置为 true 以使查询持续。
    • 使用 connectionProperties 字段指定要使用的服务账号。
    curl --request POST \
      "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \
      --header "Authorization: Bearer $(gcloud auth print-access-token)" \
      --header "Content-Type: application/json; charset=utf-8" \
      --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":[{"key":"service_account","value":"SERVICE_ACCOUNT_EMAIL"}]}}}' \
      --compressed

    替换以下内容:

    • PROJECT_ID:您的项目 ID。
    • QUERY:持续查询的 SQL 语句。SQL 语句只能包含支持的操作
    • SERVICE_ACCOUNT_EMAIL:服务账号电子邮件地址。您可以在 Google Cloud 控制台的服务账号页面上获取服务账号电子邮件地址。

示例

以下 SQL 示例展示了持续查询的常见用例。

将数据导出到 Pub/Sub 主题

以下示例展示了一个持续查询,该查询过滤来自接收流式出租车行程信息的 BigQuery 表的数据,并使用消息属性将数据实时发布到 Pub/Sub 主题:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message,
    TO_JSON(
      STRUCT(
        CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
);

将数据导出到 Bigtable 表

以下示例展示了一个持续查询,该查询过滤来自接收流式出租车行程信息的 BigQuery 表的数据,并将数据实时导出到 Bigtable 表中:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_BIGTABLE',
    truncate = TRUE,
    overwrite = TRUE,
    uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides')
AS (
  SELECT
    CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey,
    STRUCT(
      timestamp,
      latitude,
      longitude,
      meter_reading,
      ride_status,
      passenger_count) AS features
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
);

将数据写入 BigQuery 表

以下示例展示了一个持续查询,该查询过滤并转换来自接收流式出租车行程信息的 BigQuery 表的数据,然后将数据实时写入另一个 BigQuery 表。这样一来,数据便可用于进一步的下游分析。

INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
SELECT
  timestamp,
  meter_reading,
  ride_status,
  passenger_count,
  ST_Distance(
    ST_GeogPoint(pickup_longitude, pickup_latitude),
    ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
    SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
FROM
  APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
    -- Configure the APPENDS TVF start_timestamp to specify when you want to
    -- start processing data using your continuous query.
    -- This example starts processing at 10 minutes before the current time.
    CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
WHERE
  ride_status = 'dropoff';

使用 Vertex AI 模型处理数据

以下示例展示了一个持续查询,该查询使用 Vertex AI 模型根据出租车乘客当前的纬度和经度为其生成广告,然后将结果实时导出到 Pub/Sub 主题:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude,
        prompt,
        ml_generate_text_llm_result)) AS message
  FROM
    ML.GENERATE_TEXT(
      MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`,
      (
        SELECT
          timestamp,
          ride_id,
          latitude,
          longitude,
          CONCAT(
            'Generate an ad based on the current latitude of ',
            latitude,
            ' and longitude of ',
            longitude) AS prompt
        FROM
          APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
            -- Configure the APPENDS TVF start_timestamp to specify when you
            -- want to start processing data using your continuous query.
            -- This example starts processing at 10 minutes before the current time.
            CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
        WHERE ride_status = 'enroute'
      ),
      STRUCT(
        50 AS max_output_tokens,
        1.0 AS temperature,
        40 AS top_k,
        1.0 AS top_p,
        TRUE AS flatten_json_output))
      AS ml_output
);

修改持续查询的 SQL

在持续查询作业运行期间,您无法更新持续查询中使用的 SQL。您必须取消持续查询作业,修改 SQL,然后从停止原始持续查询作业的时间点启动新的持续查询作业。

请按照以下步骤修改持续查询中使用的 SQL:

  1. 对要更新的持续查询作业查看作业详细信息,并记下作业 ID。
  2. 如果可能,请暂停收集上游数据。如果您无法执行此操作,则在重新启动持续查询时可能会出现一些数据重复。
  3. 取消要修改的持续查询
  4. 使用 INFORMATION_SCHEMA JOBS 视图获取原始持续查询作业的 end_time 值:

    SELECT end_time
    FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
    WHERE
      EXTRACT(DATE FROM creation_time) = current_date()
    AND error_result.reason = 'stopped'
    AND job_id = 'JOB_ID';

    替换以下内容:

    • PROJECT_ID:您的项目 ID。
    • REGION:项目使用的区域。
    • JOB_ID:您在第 1 步中确定的持续查询作业 ID。
  5. 修改持续查询 SQL 语句,以从特定时间点启动持续查询(使用您在第 5 步中检索的 end_time 值作为起始点)。

  6. 修改持续查询 SQL 语句,以反映您需要进行的更改。

  7. 运行修改后的持续查询。

取消持续查询

您可以像取消任何其他作业一样取消持续查询作业。在作业取消后,查询最多可能需要一分钟才能停止运行。

如果您取消查询,然后重新启动查询,重新启动的查询的行为会类似于新的独立查询。重新启动的查询不会开始处理上一个作业停止处理的数据,并且无法引用上一个查询的结果。请参阅从特定时间点启动持续查询

后续步骤