템플릿 만들기

Cloud Dataflow 템플릿런타임 매개변수를 통해 파이프라인 실행 중에만 사용 가능한 값을 허용합니다. 이러한 매개변수를 파이프라인 내에서 실행되는 함수(예: DoFn)에 전달하면 템플릿 기반 파이프라인의 실행을 맞춤설정할 수 있습니다.

Apache Beam 파이프라인에서 템플릿을 만들려면 런타임 매개변수를 지원하도록 파이프라인 코드를 수정해야 합니다.

그런 다음 템플릿을 만들고 준비합니다.

런타임 매개변수 및 ValueProvider 인터페이스

ValueProvider 인터페이스를 사용하면 파이프라인이 런타임 매개변수를 허용할 수 있습니다. Apache Beam은 다음 세 가지 유형의 ValueProvider 객체를 제공합니다.

이름 설명
RuntimeValueProvider

RuntimeValueProviderValueProvider 기본 유형입니다. RuntimeValueProvider를 사용하면 파이프라인이 파이프라인 실행 중에만 사용 가능한 값을 허용할 수 있습니다. 파이프라인 생성 중에는 이 값을 사용할 수 없으므로, 이 값을 사용하여 파이프라인의 워크플로 그래프를 변경할 수 없습니다.

isAccessible()을 사용하여 ValueProvider 값을 사용할 수 있는지 확인할 수 있습니다. 파이프라인 실행 전에 get()을 호출하면 Apache Beam이 오류를 반환합니다.
Value only available at runtime, but accessed from a non-runtime context.

사전에 값을 모르면 RuntimeValueProvider를 사용합니다.

StaticValueProvider

StaticValueProvider를 사용하면 파이프라인에 정적 값을 제공할 수 있습니다. 파이프라인 생성 중에 이 값을 사용할 수 있으므로, 이 값을 사용하여 파이프라인의 워크플로 그래프를 변경할 수 있습니다.

미리 값을 알고 있으면 StaticValueProvider를 사용합니다. 예에 대해서는 StaticValueProvider 섹션을 참조하세요.

NestedValueProvider

NestedValueProvider를 사용하면 다른 ValueProvider 객체의 값을 계산할 수 있습니다. NestedValueProviderValueProvider를 래핑하며, 래핑된 ValueProvider 유형에 따라 파이프라인 생성 중에 이 값의 액세스 가능 여부가 결정됩니다.

런타임에서 이 값을 사용하여 다른 값을 계산하려면 NestedValueProvider를 사용합니다. 예에 대해서는 NestedValueProvider 섹션을 참조하세요.

참고: Python용 Apache Beam SDK는 NestedValueProvider를 지원하지 않습니다.

런타임 매개변수를 사용하도록 코드 수정

이 섹션에서는 ValueProvider, StaticValueProvider, NestedValueProvider를 사용하는 방법을 살펴봅니다.

파이프라인 옵션에서 ValueProvider 사용

런타임에 설정 또는 사용할 모든 파이프라인 옵션에 ValueProvider를 사용합니다.

예를 들어, 다음 WordCount 코드 스니펫은 런타임 매개변수를 지원하지 않습니다. 이 코드는 입력 파일 옵션을 추가하고, 파이프라인을 만들고, 입력 파일의 행을 읽습니다.

자바: SDK 2.x

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

  class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

자바: SDK 1.x

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    String getInputFile();
    void setInputFile(String value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.Read.from(options.getInputFile()));
    ...

런타임 매개변수 지원을 추가하려면 ValueProvider를 사용하도록 입력 파일 옵션을 수정합니다.

자바: SDK 2.x

입력 파일 옵션의 유형에 String 대신 ValueProvider<String>을 사용합니다.

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply("ReadLines", TextIO.read().from(options.getInputFile()));
    ...

Python

add_argumentadd_value_provider_argument로 바꿉니다.

 class WordcountOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      # Use add_value_provider_argument for arguments to be templatable
      # Use add_argument as usual for non-templatable arguments
      parser.add_value_provider_argument(
          '--input',
          default='gs://dataflow-samples/shakespeare/kinglear.txt',
          help='Path of the file to read from')
      parser.add_argument(
          '--output',
          required=True,
          help='Output file to write results to.')
  pipeline_options = PipelineOptions(['--output', 'some/output_path'])
  p = beam.Pipeline(options=pipeline_options)

  wordcount_options = pipeline_options.view_as(WordcountOptions)
  lines = p | 'read' >> ReadFromText(wordcount_options.input)

