本教程使用 Pub/Sub Subscription to BigQuery 模板,通过 Google Cloud 控制台或 Google Cloud CLI 创建并运行 Dataflow 模板作业。本教程介绍了一个流处理流水线示例,该示例从 Pub/Sub 中读取 JSON 编码的消息,使用用户定义的函数 (UDF) 来扩展 Google 提供的流处理模板,使用 Apache Beam SDK 转换消息数据,然后将结果写入 BigQuery 表中。
流式分析和数据集成流水线使用 Pub/Sub 提取和分发数据。通过 Pub/Sub,您可以创建事件提供方和使用方的系统,称为发布者和订阅者。发布者将事件异步发送到 Pub/Sub 服务,Pub/Sub 将事件传递给需要响应事件的所有服务。
Dataflow 是一种全代管式服务,用于以流式传输(实时)和批量模式对数据进行转换并丰富数据内容。它提供了一个简化的流水线开发环境,该环境使用 Apache Beam SDK 转换传入的数据,然后输出转换后的数据。
此工作流的优势在于,您可以在将消息数据写入 BigQuery 之前使用 UDF 对其进行转换。另一种方法是使用 BigQuery 订阅,它会直接将 Pub/Sub 消息写入 BigQuery,而无需使用 Dataflow。此选项仅支持“至少一次”传送;不支持“正好一次”处理。
目标
- 创建 Pub/Sub 主题。
- 使用表和架构创建 BigQuery 数据集。
- 使用 Google 提供的流处理模板,通过 Dataflow 将数据从 Pub/Sub 订阅流式传输到 BigQuery。
- 创建用户定义的函数 (UDF) 以扩展 Google 提供的流处理模板。
费用
在本文档中,您将使用 Google Cloud 的以下收费组件:
- Dataflow
- Pub/Sub
- Cloud Storage
- Cloud Scheduler
- BigQuery
您可使用价格计算器根据您的预计使用情况来估算费用。
完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理。
准备工作
本部分介绍了如何选择项目、启用 API 以及向您的用户账号和工作器服务账号授予适当的角色。
控制台
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager, and Cloud Scheduler APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager, and Cloud Scheduler APIs.
如需完成本教程中的步骤,您的用户账号必须具有 Service Account User 角色。Compute Engine 默认服务账号必须具有以下角色:Dataflow Worker、Dataflow Admin、Pub/Sub Editor、Storage Object Admin 和 BigQuery Data Editor。如需在 Google Cloud 控制台中添加所需的角色,请执行以下操作:
在 Google Cloud 控制台中,转到 IAM 页面。
转到 IAM- 选择您的项目。
- 在用户账号所在的行中,点击 修改主账号,然后点击 添加其他角色。
- 在下拉列表中,选择 Service Account User 角色。
- 在 Compute Engine 默认服务账号所在的行中,点击 修改主账号,然后点击 添加其他角色。
- 在下拉列表中,选择 Dataflow Worker 角色。
对 Dataflow Admin、Pub/Sub Editor、Storage Object Admin 和 BigQuery Data Editor 角色重复上述步骤,然后点击保存。
如需详细了解如何授予角色,请参阅使用控制台授予 IAM 角色。
gcloud
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager, and Cloud Scheduler APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager, and Cloud Scheduler APIs:
gcloud services enable compute.googleapis.com
dataflow.googleapis.com logging.googleapis.com bigquery.googleapis.com pubsub.googleapis.com storage.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
If you're using a local shell, then create local authentication credentials for your user account:
gcloud auth application-default login
You don't need to do this if you're using Cloud Shell.
-
Grant roles to your user account. Run the following command once for each of the following IAM roles:
roles/iam.serviceAccountUser
gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
- Replace
PROJECT_ID
with your project ID. -
Replace
USER_IDENTIFIER
with the identifier for your user account. For example,user:myemail@example.com
. - Replace
ROLE
with each individual role.
- Replace
-
向您的 Compute Engine 默认服务账号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/bigquery.dataEditor
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
替换以下内容:
PROJECT_ID
:您的项目 ID。PROJECT_NUMBER
:您的项目编号。 如需查找您的项目编号,请使用gcloud projects describe
命令。SERVICE_ACCOUNT_ROLE
:每个角色。
创建示例来源和接收器
本部分介绍如何创建以下内容:
- 使用 Pub/Sub 的流式处理数据源
- 将数据加载到 BigQuery 中的数据集
创建 Cloud Storage 存储桶
首先使用 Google Cloud 控制台或 Google Cloud CLI 创建 Cloud Storage 存储桶。Dataflow 流水线将此存储桶用作临时存储位置。
控制台
gcloud
使用 gcloud storage buckets create
命令:
gcloud storage buckets create gs://BUCKET_NAME
将 BUCKET_NAME
替换为符合存储桶命名要求的 Cloud Storage 存储桶的名称。Cloud Storage 存储桶名称必须是全局唯一的。
创建 Pub/Sub 主题和订阅
创建 Pub/Sub 主题,然后创建对该主题的订阅。
控制台
如需创建主题,请完成以下步骤。
gcloud
如需创建主题,请运行 gcloud pubsub topics create
命令。如需了解如何命名订阅,请参阅主题或订阅命名指南。
gcloud pubsub topics create TOPIC_ID
将 TOPIC_ID
替换为您的 Pub/Sub 主题的名称。
如需创建对主题的订阅,请运行 gcloud pubsub subscriptions create
命令:
gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID
将 SUBSCRIPTION_ID
替换为您的 Pub/Sub 订阅的名称。
创建并运行 Cloud Scheduler 作业
创建并运行两个 Cloud Scheduler 作业,一个发布正面评分,另一个将负面评分发布到 Pub/Sub 主题。
控制台
创建 Cloud Scheduler 作业以获取正面评分。
访问控制台中的 Cloud Scheduler 页面:
点击创建作业按钮。
输入名称
positive-ratings-publisher
。选择靠近您在本教程中运行命令的位置的 Dataflow 区域。
REGION
变量的值必须是有效的区域名称。如需详细了解区域和位置,请参阅 Dataflow 位置。使用 unix-cron 格式
* * * * *
指定作业的频率。如需了解详情,请参阅配置 Cron 作业时间表。
选择您所在的时区。
点击继续。
在目标列表中,选择 Pub/Sub。
从列表中选择主题名称。
添加以下消息字符串以发送到您的目标:
{"url": "https://beam.apache.org/", "review": "positive"}
点击创建。
您现在将获得每分钟向您的 Pub/Sub 主题发送一条包含正面评分的消息的 cron 作业。该主题的 Cloud Functions 函数订阅。
创建 Cloud Scheduler 作业以发布负面评分。
在控制台的 Cloud Scheduler 页面上,点击创建作业按钮。
输入名称
negative-ratings-publisher
。选择要在其中运行作业的区域。
使用 unix-cron 格式
*/2 * * * *
指定作业的频率。如需了解详情,请参阅配置 Cron 作业时间表。
选择您所在的时区。
点击继续。
在目标列表中,选择 Pub/Sub。
从列表中选择主题名称。
添加以下消息字符串以发送到您的目标:
{"url": "https://beam.apache.org/", "review": "negative"}
点击创建。
您现在将获得每两分钟向 Pub/Sub 主题发送一条具有负面评分的消息的 Cron 作业。该主题的 Cloud Functions 函数订阅。
gcloud
如需为本教程创建 Cloud Scheduler 作业,请使用
gcloud scheduler jobs create
命令。此步骤创建一个发布者以发布“正面评分”,该发布者每分钟发布一条消息。gcloud scheduler jobs create pubsub positive-ratings-publisher \ --schedule="* * * * *" \ --location=DATAFLOW_REGION \ --topic="TOPIC_ID" \ --message-body='{"url": "https://beam.apache.org/", "review": "positive"}'
将
DATAFLOW_REGION
替换为要在其中部署 Dataflow 作业的区域。选择靠近您在本教程中运行命令的位置的 Dataflow 区域。REGION
变量的值必须是有效的区域名称。如需启动 Cloud Scheduler 作业,请使用
gcloud scheduler jobs run
命令。gcloud scheduler jobs run --location=DATAFLOW_REGION positive-ratings-publisher
再创建并运行一个类似的“负面评分”发布者,该发布者每两分钟发布一条消息。此步骤会创建一个发布者以发布“负面评分”,该发布者每两分钟发布一条消息。
gcloud scheduler jobs create pubsub negative-ratings-publisher \ --schedule="*/2 * * * *" \ --location=DATAFLOW_REGION \ --topic="TOPIC_ID" \ --message-body='{"url": "https://beam.apache.org/", "review": "negative"}'
启动第二个 Cloud Scheduler 作业。
gcloud scheduler jobs run --location=DATAFLOW_REGION negative-ratings-publisher
创建 BigQuery 数据集
为您的 Pub/Sub 主题创建具有适当架构的 BigQuery 数据集和表。
控制台
创建 BigQuery 数据集。
在 Google Cloud 控制台中打开 BigQuery 页面。
在探索器面板中,选择您要在其中创建数据集的项目。
展开
操作选项,然后点击创建数据集。在创建数据集页面中执行以下操作:
- 在数据集 ID 部分,输入
tutorial_dataset
。 对于数据位置,为数据集选择一个地理位置。创建数据集后,就无法再更改此位置。
请勿选择其他选项。
点击创建数据集。
- 在数据集 ID 部分,输入
创建具有架构的 BigQuery 表。
在浏览器面板中,展开您的项目并选择您的
tutorial_dataset
数据集。展开
操作选项,然后点击打开。在详情面板中,点击创建表
。在创建表页面的来源部分,选择空白表。
在创建表页面的目标部分,执行以下操作:
- 验证数据集名称是否设置为
tutorial_dataset
。 - 在表名称字段中,输入
tutorial
。 - 确认表类型设置为原生表。
- 验证数据集名称是否设置为
在 Schema 部分中,输入架构定义。点击以文字形式修改,并以 JSON 数组格式输入以下表架构:
[ { "mode": "NULLABLE", "name": "url", "type": "STRING" }, { "mode": "NULLABLE", "name": "review", "type": "STRING" } ]
对于分区和聚簇设置,保留默认值
No partitioning
。对于高级选项部分的加密,保留默认值
Google-managed key
。默认情况下,Dataflow 会对以静态方式存储的客户内容进行加密。点击创建表。
gcloud
使用 bq mk
命令创建数据集。
bq --location=DATAFLOW_REGION mk \
PROJECT_ID:tutorial_dataset
将 PROJECT_ID
替换为项目的项目 ID。
搭配使用 bq mk
命令和 --table
或 -t
标志以在数据集中创建表。
bq mk \
--table \
PROJECT_ID:tutorial_dataset.tutorial \
url:STRING,review:STRING
创建用户定义的函数 (UDF)
您可以视需要创建 JavaScript UDF 来扩展 Google 提供的 Pub/Sub Subscription to BigQuery 模板。借助 UDF,您可以定义模板中不存在的数据转换,并将其注入到模板中。
以下 UDF 会验证传入评分的网址。没有网址或网址错误的评分将转发到同一项目和数据集中带有 _error_records
后缀的另一个输出表(也称为死信表)。
JavaScript
将此 JavaScript 代码段保存到之前创建的 Cloud Storage 存储桶中。
运行流水线
使用 Google 提供的 Pub/Sub Subscription to BigQuery 模板运行流处理流水线。该流水线从 Pub/Sub 主题获取传入数据,并将该数据输出到 BigQuery 数据集。
控制台
在 Google Cloud 控制台中,转到 Dataflow 作业页面。
点击基于模板创建作业。
为 Dataflow 作业输入作业名称。
在区域端点部分中,为 Dataflow 作业选择一个区域。
在 Dataflow 模板部分中,选择 Pub/Sub Subscription to BigQuery 模板。
对于 BigQuery 输出表,输入以下内容:
PROJECT_ID:tutorial_dataset.tutorial
在 Pub/Sub input subscription 部分中,输入以下内容:
projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID
将
PROJECT_ID
替换为在其中创建了 BigQuery 数据集的项目的 ID,将SUBSCRIPTION_ID
替换为 Pub/Sub 订阅的名称。对于临时位置,输入以下内容:
gs://BUCKET_NAME/temp/
将
BUCKET_NAME
替换为您的 Cloud Storage 存储桶的名称。temp
文件夹存储临时文件,如暂存的流水线作业。可选:如需为作业添加 UDF,请展开可选参数。
在 Cloud Storage 中的 JavaScript UDF 路径部分,输入以下内容:
gs://BUCKET_NAME/dataflow_udf_transform.js
在 JavaScript UDF 名称部分,输入以下内容:
process
点击运行作业。
如需检查模板是否能够将消息转发到死信表,请发布一些没有网址或网址错误的评分。
转到 Pub/Sub 主题页面。
点击您的 TOPIC_ID。
转到消息部分。
点击发布消息。
在消息正文部分输入一些没有网址或网址错误的评分。例如:
{"url": "https://beam.apache.org/documentation/sdks/java/", "review": "positive"}
点击发布。
gcloud
如需在 shell 或终端中运行模板,请使用 gcloud dataflow jobs run
命令。
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
--region DATAFLOW_REGION \
--staging-location gs://BUCKET_NAME/temp \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:tutorial_dataset.tutorial
将 JOB_NAME
替换为您选择的唯一名称。
(可选)如需运行包含 UDF 的模板,请使用以下命令:
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
--region DATAFLOW_REGION \
--staging-location gs://BUCKET_NAME/temp \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:tutorial_dataset.tutorial,\
javascriptTextTransformGcsPath=gs://BUCKET_NAME/dataflow_udf_transform.js,\
javascriptTextTransformFunctionName=process
如需检查模板是否能够将消息转发到死信表,请发布一些没有网址或网址错误的评分。例如:
gcloud pubsub topics publish TOPIC_ID \
--message='{"url": "https://beam.apache.org/documentation/sdks/java/", "review": "positive"}'
查看结果
查看写入 BigQuery 表的数据。
控制台
在 Google Cloud 控制台中,转到 BigQuery 页面。
转到 BigQuery 页面在查询编辑器中,运行以下查询:
SELECT * FROM `PROJECT_ID.tutorial_dataset.tutorial` LIMIT 1000
数据最多可能需要一分钟才会开始显示在表中。
查询将返回在过去 24 小时内添加到表中的行。您还可以使用标准 SQL 运行查询。
如果您希望将某些错误记录写入死信表,请在查询中使用表名称
tutorial_error_records
。例如:SELECT * FROM `PROJECT_ID.tutorial_dataset.tutorial_error_records` LIMIT 1000
gcloud
通过运行以下查询来查看 BigQuery 中的结果:
bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.tutorial_dataset.tutorial"'`'
在此流水线的运行期间,您可以看到 BigQuery 表中每分钟都在附加新行。
如果您希望将某些错误记录写入死信表,请在查询中使用表名称 tutorial_error_records
。例如:
SELECT * FROM `PROJECT_ID.tutorial_dataset.tutorial_error_records`
LIMIT 1000
清理
为避免因本教程中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。
删除项目
为了避免产生费用,最简单的方法是删除您为本教程创建的 Google Cloud 项目。
控制台
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
gcloud
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
逐个删除资源
如果您希望以后重复使用该项目,可以保留该项目,但删除在本教程中创建的资源。
停止 Dataflow 流水线
控制台
在 Google Cloud 控制台中,转到 Dataflow 作业页面。
点击要停止的作业。
要停止作业,作业状态必须为正在运行。
在作业详情页面上,点击停止。
点击取消。
要确认您的选择,请点击停止作业。
gcloud
如需取消 Dataflow 作业,请使用 gcloud dataflow jobs
命令。
gcloud dataflow jobs list \
--filter 'NAME=JOB_NAME AND STATE=Running' \
--format 'value(JOB_ID)' \
--region "DATAFLOW_REGION" \
| xargs gcloud dataflow jobs cancel --region "DATAFLOW_REGION"
清理 Google Cloud 项目资源
控制台
删除 Cloud Scheduler 作业。
在 Google Cloud 控制台中,转到 Cloud Scheduler 页面。
选择您的作业。
点击页面顶部的删除按钮并确认删除操作。
删除 Pub/Sub 主题和订阅。
删除 BigQuery 表和数据集。
在 Google Cloud 控制台中,转到 BigQuery 页面。
在探索器面板中,展开您的项目。
在要删除的数据集旁边,点击
查看操作,然后点击删除。
删除 Cloud Storage 存储桶。
在 Google Cloud 控制台中,进入 Cloud Storage 存储桶页面。
选择要删除的存储t桶,点击
删除,然后按照说明操作。
gcloud
如需删除 Cloud Scheduler 作业,请使用
gcloud scheduler jobs delete
命令。gcloud scheduler jobs delete negative-ratings-publisher --location=DATAFLOW_REGION
gcloud scheduler jobs delete positive-ratings-publisher --location=DATAFLOW_REGION
如需删除 Pub/Sub 订阅和主题,请使用
gcloud pubsub subscriptions delete
和gcloud pubsub topics delete
命令。gcloud pubsub subscriptions delete SUBSCRIPTION_ID gcloud pubsub topics delete TOPIC_ID
如需删除 BigQuery 表,请使用
bq rm
命令。bq rm -f -t PROJECT_ID:tutorial_dataset.tutorial
删除 BigQuery 数据集。单独的数据集不会产生任何费用。
bq rm -r -f -d PROJECT_ID:tutorial_dataset
如需删除 Cloud Storage 存储桶,请使用
gcloud storage rm
命令。单独的存储桶不会产生任何费用。gcloud storage rm gs://BUCKET_NAME --recursive
撤销凭据
控制台
如果您保留项目,请撤消授予 Compute Engine 默认服务账号的角色。
- 在 Google Cloud 控制台中,转到 IAM 页面。
选择一个项目、文件夹或组织。
找到包含要撤消其访问权限的主账号的行。 在该行中,点击
修改主账号。点击要撤消的每个角色对应的删除
按钮,然后点击保存。
gcloud
- 如果您保留项目,请撤消授予 Compute Engine 默认服务账号的角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/bigquery.dataEditor
gcloud projects remove-iam-policy-binding <var>PROJECT_ID</var> \ --member=serviceAccount:<var>PROJECT_NUMBER</var>-compute@developer.gserviceaccount.com \ --role=<var>ROLE</var>
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
后续步骤
- 使用 UDF 扩展 Dataflow 模板。
- 详细了解如何使用 Dataflow 模板。
- 查看 Google 提供的所有模板。
- 了解如何使用 Pub/Sub 创建和使用主题以及如何创建拉取订阅。
- 详细了解如何使用 Cloud Scheduler 安排和运行 Cron 作业。
- 了解如何使用 BigQuery 创建数据集。
- 了解 Pub/Sub 订阅。
- 探索有关 Google Cloud 的参考架构、图表和最佳实践。查看我们的 Cloud 架构中心。