使用 Dataflow 流式传输 Pub/Sub Lite 消息
除了编写和运行自己的数据处理程序之外,您还可以将 Dataflow 与 Apache Beam 的 Pub/Sub Lite I/O 连接器搭配使用。Dataflow 是一种全代管式服务,用于以流式传输(实时)模式和批量模式对数据进行转换并丰富数据内容,同时保持同等的可靠性和表现力。它安全地执行使用 Apache Beam SDK 开发的程序,该程序可将一组强大的有状态处理抽象和 I/O 连接器连接到其他流式传输和批处理系统。
本快速入门介绍如何编写将执行以下操作的 Apache Beam 流水线:
- 从 Pub/Sub 精简版读取消息
- 按发布时间戳选取(或组合)消息
- 将消息写入 Cloud Storage
此外,本文还介绍了如何执行以下操作:
- 提交流水线以在 Dataflow 上运行
- 通过流水线创建 Dataflow Flex 模板
本教程需要使用 Maven,但您也可以将示例项目从 Maven 转换为 Gradle。如需了解详情,请参阅可选:从 Maven 转换为 Gradle。
准备工作
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:
gcloud services enable pubsublite.googleapis.com
dataflow.googleapis.com storage-api.googleapis.com logging.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:
gcloud services enable pubsublite.googleapis.com
dataflow.googleapis.com storage-api.googleapis.com logging.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
设置 Pub/Sub Lite 项目
为您的 Cloud Storage 存储桶、项目和 Dataflow 区域创建变量。Cloud Storage 存储桶名称必须是全局唯一的。 Dataflow 区域必须是您可以运行作业的有效区域。如需详细了解区域和位置,请参阅 Dataflow 位置。
export PROJECT_ID=$(gcloud config get-value project)
export SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
export BUCKET=BUCKET_NAME
export DATAFLOW_REGION=DATAFLOW_REGION
创建此项目所拥有的 Cloud Storage 存储分区:
gcloud storage buckets create gs://$BUCKET
创建 Pub/Sub Lite 区域 Lite 主题和订阅
创建区域性 Lite Pub/Sub Lite 主题和 Lite 订阅。
对于 Lite 位置,请选择受支持的 Pub/Sub Lite 位置。您还必须为该区域指定一个可用区。例如 us-central1-a
。
export TOPIC=LITE_TOPIC_ID
export SUBSCRIPTION=LITE_SUBSCRIPTION_ID
export LITE_LOCATION=LITE_LOCATION
gcloud pubsub lite-topics create $TOPIC \ --location=$LITE_LOCATION \ --partitions=1 \ --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \ --location=$LITE_LOCATION \ --topic=$TOPIC \ --starting-offset=beginning
将消息流式传输到 Dataflow
下载快速入门示例代码
克隆快速入门代码库并导航到示例代码目录。
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics
示例代码
此示例代码使用 Dataflow 执行以下操作:
Java
在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Java 设置说明进行操作。
启动 Dataflow 流水线
如需在 Dataflow 中启动流水线,请运行以下命令:
mvn compile exec:java \
-Dexec.mainClass=examples.PubsubliteToGcs \
-Dexec.args=" \
--subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
--output=gs://$BUCKET/samples/output \
--windowSize=1 \
--project=$PROJECT_ID \
--region=$DATAFLOW_REGION \
--tempLocation=gs://$BUCKET/temp \
--runner=DataflowRunner \
--serviceAccount=$SERVICE_ACCOUNT"
上述命令会启动 Dataflow 作业。点击控制台输出中的链接以访问 Dataflow 监控控制台中的作业。
观察作业进度
在 Dataflow 控制台中查看作业的进度。
打开作业详细信息视图以查看以下内容:
- 作业图
- 执行详情
- 作业指标
将部分消息发布到精简版主题。
gcloud pubsub lite-topics publish $TOPIC \
--location=$LITE_LOCATION \
--message="Hello World!"
您可能需要等待几分钟才能在工作器日志中看到这些消息。
使用以下命令检查已写入 Cloud Storage 的文件。
gcloud storage ls "gs://$BUCKET/samples/"
输出应如下所示:
gs://$BUCKET/samples/output-19:41-19:42-0-of-1
gs://$BUCKET/samples/output-19:47-19:48-0-of-1
gs://$BUCKET/samples/output-19:48-19:49-0-of-1
使用以下命令查看文件中的内容:
gcloud storage cat "gs://$BUCKET/samples/your-filename"
可选:创建 Dataflow 模板
您可以根据需要为流水线创建自定义 Dataflow Flex 模板。通过 Dataflow 模板,您可以使用 Google Cloud 控制台或命令行中的不同输入参数运行作业,而无需设置完整的 Java 开发环境。
创建一个包含流水线的所有依赖项的 Fat JAR。运行命令后,您应该会看到
target/pubsublite-streaming-bundled-1.0.jar
。mvn clean package -DskipTests=true
为您的模板文件和模板容器映像提供名称和位置。
export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
构建自定义 Flex 模板。示例提供了必需的
metadata.json
文件,其中包含运行作业的必要规范。gcloud dataflow flex-template build $TEMPLATE_PATH \ --image-gcr-path $TEMPLATE_IMAGE \ --sdk-language "JAVA" \ --flex-template-base-image "JAVA11" \ --metadata-file "metadata.json" \ --jar "target/pubsublite-streaming-bundled-1.0.jar" \ --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
使用自定义 Flex 模板运行作业。
控制台
输入作业名称。
输入您的 Dataflow 区域。
选择您的自定义模板。
输入您的模板路径。
输入必需参数。
点击运行作业。
gcloud
gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
--template-file-gcs-location $TEMPLATE_PATH \
--parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
--parameters output="gs://$BUCKET/samples/template-output" \
--parameters windowSize=1 \
--region $DATAFLOW_REGION \
--serviceAccount=$SERVICE_ACCOUNT
清理
为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。
在 Dataflow 控制台中,停止作业。取消流水线,而不是取消流水线。
删除主题和订阅。
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
删除流水线创建的文件。
gcloud storage rm "gs://$BUCKET/samples/*" --recursive --continue-on-error
gcloud storage rm "gs://$BUCKET/temp/*" --recursive --continue-on-error
删除模板映像和模板文件(如果存在)。
gcloud container images delete $TEMPLATE_IMAGE
gcloud storage rm $TEMPLATE_PATH
移除 Cloud Storage 存储分区。
gcloud storage rm gs://$BUCKET --recursive
-
删除服务账号:
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
后续步骤
详细了解如何配置 Dataflow Flex 模板。
了解 Dataflow 流处理流水线。