使用 Dataflow 流式传输来自 Pub/Sub 的消息
Dataflow 是一种全托管式服务,用于以流式传输(实时)和批量模式对数据进行转换并丰富数据内容,同时保持同等的可靠性和表现力。它使用 Apache Beam SDK 提供了一个简化的流水线开发环境;该 SDK 具有一组丰富的数据选取和会话分析基本功能,以及一个包含来源连接器与接收器连接器的生态系统。本快速入门介绍如何使用 Dataflow 执行以下操作:
- 读取发布到 Pub/Sub 主题的消息
- 按时间戳选取(或组合)消息
- 将消息写入 Cloud Storage
本快速入门介绍如何在 Java 和 Python 中使用 Dataflow。SQL 也受支持。此快速入门还作为 Google Cloud Skills Boost 教程提供,它提供了临时凭据来帮助您开始使用。
如果您不打算进行自定义数据处理,也可以通过使用基于界面的 Dataflow 模板开始上手。
准备工作
- 登录您的 Google Cloud 帐号。如果您是 Google Cloud 新手,请创建一个帐号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
- 安装 Google Cloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init
-
创建或选择 Google Cloud 项目。
-
创建 Cloud 项目:
gcloud projects create PROJECT_ID
-
选择您创建的 Cloud 项目:
gcloud config set project PROJECT_ID
-
-
确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能。
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
设置身份验证:
-
创建服务帐号:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
将
SERVICE_ACCOUNT_NAME
替换为服务帐号的名称。 -
向服务帐号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
替换以下内容:
SERVICE_ACCOUNT_NAME
:服务帐号的名称PROJECT_ID
:您在其中创建服务帐号的项目的 IDROLE
:要授予的角色
-
为您的 Google 帐号授予一个可让您使用服务帐号的角色并将服务帐号关联到其他资源的角色:
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
替换以下内容:
SERVICE_ACCOUNT_NAME
:服务帐号的名称PROJECT_ID
:您在其中创建服务帐号的项目的 IDUSER_EMAIL
:您的 Google 帐号的电子邮件地址
-
- 安装 Google Cloud CLI。
-
如需初始化 gcloud CLI,请运行以下命令:
gcloud init
-
创建或选择 Google Cloud 项目。
-
创建 Cloud 项目:
gcloud projects create PROJECT_ID
-
选择您创建的 Cloud 项目:
gcloud config set project PROJECT_ID
-
-
确保您的 Cloud 项目已启用结算功能。了解如何检查项目是否已启用结算功能。
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
设置身份验证:
-
创建服务帐号:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
将
SERVICE_ACCOUNT_NAME
替换为服务帐号的名称。 -
向服务帐号授予角色。对以下每个 IAM 角色运行以下命令一次:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
替换以下内容:
SERVICE_ACCOUNT_NAME
:服务帐号的名称PROJECT_ID
:您在其中创建服务帐号的项目的 IDROLE
:要授予的角色
-
为您的 Google 帐号授予一个可让您使用服务帐号的角色并将服务帐号关联到其他资源的角色:
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
替换以下内容:
SERVICE_ACCOUNT_NAME
:服务帐号的名称PROJECT_ID
:您在其中创建服务帐号的项目的 IDUSER_EMAIL
:您的 Google 帐号的电子邮件地址
-
-
为您的 Google 帐号创建身份验证凭据:
gcloud auth application-default login
设置 Pub/Sub 项目
-
为您的存储桶、项目和区域创建变量。 Cloud Storage 存储桶名称必须是全局唯一的。选择在本快速入门中运行命令的 Dataflow 区域附近。
REGION
变量的值必须是有效的区域名称。如需详细了解区域和位置,请参阅 Dataflow 位置。BUCKET_NAME=BUCKET_NAME PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=TOPIC_ID REGION=DATAFLOW_REGION SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
-
创建此项目所拥有的 Cloud Storage 存储桶:
gsutil mb gs://$BUCKET_NAME
-
在此项目中创建 Pub/Sub 主题:
gcloud pubsub topics create $TOPIC_ID
-
在此项目中创建 Cloud Scheduler 作业。作业每隔一分钟向 Pub/Sub 主题发布一条消息。
如果项目不存在 App Engine 应用,则此步骤将创建一个。
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
启动作业。
gcloud scheduler jobs run publisher-job --location=$REGION
-
使用以下命令克隆快速入门代码库并导航到示例代码目录:
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
将消息从 Pub/Sub 流式传输到 Cloud Storage
代码示例
此示例代码使用 Dataflow 执行以下操作:
- 读取 Pub/Sub 消息。
- 按发布时间戳将消息按固定大小间隔选取(或组合)。
将每个窗口中的消息写入 Cloud Storage 中的文件。
Java
Python
启动流水线
如需启动流水线,请运行以下命令:
Java
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2 \ --serviceAccount=$SERVICE_ACCOUNT"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp \ --service_account_email=$SERVICE_ACCOUNT
上述命令在本地运行,并启动一个在云端运行的 Dataflow 作业。当命令返回 JOB_MESSAGE_DETAILED: Workers
have started successfully
时,使用 Ctrl+C
退出本地程序。
查看作业和流水线进度
您可以在 Dataflow 控制台中查看作业的进度。
打开作业详细信息视图以查看以下内容:
- 作业结构
- 作业日志
- 阶段指标
您可能需要等待几分钟才能在 Cloud Storage 中看到输出文件。
或者,您可以使用以下命令行查看哪些文件已输出。
gsutil ls gs://${BUCKET_NAME}/samples/
输出应如下所示:
Java
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
清理
为避免因本页中使用的资源导致您的 Google Cloud 帐号产生费用,请删除包含这些资源的 Cloud 项目。
删除 Cloud Scheduler 作业。
gcloud scheduler jobs delete publisher-job --location=$REGION
在 Dataflow 控制台中,停止作业。取消流水线(不排空)。
删除主题。
gcloud pubsub topics delete $TOPIC_ID
删除流水线创建的文件。
gsutil -m rm -rf "gs://${BUCKET_NAME}/samples/output*" gsutil -m rm -rf "gs://${BUCKET_NAME}/temp/*"
移除 Cloud Storage 存储桶。
gsutil rb gs://${BUCKET_NAME}
-
删除服务帐号:
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
可选:撤消您创建的身份验证凭据,并删除本地凭据文件。
gcloud auth application-default revoke
-
可选:从 gcloud CLI 撤消凭据。
gcloud auth revoke
后续步骤
如果您希望按自定义时间戳选取 Pub/Sub 消息,可以在 Pub/Sub 消息中指定时间戳,然后一起使用自定义时间戳和 PubsubIO 的
withTimestampAttribute
部分。请查看 Google 专为流式传输而设计的开源 Dataflow 模板。
详细了解 Dataflow 如何与 Pub/Sub 集成。
查看此教程,了解如何使用 Dataflow Flex 模板从 Pub/Sub 读取并向 BigQuery 写入。
如需详细了解数据选取,请参阅 Apache Beam 移动游戏流水线示例。