자바: SDK 1.x

입력 파일 옵션의 유형에 String 대신 ValueProvider<String>을 사용합니다.

  public interface WordCountOptions extends PipelineOptions {
    @Description("Path of the file to read from")
    @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt")
    ValueProvider<String> getInputFile();
    void setInputFile(ValueProvider<String> value);
  }

  public static void main(String[] args) {
    WordCountOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WordCountOptions.class);
    Pipeline p = Pipeline.create(options);

    // Add .withoutValidation() when you use a RuntimeValueProvider with SDK 1.x.
    // The value may not be available at validation time.
    p.apply("ReadLines", TextIO.Read.from(options.getInputFile()).withoutValidation());
    ...

함수에서 ValueProvider 사용

자체 함수에서 런타임 매개변수 값을 사용하려면 ValueProvider 매개변수를 사용하도록 함수를 업데이트합니다.

다음 예에는 정수 ValueProvider 옵션과 정수를 더하는 간단한 함수가 포함되어 있습니다. 이 함수는 ValueProvider 정수에 따라 달라집니다. 실행 중에 파이프라인은 [1, 2, 3]을 포함하는 PCollection의 모든 정수에 MySumFn을 적용합니다. 런타임 값이 10이면 결과 PCollection[11, 12, 13]이 포함됩니다.

자바: SDK 2.x

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString()))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

Python

  import apache_beam as beam
  from apache_beam.options.pipeline_options import PipelineOptions
  from apache_beam.utils.value_provider import StaticValueProvider
  from apache_beam.io import WriteToText

  class UserOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
      parser.add_value_provider_argument('--templated_int', type=int)

  class MySumFn(beam.DoFn):
    def __init__(self, templated_int):
      self.templated_int = templated_int

    def process(self, an_int):
      yield self.templated_int.get() + an_int

  pipeline_options = PipelineOptions()
  p = beam.Pipeline(options=pipeline_options)

  user_options = pipeline_options.view_as(UserOptions)
  sum = (p
         | 'ReadCollection' >> beam.io.ReadFromText(
             'gs://some/integer_collection')
         | 'StringToInt' >> beam.Map(lambda w: int(w))
         | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int))
         | 'WriteResultingCollection' >> WriteToText('some/output_path'))

자바: SDK 1.x

  public interface SumIntOptions extends PipelineOptions {
      // New runtime parameter, specified by the --int
      // option at runtime.
      ValueProvider<Integer> getInt();
      void setInt(ValueProvider<Integer> value);
  }

  class MySumFn extends DoFn<Integer, Integer> {
      ValueProvider<Integer> mySumInteger;

      MySumFn(ValueProvider<Integer> sumInt) {
          // Store the value provider
          this.mySumInteger = sumInt;
      }

      @ProcessElement
      public void processElement(ProcessContext c) {
         // Get the value of the value provider and add it to
         // the element's value.
         c.output(c.element() + mySumInteger.get());
      }
  }

  public static void main(String[] args) {
    SumIntOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(SumIntOptions.class);

    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
      // Get the value provider and pass it to MySumFn
     .apply(ParDo.of(new MySumFn(options.getInt())))
     .apply(MapElements.via(
        new SimpleFunction<Integer, String>() {
          public String apply(Integer i) {
            return i.toString();
          }
        }))
     .apply("OutputNums", TextIO.write().to("numvalues"));

    p.run();
  }

StaticValueProvider 사용

파이프라인에 정적 값을 제공하려면 StaticValueProvider를 사용합니다.

이 예에서는 ValueProvider<Integer>를 받아들이는 DoFnMySumFn을 사용합니다. 미리 매개변수 값을 알고 있으면 StaticValueProvider를 사용하여 정적 값을 ValueProvider로 지정할 수 있습니다.

자바: SDK 2.x

다음 코드는 파이프라인 런타임에 값을 가져옵니다.

  .apply(ParDo.of(new MySumFn(options.getInt())))

대신 StaticValueProvider를 정적 값과 함께 사용할 수 있습니다.

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

Python

