기본 Dataflow 템플릿 만들기

이 문서에서는 Dataflow 파이프라인 코드에서 커스텀 기본 템플릿을 만드는 방법을 알아봅니다. 기본 템플릿은 기존 Dataflow 파이프라인을 패키징하여 재사용 가능한 템플릿을 만들며, 특정 파이프라인 매개변수를 변경하여 이러한 템플릿을 각 작업에 대해 맞춤설정할 수 있습니다. 템플릿을 직접 작성하는 대신 명령어를 사용하여 기존 파이프라인에서 템플릿을 생성합니다.

다음은 이 프로세스에 대한 간단한 개요입니다. 이 프로세스의 세부정보는 이어지는 섹션에서 제공됩니다.

  1. 파이프라인 코드에서 런타임에 설정하거나 사용할 모든 파이프라인 옵션에 ValueProvider 인터페이스를 사용합니다. 런타임 매개변수를 허용하는 DoFn 객체를 사용합니다.
  2. 기본 템플릿이 실행되면 커스텀 매개변수 유효성을 검사하도록 추가 메타데이터로 템플릿을 확장합니다. 이러한 메타데이터의 예시로는 커스텀 기본 템플릿의 이름과 선택적 매개변수가 있습니다.
  3. 파이프라인 I/O 커넥터가 ValueProvider 객체를 지원하는지 확인하고 필요에 따라 변경합니다.
  4. 커스텀 기본 템플릿을 만들고 스테이징합니다.
  5. 커스텀 기본 템플릿을 실행합니다.

다양한 종류의 Dataflow 템플릿, 템플릿의 이점, 기본 템플릿을 선택하는 시기에 대한 자세한 내용은 Dataflow 템플릿을 참조하세요.

기본 템플릿 실행에 필요한 권한

Dataflow 기본 템플릿을 실행하는 데 필요한 권한은 템플릿을 실행하는 위치와 파이프라인의 소스와 싱크가 다른 프로젝트에 있는지 여부에 따라 다릅니다.

Dataflow 파이프라인을 로컬에서 실행하거나 Google Cloud를 사용하여 실행하는 방법에 대한 자세한 내용은 Dataflow 보안 및 권한을 참조하세요.

Dataflow 역할 및 권한 목록은 Dataflow 액세스 제어를 참조하세요.

제한사항

  • 다음 파이프라인 옵션은 기본 템플릿에서 지원되지 않습니다. 작업자 하네스 스레드의 수를 제어해야 하는 경우 Flex 템플릿을 사용하세요.

    자바

    numberOfWorkerHarnessThreads
      

    Python

    number_of_worker_harness_threads
      
  • Dataflow 실행기는 Pub/Sub 주제 및 구독 매개변수의 ValueProvider 옵션을 지원하지 않습니다. 런타임 매개변수에 Pub/Sub 옵션이 필요하면 Flex 템플릿을 사용합니다.

런타임 매개변수 및 ValueProvider 인터페이스에 대한 정보

ValueProvider 인터페이스를 사용하면 파이프라인이 런타임 매개변수를 허용할 수 있습니다. Apache Beam은 다음 세 가지 유형의 ValueProvider 객체를 제공합니다.

이름 설명
RuntimeValueProvider

RuntimeValueProvider는 기본 ValueProvider 유형입니다. RuntimeValueProvider를 사용하면 파이프라인이 파이프라인 실행 중에만 사용 가능한 값을 허용할 수 있습니다. 파이프라인 생성 중에는 이 값을 사용할 수 없으므로, 이 값을 사용하여 파이프라인의 워크플로 그래프를 변경할 수 없습니다.

isAccessible()을 사용하여 ValueProvider 값을 사용할 수 있는지 확인할 수 있습니다. 파이프라인 실행 전에 get()을 호출하면 Apache Beam이 오류를 반환합니다.
Value only available at runtime, but accessed from a non-runtime context.

사전에 값을 모르면 RuntimeValueProvider를 사용합니다. 런타임에 매개변수 값을 변경하려면 템플릿에서 매개변수 값을 설정하지 마세요. 템플릿에서 작업을 만들 때 매개변수 값을 설정하세요.

StaticValueProvider

StaticValueProvider를 사용하면 파이프라인에 정적 값을 제공할 수 있습니다. 파이프라인 생성 중에 이 값을 사용할 수 있으므로, 이 값을 사용하여 파이프라인의 워크플로 그래프를 변경할 수 있습니다.

값을 미리 알면 StaticValueProvider를 사용합니다. 예시는 StaticValueProvider 섹션을 참조하세요.

NestedValueProvider

NestedValueProvider를 사용하면 다른 ValueProvider 객체의 값을 계산할 수 있습니다. NestedValueProviderValueProvider를 래핑하며, 래핑된 ValueProvider 유형에 따라 파이프라인 생성 중에 이 값의 액세스 가능 여부가 결정됩니다.

런타임에 이 값을 사용하여 다른 값을 계산하려면 NestedValueProvider를 사용합니다. 예시는 NestedValueProvider 섹션을 참조하세요.

파이프라인 코드에서 런타임 매개변수 사용

이 섹션에서는 ValueProvider, StaticValueProvider, NestedValueProvider를 사용하는 방법을 살펴봅니다.

파이프라인 옵션에서 ValueProvider 사용

런타임 시 설정하거나 사용할 모든 파이프라인 옵션에 ValueProvider를 사용합니다.

예를 들어 다음 WordCount 코드 스니펫은 런타임 매개변수를 지원하지 않습니다. 이 코드는 입력 파일 옵션을 추가하고, 파이프라인을 만들고, 입력 파일의 행을 읽습니다.

자바

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

  class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

런타임 매개변수 지원을 추가하려면 ValueProvider를 사용하도록 입력 파일 옵션을 수정합니다.

자바

입력 파일 옵션의 유형에 String 대신 ValueProvider<String>을 사용합니다.

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

add_argumentadd_value_provider_argument로 바꿉니다.

 class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

함수에서 ValueProvider 사용

자체 함수에서 런타임 매개변수 값을 사용하려면 ValueProvider 매개변수를 사용하도록 함수를 업데이트합니다.

다음 예시에는 정수 ValueProvider 옵션과 정수를 더하는 간단한 함수가 포함되어 있습니다. 이 함수는 ValueProvider 정수에 따라 달라집니다. 실행 중에 파이프라인은 [1, 2, 3]을 포함하는 PCollection의 모든 정수에 MySumFn을 적용합니다. 런타임 값이 10이면 결과 PCollection[11, 12, 13]이 포함됩니다.

자바

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString()))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Python

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.options.value_provider import StaticValueProvider
  from apache_beam.io import WriteToText

  class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--templated_int', type=int)

  class MySumFn(beam.DoFn):
    def __init__(self, templated_int):
      self.templated_int = templated_int

    def process(self, an_int):
      yield self.templated_int.get() + an_int

  pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  user_options = pipeline_options.view_as(UserOptions)
  sum = (p
         | 'ReadCollection' >> beam.io.ReadFromText(
             'gs://some/integer_collection')
         | 'StringToInt' >> beam.Map(lambda w: int(w))
         | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int))
         | 'WriteResultingCollection' >> WriteToText('some/output_path'))

StaticValueProvider 사용

파이프라인에 정적 값을 제공하려면 StaticValueProvider를 사용합니다.

이 예시에서는 ValueProvider<Integer>를 받아들이는 DoFnMySumFn을 사용합니다. 매개변수의 값을 미리 알고 있으면 StaticValueProvider를 사용하여 정적 값을 ValueProvider로 지정할 수 있습니다.

자바

다음 코드는 파이프라인 런타임에 값을 가져옵니다.

  .apply(ParDo.of(new MySumFn(options.getInt())))

대신 StaticValueProvider를 정적 값과 함께 사용할 수 있습니다.

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Python

다음 코드는 파이프라인 런타임에 값을 가져옵니다.

  beam.ParDo(MySumFn(user_options.templated_int))

대신 StaticValueProvider를 정적 값과 함께 사용할 수 있습니다.

  beam.ParDo(MySumFn(StaticValueProvider(int,10)))

일반 매개변수와 런타임 매개변수를 모두 지원하는 I/O 모듈을 구현하는 경우에 StaticValueProvider를 사용할 수도 있습니다. StaticValueProvider는 비슷한 두 가지 메서드 구현 시에 코드 중복을 줄여줍니다.

자바

