이 문서에서는 Dataflow 파이프라인 코드에서 커스텀 기본 템플릿을 만드는 방법을 알아봅니다. 기본 템플릿은 기존 Dataflow 파이프라인을 패키징하여 재사용 가능한 템플릿을 만들며, 특정 파이프라인 파라미터를 변경하여 이러한 템플릿을 각 작업에 대해 맞춤설정할 수 있습니다. 템플릿을 직접 작성하는 대신 명령어를 사용하여 기존 파이프라인에서 템플릿을 생성합니다.
다음은 이 프로세스에 대한 간단한 개요입니다. 이 프로세스의 세부정보는 이어지는 섹션에서 제공됩니다.
- 파이프라인 코드에서 런타임에 설정하거나 사용할 모든 파이프라인 옵션에 ValueProvider인터페이스를 사용합니다. 런타임 파라미터를 허용하는DoFn객체를 사용합니다.
- 기본 템플릿이 실행되면 커스텀 파라미터 유효성을 검사하도록 추가 메타데이터로 템플릿을 확장합니다. 이러한 메타데이터의 예시로는 커스텀 기본 템플릿의 이름과 선택적 파라미터가 있습니다.
- 파이프라인 I/O 커넥터가 ValueProvider객체를 지원하는지 확인하고 필요에 따라 변경합니다.
- 커스텀 기본 템플릿을 만들고 스테이징합니다.
- 커스텀 기본 템플릿을 실행합니다.
다양한 종류의 Dataflow 템플릿, 템플릿의 이점, 기본 템플릿을 선택하는 시기에 대한 자세한 내용은 Dataflow 템플릿을 참조하세요.
기본 템플릿 실행에 필요한 권한
Dataflow 기본 템플릿을 실행하는 데 필요한 권한은 템플릿을 실행하는 위치와 파이프라인의 소스와 싱크가 다른 프로젝트에 있는지 여부에 따라 다릅니다.
Dataflow 파이프라인을 로컬에서 실행하거나 Google Cloud Platform을 사용하여 실행하는 방법에 대한 자세한 내용은 Dataflow 보안 및 권한을 참조하세요.
Dataflow 역할 및 권한 목록은 Dataflow 액세스 제어를 참조하세요.
제한사항
- 다음 파이프라인 옵션은 기본 템플릿에서 지원되지 않습니다. 작업자 하네스 스레드의 수를 제어해야 하는 경우 Flex 템플릿을 사용하세요.
자바numberOfWorkerHarnessThreads Pythonnumber_of_worker_harness_threads
- Dataflow 실행기는 Pub/Sub 주제 및 구독 파라미터의 ValueProvider옵션을 지원하지 않습니다. 런타임 파라미터에 Pub/Sub 옵션이 필요하면 Flex 템플릿을 사용합니다.
런타임 파라미터 및 ValueProvider 인터페이스에 대한 정보
ValueProvider 인터페이스를 사용하면 파이프라인이 런타임 파라미터를 허용할 수 있습니다. Apache Beam은 다음 세 가지 유형의 ValueProvider 객체를 제공합니다.
| 이름 | 설명 | 
|---|---|
| RuntimeValueProvider | 
 
 사전에 값을 모르면  | 
| StaticValueProvider | 
 값을 미리 알면  | 
| NestedValueProvider | 
 런타임에 이 값을 사용하여 다른 값을 계산하려면  | 
