Pub/Sub Topic 또는 Subscription to Text Files on Cloud Storage 템플릿

Pub/Sub Topic 또는 Subscription to Cloud Storage Text 템플릿은 Pub/Sub에서 레코드를 읽고 텍스트 형식으로 된 일련의 Cloud Storage 파일로 저장하는 스트리밍 파이프라인입니다. 나중에 사용하기 위해 Pub/Sub에 데이터를 빠르게 저장하는 수단으로 템플릿을 사용할 수 있습니다. 기본적으로 템플릿은 5분마다 새 파일을 생성합니다.

파이프라인 요구사항

  • 실행하기 전에 Pub/Sub 주제 또는 구독이 있어야 합니다.
  • 주제에 게시되는 메시지는 텍스트 형식이어야 합니다.
  • 주제에 게시되는 메시지에는 줄바꿈을 사용할 수 없습니다. 각 Pub/Sub 메시지는 출력 파일에 한 줄로 저장됩니다.

템플릿 매개변수

필수 매개변수

  • outputDirectory: 출력 파일을 쓸 경로 및 파일 이름 프리픽스입니다. 이 값은 슬래시로 끝나야 합니다. 예를 들면 gs://your-bucket/your-path/입니다.

선택적 매개변수

  • inputTopic: 입력을 읽어올 Pub/Sub 주제입니다. 이 매개변수가 제공되면 inputSubscription을 사용하지 마세요. 예를 들면 projects/<PROJECT_ID>/topics/<TOPIC_NAME>입니다.
  • inputSubscription: 입력을 읽을 Pub/Sub 구독입니다. 이 매개변수가 제공되면 inputTopic을 사용하지 마세요. 예를 들면 projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_NAME>입니다.
  • userTempLocation: 임시 파일을 출력할 사용자가 제공한 디렉터리입니다. 슬래시로 끝나야 합니다.
  • outputFilenamePrefix: 윈도우 설정된 각 파일에 넣을 프리픽스입니다. 예를 들면 output-입니다. 기본값은 output입니다.
  • outputFilenameSuffix: 윈도잉된 각 파일에 배치할 서픽스이며 일반적으로 .txt 또는 .csv와 같은 파일 확장자입니다. 예를 들면 .txt입니다. 기본값은 빈 값입니다.
  • outputShardTemplate: 샤드 템플릿은 윈도우 설정된 각 파일의 동적 부분을 정의합니다. 기본적으로, 파이프라인은 각 윈도우 내에서 단일 샤드를 사용하여 파일 시스템에 출력합니다. 따라서 모든 데이터가 윈도우별로 한 파일에 출력됩니다. outputShardTemplate 기본값은 W-P-SS-of-NN입니다. 여기에서 W는 윈도우 기간, P는 창 정보, S는 샤드 번호, N은 샤드 개수입니다. 단일 파일의 경우 outputShardTemplateSS-of-NN 부분이 00-of-01입니다.
  • numShards: 쓰는 동안에 생성되는 최대 출력 샤드 수입니다. 샤드 수가 많을수록 Cloud Storage 쓰기 처리량이 높아지지만 출력 Cloud Storage 파일을 처리할 때 샤드 간에 데이터 집계 비용이 늘어날 수 있습니다. 기본값은 0입니다.
  • windowDuration: 범위 기간은 데이터가 출력 디렉터리에 기록되는 간격입니다. 파이프라인의 처리량을 기준으로 기간을 구성합니다. 예를 들어 처리량이 높을수록 데이터가 메모리에 적합하도록 더 작은 범위가 필요할 수 있습니다. 기본값은 5m(5분)이며 최소 1s(1초)입니다. 허용되는 형식은 [int]s(초 단위, 예: 5s), [int]m(분 단위, 예: 12m), [int]h(시간 단위, 예: 2h)입니다. 예를 들면 5m입니다.
  • yearPattern: 연도 형식을 지정하는 패턴입니다. y 또는 Y 중 하나 이상이어야 합니다. 연도의 대소문자는 차이가 없습니다. 영숫자 또는 디렉터리(/) 외의 문자로 패턴을 선택적으로 래핑할 수 있습니다. 기본값은 YYYY입니다.
  • monthPattern: 월 형식을 지정하는 패턴입니다. M 문자가 1개 이상이어야 합니다. 영숫자 또는 디렉터리(/) 외의 문자로 패턴을 선택적으로 래핑할 수 있습니다. 기본값은 MM입니다.
  • dayPattern: 날짜 형식을 지정하는 패턴입니다. 월의 날짜일 경우 d, 연의 날짜일 경우 D 하나 이상이어야 합니다. 연도의 대소문자는 차이가 없습니다. 영숫자 또는 디렉터리(/) 외의 문자로 패턴을 선택적으로 래핑할 수 있습니다. 기본값은 dd입니다.
  • hourPattern: 시간 형식을 지정하는 패턴입니다. H 문자가 1개 이상이어야 합니다. 영숫자 또는 디렉터리(/) 외의 문자로 패턴을 선택적으로 래핑할 수 있습니다. 기본값은 HH입니다.
  • minutePattern: 분 형식을 지정하는 패턴입니다. m 문자가 1개 이상이어야 합니다. 영숫자 또는 디렉터리(/) 외의 문자로 패턴을 선택적으로 래핑할 수 있습니다. 기본값은 mm입니다.

