使用 Dataflow 流式传输 Pub/Sub Lite 消息

除了编写和运行自己的数据处理程序之外,您还可以将 DataflowApache Beam 的 Pub/Sub Lite I/O 连接器搭配使用。Dataflow 是一种全代管式服务,用于以流式传输(实时)模式和批量模式对数据进行转换并丰富数据内容,同时保持同等的可靠性和表现力。它安全地执行使用 Apache Beam SDK 开发的程序,该程序可将一组强大的有状态处理抽象和 I/O 连接器连接到其他流式传输和批处理系统。

本快速入门介绍如何编写将执行以下操作的 Apache Beam 流水线:

  • 从 Pub/Sub 精简版读取消息
  • 按发布时间戳选取(或组合)消息
  • 将消息写入 Cloud Storage


  • 提交流水线以在 Dataflow 上运行
  • 通过流水线创建 Dataflow Flex 模板

本教程需要使用 Maven,但您也可以将示例项目从 Maven 转换为 Gradle。如需了解详情,请参阅可选:从 Maven 转换为 Gradle


  1. 登录您的 Google Cloud 账号。如果您是 Google Cloud 新手,请创建一个账号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. 确保您的 Google Cloud 项目已启用结算功能

  6. Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:

    gcloud services enable pubsublite.googleapis.com  dataflow.googleapis.com  storage-api.googleapis.com  logging.googleapis.com
  7. Set up authentication:

    1. Create the service account:

      gcloud iam service-accounts create SERVICE_ACCOUNT_NAME

      Replace SERVICE_ACCOUNT_NAME with a name for the service account.

    2. Grant roles to the service account. Run the following command once for each of the following IAM roles: roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin:

      gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • ROLE: the role to grant
    3. Grant the required role to the principal that will attach the service account to other resources.

      gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser

      Replace the following:

      • SERVICE_ACCOUNT_NAME: the name of the service account
      • PROJECT_ID: the project ID where you created the service account
      • USER_EMAIL: the email address for a Google Account
设置 Pub/Sub Lite 项目

  1. 为 Cloud Storage 存储桶、项目和 Dataflow 区域。Cloud Storage 存储桶名称必须是全局唯一的。 Dataflow 区域必须是您可以运行作业的有效区域。如需详细了解区域和位置,请参阅 Dataflow 位置

    export PROJECT_ID=$(gcloud config get-value project)
    export SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
  2. 创建此项目所拥有的 Cloud Storage 存储分区:

       gcloud storage buckets create gs://$BUCKET

创建 Pub/Sub Lite 可用区级精简版主题和订阅

创建可用区级精简版 Pub/Sub 精简版主题和精简版订阅。

对于 Lite 位置,请选择受支持的 Pub/Sub Lite 位置。您还必须 并为区域指定一个可用区例如 us-central1-a

gcloud pubsub lite-topics create $TOPIC \
      --location=$LITE_LOCATION \
      --partitions=1 \
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \
      --location=$LITE_LOCATION \
      --topic=$TOPIC \

将消息流式传输到 Dataflow



git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
cd java-docs-samples/pubsublite/streaming-analytics


此示例代码使用 Dataflow 执行以下操作:

  • 从 Pub/Sub 精简版订阅读取无界限来源的消息
  • 使用 固定时间范围默认触发器
  • 将已分组的消息写入 Cloud Storage 上的文件。


在运行此示例之前,请按照 Pub/Sub Lite 客户端库中的 Java 设置说明进行操作。