이 예의 소스 코드는 GitHub에 있는 Apache Beam의 TextIO.java에서 가져온 것입니다.

  // Create a StaticValueProvider<String> from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Python

이 예시에서는 string 또는 ValueProvider 인수를 모두 사용하는 단일 생성자가 있습니다. 인수가 string인 경우 StaticValueProvider로 변환됩니다.

class Read():

  def __init__(self, filepattern):
    if isinstance(filepattern, str):
      # Create a StaticValueProvider from a regular string parameter
      filepattern = StaticValueProvider(str, filepattern)

    self.filepattern = filepattern

NestedStaticValueProvider 사용

다른 ValueProvider 객체에서 값을 계산하려면 NestedValueProvider를 사용합니다.

NestedValueProviderValueProviderSerializableFunction변환기를 입력으로 사용합니다. NestedValueProvider.get()을 호출하면 변환기가 ValueProvider 값을 기준으로 새 값을 만듭니다. 이 변환을 사용하면 ValueProvider 값을 사용하여 원하는 최종 값을 만들 수 있습니다.

다음 예시에서 사용자는 파일 이름 file.txt를 제공합니다. 변환이 파일 이름 앞에 gs://directory_name/ 경로를 추가합니다. .get()을 호출하면 gs://directory_name/file.txt가 반환됩니다.

자바

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply("OutputNums", TextIO.write().to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

파이프라인 코드에서 메타데이터 사용

템플릿을 실행할 때 커스텀 매개변수를 검증할 수 있도록 추가 메타데이터로 템플릿을 확장할 수 있습니다. 템플릿의 메타데이터를 만들려면 다음 단계를 수행합니다.

  1. 메타데이터 매개변수의 매개변수와 메타데이터 예시 파일의 형식을 사용하여 TEMPLATE_NAME_metadata이라는 JSON 형식의 파일을 만듭니다. TEMPLATE_NAME을 템플릿 이름으로 바꿉니다.

    메타데이터 파일에 파일 이름 확장자가 없어야 합니다. 예를 들어 템플릿 이름이 myTemplate이면 메타데이터 파일은 myTemplate_metadata여야 합니다.

  2. 메타데이터 파일을 Cloud Storage에서 템플릿과 동일한 폴더에 저장합니다.

메타데이터 매개변수

매개변수 키 필수 값 설명
name 템플릿 이름입니다.
description 아니요 템플릿을 설명하는 짧은 텍스트 단락입니다.
streaming 아니요 true면 이 템플릿은 스트리밍을 지원합니다. 기본값은 false입니다.
supportsAtLeastOnce 아니요 true면 이 템플릿은 적어도 한 번 처리를 지원합니다. 기본값은 false입니다. 템플릿이 적어도 한 번 스트리밍 모드로 작동하도록 설계된 경우 이 매개변수를 true로 설정합니다.
supportsExactlyOnce 아니요 true면 이 템플릿은 정확히 한 번 처리를 지원합니다. 기본값은 true입니다.
defaultStreamingMode 아니요 적어도 한 번 모드와 정확히 한 번 모드를 모두 지원하는 템플릿의 기본 스트리밍 모드입니다. 다음 값 중 하나를 사용합니다("AT_LEAST_ONCE" 또는 "EXACTLY_ONCE") 지정하지 않으면 기본 스트리밍 모드는 정확히 한 번입니다.
parameters 아니요 템플릿에 사용되는 추가 매개변수 배열입니다. 기본적으로 빈 배열이 사용됩니다.
name 템플릿에 사용되는 매개변수 이름입니다.
label Google Cloud 콘솔에서 매개변수 라벨을 지정하기 위해 사용되는 사람이 읽을 수 있는 문자열입니다.
helpText 매개변수를 설명하는 짧은 텍스트 단락입니다.
isOptional 아니요 매개변수가 필수이면 false이고, 매개변수가 선택사항이면 true입니다. 특정 값으로 설정하지 않으면 isOptional이 기본적으로 false로 설정됩니다. 메타데이터에 이 매개변수 키를 포함하지 않으면 메타데이터가 필수 매개변수가 됩니다.
regexes 아니요 매개변수 값 유효성을 검사하는 데 사용할 문자열 형식의 POSIX-egrep 정규 표현식 배열입니다. 예를 들어 ["^[a-zA-Z][a-zA-Z0-9]+"]는 값이 문자로 시작하고 문자를 한 개 이상 포함하고 있음을 검증하는 단일 정규 표현식입니다. 기본적으로 빈 배열이 사용됩니다.

메타데이터 파일 예시

자바

Dataflow 서비스는 다음 메타데이터를 사용하여 WordCount 템플릿의 맞춤 매개변수 유효성을 검사합니다.

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "inputFile",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Python

Dataflow 서비스는 다음 메타데이터를 사용하여 WordCount 템플릿의 맞춤 매개변수 유효성을 검사합니다.

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "streaming": false,
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "input",
      "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input Cloud Storage file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts",
      "label": "Output Cloud Storage file(s)"
    }
  ]
}