파이프라인 코드에서 런타임 파라미터 사용
이 섹션에서는 ValueProvider, StaticValueProvider, NestedValueProvider를 사용하는 방법을 살펴봅니다.
파이프라인 옵션에서 ValueProvider 사용
런타임 시 설정하거나 사용할 모든 파이프라인 옵션에 ValueProvider를 사용합니다.
예를 들어 다음 WordCount 코드 스니펫은 런타임 파라미터를 지원하지 않습니다. 이 코드는 입력 파일 옵션을 추가하고, 파이프라인을 만들고, 입력 파일의 행을 읽습니다.
자바
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
런타임 파라미터 지원을 추가하려면 ValueProvider를 사용하도록 입력 파일 옵션을 수정합니다.
자바
입력 파일 옵션의 유형에 String 대신 ValueProvider<String>을 사용합니다.
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") ValueProvider<String> getInputFile(); void setInputFile(ValueProvider<String> value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
add_argument를 add_value_provider_argument로 바꿉니다.
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): # Use add_value_provider_argument for arguments to be templatable # Use add_argument as usual for non-templatable arguments parser.add_value_provider_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
함수에서 ValueProvider 사용
자체 함수에서 런타임 파라미터 값을 사용하려면 ValueProvider 파라미터를 사용하도록 함수를 업데이트합니다.
다음 예시에는 정수 ValueProvider 옵션과 정수를 더하는 간단한 함수가 포함되어 있습니다. 이 함수는 ValueProvider 정수에 따라 달라집니다. 실행 중에 파이프라인은 [1, 2, 3]을 포함하는 PCollection의 모든 정수에 MySumFn을 적용합니다. 런타임 값이 10이면 결과 PCollection에 [11, 12, 13]이 포함됩니다.
자바
public interface SumIntOptions extends PipelineOptions { // New runtime parameter, specified by the --int // option at runtime. ValueProvider<Integer> getInt(); void setInt(ValueProvider<Integer> value); } class MySumFn extends DoFn<Integer, Integer> { ValueProvider<Integer> mySumInteger; MySumFn(ValueProvider<Integer> sumInt) { // Store the value provider this.mySumInteger = sumInt; } @ProcessElement public void processElement(ProcessContext c) { // Get the value of the value provider and add it to // the element's value. c.output(c.element() + mySumInteger.get()); } } public static void main(String[] args) { SumIntOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(SumIntOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Get the value provider and pass it to MySumFn .apply(ParDo.of(new MySumFn(options.getInt()))) .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString())) .apply("OutputNums", TextIO.write().to("numvalues")); p.run(); }
Python
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.value_provider import StaticValueProvider from apache_beam.io import WriteToText class UserOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_value_provider_argument('--templated_int', type=int) class MySumFn(beam.DoFn): def __init__(self, templated_int): self.templated_int = templated_int def process(self, an_int): yield self.templated_int.get() + an_int pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) user_options = pipeline_options.view_as(UserOptions) sum = (p | 'ReadCollection' >> beam.io.ReadFromText( 'gs://some/integer_collection') | 'StringToInt' >> beam.Map(lambda w: int(w)) | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int)) | 'WriteResultingCollection' >> WriteToText('some/output_path'))
StaticValueProvider 사용
파이프라인에 정적 값을 제공하려면 StaticValueProvider를 사용합니다.
이 예시에서는 ValueProvider<Integer>를 받아들이는 DoFn인 MySumFn을 사용합니다. 파라미터의 값을 미리 알고 있으면 StaticValueProvider를 사용하여 정적 값을 ValueProvider로 지정할 수 있습니다.
자바
다음 코드는 파이프라인 런타임에 값을 가져옵니다.
.apply(ParDo.of(new MySumFn(options.getInt())))
대신 StaticValueProvider를 정적 값과 함께 사용할 수 있습니다.
.apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))
Python
다음 코드는 파이프라인 런타임에 값을 가져옵니다.
beam.ParDo(MySumFn(user_options.templated_int))
대신 StaticValueProvider를 정적 값과 함께 사용할 수 있습니다.
beam.ParDo(MySumFn(StaticValueProvider(int,10)))
일반 파라미터와 런타임 파라미터를 모두 지원하는 I/O 모듈을 구현하는 경우에 StaticValueProvider를 사용할 수도 있습니다.
  StaticValueProvider는 비슷한 두 가지 메서드 구현 시에 코드 중복을 줄여줍니다.
자바
이 예의 소스 코드는 GitHub에 있는 Apache Beam의 TextIO.java에서 가져온 것입니다.
// Create a StaticValueProvider<String> from a regular String parameter // value, and then call .from() with this new StaticValueProvider. public Read from(String filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return from(StaticValueProvider.of(filepattern)); } // This method takes a ValueProvider parameter. public Read from(ValueProvider<String> filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return toBuilder().setFilepattern(filepattern).build(); }
Python
이 예시에서는 string 또는 ValueProvider 인수를 모두 사용하는 단일 생성자가 있습니다. 인수가 string인 경우 StaticValueProvider로 변환됩니다.
class Read(): def __init__(self, filepattern): if isinstance(filepattern, str): # Create a StaticValueProvider from a regular string parameter filepattern = StaticValueProvider(str, filepattern) self.filepattern = filepattern
NestedStaticValueProvider 사용
  다른 ValueProvider 객체에서 값을 계산하려면 NestedValueProvider를 사용합니다.
