Pub/Sub Topic to Text Files on Cloud Storage

Pub/Sub to Cloud Storage Text 模板是一种流处理流水线,可从 Pub/Sub 主题读取记录并将其保存为一系列文本格式的 Cloud Storage 文件。使用此模板,您可以快速地保存 Pub/Sub 中的数据以留待将来使用。默认情况下,此模板每 5 分钟生成一个新文件。

流水线要求

  • Pub/Sub 主题必须已存在才能执行此流水线。
  • 发布到主题的消息必须采用文本格式。
  • 发布到主题的消息不得包含任何换行符。请注意,每条 Pub/Sub 消息在输出文件中均会保存为一行。

模板参数

必需参数

  • outputDirectory:用于写入输出文件的路径和文件名前缀。例如 gs://bucket-name/path/。该值必须以斜杠结尾。
  • outputFilenamePrefix:要在各窗口文件上放置的前缀。例如 output-。默认值:output。

可选参数

  • inputTopic:要从中读取输入的 Pub/Sub 主题。主题名称应采用 projects/<PROJECT_ID>/topics/<TOPIC_NAME> 格式。
  • userTempLocation:临时文件输出到的用户提供的目录。必须以斜杠结尾。
  • outputFilenameSuffix:要在各窗口文件上放置的后缀。通常是文件扩展名,例如 .txt.csv。默认值为空。
  • outputShardTemplate:分片式模板定义每个窗口文件的动态部分。默认情况下,该流水线使用单一分片输出到各窗口内的文件系统。因此,每个窗口的所有数据都会输出到单个文件中。outputShardTemplate 默认为 to W-P-SS-of-NN,其中 W 是窗口日期范围,P 是窗格信息,S 是分片编号,而 N 是分片数。对于单个文件,outputShardTemplateSS-of-NN 部分为 00-of-01
  • yearPattern:用于设置年份格式的模式。必须是 yY 中的一个或多个。大小写在年份中没有区别。您可以酌情用非字母数字字符或非目录字符 (/) 将格式括起来。默认值为 YYYY
  • monthPattern:用于设置月份格式的模式。必须是一个或多个 M 字符。您可以酌情用非字母数字字符或非目录字符 (/) 将格式括起来。默认值为 MM
  • dayPattern:用于设置某天的模式。必须是一个或多个 d(表示月份中的某一天)或 D(表示年中的某一天)。您可以酌情用非字母数字字符或非目录字符 (/) 将格式括起来。默认值为 dd
  • hourPattern:用于设置小时格式的模式。必须是一个或多个 H 字符。您可以酌情用非字母数字字符或非目录字符 (/) 将格式括起来。默认值为 HH
  • minutePattern:用于设置分钟格式的模式。必须是一个或多个 m 字符。您可以酌情用非字母数字字符或非目录字符 (/) 将格式括起来。默认值为 mm