import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.proto.SequencedMessage;
import org.apache.beam.examples.common.WriteOneFilePerWindow;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsublite.PubsubLiteIO;
import org.apache.beam.sdk.io.gcp.pubsublite.SubscriberOptions;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class PubsubliteToGcs {
   * Define your own configuration options. Add your arguments to be processed
   * by the command-line parser.
  public interface PubsubliteToGcsOptions extends StreamingOptions {
    @Description("Your Pub/Sub Lite subscription.")
    String getSubscription();

    void setSubscription(String value);

    @Description("Window size of output files in minutes.")
    Integer getWindowSize();

    void setWindowSize(Integer value);

    @Description("Filename prefix of output files.")
    String getOutput();

    void setOutput(String value);

  private static final Logger LOG = LoggerFactory.getLogger(PubsubliteToGcs.class);

  public static void main(String[] args) throws InterruptedException {
    // The maximum number of shards when writing output files.
    int numShards = 1;

    PubsubliteToGcsOptions options =


    SubscriberOptions subscriberOptions =

    Pipeline pipeline = Pipeline.create(options);
        .apply("Read From Pub/Sub Lite", PubsubLiteIO.read(subscriberOptions))
            "Convert messages",
                    (SequencedMessage sequencedMessage) -> {
                      String data = sequencedMessage.getMessage().getData().toStringUtf8();
                      LOG.info("Received: " + data);
                      long publishTime = sequencedMessage.getPublishTime().getSeconds();
                      return data + "\t" + publishTime;
            "Apply windowing function",
                // Group the elements using fixed-sized time intervals based on the element
                // timestamp (using the default event time trigger). The element timestamp
                // is the publish timestamp associated with a message.
                // NOTE: If data is not being continuously ingested, such as with a batch or
                // intermittent publisher, the final window will never close as the watermark
                // will not advance. If this is a possibility with your pipeline, you should
                // add an additional processing time trigger to force window closure after
                // enough time has passed. See
                // https://beam.apache.org/documentation/programming-guide/#triggers
                // for more information.
        .apply("Write elements to GCS", new WriteOneFilePerWindow(options.getOutput(), numShards));

    // Execute the pipeline. You may add `.waitUntilFinish()` to observe logs in your console, but
    // `waitUntilFinish()` will not work in Dataflow Flex Templates.

启动 Dataflow 流水线

如需在 Dataflow 中启动流水线,请运行以下命令:

mvn compile exec:java \
    -Dexec.mainClass=examples.PubsubliteToGcs \
    -Dexec.args=" \
        --subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
        --output=gs://$BUCKET/samples/output \
        --windowSize=1 \
        --project=$PROJECT_ID \
        --region=$DATAFLOW_REGION \
        --tempLocation=gs://$BUCKET/temp \
        --runner=DataflowRunner \

上述命令会启动 Dataflow 作业。点击控制台输出中的链接以访问 Dataflow 监控控制台中的作业。


在 Dataflow 控制台中查看作业的进度。

转到 Dataflow 控制台


  • 作业图
  • 执行详情
  • 作业指标


gcloud pubsub lite-topics publish $TOPIC \
    --location=$LITE_LOCATION \
    --message="Hello World!"


使用以下命令检查已写入 Cloud Storage 的文件。

gcloud storage ls "gs://$BUCKET/samples/"




gcloud storage cat "gs://$BUCKET/samples/your-filename"

可选:创建 Dataflow 模板

您可以根据需要为流水线创建自定义 Dataflow Flex 模板。利用 Dataflow 模板,您可以使用不同的 从 Google Cloud 控制台或命令行(不含 需要设置完整的 Java 开发环境。

  1. 创建一个包含流水线的所有依赖项的 Fat JAR。运行命令后,您应该会看到 target/pubsublite-streaming-bundled-1.0.jar

    mvn clean package -DskipTests=true
  2. 为您的模板文件和模板容器映像提供名称和位置。

    export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
    export TEMPLATE_IMAGE="gcr.io/$PROJECT_ID/your-template-image:latest"
  3. 构建自定义 Flex 模板。示例提供了必需的 metadata.json 文件,其中包含运行作业的必要规范。

    gcloud dataflow flex-template build $TEMPLATE_PATH \
        --image-gcr-path $TEMPLATE_IMAGE \
        --sdk-language "JAVA" \
        --flex-template-base-image "JAVA11" \
        --metadata-file "metadata.json" \
        --jar "target/pubsublite-streaming-bundled-1.0.jar" \
        --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
  4. 使用自定义 Flex 模板运行作业。


  1. 基于模板创建作业

  2. 输入作业名称

  3. 输入您的 Dataflow 区域

  4. 选择您的自定义模板

  5. 输入您的模板路径

  6. 输入必需参数。

  7. 点击运行作业


gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
     --template-file-gcs-location $TEMPLATE_PATH \
     --parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
     --parameters output="gs://$BUCKET/samples/template-output" \
     --parameters windowSize=1 \
     --region $DATAFLOW_REGION \ 


为避免因本页面中使用的资源导致您的 Google Cloud 账号产生费用,请删除包含这些资源的 Google Cloud 项目。

  1. 在 Dataflow 控制台中,停止作业。取消流水线,而不是取消流水线。

  2. 删除主题和订阅。

    gcloud pubsub lite-topics delete $TOPIC
    gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
  3. 删除流水线创建的文件。

    gcloud storage rm "gs://$BUCKET/samples/*" --recursive --continue-on-error
    gcloud storage rm "gs://$BUCKET/temp/*" --recursive --continue-on-error
  4. 删除模板映像和模板文件(如果存在)。

    gcloud container images delete $TEMPLATE_IMAGE
    gcloud storage rm $TEMPLATE_PATH
  5. 移除 Cloud Storage 存储分区。

    gcloud storage rm gs://$BUCKET --recursive

  6. 删除服务账号:
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  7. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  8. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke
