Cloud Storage Text to Pub/Sub (Batch) 模板

此模板会创建一种批处理流水线,该流水线可从存储在 Cloud Storage 中的文本文件读取记录,并将其发布到 Pub/Sub 主题。使用此模板,您可以将采用换行符分隔的文件中的 JSON 记录或 CSV 文件中的记录发布到 Pub/Sub 主题,以实现实时处理。您可以使用此模板将数据重放到 Pub/Sub。

此模板不会在各个记录上设置任何时间戳。事件时间等于执行期间的发布时间。如果您的流水线依赖准确的事件时间来执行处理,请不要使用此流水线。

流水线要求

  • 需要读取的文件必须采用换行符分隔 JSON 或 CSV 格式。在源文件中占多行的记录可能会导致下游问题,因为文件中的每一行都将以消息形式发布到 Pub/Sub。
  • 在运行此流水线之前,Pub/Sub 主题必须已存在。

模板参数

必需参数

  • inputFilePattern:要从中读取数据的输入文件模式。例如 gs://bucket-name/files/*.json
  • outputTopic:要向其写入数据的 Pub/Sub 输入主题。名称必须采用 projects/<PROJECT_ID>/topics/<TOPIC_NAME> 格式。例如 projects/your-project-id/topics/your-topic-name

运行模板

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Text Files on Cloud Storage to Pub/Sub (Batch) template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_Text_to_Cloud_PubSub \
    --region REGION_NAME \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/files/*.json,\
outputTopic=projects/PROJECT_ID/topics/TOPIC_NAME

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME:Cloud Storage 存储桶的名称

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_Text_to_Cloud_PubSub
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/files/*.json",
       "outputTopic": "projects/PROJECT_ID/topics/TOPIC_NAME"
   },
   "environment": { "zone": "us-central1-f" }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • TOPIC_NAME:您的 Pub/Sub 主题名称
  • BUCKET_NAME:Cloud Storage 存储桶的名称
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.TextToPubsub.Options;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;

/**
 * The {@code TextToPubsub} pipeline publishes records to Cloud Pub/Sub from a set of files. The
 * pipeline reads each file row-by-row and publishes each record as a string message. At the moment,
 * publishing messages with attributes is unsupported.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_GCS_Text_to_Cloud_PubSub.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "GCS_Text_to_Cloud_PubSub",
    category = TemplateCategory.BATCH,
    displayName = "Cloud Storage Text File to Pub/Sub (Batch)",
    description = {
      "This template creates a batch pipeline that reads records from text files stored in Cloud Storage and publishes them to a Pub/Sub topic. "
          + "The template can be used to publish records in a newline-delimited file containing JSON records or CSV file to a Pub/Sub topic for real-time processing. "
          + "You can use this template to replay data to Pub/Sub.\n",
      "This template does not set any timestamp on the individual records. The event time is equal to the publishing time during execution. "
          + "If your pipeline relies on an accurate event time for processing, you must not use this pipeline."
    },
    optionsClass = Options.class,
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/cloud-storage-to-pubsub",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The files to read need to be in newline-delimited JSON or CSV format. Records spanning multiple lines in the source files might cause issues downstream because each line within the files will be published as a message to Pub/Sub.",
      "The Pub/Sub topic must exist before running the pipeline."
    })
public class TextToPubsub {

  /** The custom options supported by the pipeline. Inherits standard configuration options. */
  public interface Options extends PipelineOptions {
    @TemplateParameter.GcsReadFile(
        order = 1,
        groupName = "Source",
        description = "Cloud Storage Input File(s)",
        helpText = "The input file pattern to read from.",
        example = "gs://bucket-name/files/*.json")
    @Required
    ValueProvider<String> getInputFilePattern();

    void setInputFilePattern(ValueProvider<String> value);

    @TemplateParameter.PubsubTopic(
        order = 2,
        groupName = "Target",
        description = "Output Pub/Sub topic",
        helpText =
            "The Pub/Sub input topic to write to. The name must be in the format `projects/<PROJECT_ID>/topics/<TOPIC_NAME>`.",
        example = "projects/your-project-id/topics/your-topic-name")
    @Required
    ValueProvider<String> getOutputTopic();

    void setOutputTopic(ValueProvider<String> value);
  }

  /**
   * Main entry-point for the pipeline. Reads in the command-line arguments, parses them, and
   * executes the pipeline.
   *
   * @param args Arguments passed in from the command-line.
   */
  public static void main(String[] args) {

    // Parse the user options passed from the command-line
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    run(options);
  }

  /**
   * Executes the pipeline with the provided execution parameters.
   *
   * @param options The execution parameters.
   */
  public static PipelineResult run(Options options) {
    // Create the pipeline.
    Pipeline pipeline = Pipeline.create(options);

    /*
     * Steps:
     *  1) Read from the text source.
     *  2) Write each text record to Pub/Sub
     */
    pipeline
        .apply("Read Text Data", TextIO.read().from(options.getInputFilePattern()))
        .apply("Write to PubSub", PubsubIO.writeStrings().to(options.getOutputTopic()));

    return pipeline.run();
  }
}

后续步骤