템플릿 실행

콘솔gcloudAPI
  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Pub/Sub Topic or Subscription to Text Files on Cloud Storage template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow flex-template run JOB_NAME \
    --project=YOUR_PROJECT_ID \
    --region REGION_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME,\
outputDirectory=gs://BUCKET_NAME/output/,\
outputFilenamePrefix=output-,\
outputFilenameSuffix=.txt

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • BUCKET_NAME: Cloud Storage 버킷 이름

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
  "launch_parameter": {
    "jobName": "JOB_NAME",
    "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/SUBSCRIPTION_NAME"
       "outputDirectory": "gs://BUCKET_NAME/output/",
       "outputFilenamePrefix": "output-",
       "outputFilenameSuffix": ".txt",
    },
    "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_GCS_Text_Flex",
  }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • SUBSCRIPTION_NAME: Pub/Sub 구독 이름
  • BUCKET_NAME: Cloud Storage 버킷 이름
Java
/*
 * Copyright (C) 2022 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates.pubsubtotext;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.io.WindowedFilenamePolicy;
import com.google.cloud.teleport.v2.options.WindowedFilenamePolicyOptions;
import com.google.cloud.teleport.v2.templates.pubsubtotext.PubsubToText.Options;
import com.google.cloud.teleport.v2.utils.DurationUtils;
import com.google.common.base.Strings;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileBasedSink;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;

/**
 * This pipeline ingests incoming data from a Cloud Pub/Sub topic and outputs the raw data into
 * windowed files at the specified output directory.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Cloud_PubSub_to_GCS_Text_Flex.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_PubSub_to_GCS_Text_Flex",
    category = TemplateCategory.STREAMING,
    displayName = "Pub/Sub Subscription or Topic to Text Files on Cloud Storage",
    description =
        "The Pub/Sub Topic or Subscription to Cloud Storage Text template is a streaming pipeline that reads records "
            + "from Pub/Sub and saves them as a series of Cloud Storage files in text format. The template can be used as a quick way to save data in Pub/Sub for future use. By default, the template generates a new file every 5 minutes.",
    optionsClass = Options.class,
    flexContainerName = "pubsub-to-text",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-topic-subscription-to-text",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Pub/Sub topic or subscription must exist prior to execution.",
      "The messages published to the topic must be in text format.",
      "The messages published to the topic must not contain any newlines. Note that each Pub/Sub message is saved as a single line in the output file."
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class PubsubToText {

  /**
   * Options supported by the pipeline.
   *
   * <p>Inherits standard configuration options.
   */
  public interface Options
      extends PipelineOptions, StreamingOptions, WindowedFilenamePolicyOptions {

    @TemplateParameter.PubsubTopic(
        order = 1,
        groupName = "Source",
        optional = true,
        description = "Pub/Sub input topic",
        helpText =
            "The Pub/Sub topic to read the input from. If this parameter is provided "
                + "don't use `inputSubscription`.",
        example = "projects/<PROJECT_ID>/topics/<TOPIC_NAME>")
    String getInputTopic();

    void setInputTopic(String value);

    @TemplateParameter.PubsubSubscription(
        order = 2,
        groupName = "Source",
        optional = true,
        description = "Pub/Sub input subscription",
        helpText =
            "The Pub/Sub subscription to read the input from. If this parameter is "
                + "provided, don't use `inputTopic`.",
        example = "projects/<PROJECT_ID>/subscription/<SUBSCRIPTION_NAME>")
    String getInputSubscription();

    void setInputSubscription(String value);

    @TemplateParameter.GcsWriteFolder(
        order = 3,
        groupName = "Target",
        description = "Output file directory in Cloud Storage",
        helpText =
            "The path and filename prefix to write write output files to. "
                + "This value must end in a slash.",
        example = "gs://your-bucket/your-path/")
    @Required
    String getOutputDirectory();

    void setOutputDirectory(String value);

    @TemplateParameter.GcsWriteFolder(
        order = 4,
        optional = true,
        description = "User provided temp location",
        helpText =
            "The user provided directory to output temporary files to. Must end with a slash.")
    String getUserTempLocation();

    void setUserTempLocation(String value);

    @TemplateParameter.Text(
        order = 5,
        groupName = "Target",
        optional = true,
        description = "Output filename prefix of the files to write",
        helpText = "The prefix to place on each windowed file.",
        example = "output-")
    @Default.String("output")
    @Required
    String getOutputFilenamePrefix();

    void setOutputFilenamePrefix(String value);

    @TemplateParameter.Text(
        order = 6,
        groupName = "Target",
        optional = true,
        description = "Output filename suffix of the files to write",
        helpText =
            "The suffix to place on each windowed file, typically a file extension such as `.txt` or `.csv`.",
        example = ".txt")
    @Default.String("")
    String getOutputFilenameSuffix();

    void setOutputFilenameSuffix(String value);
  }

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    options.setStreaming(true);

    run(options);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(Options options) {
    boolean useInputSubscription = !Strings.isNullOrEmpty(options.getInputSubscription());
    boolean useInputTopic = !Strings.isNullOrEmpty(options.getInputTopic());
    if (useInputSubscription == useInputTopic) {
      throw new IllegalArgumentException(
          "Either input topic or input subscription must be provided, but not both.");
    }

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    PCollection<String> messages = null;

    /*
     * Steps:
     *   1) Read string messages from PubSub
     *   2) Window the messages into minute intervals specified by the executor.
     *   3) Output the windowed files to GCS
     */
    if (useInputSubscription) {
      messages =
          pipeline.apply(
              "Read PubSub Events",
              PubsubIO.readStrings().fromSubscription(options.getInputSubscription()));
    } else {
      messages =
          pipeline.apply(
              "Read PubSub Events", PubsubIO.readStrings().fromTopic(options.getInputTopic()));
    }
    messages
        .apply(
            options.getWindowDuration() + " Window",
            Window.into(FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))))

        // Apply windowed file writes
        .apply(
            "Write File(s)",
            TextIO.write()
                .withWindowedWrites()
                .withNumShards(options.getNumShards())
                .to(
                    WindowedFilenamePolicy.writeWindowedFiles()
                        .withOutputDirectory(options.getOutputDirectory())
                        .withOutputFilenamePrefix(options.getOutputFilenamePrefix())
                        .withShardTemplate(options.getOutputShardTemplate())
                        .withSuffix(options.getOutputFilenameSuffix())
                        .withYearPattern(options.getYearPattern())
                        .withMonthPattern(options.getMonthPattern())
                        .withDayPattern(options.getDayPattern())
                        .withHourPattern(options.getHourPattern())
                        .withMinutePattern(options.getMinutePattern()))
                .withTempDirectory(
                    FileBasedSink.convertToFileResourceIfPossible(
                        maybeUseUserTempLocation(
                            options.getUserTempLocation(), options.getOutputDirectory()))));

    // Execute the pipeline and return the result.
    return pipeline.run();
  }

  /**
   * Utility method for using optional parameter userTempLocation as TempDirectory. This is useful
   * when output bucket is locked and temporary data cannot be deleted.
   *
   * @param userTempLocation user provided temp location
   * @param outputLocation user provided outputDirectory to be used as the default temp location
   * @return userTempLocation if available, otherwise outputLocation is returned.
   */
  private static String maybeUseUserTempLocation(String userTempLocation, String outputLocation) {
    return !Strings.isNullOrEmpty(userTempLocation) ? userTempLocation : outputLocation;
  }
}

다음 단계