Dataflow 템플릿 디렉터리에서 Google 제공 템플릿의 메타데이터 파일을 다운로드할 수 있습니다.

지원되는 파이프라인 I/O 커넥터 및 ValueProvider

자바

일부 I/O 커넥터에는 ValueProvider 객체를 허용하는 메서드가 포함되어 있습니다. 특정 커넥터와 메서드에 대한 지원을 확인하려면 I/O 커넥터의 API 참고 문서를 확인하세요. 지원되는 메서드에는 ValueProvider가 포함된 오버로드가 있습니다. 메서드에 오버로드가 없으면 메서드가 런타임 매개변수를 지원하지 않습니다. 다음 I/O 커넥터는 최소한 ValueProvider를 부분적으로 지원합니다.

  • 파일 기반 IO: TextIO, AvroIO, FileIO, TFRecordIO, XmlIO
  • BigQueryIO*
  • BigtableIO(SDK 2.3.0 이상 필요)
  • PubSubIO
  • SpannerIO

Python

일부 I/O 커넥터에는 ValueProvider 객체를 허용하는 메서드가 포함되어 있습니다. I/O 커넥터 및 해당 메서드에 대한 지원을 확인하려면 커넥터의 API 참고 문서를 참조하세요. 다음 I/O 커넥터는 런타임 매개변수를 허용합니다.

  • 파일 기반 IO: textio, avroio, tfrecordio

기본 템플릿 만들기 및 스테이징

파이프라인을 작성한 후에는 템플릿 파일을 만들고 스테이징해야 합니다. 템플릿을 만들고 스테이징하면 템플릿을 실행하는 데 필요한 추가 파일이 스테이징 위치에 포함됩니다. 스테이징 위치를 삭제하면 템플릿 실행이 실패합니다. Dataflow 작업은 템플릿을 스테이징한 후 바로 실행되지 않습니다. 커스텀 템플릿 기반 Dataflow 작업을 실행하려면 Google Cloud 콘솔, Dataflow REST API 또는 gcloud CLI를 사용하면 됩니다.

다음 예시에서는 템플릿 파일을 스테이징하는 방법을 보여줍니다.

자바

이 Maven 명령어는 --templateLocation으로 지정한 Cloud Storage 위치에서 템플릿을 만들고 스테이징합니다.

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=PROJECT_ID \
                  --stagingLocation=gs://BUCKET_NAME/staging \
                  --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \
                  --region=REGION" \
     -P dataflow-runner
    

templateLocation 경로가 올바른지 확인합니다. 다음을 바꿉니다.

  • com.example.myclass: Java 클래스입니다.
  • PROJECT_ID: 프로젝트 ID입니다.
  • BUCKET_NAME: Cloud Storage 버킷 이름입니다.
  • TEMPLATE_NAME: 템플릿 이름입니다.
  • REGION: Dataflow 작업을 배포할 리전입니다.

Python

이 Python 명령어는 --template_location으로 지정한 Cloud Storage 위치에 템플릿을 만들고 스테이징합니다.

  python -m examples.mymodule \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --staging_location gs://BUCKET_NAME/staging \
    --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \
    --region REGION

template_location 경로가 올바른지 확인합니다. 다음을 바꿉니다.

  • examples.mymodule: Python 모듈입니다.
  • PROJECT_ID: 프로젝트 ID입니다.
  • BUCKET_NAME: Cloud Storage 버킷 이름입니다.
  • TEMPLATE_NAME: 템플릿 이름입니다.
  • REGION: Dataflow 작업을 배포할 리전입니다.

템플릿을 만들고 스테이징한 후 다음 단계는 템플릿 실행입니다.