NestedValueProvider는 ValueProvider 및 SerializableFunction변환기를 입력으로 사용합니다. NestedValueProvider에 .get()을 호출하면 변환기가 ValueProvider 값을 기준으로 새 값을 만듭니다. 이 변환을 사용하면 ValueProvider 값을 사용하여 원하는 최종 값을 만들 수 있습니다.
다음 예시에서 사용자는 파일 이름 file.txt를 제공합니다. 변환이 파일 이름 앞에 gs://directory_name/ 경로를 추가합니다. .get()을 호출하면 gs://directory_name/file.txt가 반환됩니다.
자바
public interface WriteIntsOptions extends PipelineOptions { // New runtime parameter, specified by the --fileName // option at runtime. ValueProvider<String> getFileName(); void setFileName(ValueProvider<String> value); } public static void main(String[] args) { WriteIntsOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WriteIntsOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Write to the computed complete file path. .apply("OutputNums", TextIO.write().to(NestedValueProvider.of( options.getFileName(), new SerializableFunction<String, String>() { @Override public String apply(String file) { return "gs://directoryname/" + file; } }))); p.run(); }
파이프라인 코드에서 메타데이터 사용
템플릿을 실행할 때 커스텀 파라미터를 검증할 수 있도록 추가 메타데이터로 템플릿을 확장할 수 있습니다. 템플릿의 메타데이터를 만들려면 다음 단계를 수행합니다.
- 메타데이터 파라미터의 파라미터와 메타데이터 예시 파일의 형식을 사용하여 TEMPLATE_NAME_metadata이라는 JSON 형식의 파일을 만듭니다.TEMPLATE_NAME을 템플릿 이름으로 바꿉니다.메타데이터 파일에 파일 이름 확장자가 없어야 합니다. 예를 들어 템플릿 이름이 myTemplate이면 메타데이터 파일은myTemplate_metadata여야 합니다.
- 메타데이터 파일을 Cloud Storage에서 템플릿과 동일한 폴더에 저장합니다.
메타데이터 매개변수
| 매개변수 키 | 필수 | 값 설명 | |
|---|---|---|---|
| name | 예 | 템플릿 이름입니다. | |
| description | 아니요 | 템플릿을 설명하는 짧은 텍스트 단락입니다. | |
| streaming | 아니요 | true면 이 템플릿은 스트리밍을 지원합니다. 기본값은false입니다. | |
| supportsAtLeastOnce | 아니요 | true면 이 템플릿은 적어도 한 번 처리를 지원합니다. 기본값은false입니다. 템플릿이 적어도 한 번 스트리밍 모드로 작동하도록 설계된 경우 이 파라미터를true로 설정합니다. | |
| supportsExactlyOnce | 아니요 | true면 이 템플릿은 정확히 한 번 처리를 지원합니다. 기본값은true입니다. | |
| defaultStreamingMode | 아니요 | 적어도 한 번 모드와 정확히 한 번 모드를 모두 지원하는 템플릿의 기본 스트리밍 모드입니다. 다음 값 중 하나를 사용합니다( "AT_LEAST_ONCE"또는"EXACTLY_ONCE") 지정하지 않으면 기본 스트리밍 모드는 정확히 한 번입니다. | |
| parameters | 아니요 | 템플릿에 사용되는 추가 파라미터 배열입니다. 기본적으로 빈 배열이 사용됩니다. | |
| name | 예 | 템플릿에 사용되는 파라미터 이름입니다. | |
| label | 예 | Google Cloud 콘솔에서 파라미터 라벨을 지정하는 데 사용되는 인간이 읽을 수 있는 문자열입니다. | |
| helpText | 예 | 파라미터를 설명하는 짧은 텍스트 단락입니다. | |
| isOptional | 아니요 | 파라미터가 필수이면 false이고, 파라미터가 선택사항이면true입니다. 특정 값으로 설정하지 않으면isOptional이 기본적으로false로 설정됩니다.
        메타데이터에 이 파라미터 키를 포함하지 않으면 메타데이터가 필수 파라미터가 됩니다. | |
| regexes | 아니요 | 파라미터 값 유효성을 검사하는 데 사용할 문자열 형식의 POSIX-egrep 정규 표현식 배열입니다. 예를 들어 ["^[a-zA-Z][a-zA-Z0-9]+"]는 값이 문자로 시작하고 문자를 한 개 이상 포함하고 있음을 검증하는 단일 정규 표현식입니다. 기본적으로 빈 배열이 사용됩니다. | 
메타데이터 파일 예시
자바
Dataflow 서비스는 다음 메타데이터를 사용하여 WordCount 템플릿의 맞춤 파라미터 유효성을 검사합니다.
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "inputFile", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
Python
Dataflow 서비스는 다음 메타데이터를 사용하여 WordCount 템플릿의 맞춤 파라미터 유효성을 검사합니다.
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "input", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
Dataflow 템플릿 디렉터리에서 Google 제공 템플릿의 메타데이터 파일을 다운로드할 수 있습니다.
지원되는 파이프라인 I/O 커넥터 및 ValueProvider
자바
일부 I/O 커넥터에는 ValueProvider 객체를 허용하는 메서드가 포함되어 있습니다. 특정 커넥터와 메서드에 대한 지원을 확인하려면 I/O 커넥터의 API 참고 문서를 확인하세요. 지원되는 메서드에는 ValueProvider가 포함된 오버로드가 있습니다. 메서드에 오버로드가 없으면 메서드가 런타임 파라미터를 지원하지 않습니다. 다음 I/O 커넥터는 최소한 ValueProvider를 부분적으로 지원합니다.
- 파일 기반 IO: TextIO,AvroIO,FileIO,TFRecordIO,XmlIO
- BigQueryIO*
- BigtableIO(SDK 2.3.0 이상 필요)
- PubSubIO
- SpannerIO
Python
일부 I/O 커넥터에는 ValueProvider 객체를 허용하는 메서드가 포함되어 있습니다. I/O 커넥터 및 해당 메서드에 대한 지원을 확인하려면 커넥터의 API 참고 문서를 참조하세요. 다음 I/O 커넥터는 런타임 파라미터를 허용합니다.
- 파일 기반 IO: textio,avroio,tfrecordio
기본 템플릿 만들기 및 스테이징
파이프라인을 작성한 후에는 템플릿 파일을 만들고 스테이징해야 합니다. 템플릿을 만들고 스테이징하면 템플릿을 실행하는 데 필요한 추가 파일이 스테이징 위치에 포함됩니다. 스테이징 위치를 삭제하면 템플릿 실행이 실패합니다. Dataflow 작업은 템플릿을 스테이징한 후 바로 실행되지 않습니다. 커스텀 템플릿 기반 Dataflow 작업을 실행하려면 Google Cloud 콘솔, Dataflow REST API 또는 gcloud CLI를 사용하면 됩니다.
다음 예시에서는 템플릿 파일을 스테이징하는 방법을 보여줍니다.
자바
이 Maven 명령어는 --templateLocation으로 지정한 Cloud Storage 위치에서 템플릿을 만들고 스테이징합니다.
    mvn compile exec:java \
     -Dexec.mainClass=com.example.myclass \
     -Dexec.args="--runner=DataflowRunner \
                  --project=PROJECT_ID \
                  --stagingLocation=gs://BUCKET_NAME/staging \
                  --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \
                  --region=REGION" \
     -P dataflow-runner
    templateLocation 경로가 올바른지 확인합니다. 다음을 바꿉니다.
- com.example.myclass: Java 클래스입니다.
- PROJECT_ID: 프로젝트 ID입니다.
- BUCKET_NAME: Cloud Storage 버킷 이름입니다.
- TEMPLATE_NAME: 템플릿 이름입니다.
- REGION: Dataflow 작업을 배포할 리전입니다.
Python
이 Python 명령어는 --template_location으로 지정한 Cloud Storage 위치에 템플릿을 만들고 스테이징합니다.
  python -m examples.mymodule \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --staging_location gs://BUCKET_NAME/staging \
    --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \
    --region REGION
template_location 경로가 올바른지 확인합니다. 다음을 바꿉니다.
- examples.mymodule: Python 모듈입니다.
- PROJECT_ID: 프로젝트 ID입니다.
- BUCKET_NAME: Cloud Storage 버킷 이름입니다.
- TEMPLATE_NAME: 템플릿 이름입니다.
- REGION: Dataflow 작업을 배포할 리전입니다.
템플릿을 만들고 스테이징한 후 다음 단계는 템플릿 실행입니다.