将数据导出到 Pub/Sub(反向 ETL)

将数据导出到 Pub/Sub 需要使用 BigQuery 持续查询。如需注册持续查询预览,请填写请求表单。如需就此功能提供反馈或请求支持,请联系 bq-continuous-queries-feedback@google.com

本文档介绍如何设置从 BigQuery 到 Pub/Sub 的反向提取、转换和加载 (RETL)。为此,您可以在持续查询中使用 EXPORT DATA 语句,以将数据从 BigQuery 导出到 Pub/Sub 主题

您可以使用到 Pub/Sub 的 RETL 工作流,将 BigQuery 的分析功能与 Pub/Sub 的异步且可扩缩的全球性消息传递服务相结合。此工作流可让您以事件驱动方式向下游应用和服务传送数据。

前提条件

您必须创建服务账号。您需要使用服务账号才能运行将结果导出到 Pub/Sub 主题的持续查询。

您必须创建一个 Pub/Sub 主题以将持续查询结果作为消息来接收,并且必须创建一个 Pub/Sub 订阅以供目标应用用来接收这些消息。

所需的角色

本部分介绍了创建持续查询的用户账号和运行持续查询的服务账号所需的角色和权限。

用户账号权限

如需在 BigQuery 中创建作业,用户账号必须具有 bigquery.jobs.create IAM 权限。以下每个 IAM 角色都可授予 bigquery.jobs.create 权限:

如需提交使用服务账号运行的作业,用户账号必须具有 Service Account User (roles/iam.serviceAccountUser) 角色。如果您使用同一用户账号来创建服务账号,则该用户账号必须具有 Service Account Admin (roles/iam.serviceAccountAdmin) 角色。如需了解如何将用户的访问权限限制到单个服务账号(而不是项目中的所有服务账号),请参阅授予单个角色

如果用户账号必须启用持续查询用例所需的 API,则该用户账号必须具有 Service Usage Admin (roles/serviceusage.serviceUsageAdmin) 角色。

服务账号权限

如需从 BigQuery 表导出数据,服务账号必须具有 bigquery.tables.export IAM 权限。以下每个 IAM 角色都可授予 bigquery.tables.export 权限:

要使该服务账号能够访问 Pub/Sub,您必须向该服务账号授予以下两个 IAM 角色:

您也可以通过自定义角色来获取所需的权限。

准备工作

Enable the BigQuery and Pub/Sub APIs.

Enable the APIs

导出到 Pub/Sub

使用 EXPORT DATA 语句将数据导出到 Pub/Sub 主题:

控制台

  1. 在 Google Cloud 控制台中,转到 BigQuery 页面。

    转到 BigQuery

  2. 在查询编辑器中,依次点击更多 > 查询设置

  3. 持续查询部分中,选中使用持续查询模式复选框。

  4. 服务账号方框中,选择您创建的服务账号。

  5. 点击保存

  6. 在查询编辑器中,输入以下语句:

    EXPORT DATA
    OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID'
    ) AS
    (
    QUERY
    );

    替换以下内容:

    • PROJECT_ID:您的项目 ID。
    • TOPIC_ID:Pub/Sub 主题 ID。您可以从 Google Cloud 控制台的主题页面中获取主题 ID。
    • QUERY:用于选择要导出的数据的 SQL 语句。SQL 语句只能包含支持的操作。 您必须在持续查询的 FROM 子句中使用 APPENDS 函数来指定开始处理数据的时间点。
  7. 点击运行

bq

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. 在命令行上,使用带有以下标志的 bq query 命令运行持续查询:

    • --continuous 标志设置为 true 以使查询持续。
    • 使用 --connection_property 标志指定要使用的服务账号。
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'EXPORT DATA OPTIONS (format = "CLOUD_PUBSUB", uri = "https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID") AS (QUERY);'

    替换以下内容:

    • PROJECT_ID:您的项目 ID。
    • SERVICE_ACCOUNT_EMAIL:服务账号电子邮件地址。您可以在 Google Cloud 控制台的服务账号页面上获取服务账号电子邮件地址。
    • QUERY:用于选择要导出的数据的 SQL 语句。SQL 语句只能包含支持的操作。 您必须在持续查询的 FROM 子句中使用 APPENDS 函数来指定开始处理数据的时间点。

API

  1. 通过调用 jobs.insert 方法运行持续查询。在您传入的 Job 资源JobConfigurationQuery 资源中设置以下字段:

    • continuous 字段设置为 true 以使查询持续。
    • 使用 connection_property 字段指定要使用的服务账号。
    curl --request POST \
      'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs'
      --header 'Authorization: Bearer $(gcloud auth print-access-token) \
      --header 'Accept: application/json' \
      --header 'Content-Type: application/json' \
      --data '("configuration":("query":"EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID') AS (QUERY);","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \
      --compressed

    替换以下内容:

    • PROJECT_ID:您的项目 ID。
    • QUERY:用于选择要导出的数据的 SQL 语句。SQL 语句只能包含支持的操作。 您必须在持续查询的 FROM 子句中使用 APPENDS 函数来指定开始处理数据的时间点。
    • SERVICE_ACCOUNT_EMAIL:服务账号电子邮件地址。您可以在 Google Cloud 控制台的服务账号页面上获取服务账号电子邮件地址。

将多列导出到 Pub/Sub

如果要在输出中包含多列,可以创建一个结构列来包含列值,然后使用 TO_JSON_STRING 函数将结构值转换为 JSON 字符串。以下示例以 JSON 字符串格式导出四列中的数据:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
);

导出优化

如果您的持续查询作业性能似乎受到可用计算资源的限制,请尝试增加 BigQuery CONTINUOUS 槽预留分配的大小。

限制

  • 导出的数据必须由单个 STRINGBYTES 列组成。列名可以是您选择的任何名称。
  • 您必须使用持续查询将数据导出到 Pub/Sub。
  • 您无法将架构传递给持续查询中的 Pub/Sub 主题。
  • 您无法将数据导出到使用架构的 Pub/Sub 主题。
  • 您无法导出包含 NULL 值的数据。您可以通过在持续查询中添加 WHERE message IS NOT NULL 过滤条件,从查询结果中排除 NULL 值。
  • 导出的数据不得超过 Pub/Sub 配额

价格

当您从持续查询中导出数据时,您需要按照 BigQuery 容量计算价格付费。如需运行持续查询,您必须拥有使用企业版或企业 Plus 版预留以及使用 CONTINUOUS 作业类型的预留分配

导出数据后,您需要为使用 Pub/Sub 付费。如需了解详情,请参阅 Pub/Sub 价格