将数据导出到 Pub/Sub(反向 ETL)
将数据导出到 Pub/Sub 需要使用 BigQuery 持续查询。如需注册持续查询预览,请填写请求表单。如需就此功能提供反馈或请求支持,请联系 bq-continuous-queries-feedback@google.com。
本文档介绍如何设置从 BigQuery 到 Pub/Sub 的反向提取、转换和加载 (RETL)。为此,您可以在持续查询中使用 EXPORT DATA
语句,以将数据从 BigQuery 导出到 Pub/Sub 主题。
您可以将 RETL 工作流用于 Pub/Sub,将 BigQuery 的分析功能与 Pub/Sub 的异步可伸缩全球消息传递服务相结合。借助此工作流,您可以以事件驱动的方式向下游应用和服务提供数据。
前提条件
您必须创建服务账号。您需要使用服务账号运行将结果导出到 Pub/Sub 主题的持续查询。
您必须创建一个 Pub/Sub 主题以将持续查询结果作为消息来接收,并且必须创建一个 Pub/Sub 订阅以供目标应用用来接收这些消息。
所需的角色
本部分介绍了创建持续查询的用户账号和运行持续查询的服务账号所需的角色和权限。
用户账号权限
如需在 BigQuery 中创建作业,用户账号必须具有 bigquery.jobs.create
IAM 权限。以下每个 IAM 角色均会授予 bigquery.jobs.create
权限:
- BigQuery User (
roles/bigquery.user
) - BigQuery Job User (
roles/bigquery.jobUser
) - BigQuery Admin (
roles/bigquery.admin
)
如需提交使用服务账号运行的作业,用户账号必须具有 Service Account User (roles/iam.serviceAccountUser
) 角色。如果您使用同一用户账号创建服务账号,则该用户账号必须具有 Service Account Admin (roles/iam.serviceAccountAdmin
) 角色。如需了解如何限制用户对单个服务账号(而非项目中的所有服务账号)的访问权限,请参阅授予单个角色。
如果用户账号必须启用持续查询用例所需的 API,则该用户账号必须具有 Service Usage Admin (roles/serviceusage.serviceUsageAdmin
) 角色。
服务账号权限
如需从 BigQuery 表导出数据,服务账号必须拥有 bigquery.tables.export
IAM 权限。以下每个 IAM 角色均会授予 bigquery.tables.export
权限:
- BigQuery Data Viewer (
roles/bigquery.dataViewer
) - BigQuery Data Editor (
roles/bigquery.dataEditor
) - BigQuery Data Owner (
roles/bigquery.dataOwner
) - BigQuery Admin (
roles/bigquery.admin
)
要使该服务账号能够访问 Pub/Sub,您必须向该服务账号授予以下两个 IAM 角色:
您也可以通过自定义角色来获取所需的权限。
准备工作
Enable the BigQuery and Pub/Sub APIs.
导出到 Pub/Sub
使用 EXPORT DATA
语句将数据导出到 Pub/Sub 主题:
控制台
在 Google Cloud 控制台中,前往 BigQuery 页面。
在查询编辑器中,点击更多 > 查询设置。
在持续查询部分中,选中使用持续查询模式复选框。
在服务账号方框中,选择您创建的服务账号。
点击保存。
在查询编辑器中,输入以下语句:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID' ) AS ( QUERY );
替换以下内容:
点击运行。
bq
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
在命令行上,使用带有以下标志的
bq query
命令运行持续查询:- 将
--continuous
标志设置为true
以使查询持续。 - 使用
--connection_property
标志指定要使用的服务账号。
bq query --project_id=PROJECT_ID --use_legacy_sql=false \ --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \ 'EXPORT DATA OPTIONS (format = "CLOUD_PUBSUB", uri = "https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID") AS (QUERY);'
替换以下内容:
- 将
API
通过调用
jobs.insert
方法运行持续查询。在传入的Job
资源的JobConfigurationQuery
资源中设置以下字段:- 将
continuous
字段设置为true
以使查询持续。 - 使用
connection_property
字段指定要使用的服务账号。
curl --request POST \ 'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs' --header 'Authorization: Bearer $(gcloud auth print-access-token) \ --header 'Accept: application/json' \ --header 'Content-Type: application/json' \ --data '("configuration":("query":"EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID') AS (QUERY);","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \ --compressed
替换以下内容:
- 将
将多列导出到 Pub/Sub
如果要在输出中包含多列,可以创建一个结构列来包含列值,然后使用 TO_JSON_STRING
函数将结构值转换为 JSON 字符串。以下示例以 JSON 字符串格式导出四列中的数据:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude)) AS message FROM `myproject.real_time_taxi_streaming.taxi_rides` WHERE ride_status = 'enroute' );
导出优化
如果您的持续查询作业性能似乎受到可用计算资源的限制,请尝试增加 BigQuery CONTINUOUS
槽预留分配的大小。
限制
- 导出的数据必须由单个
STRING
或BYTES
列组成。列名称可以是您选择的任何名称。 - 您必须使用持续查询才能导出到 Pub/Sub。
- 您无法将架构传递给持续查询中的 Pub/Sub 主题。
- 您无法将数据导出到使用架构的 Pub/Sub 主题。
- 您无法导出包含
NULL
值的数据。您可以通过在持续查询中添加WHERE message IS NOT NULL
过滤条件,从查询结果中排除NULL
值。 - 导出的数据不得超出 Pub/Sub 配额。
价格
当您从持续查询中导出数据时,您需要按照 BigQuery 容量计算价格付费。如需运行持续查询,您必须拥有使用企业版或企业 Plus 版的预留以及使用 CONTINUOUS
作业类型的预留分配。
导出数据后,您需要为使用 Pub/Sub 付费。如需了解详情,请参阅 Pub/Sub 价格。