이 문서에서는 Dataflow 파이프라인 코드에서 커스텀 기본 템플릿을 만드는 방법을 알아봅니다. 기본 템플릿은 기존 Dataflow 파이프라인을 패키징하여 재사용 가능한 템플릿을 만들며, 특정 파이프라인 매개변수를 변경하여 이러한 템플릿을 각 작업에 대해 맞춤설정할 수 있습니다. 템플릿을 직접 작성하는 대신 명령어를 사용하여 기존 파이프라인에서 템플릿을 생성합니다.
다음은 이 프로세스에 대한 간단한 개요입니다. 이 프로세스의 세부정보는 이어지는 섹션에서 제공됩니다.
- 파이프라인 코드에서 런타임에 설정하거나 사용할 모든 파이프라인 옵션에
ValueProvider
인터페이스를 사용합니다. 런타임 매개변수를 허용하는DoFn
객체를 사용합니다. - 기본 템플릿이 실행되면 커스텀 매개변수 유효성을 검사하도록 추가 메타데이터로 템플릿을 확장합니다. 이러한 메타데이터의 예시로는 커스텀 기본 템플릿의 이름과 선택적 매개변수가 있습니다.
- 파이프라인 I/O 커넥터가
ValueProvider
객체를 지원하는지 확인하고 필요에 따라 변경합니다. - 커스텀 기본 템플릿을 만들고 스테이징합니다.
- 커스텀 기본 템플릿을 실행합니다.
다양한 종류의 Dataflow 템플릿, 템플릿의 이점, 기본 템플릿을 선택하는 시기에 대한 자세한 내용은 Dataflow 템플릿을 참조하세요.
기본 템플릿 실행에 필요한 권한
Dataflow 기본 템플릿을 실행하는 데 필요한 권한은 템플릿을 실행하는 위치와 파이프라인의 소스와 싱크가 다른 프로젝트에 있는지 여부에 따라 다릅니다.
Dataflow 파이프라인을 로컬에서 실행하거나 Google Cloud를 사용하여 실행하는 방법에 대한 자세한 내용은 Dataflow 보안 및 권한을 참조하세요.
Dataflow 역할 및 권한 목록은 Dataflow 액세스 제어를 참조하세요.
제한사항
- 기본 템플릿에서는 다음 파이프라인 옵션이 지원되지 않습니다. 작업자 하네스 스레드 수를 제어해야 하는 경우 Flex 템플릿을 사용하세요.
Java
numberOfWorkerHarnessThreads
Python
number_of_worker_harness_threads
- Dataflow 실행기는 Pub/Sub 주제와 구독 매개변수의
ValueProvider
옵션을 지원하지 않습니다. 런타임 매개변수에 Pub/Sub 옵션이 필요하면 Flex 템플릿을 사용합니다.
런타임 매개변수 및 ValueProvider
인터페이스에 대한 정보
ValueProvider
인터페이스를 사용하면 파이프라인이 런타임 매개변수를 허용할 수 있습니다. Apache Beam은 다음 세 가지 유형의 ValueProvider
객체를 제공합니다.
이름 | 설명 |
---|---|
RuntimeValueProvider |
사전에 값을 모르면 |
StaticValueProvider |
값을 미리 알면 |
NestedValueProvider |
런타임에 이 값을 사용하여 다른 값을 계산하려면 |
파이프라인 코드에서 런타임 매개변수 사용
이 섹션에서는 ValueProvider
, StaticValueProvider
, NestedValueProvider
를 사용하는 방법을 살펴봅니다.
파이프라인 옵션에서 ValueProvider
사용
런타임 시 설정하거나 사용할 모든 파이프라인 옵션에 ValueProvider
를 사용합니다.
예를 들어 다음 WordCount
코드 스니펫은 런타임 매개변수를 지원하지 않습니다. 이 코드는 입력 파일 옵션을 추가하고, 파이프라인을 만들고, 입력 파일의 행을 읽습니다.
Java
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") String getInputFile(); void setInputFile(String value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
런타임 매개변수 지원을 추가하려면 ValueProvider
를 사용하도록 입력 파일 옵션을 수정합니다.
Java
입력 파일 옵션의 유형에 String
대신 ValueProvider<String>
을 사용합니다.
public interface WordCountOptions extends PipelineOptions { @Description("Path of the file to read from") @Default.String("gs://dataflow-samples/shakespeare/kinglear.txt") ValueProvider<String> getInputFile(); void setInputFile(ValueProvider<String> value); } public static void main(String[] args) { WordCountOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WordCountOptions.class); Pipeline p = Pipeline.create(options); p.apply("ReadLines", TextIO.read().from(options.getInputFile())); ...
Python
add_argument
를 add_value_provider_argument
로 바꿉니다.
class WordcountOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): # Use add_value_provider_argument for arguments to be templatable # Use add_argument as usual for non-templatable arguments parser.add_value_provider_argument( '--input', default='gs://dataflow-samples/shakespeare/kinglear.txt', help='Path of the file to read from') parser.add_argument( '--output', required=True, help='Output file to write results to.') pipeline_options = PipelineOptions(['--output', 'some/output_path']) p = beam.Pipeline(options=pipeline_options) wordcount_options = pipeline_options.view_as(WordcountOptions) lines = p | 'read' >> ReadFromText(wordcount_options.input)
함수에 ValueProvider
사용
자체 함수에서 런타임 매개변수 값을 사용하려면 ValueProvider
매개변수를 사용하도록 함수를 업데이트합니다.
다음 예시에는 정수 ValueProvider
옵션과 정수를 더하는 간단한 함수가 포함되어 있습니다. 이 함수는 ValueProvider
정수에 따라 달라집니다. 실행 중에 파이프라인은 [1, 2, 3]
을 포함하는 PCollection
의 모든 정수에 MySumFn
을 적용합니다. 런타임 값이 10이면 결과 PCollection
에 [11, 12, 13]
이 포함됩니다.
Java
public interface SumIntOptions extends PipelineOptions { // New runtime parameter, specified by the --int // option at runtime. ValueProvider<Integer> getInt(); void setInt(ValueProvider<Integer> value); } class MySumFn extends DoFn<Integer, Integer> { ValueProvider<Integer> mySumInteger; MySumFn(ValueProvider<Integer> sumInt) { // Store the value provider this.mySumInteger = sumInt; } @ProcessElement public void processElement(ProcessContext c) { // Get the value of the value provider and add it to // the element's value. c.output(c.element() + mySumInteger.get()); } } public static void main(String[] args) { SumIntOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(SumIntOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Get the value provider and pass it to MySumFn .apply(ParDo.of(new MySumFn(options.getInt()))) .apply("ToString", MapElements.into(TypeDescriptors.strings()).via(x -> x.toString())) .apply("OutputNums", TextIO.write().to("numvalues")); p.run(); }
Python
import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.value_provider import StaticValueProvider from apache_beam.io import WriteToText class UserOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_value_provider_argument('--templated_int', type=int) class MySumFn(beam.DoFn): def __init__(self, templated_int): self.templated_int = templated_int def process(self, an_int): yield self.templated_int.get() + an_int pipeline_options = PipelineOptions() p = beam.Pipeline(options=pipeline_options) user_options = pipeline_options.view_as(UserOptions) sum = (p | 'ReadCollection' >> beam.io.ReadFromText( 'gs://some/integer_collection') | 'StringToInt' >> beam.Map(lambda w: int(w)) | 'AddGivenInt' >> beam.ParDo(MySumFn(user_options.templated_int)) | 'WriteResultingCollection' >> WriteToText('some/output_path'))
StaticValueProvider
사용
파이프라인에 정적 값을 제공하려면 StaticValueProvider
를 사용합니다.
이 예시에서는 ValueProvider<Integer>
를 받아들이는 DoFn
인 MySumFn
을 사용합니다. 매개변수의 값을 미리 알고 있으면 StaticValueProvider
를 사용하여 정적 값을 ValueProvider
로 지정할 수 있습니다.
Java
다음 코드는 파이프라인 런타임에 값을 가져옵니다.
.apply(ParDo.of(new MySumFn(options.getInt())))
대신 StaticValueProvider
를 정적 값과 함께 사용할 수 있습니다.
.apply(ParDo.of(new MySumFn(StaticValueProvider.of(10))))
Python
다음 코드는 파이프라인 런타임에 값을 가져옵니다.
beam.ParDo(MySumFn(user_options.templated_int))
대신 StaticValueProvider
를 정적 값과 함께 사용할 수 있습니다.
beam.ParDo(MySumFn(StaticValueProvider(int,10)))
일반 매개변수와 런타임 매개변수를 모두 지원하는 I/O 모듈을 구현하는 경우에 StaticValueProvider
를 사용할 수도 있습니다.
StaticValueProvider
는 비슷한 두 가지 메서드 구현 시에 코드 중복을 줄여줍니다.
Java
이 예의 소스 코드는 GitHub에 있는 Apache Beam의 TextIO.java에서 가져온 것입니다.
// Create a StaticValueProvider<String> from a regular String parameter // value, and then call .from() with this new StaticValueProvider. public Read from(String filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return from(StaticValueProvider.of(filepattern)); } // This method takes a ValueProvider parameter. public Read from(ValueProvider<String> filepattern) { checkNotNull(filepattern, "Filepattern cannot be empty."); return toBuilder().setFilepattern(filepattern).build(); }
Python
이 예시에서는 string
또는 ValueProvider
인수를 모두 사용하는 단일 생성자가 있습니다. 인수가 string
인 경우 StaticValueProvider
로 변환됩니다.
class Read(): def __init__(self, filepattern): if isinstance(filepattern, str): # Create a StaticValueProvider from a regular string parameter filepattern = StaticValueProvider(str, filepattern) self.filepattern = filepattern
NestedStaticValueProvider
사용
다른 ValueProvider
객체에서 값을 계산하려면 NestedValueProvider
를 사용합니다.
NestedValueProvider
는 ValueProvider
및 SerializableFunction
변환기를 입력으로 사용합니다. NestedValueProvider
에 .get()
을 호출하면 변환기가 ValueProvider
값을 기준으로 새 값을 만듭니다. 이 변환을 사용하면 ValueProvider
값을 사용하여 원하는 최종 값을 만들 수 있습니다.
다음 예시에서 사용자는 파일 이름 file.txt
를 제공합니다. 변환이 파일 이름 앞에 gs://directory_name/
경로를 추가합니다. .get()
을 호출하면 gs://directory_name/file.txt
가 반환됩니다.
Java
public interface WriteIntsOptions extends PipelineOptions { // New runtime parameter, specified by the --fileName // option at runtime. ValueProvider<String> getFileName(); void setFileName(ValueProvider<String> value); } public static void main(String[] args) { WriteIntsOptions options = PipelineOptionsFactory.fromArgs(args).withValidation() .as(WriteIntsOptions.class); Pipeline p = Pipeline.create(options); p.apply(Create.of(1, 2, 3)) // Write to the computed complete file path. .apply("OutputNums", TextIO.write().to(NestedValueProvider.of( options.getFileName(), new SerializableFunction<String, String>() { @Override public String apply(String file) { return "gs://directoryname/" + file; } }))); p.run(); }
파이프라인 코드에서 메타데이터 사용
템플릿을 실행할 때 커스텀 매개변수를 검증할 수 있도록 추가 메타데이터로 템플릿을 확장할 수 있습니다. 템플릿의 메타데이터를 만들려면 다음 단계를 수행합니다.
- 메타데이터 매개변수의 매개변수와 메타데이터 예시 파일의 형식을 사용하여
TEMPLATE_NAME_metadata
이라는 JSON 형식의 파일을 만듭니다.TEMPLATE_NAME
을 템플릿 이름으로 바꿉니다.메타데이터 파일에 파일 이름 확장자가 없어야 합니다. 예를 들어 템플릿 이름이
myTemplate
이면 메타데이터 파일은myTemplate_metadata
여야 합니다. - 메타데이터 파일을 Cloud Storage에서 템플릿과 동일한 폴더에 저장합니다.
메타데이터 매개변수
매개변수 키 | 필수 | 값 설명 | |
---|---|---|---|
name |
예 | 템플릿 이름입니다. | |
description |
아니요 | 템플릿을 설명하는 짧은 텍스트 단락입니다. | |
streaming |
아니요 | true 면 이 템플릿은 스트리밍을 지원합니다. 기본값은 false 입니다. |
|
supportsAtLeastOnce |
아니요 | true 면 이 템플릿은 적어도 한 번 처리를 지원합니다. 기본값은 false 입니다. 템플릿이 적어도 한 번 스트리밍 모드로 작동하도록 설계된 경우 이 매개변수를 true 로 설정합니다.
|
|
supportsExactlyOnce |
아니요 | true 면 이 템플릿은 정확히 한 번 처리를 지원합니다. 기본값은 true 입니다. |
|
defaultStreamingMode |
아니요 | 최소 1회 모드와 정확히 1회 모드를 모두 지원하는 템플릿의 기본 스트리밍 모드입니다. 다음 값 중 하나를 사용합니다("AT_LEAST_ONCE" 또는 "EXACTLY_ONCE" ) 지정하지 않으면 기본 스트리밍 모드는 정확히 한 번입니다.
|
|
parameters |
아니요 | 템플릿에 사용되는 추가 매개변수 배열입니다. 기본적으로 빈 배열이 사용됩니다. | |
name |
예 | 템플릿에 사용되는 매개변수 이름입니다. | |
label |
예 | Google Cloud 콘솔에서 매개변수 라벨을 지정하기 위해 사용되는 사람이 읽을 수 있는 문자열입니다. | |
helpText |
예 | 매개변수를 설명하는 짧은 텍스트 단락입니다. | |
isOptional |
아니요 | 매개변수가 필수이면 false 이고, 매개변수가 선택사항이면 true 입니다. 특정 값으로 설정하지 않으면 isOptional 이 기본적으로 false 로 설정됩니다.
메타데이터에 이 매개변수 키를 포함하지 않으면 메타데이터가 필수 매개변수가 됩니다. |
|
regexes |
아니요 | 매개변수 값 유효성을 검사하는 데 사용할 문자열 형식의 POSIX-egrep 정규 표현식 배열입니다. 예를 들어 ["^[a-zA-Z][a-zA-Z0-9]+"] 는 값이 문자로 시작하고 문자를 한 개 이상 포함하고 있음을 검증하는 단일 정규 표현식입니다. 기본적으로 빈 배열이 사용됩니다. |
메타데이터 파일 예시
Java
Dataflow 서비스는 다음 메타데이터를 사용하여 WordCount 템플릿의 맞춤 매개변수 유효성을 검사합니다.
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "inputFile", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
Python
Dataflow 서비스는 다음 메타데이터를 사용하여 WordCount 템플릿의 맞춤 매개변수 유효성을 검사합니다.
{ "description": "An example pipeline that counts words in the input file.", "name": "Word Count", "streaming": false, "parameters": [ { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "input", "helpText": "Path of the file pattern glob to read from - for example, gs://dataflow-samples/shakespeare/kinglear.txt", "label": "Input Cloud Storage file(s)" }, { "regexes": [ "^gs:\\/\\/[^\\n\\r]+$" ], "name": "output", "helpText": "Path and filename prefix for writing output files - for example, gs://MyBucket/counts", "label": "Output Cloud Storage file(s)" } ] }
Dataflow 템플릿 디렉터리에서 Google 제공 템플릿의 메타데이터 파일을 다운로드할 수 있습니다.
지원되는 파이프라인 I/O 커넥터 및 ValueProvider
Java
일부 I/O 커넥터에는 ValueProvider
객체를 허용하는 메서드가 포함되어 있습니다. 특정 커넥터와 메서드에 대한 지원을 확인하려면 I/O 커넥터의 API 참고 문서를 확인하세요. 지원되는 메서드에는 ValueProvider
가 포함된 오버로드가 있습니다. 메서드에 오버로드가 없으면 메서드가 런타임 매개변수를 지원하지 않습니다. 다음 I/O 커넥터는 최소한 ValueProvider
를 부분적으로 지원합니다.
- 파일 기반 IO:
TextIO
,AvroIO
,FileIO
,TFRecordIO
,XmlIO
BigQueryIO
*BigtableIO
(SDK 2.3.0 이상 필요)PubSubIO
SpannerIO
Python
일부 I/O 커넥터에는 ValueProvider
객체를 허용하는 메서드가 포함되어 있습니다. I/O 커넥터 및 해당 메서드에 대한 지원을 확인하려면 커넥터의 API 참고 문서를 참조하세요. 다음 I/O 커넥터는 런타임 매개변수를 허용합니다.
- 파일 기반 IO:
textio
,avroio
,tfrecordio
기본 템플릿 만들기 및 스테이징
파이프라인을 작성한 후에는 템플릿 파일을 만들고 스테이징해야 합니다. 템플릿을 만들고 스테이징하면 템플릿을 실행하는 데 필요한 추가 파일이 스테이징 위치에 포함됩니다. 스테이징 위치를 삭제하면 템플릿 실행이 실패합니다. Dataflow 작업은 템플릿을 스테이징한 후 바로 실행되지 않습니다. 커스텀 템플릿 기반 Dataflow 작업을 실행하려면 Google Cloud 콘솔, Dataflow REST API 또는 gcloud CLI를 사용하면 됩니다.
다음 예시에서는 템플릿 파일을 스테이징하는 방법을 보여줍니다.
Java
이 Maven 명령어는 --templateLocation
으로 지정한 Cloud Storage 위치에서 템플릿을 만들고 스테이징합니다.
mvn compile exec:java \ -Dexec.mainClass=com.example.myclass \ -Dexec.args="--runner=DataflowRunner \ --project=PROJECT_ID \ --stagingLocation=gs://BUCKET_NAME/staging \ --templateLocation=gs://BUCKET_NAME/templates/TEMPLATE_NAME \ --region=REGION" \ -P dataflow-runner
templateLocation
경로가 올바른지 확인합니다. 다음을 바꿉니다.
com.example.myclass
: Java 클래스입니다.PROJECT_ID
: 프로젝트 ID입니다.BUCKET_NAME
: Cloud Storage 버킷 이름입니다.TEMPLATE_NAME
: 템플릿 이름입니다.REGION
: Dataflow 작업을 배포할 리전입니다.
Python
이 Python 명령어는 --template_location
으로 지정한 Cloud Storage 위치에 템플릿을 만들고 스테이징합니다.
python -m examples.mymodule \ --runner DataflowRunner \ --project PROJECT_ID \ --staging_location gs://BUCKET_NAME/staging \ --template_location gs://BUCKET_NAME/templates/TEMPLATE_NAME \ --region REGION
template_location
경로가 올바른지 확인합니다. 다음을 바꿉니다.
examples.mymodule
: Python 모듈입니다.PROJECT_ID
: 프로젝트 ID입니다.BUCKET_NAME
: Cloud Storage 버킷 이름입니다.TEMPLATE_NAME
: 템플릿 이름입니다.REGION
: Dataflow 작업을 배포할 리전입니다.
템플릿을 만들고 스테이징한 후 다음 단계는 템플릿 실행입니다.