运行模板

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to Text Files on Cloud Storage template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 可选:如需从“正好一次”处理切换到“至少一次”流处理模式,请选择至少一次
  8. 点击运行作业

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_PubSub_to_GCS_Text \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputTopic=projects/PROJECT_ID/topics/TOPIC_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME - Cloud Storage 存储桶的名称。

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_PubSub_to_GCS_Text
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
    },
   "parameters": {
       "inputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME - Cloud Storage 存储桶的名称。
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.cloud.teleport.io.WindowedFilenamePolicy;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateCreationParameter;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.options.WindowedFilenamePolicyOptions;
import com.google.cloud.teleport.templates.PubsubToText.Options;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.cloud.teleport.util.DurationUtils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;

/**
 * This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into
 * windowed files at the specified output directory.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_PubSub_to_GCS_Text.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_PubSub_to_GCS_Text",
    category = TemplateCategory.STREAMING,
    displayName = "Pub/Sub to Text Files on Cloud Storage",
    description =
        "The Pub/Sub to Cloud Storage Text template is a streaming pipeline that reads records from Pub/Sub topic and "
            + "saves them as a series of Cloud Storage files in text format. "
            + "The template can be used as a quick way to save data in Pub/Sub for future use. "
            + "By default, the template generates a new file every 5 minutes.",
    optionsClass = Options.class,
    skipOptions = {"inputSubscription"},
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-topic-to-text",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Pub/Sub topic must exist prior to execution.",
      "The messages published to the topic must be in text format.",
      "The messages published to the topic must not contain any newlines. Note that each Pub/Sub message is saved as a single line in the output file."
    },
    streaming = true,
    supportsAtLeastOnce = true,
    supportsExactlyOnce = true)
public class PubsubToText {

  /**
   * Options supported by the pipeline.
   *
   * <p>Inherits standard configuration options.
   */
  public interface Options
      extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions {

    @TemplateParameter.PubsubSubscription(
        order = 1,
        groupName = "Source",
        optional = true,
        description = "Pub/Sub input subscription",
        helpText =
            "Pub/Sub subscription to read the input from, in the format of"
                + " 'projects/your-project-id/subscriptions/your-subscription-name'",
        example = "projects/your-project-id/subscriptions/your-subscription-name")
    ValueProvider<String> getInputSubscription();

    void setInputSubscription(ValueProvider<String> value);

    @TemplateParameter.PubsubTopic(
        order = 2,
        groupName = "Source",
        optional = true,
        description = "Pub/Sub input topic",
        helpText =
            "The Pub/Sub topic to read the input from. The topic name should be in the format "
                + "`projects/<PROJECT_ID>/topics/<TOPIC_NAME>`.")
    ValueProvider<String> getInputTopic();

    void setInputTopic(ValueProvider<String> value);

    @TemplateCreationParameter(value = "false")
    @Description(
        "This determines whether the template reads from a Pub/Sub subscription or a topic")
    @Default.Boolean(false)
    Boolean getUseSubscription();

    void setUseSubscription(Boolean value);

    @TemplateParameter.GcsWriteFolder(
        order = 3,
        groupName = "Target",
        description = "Output file directory in Cloud Storage",
        helpText =
            "The path and filename prefix for writing output files. For example, `gs://bucket-name/path/`. This value must end in a slash.")
    @Required
    ValueProvider<String> getOutputDirectory();

    void setOutputDirectory(ValueProvider<String> value);

    @TemplateParameter.GcsWriteFolder(
        order = 4,
        optional = true,
        description = "User provided temp location",
        helpText =
            "The user provided directory to output temporary files to. Must end with a slash.")
    ValueProvider<String> getUserTempLocation();

    void setUserTempLocation(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 5,
        groupName = "Target",
        description = "Output filename prefix of the files to write",
        helpText = "The prefix to place on each windowed file. For example, `output-`.")
    @Default.String("output")
    @Required
    ValueProvider<String> getOutputFilenamePrefix();

    void setOutputFilenamePrefix(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 6,
        groupName = "Target",
        optional = true,
        description = "Output filename suffix of the files to write",
        helpText =
            "The suffix to place on each windowed file. Typically a file extension such as `.txt` or `.csv`.")
    @Default.String("")
    ValueProvider<String> getOutputFilenameSuffix();

    void setOutputFilenameSuffix(ValueProvider<String> value);
  }

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {

    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    options.setStreaming(true);

    run(options);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(Options options) {
    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> messages = null;

    /*
     * Steps:
     *   1) Read string messages from PubSub
     *   2) Window the messages into minute intervals specified by the executor.
     *   3) Output the windowed files to GCS
     */
    if (options.getUseSubscription()) {
      messages =
          pipeline.apply(
              "Read PubSub Events",
              PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));
    } else {
      messages =
          pipeline.apply(
              "Read PubSub Events", PubsubIO.readStrings().fromTopic(options.getInputTopic()));
    }
    messages
        .apply(
            options.getWindowDuration() + " Window",
            Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))

        // Apply windowed file writes. Use a NestedValueProvider because the filename
        // policy requires a resourceId generated from the input value at runtime.
        .apply(
            "Write File(s)",
            TextIO.write()
                .withWindowedWrites()
                .withNumShards(options.getNumShards())
                .to(
                    WindowedFilenamePolicy.writeWindowedFiles()
                        .withOutputDirectory(options.getOutputDirectory())
                        .withOutputFilenamePrefix(options.getOutputFilenamePrefix())
                        .withShardTemplate(options.getOutputShardTemplate())
                        .withSuffix(options.getOutputFilenameSuffix())
                        .withYearPattern(options.getYearPattern())
                        .withMonthPattern(options.getMonthPattern())
                        .withDayPattern(options.getDayPattern())
                        .withHourPattern(options.getHourPattern())
                        .withMinutePattern(options.getMinutePattern()))
                .withTempDirectory(
                    NestedValueProvider.of(
                        maybeUseUserTempLocation(
                            options.getUserTempLocation(), options.getOutputDirectory()),
                        (SerializableFunction<String, ResourceId>)
                            input -> FileBasedSink.convertToFileResourceIfPossible(input))));

    // Execute the pipeline and return the result.
    return pipeline.run();
  }

  /**
   * Utility method for using optional parameter userTempLocation as TempDirectory. This is useful
   * when output bucket is locked and temporary data cannot be deleted.
   *
   * @param userTempLocation user provided temp location
   * @param outputLocation user provided outputDirectory to be used as the default temp location
   * @return userTempLocation if available, otherwise outputLocation is returned.
   */
  private static ValueProvider<String> maybeUseUserTempLocation(
      ValueProvider<String> userTempLocation, ValueProvider<String> outputLocation) {
    return DualInputNestedValueProvider.of(
        userTempLocation,
        outputLocation,
        new SerializableFunction<TranslatorInput<String, String>, String>() {
          @Override
          public String apply(TranslatorInput<String, String> input) {
            return (input.getX() != null) ? input.getX() : input.getY();
          }
        });
  }
}

后续步骤