使用 Dataflow 流式传输 Pub/Sub Lite 消息

除了编写和运行自己的数据处理程序之外,您还可以将 DataflowApache Beam 的 Pub/Sub Lite I/O 连接器搭配使用。Dataflow 是一种全代管式服务,用于以流式传输(实时)模式和批量模式对数据进行转换并丰富数据内容,同时保持同等的可靠性和表现力。它安全地执行使用 Apache Beam SDK 开发的程序,该程序可将一组强大的有状态处理抽象和 I/O 连接器连接到其他流式传输和批处理系统。

本快速入门介绍如何编写将执行以下操作的 Apache Beam 流水线:

  • 从 Pub/Sub 精简版读取消息
  • 按发布时间戳选取(或组合)消息
  • 将消息写入 Cloud Storage

此外,本文还介绍了如何执行以下操作:

  • 提交流水线以在 Dataflow 上运行
  • 通过流水线创建 Dataflow Flex 模板

本教程需要使用 Maven,但您也可以将示例项目从 Maven 转换为 Gradle。如需了解详情,请参阅可选:从 Maven 转换为 Gradle

准备工作

  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 安装 Google Cloud CLI。
  3. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  4. 创建或选择 Google Cloud 项目

    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目 名称。

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:

    gcloud services enable pubsublite.googleapis.com  dataflow.googleapis.com  storage-api.googleapis.com  logging.googleapis.com
  7. 设置身份验证:

    1. 创建服务帐号:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      SERVICE_ACCOUNT_NAME 替换为服务帐号的名称。

    2. 向服务帐号授予角色。对以下每个 IAM 角色运行以下命令一次:roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      请替换以下内容:

      • SERVICE_ACCOUNT_NAME:服务帐号的名称
      • PROJECT_ID:您在其中创建服务帐号的项目的 ID
      • ROLE:要授予的角色
    3. 为您的 Google 帐号授予一个可让您使用服务帐号的角色并将服务帐号关联到其他资源的角色:

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      替换以下内容:

      • SERVICE_ACCOUNT_NAME:服务帐号的名称
      • PROJECT_ID:您在其中创建服务帐号的项目的 ID
      • USER_EMAIL:您的 Google 帐号的电子邮件地址
  8. 安装 Google Cloud CLI。
  9. 如需初始化 gcloud CLI,请运行以下命令:

    gcloud init
  10. 创建或选择 Google Cloud 项目

    • 创建 Google Cloud 项目:

      gcloud projects create PROJECT_ID

      PROJECT_ID 替换为您要创建的 Google Cloud 项目的名称。

    • 选择您创建的 Google Cloud 项目:

      gcloud config set project PROJECT_ID

      PROJECT_ID 替换为您的 Google Cloud 项目 名称。

  11. 确保您的 Google Cloud 项目已启用结算功能

  12. Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:

    gcloud services enable pubsublite.googleapis.com  dataflow.googleapis.com  storage-api.googleapis.com  logging.googleapis.com
  13. 设置身份验证:

    1. 创建服务帐号:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      SERVICE_ACCOUNT_NAME 替换为服务帐号的名称。

    2. 向服务帐号授予角色。对以下每个 IAM 角色运行以下命令一次:roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      请替换以下内容:

      • SERVICE_ACCOUNT_NAME:服务帐号的名称
      • PROJECT_ID:您在其中创建服务帐号的项目的 ID
      • ROLE:要授予的角色
    3. 为您的 Google 帐号授予一个可让您使用服务帐号的角色并将服务帐号关联到其他资源的角色:

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      替换以下内容:

      • SERVICE_ACCOUNT_NAME:服务帐号的名称
      • PROJECT_ID:您在其中创建服务帐号的项目的 ID
      • USER_EMAIL:您的 Google 帐号的电子邮件地址
  14. 为您的 Google 账号创建本地身份验证凭据:

    gcloud auth application-default login

设置您的 Pub/Sub Lite 项目

  1. 为 Cloud Storage 存储桶、项目和 Dataflow 区域创建变量。Cloud Storage 存储桶名称必须是全局唯一的。 Dataflow 区域必须是可以运行作业的有效区域。如需详细了解区域和位置,请参阅 Dataflow 位置

    export PROJECT_ID=$(gcloud config get-value project)
    export SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
    export BUCKET=BUCKET_NAME
    export DATAFLOW_REGION=DATAFLOW_REGION
    
  2. 创建此项目所拥有的 Cloud Storage 存储桶:

       gsutil mb gs://$BUCKET
    

创建 Pub/Sub Lite 可用区级精简版主题和订阅

创建可用区级精简版 Pub/Sub Lite 主题和精简版订阅。

对于精简版位置,请选择支持的 Pub/Sub Lite 位置。您还必须为该区域指定一个可用区。例如 us-central1-a

export TOPIC=LITE_TOPIC_ID
export SUBSCRIPTION=LITE_SUBSCRIPTION_ID
export LITE_LOCATION=LITE_LOCATION
gcloud pubsub lite-topics create $TOPIC \
      --location=$LITE_LOCATION \
      --partitions=1 \
      --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
      --location=$LITE_LOCATION \
      --topic=$TOPIC \
      --starting-offset=beginning

将消息流式传输到 Dataflow

下载快速入门示例代码

克隆快速入门代码库并导航到示例代码目录。

git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics

示例代码

此示例代码使用 Dataflow 执行以下操作:

  • 从 Pub/Sub 精简版订阅读取无界限来源的消息
  • 使用固定时间范围默认触发器,根据消息的发布时间戳对消息进行分组。
  • 将已分组的消息写入 Cloud Storage 上的文件。

Java

在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Java 设置说明进行操作。


import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubsubliteToGcs {
  /*
   * Define your own configuration options. Add your arguments to be processed
   * by the command-line parser.
   */
  public interface PubsubliteToGcsOptions extends StreamingOptions {
    @Description("Your Pub/Sub Lite subscription.")
    @Required
    String getSubscription();

    void setSubscription(String value);

    @Description("Window size of output files in minutes.")
    @Default.Integer(1)
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Filename prefix of output files.")
    @Required
    String getOutput();

    void setOutput(String value);
  }

  private static final Logger LOG = LoggerFactory.getLogger(PubsubliteToGcs.class);

  public static void main(String[] args) throws InterruptedException {
    // The maximum number of shards when writing output files.
    int numShards = 1;

    PubsubliteToGcsOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubsubliteToGcsOptions.class);

    options.setStreaming(true);

    SubscriberOptions subscriberOptions =
        SubscriberOptions.newBuilder()
            .setSubscriptionPath(SubscriptionPath.parse(options.getSubscription()))
            .build();

    Pipeline pipeline = Pipeline.create(options);
    pipeline
        .apply("Read From Pub/Sub Lite", PubsubLiteIO.read(subscriberOptions))
        .apply(
            "Convert messages",
            MapElements.into(TypeDescriptors.strings())
                .via(
                    (SequencedMessage sequencedMessage) -> {
                      String data = sequencedMessage.getMessage().getData().toStringUtf8();
                      LOG.info("Received: " + data);
                      long publishTime = sequencedMessage.getPublishTime().getSeconds();
                      return data + "\t" + publishTime;
                    }))
        .apply(
            "Apply windowing function",
            Window
                // Group the elements using fixed-sized time intervals based on the element
                // timestamp (using the default event time trigger). The element timestamp
                // is the publish timestamp associated with a message.
                //
                // NOTE: If data is not being continuously ingested, such as with a batch or
                // intermittent publisher, the final window will never close as the watermark
                // will not advance. If this is a possibility with your pipeline, you should
                // add an additional processing time trigger to force window closure after
                // enough time has passed. See
                // https://beam.apache.org/documentation/programming-guide/#triggers
                // for more information.
                .<String>into(FixedWindows.of(Duration.standardMinutes(options.getWindowSize()))))
        .apply("Write elements to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline. You may add `.waitUntilFinish()` to observe logs in your console, but
    // `waitUntilFinish()` will not work in Dataflow Flex Templates.
    pipeline.run();
  }
}

启动 Dataflow 流水线

如需在 Dataflow 中启动流水线,请运行以下命令:

mvn compile exec:java \
    -Dexec.mainClass=examples.PubsubliteToGcs \
    -Dexec.args=" \
        --subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
        --output=gs://$BUCKET/samples/output \
        --windowSize=1 \
        --project=$PROJECT_ID \
        --region=$DATAFLOW_REGION \
        --tempLocation=gs://$BUCKET/temp \
        --runner=DataflowRunner \
        --serviceAccount=$SERVICE_ACCOUNT"

上述命令会启动 Dataflow 作业。点击控制台输出中的链接以访问 Dataflow 监控控制台中的作业。

观察作业进度

在 Dataflow 控制台中查看作业的进度。

转到 Dataflow 控制台

打开作业详细信息视图以查看以下内容:

  • 作业图
  • 执行详情
  • 作业指标

将部分消息发布到精简版主题。

gcloud pubsub lite-topics publish $TOPIC \
    --location=$LITE_LOCATION \
    --message="Hello World!"

您可能需要等待几分钟才能在工作器日志中看到这些消息。

使用以下命令检查已写入 Cloud Storage 的文件。

gsutil ls "gs://$BUCKET/samples/"

输出应如下所示:

 gs://$BUCKET/samples/output-19:41-19:42-0-of-1
 gs://$BUCKET/samples/output-19:47-19:48-0-of-1
 gs://$BUCKET/samples/output-19:48-19:49-0-of-1

使用以下命令查看文件中的内容:

gsutil cat "gs://$BUCKET/samples/your-filename"

可选:创建 Dataflow 模板

您可以根据需要为流水线创建自定义 Dataflow Flex 模板。借助 Dataflow 模板,您可以通过 Google Cloud 控制台或命令行使用不同输入参数运行作业,而无需设置完整的 Java 开发环境。

  1. 创建一个包含流水线的所有依赖项的 Fat JAR。运行命令后,您应该会看到 target/pubsublite-streaming-bundled-1.0.jar

    mvn clean package -DskipTests=true
    
  2. 为您的模板文件和模板容器映像提供名称和位置。

    export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
    export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
    
  3. 构建自定义 Flex 模板。示例提供了必需的 metadata.json 文件,其中包含运行作业的必要规范。

    gcloud dataflow flex-template build $TEMPLATE_PATH \
        --image-gcr-path $TEMPLATE_IMAGE \
        --sdk-language "JAVA" \
        --flex-template-base-image "JAVA11" \
        --metadata-file "metadata.json" \
        --jar "target/pubsublite-streaming-bundled-1.0.jar" \
        --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
    
  4. 使用自定义 Flex 模板运行作业。

控制台

  1. 基于模板创建作业

  2. 输入作业名称

  3. 输入您的 Dataflow 区域

  4. 选择您的自定义模板

  5. 输入您的模板路径

  6. 输入必需参数。

  7. 点击运行作业

gcloud

gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
     --template-file-gcs-location $TEMPLATE_PATH \
     --parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
     --parameters output="gs://$BUCKET/samples/template-output" \
     --parameters windowSize=1 \
     --region $DATAFLOW_REGION \
     --serviceAccount=$SERVICE_ACCOUNT

清理

为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。

  1. 在 Dataflow 控制台中,停止作业。取消流水线,而不是取消流水线。

  2. 删除主题和订阅。

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
    
  3. 删除流水线创建的文件。

    gsutil -m rm -rf "gs://$BUCKET/samples/*"
    gsutil -m rm -rf "gs://$BUCKET/temp/*"
    
  4. 删除模板映像和模板文件(如果存在)。

    gcloud container images delete $TEMPLATE_IMAGE
    gsutil rm $TEMPLATE_PATH
    
  5. 移除 Cloud Storage 存储桶。

    gsutil rb gs://$BUCKET
    

  6. 删除服务账号:
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  7. 可选:撤消您创建的身份验证凭据,并删除本地凭据文件。

    gcloud auth application-default revoke
  8. 可选:从 gcloud CLI 撤消凭据。

    gcloud auth revoke

后续步骤