다음 코드는 파이프라인 런타임에 값을 가져옵니다.

  beam.ParDo(MySumFn(user_options.templated_int))

대신 StaticValueProvider를 정적 값과 함께 사용할 수 있습니다.

  beam.ParDo(MySumFn(StaticValueProvider(int,10)))

자바: SDK 1.x

다음 코드는 파이프라인 런타임에 값을 가져옵니다.

  .apply(ParDo.of(new MySumFn(options.getInt())))

대신 StaticValueProvider를 정적 값과 함께 사용할 수 있습니다.

  .apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))

일반 매개변수와 런타임 매개변수를 모두 지원하는 I/O 모듈을 구현하는 경우에 StaticValueProvider를 사용할 수도 있습니다. StaticValueProvider는 비슷한 두 가지 메소드 구현 시에 코드 중복을 줄여줍니다.

자바: SDK 2.x

이 예의 소스 코드는 GitHub에 있는 Apache Beam의 TextIO.java에서 가져온 것입니다.

  // Create a StaticValueProvider from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

Python

이 예에는 string 또는 ValueProvider 인수를 모두 사용하는 단일 생성자가 있습니다. 인수가 string인 경우에는 StaticValueProvider로 변환됩니다.

class Read():

  def __init__(self, filepattern):
    if isinstance(filepattern, basestring):
      # Create a StaticValueProvider from a regular string parameter
      filepattern = StaticValueProvider(str, filepattern)

    self.filepattern = filepattern

자바: SDK 1.x

이 예의 소스 코드는 GitHub에 있는 Apache Beam의 TextIO.java에서 가져온 것입니다.

  // Create a StaticValueProvider from a regular String parameter
  // value, and then call .from() with this new StaticValueProvider.
  public Read from(String filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return from(StaticValueProvider.of(filepattern));
  }

  // This method takes a ValueProvider parameter.
  public Read from(ValueProvider<String> filepattern) {
    checkNotNull(filepattern, "Filepattern cannot be empty.");
    return toBuilder().setFilepattern(filepattern).build();
  }

NestedValueProvider 사용

참고: Python용 Apache Beam SDK는 NestedValueProvider를 지원하지 않습니다.

다른 ValueProvider 객체에서 값을 계산하려면 NestedValueProvider를 사용합니다.

NestedValueProviderValueProviderSerializableFunction 변환기를 입력으로 사용합니다. .get()NestedValueProvider을 호출하면 변환기가 ValueProvider 값을 기준으로 새 값을 만듭니다. 이 변환을 사용하면 ValueProvider 값을 사용하여 원하는 최종 값을 만들 수 있습니다.

  • 예 1: 사용자가 파일 이름 file.txt를 제공합니다. 변환이 파일 이름 앞에 파일 경로 gs://directory_name/을 추가합니다. .get()을 호출하면 gs://directory_name/file.txt가 반환됩니다.
  • 예 2: 사용자가 BigQuery 쿼리의 하위 문자열을 제공합니다(예: 특정 날짜). 변환이 하위 문자열을 사용하여 전체 쿼리를 만듭니다. .get()을 호출하면 전체 쿼리가 반환됩니다.

참고: NestedValueProvider를 통해 값 하나만 입력할 수 있습니다. NestedValueProvider를 사용하여 서로 다른 두 값을 조합할 수 없습니다.

다음 코드는 NestedValueProvider를 사용하여 첫 번째 예를 구현합니다. 사용자가 파일 이름을 제공하고, 변환이 파일 이름 앞에 파일 경로를 추가합니다.

자바: SDK 2.x

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply("OutputNums", TextIO.write().to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

Python

Python용 Apache Beam SDK는 NestedValueProvider를 지원하지 않습니다.

자바: SDK 1.x

  public interface WriteIntsOptions extends PipelineOptions {
      // New runtime parameter, specified by the --fileName
      // option at runtime.
      ValueProvider<String> getFileName();
      void setFileName(ValueProvider<String> value);
  }

  public static void main(String[] args) {
     WriteIntsOptions options =
          PipelineOptionsFactory.fromArgs(args).withValidation()
            .as(WriteIntsOptions.class);
    Pipeline p = Pipeline.create(options);

    p.apply(Create.of(1, 2, 3))
     // Write to the computed complete file path.
     .apply(TextIO.Write.named("OutputNums").to(NestedValueProvider.of(
        options.getFileName(),
        new SerializableFunction<String, String>() {
          @Override
          public String apply(String file) {
            return "gs://directoryname/" + file;
          }
        })));

    p.run();
  }

메타데이터

템플릿이 실행되면 커스텀 매개변수 유효성을 검사하도록 추가 메타데이터로 템플릿을 확장할 수 있습니다. 템플릿의 메타데이터를 만들려면 다음을 수행해야 합니다.

  1. 아래 표에서 매개변수를 사용하여 <template-name>_metadata라는 JSON 형식의 파일을 만듭니다.

    참고: 만든 파일의 이름을 <template-name>_metadata.json으로 지정하지 마세요. 파일에 JSON이 포함되어 있지만 .json 파일 확장자로 끝날 수 없습니다.

  2. JSON 파일을 Cloud Storage에서 템플릿과 동일한 폴더에 저장합니다.

    참고: 템플릿을 <template-name>에 저장하고 메타데이터를 <template-name>_metadata에 저장해야 합니다.

메타데이터 매개변수

매개변수 키 필수 값 설명
name 템플릿 이름입니다.
description 아니요 템플릿을 설명하는 짧은 텍스트 단락입니다.
parameters 아니요. 기본값은 빈 배열입니다. 템플릿이 사용할 추가 매개변수 배열입니다.
name 템플릿에서 사용된 매개변수 이름입니다.
label 사람이 읽을 수 있는 라벨로, UI에서 매개변수에 라벨을 지정하는 데 사용됩니다.
help_text 매개변수를 설명하는 짧은 텍스트 단락입니다.
is_optional 아니요. 기본값은 false입니다. 매개변수가 필수이면 true이고, 선택사항이면 false입니다.
regexes 아니요. 기본값은 빈 배열입니다. 매개변수 값 유효성을 검사하는 데 사용할 문자열 형식의 POSIX-egrep 정규 표현식 배열입니다. 예를 들어, ["^[a-zA-Z][a-zA-Z0-9]+"]는 값이 문자로 시작되고 문자를 한 개 이상 포함하고 있음을 검증하는 단일 정규 표현식입니다.

메타데이터 파일 예

Cloud Dataflow 서비스는 다음 메타데이터를 사용하여 WordCount 템플릿의 커스텀 매개변수 유효성을 검사합니다.

{
  "description": "An example pipeline that counts words in the input file.",
  "name": "Word Count",
  "parameters": [
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "inputFile",
      "helpText": "Path of the file pattern glob to read from. ex: gs://dataflow-samples/shakespeare/kinglear.txt",
      "label": "Input GCS file(s)"
    },
    {
      "regexes": [
        "^gs:\\/\\/[^\\n\\r]+$"
      ],
      "name": "output",
      "helpText": "Path and filename prefix for writing output files. ex: gs://MyBucket/counts",
      "label": "Output GCS file(s)"
    }
  ]
}

Cloud Dataflow 템플릿 디렉터리에서 이 메타데이터 파일을 다운로드할 수 있습니다.

파이프라인 I/O 및 런타임 매개변수

자바: SDK 2.x

일부 I/O 커넥터에는 ValueProvider 객체를 허용하는 메소드가 포함되어 있습니다. 특정 커넥터와 메소드에 대한 지원을 확인하려면 I/O 커넥터의 API 참조 문서를 참조하세요. 지원되는 메소드에는 ValueProvider가 포함된 오버로드가 있습니다. 메소드에 오버로드가 없으면 메소드가 런타임 매개변수를 지원하지 않습니다. 다음 I/O 커넥터는 최소한 ValueProvider를 부분적으로 지원합니다.

  • 파일 기반 IO: TextIO, AvroIO, FileIO, TFRecordIO, XmlIO
  • BigQueryIO*
  • BigtableIO(SDK 2.3.0 이상 필요)
  • PubSubIO
  • SpannerIO

* 참고: BigQuery에서 읽는 일괄 처리 파이프라인을 실행하려면 모든 BigQuery 읽기에서 .withTemplateCompatibility()를 사용해야 합니다.

Python

일부 I/O 커넥터에는 ValueProvider 객체를 허용하는 메소드가 포함되어 있습니다. I/O 커넥터 및 해당 메소드에 대한 지원을 확인하려면 커넥터의 API 참조 문서를 참조하세요. 다음 I/O 커넥터는 런타임 매개변수를 허용합니다.

  • 파일 기반 IO: textio, avroio, tfrecordio

자바: SDK 1.x

다음 표에는 런타임 매개변수를 허용하는 메소드 전체 목록이 포함되어 있습니다.

I/O 메소드
BigQuery* BigQueryIO.Read.from()*
BigQueryIO.Read.fromQuery()*
BigQueryIO.Write.to()*
BigQueryIO.Write.withSchema()*
Cloud Pub/Sub PubsubIO.Read.subscription()
PubsubIO.Read.topic()
PubsubIO.Write.topic()
TextIO TextIO.Read.from()
TextIO.Write.to()

* BigQuery 작업 ID는 템플릿 생성 시 설정되므로, BigQuery 배치 파이프라인 템플릿을 한 번만 실행할 수 있습니다.

템플릿 만들기 및 스테이징

파이프라인을 작성한 후에는 템플릿 파일을 만들고 준비해야 합니다. 해당 SDK 버전에 대한 명령어를 사용합니다.

참고: 템플릿을 만들고 준비한 후 템플릿을 실행하는 데 필요한 추가 파일이 스테이징 위치에 포함됩니다. 준비 위치를 삭제하면 템플릿 실행이 실패합니다.

자바: SDK 2.x

이 Maven 명령어는 --templateLocation으로 지정한 Cloud Storage 위치에서 템플릿을 만들고 스테이징합니다.

  • YOUR_PROJECT_ID를 프로젝트 ID로 바꿉니다.

  • YOUR_BUCKET_NAME을 Cloud Storage 버킷 이름으로 바꿉니다.

  • YOUR_TEMPLATE_NAME을 템플릿 이름으로 바꿉니다.

  • com.example.myclass를 자바 클래스로 바꿉니다.

  • templateLocation 경로가 올바른지 확인합니다.

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=YOUR_PROJECT_ID \
                  --stagingLocation=gs://YOUR_BUCKET_NAME/staging \
                  --templateLocation=gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME"
    

Python

이 Python 명령어는 --template_location으로 지정한 Cloud Storage 위치에서 템플릿을 만들고 스테이징합니다. 명령어를 다음과 같이 변경합니다.

  • YOUR_PROJECT_ID를 프로젝트 ID로 바꿉니다.

  • YOUR_BUCKET_NAME을 Cloud Storage 버킷 이름으로 바꿉니다.

  • YOUR_TEMPLATE_NAME을 템플릿 이름으로 바꿉니다.

  • examples.mymodule을 Python 모듈로 바꿉니다.

  • template_location 경로가 올바른지 확인합니다.

  python -m examples.mymodule \
    --runner DataflowRunner \
    --project YOUR_PROJECT_ID \
    --staging_location gs://YOUR_BUCKET_NAME/staging \
    --temp_location gs://YOUR_BUCKET_NAME/temp \
    --template_location gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME

자바: SDK 1.x

이 Maven 명령어는 --dataflowJobFile로 지정한 Cloud Storage 위치에서 템플릿을 만들고 스테이징합니다.

  • YOUR_PROJECT_ID를 프로젝트 ID로 바꿉니다.

  • YOUR_BUCKET_NAME을 Cloud Storage 버킷 이름으로 바꿉니다.

  • YOUR_TEMPLATE_NAME을 템플릿 이름으로 바꿉니다.

  • com.example.myclass를 자바 클래스로 바꿉니다.

  • dataflowJobFile 경로가 올바른지 확인합니다.

    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=TemplatingDataflowPipelineRunner \
                  --project=YOUR_PROJECT_ID \
                  --stagingLocation=gs://YOUR_BUCKET_NAME/staging \
                  --dataflowJobFile=gs://YOUR_BUCKET_NAME/templates/YOUR_TEMPLATE_NAME"
    

템플릿을 만들고 스테이징한 후 다음 단계는 템플릿 실행입니다.

이 페이지가 도움이 되었나요? 평가를 부탁드립니다.

다음에 대한 의견 보내기...

도움이 필요하시나요? 지원 페이지를 방문하세요.