파이프라인 실행 매개변수 지정

Apache Beam 프로그램이 파이프라인을 구성하면 사용자는 파이프라인을 실행해야 합니다. 파이프라인 실행은 Apache Beam 프로그램 실행과는 별개입니다. Apache Beam 프로그램에서 파이프라인이 생성되며, 작성한 코드를 통해 파이프라인 실행기에서 실행할 일련의 단계가 생성됩니다. 파이프라인 실행기는 Google Cloud Platform(GCP)의 Cloud Dataflow 관리형 서비스, 타사 실행기 서비스 또는 로컬 환경에서 직접 단계를 실행하는 로컬 파이프라인 실행기일 수 있습니다.

Apache Beam SDK 클래스 PipelineOptions를 사용하여 파이프라인 실행기와 기타 실행 옵션을 지정할 수 있습니다. PipelineOptions를 사용하여 파이프라인이 실행되는 방식과 위치, 사용되는 리소스를 구성합니다.

대부분 Cloud Dataflow 실행기 서비스를 사용하여 관리형 GCP 리소스에서 파이프라인을 실행하려고 합니다. Cloud Dataflow 서비스로 파이프라인을 실행하면 Cloud Dataflow 작업이 만들어지며, 여기에는 GCP 프로젝트의 Compute Engine과 Cloud Storage 리소스가 사용됩니다.

또한 파이프라인을 로컬로 실행할 수 있습니다. 파이프라인을 로컬로 실행하면 파이프라인 변환이 Cloud Dataflow 프로그램이 실행되는 동일한 머신에서 실행됩니다. 특히 파이프라인이 적은 양의 메모리 내 데이터세트를 사용하는 경우, 로컬 실행은 테스트와 디버깅용으로 유용합니다.

PipelineOptions 설정

Cloud Dataflow 프로그램에서 Pipeline 객체 생성 시 PipelineOptions를 전달합니다. Cloud Dataflow 서비스가 파이프라인을 실행하면 각 작업자 인스턴스로 PipelineOptions 복사본을 전송합니다.

자바: SDK 2.x

참고: ProcessContext.getPipelineOptions 메소드를 사용하여 ParDoDoFn 인스턴스에 있는 PipelineOptions에 액세스할 수 있습니다.

Python

이 기능은 아직 Python용 Apache Beam SDK에서 지원되지 않습니다.

자바: SDK 1.x

참고: ProcessContext.getPipelineOptions 메소드를 사용하여 ParDoDoFn 인스턴스에 있는 PipelineOptions에 액세스할 수 있습니다.

명령줄 인수에서 PipelineOptions 설정

PipelineOptions 객체를 만들고 필드를 직접 설정하여 파이프라인을 구성할 수 있지만 Apache Beam SDK에는 명령줄 인수를 사용하여 PipelineOptions 필드를 설정하는 데 사용할 수 있는 명령줄 파서가 포함되어 있습니다.

자바: SDK 2.x

명령줄에서 옵션을 읽으려면 다음 예제 코드처럼 PipelineOptionsFactory.fromArgs 메소드를 사용하여 PipelineOptions 객체를 구성합니다.

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

참고: .withValidation 메소드를 추가하면 Cloud Dataflow가 필수 명령줄 인수를 확인하고 인수 값 유효성을 검사합니다.

PipelineOptionsFactory.fromArgs를 사용하면 다음 형식을 따르는 명령줄 인수가 해석됩니다.

--<option>=<value>

이러한 방식으로 PipelineOptions를 빌드하면 org.apache.beam.sdk.options.PipelineOptions의 모든 하위 인터페이스에서 옵션을 명령줄 인수로 지정할 수 있습니다.

Python

명령줄에서 옵션을 읽으려면 다음 예제 코드처럼 PipelineOptions 객체를 만듭니다.

options = PipelineOptions(flags=argv)

인수 (flags=argv)부터 PipelineOptions까지는 다음 형식을 따르는 명령줄 인수를 해석합니다.

--<option>=<value>

이런 방식으로 PipelineOptions를 빌드하면 PipelineOptions에서 하위 클래스 처리를 통해 옵션을 지정할 수 있습니다.

자바: SDK 1.x

명령줄에서 옵션을 읽으려면 다음 예제 코드처럼 PipelineOptionsFactory.fromArgs 메소드를 사용하여 PipelineOptions 객체를 구성합니다.

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

참고: .withValidation 메소드를 추가하면 Cloud Dataflow가 필수 명령줄 인수를 확인하고 인수 값 유효성을 검사합니다.

PipelineOptionsFactory.fromArgs를 사용하면 다음 형식을 따르는 명령줄 인수가 해석됩니다.

--<option>=<value>

이러한 방식으로 PipelineOptions를 빌드하면 com.google.cloud.dataflow.sdk.options.PipelineOptions의 모든 하위 인터페이스에서 옵션을 명령줄 인수로 지정할 수 있습니다.

커스텀 옵션 만들기

표준 PipelineOptions 외에 자체 커스텀 옵션을 추가할 수 있습니다. 또한 Cloud Dataflow의 명령줄 파서는 동일한 형식으로 지정된 명령줄 인수를 사용하여 커스텀 옵션을 설정할 수 있습니다.

자바: SDK 2.x

자체 옵션을 추가하려면 다음 예처럼 각 옵션에 대해 getter와 setter 메소드를 사용하여 인터페이스를 정의합니다.

  public interface MyOptions extends PipelineOptions {
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Python

자체 옵션을 추가하려면 다음 예와 같이 add_argument() 메소드(Python의 표준 argparse 모듈과 정확히 동일하게 동작)를 사용합니다.

class MyOptions(PipelineOptions):

  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

자바: SDK 1.x

자체 옵션을 추가하려면 다음 예처럼 각 옵션에 대해 getter와 setter 메소드를 사용하여 인터페이스를 정의합니다.

  public interface MyOptions extends PipelineOptions {
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

사용자가 명령줄 인수로 --help를 전달하면 나타나는 설명과 기본값을 지정할 수도 있습니다.

자바: SDK 2.x

다음과 같이 주석을 사용하여 설명과 기본값을 설정합니다.

  public interface MyOptions extends PipelineOptions {
    @Description("My custom command line argument.")
    @Default.String("DEFAULT")
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

PipelineOptionsFactory로 인터페이스를 등록한 후 PipelineOptions 객체를 만들 때 인터페이스를 전달하는 것이 좋습니다. PipelineOptionsFactory로 인터페이스를 등록하면 --help는 커스텀 옵션 인터페이스를 찾고 --help 명령어 출력에 추가할 수 있습니다. 또한 PipelineOptionsFactory는 커스텀 옵션이 등록된 모든 옵션과 호환되는지 확인합니다.

다음 예제 코드는 PipelineOptionsFactory로 커스텀 옵션 인터페이스를 등록하는 방법을 보여줍니다.

  PipelineOptionsFactory.register(MyOptions.class);
  MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                            .withValidation()
                                            .as(MyOptions.class);

이제 파이프라인이 명령줄 인수로 --myCustomOption=value를 수용할 수 있습니다.

Python

다음과 같이 설명과 기본값을 설정합니다.

class MyOptions(PipelineOptions):

  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input',
                        help='Input for the pipeline',
                        default='gs://my-bucket/input')
    parser.add_argument('--output',
                        help='Output for the pipeline',
                        default='gs://my-bucket/output')

자바: SDK 1.x

다음과 같이 주석을 사용하여 설명과 기본값을 설정합니다.

  public interface MyOptions extends PipelineOptions {
    @Description("My custom command line argument.")
    @Default.String("DEFAULT")
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

PipelineOptionsFactory로 인터페이스를 등록한 후 PipelineOptions 객체를 만들 때 인터페이스를 전달하는 것이 좋습니다. PipelineOptionsFactory로 인터페이스를 등록하면 --help는 커스텀 옵션 인터페이스를 찾고 --help 명령어 출력에 추가할 수 있습니다. 또한 PipelineOptionsFactory는 커스텀 옵션이 등록된 모든 옵션과 호환되는지 확인합니다.

다음 예제 코드는 PipelineOptionsFactory로 커스텀 옵션 인터페이스를 등록하는 방법을 보여줍니다.

  PipelineOptionsFactory.register(MyOptions.class);
  MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                            .withValidation()
                                            .as(MyOptions.class);

이제 파이프라인이 명령줄 인수로 --myCustomOption=value를 수용할 수 있습니다.

Cloud Dataflow 서비스에서 실행되도록 PipelineOptions 구성

Cloud Dataflow 관리형 서비스를 사용하여 파이프라인을 실행하려면 PipelineOptions에서 다음 필드를 설정해야 합니다.

자바: SDK 2.x

  • project - GCP 프로젝트 ID
  • runner - 프로그램을 파싱하고 파이프라인을 구성하는 파이프라인 실행기. 클라우드 실행의 경우에는 DataflowRunner여야 합니다.
  • gcpTempLocation - 임시 파일을 스테이징할 Cloud Dataflow의 Cloud Storage 경로. 파이프라인을 실행하기 전에 먼저 이 버킷을 만들어야 합니다. gcpTempLocation을 지정하지 않는 경우, 파이프라인 옵션 tempLocation을 지정할 수 있습니다. 그러면 gcpTempLocationtempLocation 값으로 설정됩니다. 아무것도 지정하지 않으면 기본 gcpTempLocation이 생성됩니다.
  • stagingLocation - 바이너리 파일을 스테이징할 Cloud Dataflow의 Cloud Storage 버킷. 이 옵션을 설정하지 않으면 tempLocation에 지정한 내용이 스테이징 위치에서도 사용됩니다.
  • tempLocation을 지정하지 않으면 기본 gcpTempLocation이 생성됩니다. tempLocation은 지정되고 gcpTempLocation이 지정되지 않으면 tempLocation은 Cloud Storage 경로여야 하고 gcpTempLocation의 기본값이 됩니다. tempLocation이 지정되지 않고 gcpTempLocation이 지정되면 tempLocation은 채워지지 않습니다.

Python

  • job_name - 실행 중인 Cloud Dataflow 작업 이름
  • project - GCP 프로젝트 ID
  • runner - 프로그램을 파싱하고 파이프라인을 구성하는 파이프라인 실행기. 클라우드 실행의 경우에는 DataflowRunner여야 합니다.
  • staging_location - 작업을 실행하는 작업자에게 필요한 코드 패키지를 스테이징할 Cloud Dataflow의 Cloud Storage 경로
  • temp_location - 파이프라인 실행 중에 생성된 임시 작업 파일을 스테이징할 Cloud Dataflow의 Cloud Storage 경로

자바: SDK 1.x

  • project - GCP 프로젝트 ID
  • runner - 프로그램을 파싱하고 파이프라인을 구성하는 파이프라인 실행기. 클라우드 실행을 위해 DataflowPipelineRunner 또는 BlockingDataflowPipelineRunner 중 하나여야 합니다.
  • stagingLocation - 바이너리 및 임시 파일을 스테이징할 Cloud Dataflow의 Cloud Storage 버킷. 파이프라인을 실행하기 전에 먼저 이 버킷을 만들어야 합니다.

이러한 옵션을 프로그래매틱 방식으로 설정하거나 명령줄을 사용하여 지정할 수 있습니다. 다음 예제 코드는 Cloud Dataflow 관리형 서비스를 사용하여 파이프라인을 실행하기 위해 실행기와 기타 필요 옵션을 프로그래매틱 방식으로 설정하여 파이프라인을 구성하는 방법을 보여줍니다.

자바: SDK 2.x

  // Create and set your PipelineOptions.
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

  // For Cloud execution, set the Cloud Platform project, staging location,
  // and specify DataflowRunner.
  options.setProject("my-project-id");
  options.setStagingLocation("gs://my-bucket/binaries");
  options.setRunner(DataflowRunner.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

# Create and set your PipelineOptions.
options = PipelineOptions(flags=argv)

# For Cloud execution, set the Cloud Platform project, job_name,
# staging location, temp_location and specify DataflowRunner.
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = 'my-project-id'
google_cloud_options.job_name = 'myjob'
google_cloud_options.staging_location = 'gs://my-bucket/binaries'
google_cloud_options.temp_location = 'gs://my-bucket/temp'
options.view_as(StandardOptions).runner = 'DataflowRunner'

# Create the Pipeline with the specified options.
p = Pipeline(options=options)

자바: SDK 1.x

  // Create and set your PipelineOptions.
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

  // For Cloud execution, set the Cloud Platform project, staging location,
  // and specify DataflowPipelineRunner or BlockingDataflowPipelineRunner.
  options.setProject("my-project-id");
  options.setStagingLocation("gs://my-bucket/binaries");
  options.setRunner(DataflowPipelineRunner.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

파이프라인을 구성한 후에는 모든 파이프라인 읽기, 변환, 쓰기를 지정하고 파이프라인을 실행합니다.

다음 예제 코드는 명령줄을 사용하여 Cloud Dataflow 서비스를 실행하는 데 필요한 옵션 설정 방법을 보여줍니다.

자바: SDK 2.x

  // Create and set your PipelineOptions.
  MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

# Use Python argparse module to parse custom arguments
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('--input')
parser.add_argument('--output')
known_args, pipeline_args = parser.parse_known_args(argv)

# Create the Pipeline with remaining arguments.
with beam.Pipeline(argv=pipeline_args) as p:
  lines = p | 'ReadFromText' >> beam.io.ReadFromText(known_args.input)
  lines | 'WriteToText' >> beam.io.WriteToText(known_args.output)

자바: SDK 1.x

  // Create and set your PipelineOptions.
  MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(MyOptions.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

파이프라인을 구성한 후에는 모든 파이프라인 읽기, 변환, 쓰기를 지정하고 파이프라인을 실행합니다.

자바: SDK 2.x

명령줄에서 필수 옵션을 전달하는 경우, --project, --runner, --tempLocation, 선택사항으로 --stagingLocation 옵션을 사용합니다.

Python

명령줄에서 필수 옵션을 전달하는 경우, --project, --runner, --stagingLocation 옵션을 사용합니다.

자바: SDK 1.x

명령줄에서 필수 옵션을 전달하는 경우, --project, --runner, --stagingLocation 옵션을 사용합니다.

비동기 실행

자바: SDK 2.x

DataflowRunner를 사용하면 파이프라인이 Google 클라우드에서 비동기적으로 실행됩니다. 파이프라인이 실행되는 동안 Cloud Dataflow 모니터링 인터페이스 또는 Cloud Dataflow 명령줄 인터페이스를 사용하여 작업 진행 상태를 모니터링하고, 실행 세부정보를 확인하고, 파이프라인 결과 업데이트를 수신할 수 있습니다.

Python

DataflowRunner를 사용하면 파이프라인이 Google 클라우드에서 비동기적으로 실행됩니다. 파이프라인이 실행되는 동안 Cloud Dataflow 모니터링 인터페이스 또는 Cloud Dataflow 명령줄 인터페이스를 사용하여 작업 진행 상태를 모니터링하고, 실행 세부정보를 확인하고, 파이프라인 결과 업데이트를 수신할 수 있습니다.

자바: SDK 1.x

DataflowPipelineRunner를 사용하면 파이프라인이 Google 클라우드에서 비동기적으로 실행됩니다. 파이프라인이 실행되는 동안 Cloud Dataflow 모니터링 인터페이스 또는 Cloud Dataflow 명령줄 인터페이스를 사용하여 작업 진행 상태를 모니터링하고, 실행 세부정보를 확인하고, 파이프라인 결과 업데이트를 수신할 수 있습니다.

실행 차단

자바: SDK 2.x

파이프라인 실행기로 DataflowRunner를 지정하고 pipeline.run().waitUntilFinish()를 명시적으로 호출합니다.

DataflowRunner를 사용하고 pipeline.run()에서 반환한 PipelineResult 객체에서 waitUntilFinish()를 호출하면 파이프라인은 DataflowRunner와 같은 방식으로 클라우드에서 실행되지만 시작된 작업이 끝날 때까지 기다렸다가 최종 DataflowPipelineJob 객체를 반환합니다. 작업이 실행되는 동안 Cloud Dataflow 서비스는 대기 중에 작업 상태 업데이트와 콘솔 메시지를 인쇄합니다.

자바 SDK 1.x 사용자이고 명령줄에서 --runner BlockingDataflowPipelineRunner를 사용하여 파이프라인이 종료될 때까지 기본 프로그램이 차단되도록 대화식으로 유도한 경우, 기본 프로그램이 자바 2.x로 waitUntilFinish()를 명시적으로 호출해야 합니다.

Python

파이프라인이 완료될 때까지 차단하려면 실행기의 run() 메소드에서 반환된 PipelineResult 객체의 wait_until_finish() 메소드를 사용합니다.

자바: SDK 1.x

BlockingDataflowPipelineRunner를 파이프라인 실행기로 지정합니다.

BlockingDataflowPipelineRunner를 사용하는 경우, 파이프라인이 DataflowPipelineRunner와 같은 방식으로 클라우드에서 실행되지만 시작된 작업이 끝날 때까지 기다립니다. 작업이 실행되는 동안 Cloud Dataflow 서비스는 대기 중에 작업 상태 업데이트와 콘솔 메시지를 인쇄합니다. BlockingDataflowPipelineRunner를 사용하면 최종 DataflowPipelineJob 객체가 반환됩니다.

참고: 명령줄에서 Ctrl+C를 입력해도 작업이 취소되지 않습니다. Cloud Dataflow 서비스가 계속 GCP에서 작업을 실행합니다. 작업을 취소하려면 Cloud Dataflow 모니터링 인터페이스 또는 Cloud Dataflow 명령줄 인터페이스를 사용해야 합니다.

스트리밍 실행

자바: SDK 2.x

파이프라인이 제한되지 않은 데이터 소스(예: Cloud Pub/Sub)에서 읽는 경우, 파이프라인이 자동으로 스트리밍 모드에서 실행됩니다.

파이프라인이 제한되지 않은 데이터 소스와 싱크를 사용하는 경우 GroupByKey와 같은 집계를 사용하기 전에 unbounded PCollections에 기간 설정 전략을 선택해야 합니다.

Python

파이프라인이 제한되지 않은 데이터 소스나 싱크(예: Cloud Pub/Sub)를 사용하는 경우, streaming 옵션을 true로 설정해야 합니다.

스트리밍 작업은 기본적으로 n1-standard-2 이상의 Compute Engine 머신 유형을 사용합니다. n1-standard-2는 스트리밍 작업을 실행하는 데 필요한 최소 시스템 유형이므로, 이를 재정의해서는 안 됩니다.

파이프라인이 제한되지 않은 데이터 소스와 싱크를 사용하는 경우 GroupByKey와 같은 집계를 사용하기 전에 unbounded PCollections에 기간 설정 전략을 선택해야 합니다.

자바: SDK 1.x

파이프라인이 제한되지 않은 데이터 소스나 싱크(예: Cloud Pub/Sub)를 사용하는 경우 PipelineOptionsstreaming 옵션을 true로 설정해야 합니다. 다음 예와 같이 streaming 옵션을 프로그래매틱 방식으로 설정할 수 있습니다.

  DataflowPipelineOptions dataflowOptions = options.as(DataflowPipelineOptions.class);
  dataflowOptions.setStreaming(true);

파이프라인이 제한되지 않은 데이터 소스와 싱크를 사용하는 경우 GroupByKey와 같은 집계를 사용하기 전에 unbounded PCollections기간 설정 전략을 선택해야 합니다.

다른 클라우드 파이프라인 옵션 설정

PipelineOptions 객체에서 다음 필드를 프로그래매틱 방식으로 설정하여 클라우드에서 파이프라인을 실행할 수 있습니다.

자바: SDK 2.x

필드 유형 설명 기본값
runner Class (NameOfRunner) 사용할 PipelineRunner입니다. 이 필드를 사용하면 런타임에 PipelineRunner를 결정할 수 있습니다. DirectRunner(로컬 모드)
streaming boolean 스트리밍 모드를 사용 설정 또는 중지할지 여부를 지정합니다. 사용 설정하면 true입니다. 파이프라인이 제한되지 않은 소스에서 읽는 경우 기본값은 true입니다. 그 이외의 경우는 false입니다.
project String GCP 프로젝트의 프로젝트 ID입니다. Cloud Dataflow 관리형 서비스를 사용하여 파이프라인을 실행하려는 경우에 필요합니다. 설정하지 않으면 Cloud SDK에서 현재 구성된 프로젝트가 기본값입니다.
gcpTempLocation String 임시 파일의 Cloud Storage 경로입니다. gs://로 시작되는 유효한 Cloud Storage URL이어야 합니다.
stagingLocation String 로컬 파일을 스테이징할 Cloud Storage 경로입니다. gs://로 시작되는 유효한 Cloud Storage URL이어야 합니다. 설정하지 않으면 tempLocation에 지정한 값이 기본값입니다.
autoscalingAlgorithm String Cloud Dataflow 작업에 사용할 자동 확장 모드입니다. 가능한 값은 자동 확장을 사용 설정하는 THROUGHPUT_BASED 또는 중지하는 NONE입니다. Cloud Dataflow 관리형 서비스에서 자동 확장의 작동 방식에 대해서는 자동 확장 기능을 참조하세요. 자바용 Cloud Dataflow SDK 버전 1.6.0 이상을 사용하는 모든 배치 Cloud Dataflow 작업에 대해서는 THROUGHPUT_BASED를 기본값으로 사용합니다. 이전 버전의 자바용 Cloud Dataflow SDK를 사용하는 스트리밍 작업 또는 일괄 작업에 대해서는 NONE을 기본값으로 사용합니다.
numWorkers int 파이프라인 실행 시 사용할 초기 Google Compute Engine 인스턴스의 수입니다. 이 옵션은 작업 시작 시 Dataflow 서비스가 시작하는 작업자 수를 결정합니다. 지정하지 않은 경우, Cloud Dataflow 서비스는 적절한 작업자 수를 결정합니다.
maxNumWorkers int 실행 중에 파이프라인을 사용할 수 있는 Compute Engine 인스턴스의 최대 수입니다. 이 수는 초기 작업자 수보다 높을 수 있습니다(작업이 자동으로, 또는 다른 식으로 확장되도록 numWorkers에서 지정됨). 지정하지 않은 경우, Cloud Dataflow 서비스가 적절한 작업자 수를 결정합니다.
region String 영역 엔드포인트를 지정하면 Cloud Dataflow 작업을 배포하기 위한 영역을 정의할 수 있습니다. 설정하지 않으면 us-central1이 기본값입니다.
zone String 파이프라인을 실행할 작업자 인스턴스를 실행하기 위한 Compute Engine 영역(zone)입니다. region 매개변수를 지정한 경우 zone 매개변수가 해당 리전의 영역(zone)으로 기본 설정됩니다. 다른 영역을 지정하여 이 동작을 재정의할 수 있습니다.
flexRSGoal String 자동 확장 배치 작업에 가변형 리소스 예약(FlexRS)을 지정합니다. numWorkers, autoscalingAlgorithm, zone, region, workerMachineType 매개변수에 영향을 미칩니다. 자세한 내용은 FlexRS 파이프라인 옵션 섹션을 참조하세요. 지정하지 않으면 SPEED_OPTIMIZED이 기본값이며, 이 플래그를 생략하는 것과 같습니다. FlexRS를 사용 설정하려면 Cloud Dataflow 서비스가 사용 가능한 할인된 리소스를 선택할 수 있도록 COST_OPTIMIZED 값을 지정해야 합니다.
filesToStage List<String> 각 작업자가 사용할 수 있는 로컬 파일, 파일 디렉터리 또는 보관 파일(.jar, .zip) 목록입니다. 이 옵션을 설정하면 사용자가 지정하는 해당 파일만 업로드됩니다(자바 클래스 경로는 무시됨). 모든 리소스를 올바른 클래스 경로 순서로 지정해야 합니다. 리소스는 코드로 제한되지 않으며 모든 작업자가 사용할 수 있는 구성 파일과 다른 리소스도 포함할 수 있습니다. 코드는 자바의 표준 리소스 조회 메소드를 사용하여 나열된 리소스에 액세스할 수 있습니다. 주의: Cloud Dataflow는 업로드 전에 파일을 압축하여 시작 시간이 길어지므로, 디렉토리 경로를 지정하는 것은 최적이 아닙니다. 또한 이 옵션을 사용하여 파이프라인이 처리할 대상 작업자에게 데이터를 전송하지 마세요. 그렇지 않으면 기본 Cloud Storage/BigQuery API를 적절한 Cloud Dataflow 데이터 소스와 결합하여 사용할 때보다 속도가 크게 느려집니다. filesToStage가 비어 있으면 Cloud Dataflow가 자바 클래스 경로를 기반으로 준비할 파일을 유추합니다. 왼쪽 열에 언급된 고려 사항과 주의 사항은 여기에도 적용됩니다(나열할 파일 유형 및 코드에서 여기에 액세스할 방법).
network String 파이프라인을 실행할 Compute Engine 인스턴스를 시작하기 위한 Compute Engine 네트워크입니다. 네트워크 지정 방법을 참조하세요. 설정하지 않은 경우, GCP는 default라는 네트워크를 사용한다고 가정합니다.
subnetwork String 파이프라인을 실행하기 위해 Compute Engine 인스턴스를 시작하는 Compute Engine 서브네트워크입니다. 서브네트워크 지정 방법을 참조하세요. Cloud Dataflow 서비스가 기본값을 결정합니다.
usePublicIps boolean Cloud Dataflow 작업자가 공개 IP 주소를 사용할 지 여부를 지정합니다. 값을 false로 설정하면 Cloud Dataflow 작업자는 모든 통신에 비공개 IP 주소를 사용합니다. 이 경우, subnetwork 옵션을 지정하면 network 옵션이 무시됩니다. 지정된 network 또는 subnetwork비공개 Google 액세스가 사용 설정되어 있는지 확인합니다. 설정하지 않은 경우, 기본값은 true이고 Cloud Dataflow 작업자는 공개 IP 주소를 사용합니다.
diskSizeGb int 각 원격 Compute Engine 작업자 인스턴스에서 사용할 디스크 크기(GB)입니다. 최소 디스크 크기는 30GB이며, 이는 작업자 부팅 이미지와 로컬 로그를 처리하기 위함입니다. 파이프라인이 데이터를 무작위로 섞는 경우 최솟값보다 큰 값을 할당해야 합니다.

Cloud Platform 프로젝트에서 정의된 기본 크기를 사용하려면 0으로 설정합니다.

경고: 디스크 크기를 줄이면 사용 가능한 셔플 I/O가 감소합니다. 무작위 섞기 결합 작업으로 인해 런타임과 작업 비용이 상승할 수 있습니다.

serviceAccount String my-service-account-name@<project-id>.iam.gserviceaccount.com 형식을 사용하여 사용자 관리형 컨트롤러 서비스 계정을 지정합니다. 자세한 내용은 Cloud Dataflow 보안 및 권한 페이지의 컨트롤러 서비스 계정 섹션을 참조하세요. 설정하지 않으면 작업자가 프로젝트의 Compute Engine 서비스 계정을 컨트롤러 서비스 계정으로 사용합니다.
workerDiskType String 사용할 영구 디스크 유형입니다(디스크 유형 리소스의 전체 URL로 지정). 예를 들어 SSD 영구 디스크를 지정하려면 compute.googleapis.com/projects//zones//diskTypes/pd-ssd를 사용합니다. 자세한 내용은 Compute Engine API 참조 페이지의 diskTypes를 참조하세요. Cloud Dataflow 서비스가 기본값을 결정합니다.
workerMachineType String 작업자 VM이 회전할 때 Cloud Dataflow가 사용할 Compute Engine 머신 유형입니다. Cloud Dataflow는 n1 시리즈와 커스텀 머신 유형을 지원합니다. f1g1 시리즈 작업자와 같은 공유 코어 머신 유형은 Cloud Dataflow의 서비스수준계약에서 지원되지 않습니다. 이 옵션을 설정하지 않으면 Cloud Dataflow 서비스는 작업을 기반으로 머신 유형을 선택합니다.
enableStreamingEngine boolean Cloud Dataflow 스트리밍 엔진을 사용 설정 또는 중지할지 여부를 지정합니다. 사용 설정하면 true입니다. 스트리밍 엔진을 사용 설정하면 Cloud Dataflow 서비스 백엔드에서 스트리밍 파이프라인의 단계를 실행할 수 있으므로 CPU, 메모리, Persistent Disk 스토리지 리소스가 보존됩니다. 기본값은 false입니다. 즉, 스트리밍 파이프라인의 단계가 작업자 VM에서 완전히 실행됩니다.

파이프라인 구성 옵션 전체 목록은 PipelineOptions 인터페이스(및 하위 인터페이스)에 대한 자바용 API 참조 문서를 참조하세요.

Python

필드 유형 설명 기본값
runner str 사용할 PipelineRunner입니다. 이 필드는 DirectRunner 또는 DataflowRunner일 수 있습니다. DirectRunner(로컬 모드)
streaming bool 스트리밍 모드를 사용 설정 또는 중지할지 여부를 지정합니다. 사용 설정하면 true입니다. false
project str GCP 프로젝트의 프로젝트 ID입니다. Cloud Dataflow 관리형 서비스를 사용하여 파이프라인을 실행하려는 경우에 필요합니다. 설정하지 않으면 오류가 발생합니다.
temp_location str 임시 파일의 Cloud Storage 경로입니다. gs://로 시작되는 유효한 Cloud Storage URL이어야 합니다. 설정하지 않으면 staging_location에 대한 값이 기본값입니다. Google 클라우드에서 파이프라인을 실행할 temp_location 또는 staging_location을 최소 한 개 이상 지정해야 합니다.
staging_location str 로컬 파일을 스테이징할 Cloud Storage 경로입니다. gs://로 시작되는 유효한 Cloud Storage URL이어야 합니다. 설정하지 않으면 temp_location 내의 준비 디렉토리가 기본값입니다. Google 클라우드에서 파이프라인을 실행할 temp_location 또는 staging_location을 최소 한 개 이상 지정해야 합니다.
autoscaling_algorithm str Cloud Dataflow 작업에 사용할 자동 확장 모드입니다. 가능한 값은 자동 확장을 사용 설정하는 THROUGHPUT_BASED 또는 중지하는 NONE입니다. 자동 확장 기능을 참조하여 Cloud Dataflow 관리형 서비스에서 자동 확장의 작동 방식을 알아보세요. 모든 일괄 Cloud Dataflow 작업에 대해 기본값은 THROUGHPUT_BASED입니다.
num_workers int 파이프라인 실행 시 사용할 Compute Engine 인스턴스 수입니다. 지정하지 않은 경우, Cloud Dataflow 서비스가 적절한 작업자 수를 결정합니다.
max_num_workers int 실행 중에 파이프라인을 사용할 수 있는 Compute Engine 인스턴스의 최대 수입니다. 이 수는 초기 작업자 수보다 높을 수 있습니다(작업이 자동으로, 또는 다른 식으로 확장되도록 num_workers에서 지정됨). 지정하지 않은 경우, Cloud Dataflow 서비스가 적절한 작업자 수를 결정합니다.
region str 영역 엔드포인트를 지정하면 Cloud Dataflow 작업을 배포하기 위한 영역을 정의할 수 있습니다. 설정하지 않으면 us-central1이 기본값입니다.
zone str 파이프라인을 실행할 작업자 인스턴스를 실행하기 위한 Compute Engine 영역(zone)입니다. region 매개변수를 지정한 경우 zone 매개변수가 해당 리전의 영역(zone)으로 기본 설정됩니다. 다른 영역을 지정하여 이 동작을 재정의할 수 있습니다.
flexrs_goal str 자동 확장 배치 작업에 가변형 리소스 예약(FlexRS)을 지정합니다. num_workers, autoscaling_algorithm, zone, region, machine_type 매개변수에 영향을 미칩니다. 자세한 내용은 FlexRS 파이프라인 옵션 섹션을 참조하세요. 지정하지 않으면 SPEED_OPTIMIZED이 기본값이며, 이 플래그를 생략하는 것과 같습니다. FlexRS를 사용 설정하려면 Cloud Dataflow 서비스가 사용 가능한 할인된 리소스를 선택할 수 있도록 COST_OPTIMIZED 값을 지정해야 합니다.
network str 파이프라인을 실행할 Compute Engine 인스턴스를 시작하기 위한 Compute Engine 네트워크입니다. 네트워크 지정 방법을 참조하세요. 설정하지 않은 경우, GCP는 default라는 네트워크를 사용한다고 가정합니다.
subnetwork str 파이프라인을 실행하기 위해 Compute Engine 인스턴스를 시작하는 Compute Engine 서브네트워크입니다. 서브네트워크 지정 방법을 참조하세요. Cloud Dataflow 서비스가 기본값을 결정합니다.
no_use_public_ips bool Cloud Dataflow 작업자가 공개 IP 주소를 사용하지 않도록 지정합니다. 따라서 Cloud Dataflow 작업자는 모든 통신에 비공개 IP 주소를 사용합니다. subnetwork 옵션을 지정하면 network 옵션이 무시됩니다. 지정된 network 또는 subnetwork비공개 Google 액세스가 사용 설정되어 있는지 확인합니다. 공개 IP 매개변수를 사용하려면 Python용 Beam SDK가 필요합니다. Python용 Cloud Dataflow SDK는 이 매개변수를 지원하지 않습니다. 설정하지 않으면 Cloud Dataflow 작업자가 공개 IP 주소를 사용합니다.
disk_size_gb int 각 원격 Compute Engine 작업자 인스턴스에서 사용할 디스크 크기(GB)입니다. 최소 디스크 크기는 30GB이며, 이는 작업자 부팅 이미지와 로컬 로그를 처리하기 위함입니다. 파이프라인이 데이터를 무작위로 섞는 경우 최솟값보다 큰 값을 할당해야 합니다. GCP 프로젝트에서 정의된 기본 크기를 사용하려면 0으로 설정합니다.
service_account_email str my-service-account-name@<project-id>.iam.gserviceaccount.com 형식을 사용하여 사용자 관리형 컨트롤러 서비스 계정을 지정합니다. 자세한 내용은 Cloud Dataflow 보안 및 권한 페이지의 컨트롤러 서비스 계정 섹션을 참조하세요. 설정하지 않으면 작업자가 프로젝트의 Compute Engine 서비스 계정을 컨트롤러 서비스 계정으로 사용합니다.
worker_disk_type str 사용할 영구 디스크 유형입니다(디스크 유형 리소스의 전체 URL로 지정). 예를 들어 SSD 영구 디스크를 지정하려면 compute.googleapis.com/projects//zones//diskTypes/pd-ssd를 사용합니다. 자세한 내용은 Compute Engine API 참조 페이지의 diskTypes를 참조하세요. Cloud Dataflow 서비스가 기본값을 결정합니다.
machine_type str 작업자 VM이 회전할 때 Cloud Dataflow가 사용할 Compute Engine 머신 유형입니다. worker_machine_type는 별칭 플래그입니다. 이 옵션을 설정하지 않으면 Cloud Dataflow 서비스는 작업을 기반으로 머신 유형을 선택합니다.

자바: SDK 1.x

필드 유형 설명 기본값
runner Class (NameOfRunner) 사용할 PipelineRunner입니다. 이 필드를 사용하면 런타임에 PipelineRunner를 결정할 수 있습니다. DirectPipelineRunner(로컬 모드)
streaming boolean 스트리밍 모드(현재 베타 버전)를 사용 설정 또는 중지할지 여부를 지정합니다. 사용 설정하면 true입니다. false
project String GCP 프로젝트의 프로젝트 ID입니다. Cloud Dataflow 관리형 서비스를 사용하여 파이프라인을 실행하려는 경우에 필요합니다. 설정하지 않으면 Cloud SDK에서 현재 구성된 프로젝트가 기본값입니다.
tempLocation String 임시 파일의 Cloud Storage 경로입니다. gs://로 시작되는 유효한 Cloud Storage URL이어야 합니다. 설정하지 않으면 stagingLocation에 대한 값이 기본값입니다. Google 클라우드에서 파이프라인을 실행할 tempLocation 또는 stagingLocation을 최소 한 개 이상 지정해야 합니다.
stagingLocation String 로컬 파일을 스테이징할 Cloud Storage 경로입니다. gs://로 시작되는 유효한 Cloud Storage URL이어야 합니다. 설정하지 않으면 tempLocation 내의 준비 디렉토리가 기본값입니다. Google 클라우드에서 파이프라인을 실행할 tempLocation 또는 stagingLocation을 최소 한 개 이상 지정해야 합니다.
autoscalingAlgorithm String Cloud Dataflow 작업에 사용할 자동 확장 모드입니다. 가능한 값은 자동 확장을 사용 설정하는 THROUGHPUT_BASED 또는 중지하는 NONE입니다. 자동 확장 기능을 참조하여 Cloud Dataflow 관리형 서비스에서 자동 확장의 작동 방식을 알아보세요. 자바용 Cloud Dataflow SDK 버전 1.6.0 이상을 사용하는 모든 배치 Cloud Dataflow 작업에 대해서는 THROUGHPUT_BASED를 기본값으로 사용합니다. 이전 버전의 자바용 Cloud Dataflow SDK를 사용하는 스트리밍 작업 또는 일괄 작업에 대해서는 NONE을 기본값으로 사용합니다.
numWorkers int 파이프라인 실행 시 사용할 초기 Compute Engine 인스턴스 수입니다. 이 옵션은 작업 시작 시 Cloud Dataflow 서비스가 시작하는 작업자 수를 결정합니다. 지정하지 않은 경우, Cloud Dataflow 서비스가 적절한 작업자 수를 결정합니다.
maxNumWorkers int 실행 중에 파이프라인을 사용할 수 있는 Compute Engine 인스턴스의 최대 수입니다. 이 수는 초기 작업자 수보다 높을 수 있습니다(작업이 자동으로, 또는 다른 식으로 확장되도록 numWorkers에서 지정됨). 지정하지 않은 경우, Cloud Dataflow 서비스가 적절한 작업자 수를 결정합니다.
zone String 파이프라인을 실행할 작업자 인스턴스를 시작하기 위한 Compute Engine 영역(zone)입니다. 설정하지 않으면 us-central1-f가 기본값입니다. 예를 들어 작업자가 EU에서 시작되도록 다른 영역(zone)을 지정할 수 있습니다.
filesToStage List<String> 각 작업자가 사용할 수 있는 로컬 파일, 파일 디렉토리 또는 보관 파일(.jar, .zip) 목록입니다. 이 옵션을 설정하면 사용자가 지정하는 해당 파일만 업로드됩니다(자바 클래스 경로는 무시됨). 모든 리소스를 올바른 클래스 경로 순서로 지정해야 합니다. 리소스는 코드로 제한되지 않으며 모든 작업자가 사용할 수 있는 구성 파일과 다른 리소스도 포함할 수 있습니다. 코드는 자바의 표준 리소스 조회 메소드를 사용하여 나열된 리소스에 액세스할 수 있습니다. 주의: Cloud Dataflow는 업로드 전에 파일을 압축하여 시작 시간이 길어지므로, 디렉토리 경로를 지정하는 것은 최적이 아닙니다. 또한 이 옵션을 사용하여 파이프라인이 처리할 대상 작업자에게 데이터를 전송하지 마세요. 그렇지 않으면 기본 Cloud Storage/BigQuery API를 적절한 Cloud Dataflow 데이터 소스와 결합하여 사용할 때보다 속도가 크게 느려집니다. filesToStage가 비어 있으면 Cloud Dataflow가 자바 클래스 경로를 기반으로 준비할 파일을 유추합니다. 왼쪽 열에 언급된 고려 사항과 주의 사항은 여기에도 적용됩니다(나열할 파일 유형 및 코드에서 여기에 액세스할 방법).
network String 파이프라인을 실행할 Compute Engine 인스턴스를 시작하기 위한 Compute Engine 네트워크입니다. 네트워크 지정 방법을 참조하세요. 설정하지 않은 경우, GCP는 default라는 네트워크를 사용한다고 가정합니다.
subnetwork String 파이프라인을 실행하기 위해 Compute Engine 인스턴스를 시작하는 Compute Engine 서브네트워크입니다. 서브네트워크 지정 방법을 참조하세요. Cloud Dataflow 서비스가 기본값을 결정합니다.
diskSizeGb int 각 원격 Compute Engine 작업자 인스턴스에서 사용할 디스크 크기(GB)입니다. 최소 디스크 크기는 30GB이며, 이는 작업자 부팅 이미지와 로컬 로그를 처리하기 위함입니다. 파이프라인이 데이터를 무작위로 섞는 경우 최솟값보다 큰 값을 할당해야 합니다.

GCP 프로젝트에서 정의된 기본 크기를 사용하려면 0으로 설정합니다.

경고: 디스크 크기를 줄이면 사용 가능한 셔플 I/O가 감소합니다. 무작위 섞기 결합 작업으로 인해 런타임과 작업 비용이 상승할 수 있습니다.

workerDiskType String 사용할 영구 디스크 유형입니다(디스크 유형 리소스의 전체 URL로 지정). 예를 들어 SSD 영구 디스크를 지정하려면 compute.googleapis.com/projects//zones//diskTypes/pd-ssd를 사용합니다. 자세한 내용은 Compute Engine API 참조 페이지의 diskTypes를 참조하세요. Cloud Dataflow 서비스가 기본값을 결정합니다.
workerMachineType String 작업자 VM이 회전할 때 Cloud Dataflow가 사용할 Compute Engine 머신 유형입니다. Cloud Dataflow는 n1 시리즈와 커스텀 머신 유형을 지원합니다. f1g1 시리즈 작업자와 같은 공유 코어 머신 유형은 Cloud Dataflow의 서비스수준계약에서 지원되지 않습니다. 이 옵션을 설정하지 않으면 Cloud Dataflow 서비스는 작업을 기반으로 머신 유형을 선택합니다.

파이프라인 구성 옵션 전체 목록은 PipelineOptions 인터페이스(및 하위 인터페이스)에 대한 자바용 API 참조 문서를 참조하세요.

로컬 실행을 위한 PipelineOptions 구성

관리형 클라우드 리소스에서 파이프라인을 실행하는 대신 파이프라인을 로컬에서 실행할 수 있습니다. 로컬 실행은 작은 데이터 세트에 대해 파이프라인을 테스트, 디버깅 또는 실행하는 데 몇 가지 장점을 제공합니다. 예를 들어 로컬 실행은 원격 Cloud Dataflow 서비스와 관련된 GCP 프로젝트의 종속 항목을 삭제합니다.

로컬 실행을 사용할 때는 로컬 메모리에 저장할 수 있을 만큼 작은 데이터세트로 파이프라인을 실행하는 것이 좋습니다. Create 변환을 사용하여 작은 메모리에 데이터 세트를 만들거나 Read 변환을 사용하여 작은 로컬 또는 원격 파일로 작업할 수 있습니다. 로컬 실행은 외부 종속성을 줄이면서 테스트와 디버깅을 빠르고 간편하게 수행할 수 있는 방법이지만 로컬 환경에서 사용할 수 있는 메모리로 제한됩니다.

다음 예제 코드는 로컬 환경에서 실행되는 파이프라인을 구성하는 방법을 보여줍니다.

자바: SDK 2.x

  // Create and set our Pipeline Options.
  PipelineOptions options = PipelineOptionsFactory.create();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

참고: 로컬 모드의 경우 DirectRunner가 이미 기본값이므로 실행기를 설정할 필요가 없습니다. 그러나, DirectRunner를 종속성으로 명시적으로 포함하거나 이를 클래스 경로에 추가해야 합니다.

Python

# Create and set your Pipeline Options.
options = PipelineOptions()
p = Pipeline(options=options)

참고: 로컬 모드의 경우 DirectRunner가 이미 기본값이므로 실행기를 설정할 필요가 없습니다.

자바: SDK 1.x

  // Create and set our Pipeline Options.
  PipelineOptions options = PipelineOptionsFactory.create();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

참고: 로컬 모드의 경우 DirectPipelineRunner가 이미 기본값이므로 실행기를 설정할 필요가 없습니다.

파이프라인을 구성한 후에는 실행합니다.

다른 로컬 파이프라인 옵션 설정

파이프라인을 로컬에서 실행하는 경우, 일반적으로 PipelineOptions 속성의 기본값이면 충분합니다.

자바: SDK 2.x

자바 API 참조에서 자바 PipelineOptions의 기본값을 확인할 수 있습니다. 자세한 내용은 PipelineOptions 클래스 목록을 참조하세요.

파이프라인이 BigQuery 또는 IO용 Cloud Storage와 같은 GCP를 사용하는 경우, 특정 GCP 프로젝트와 사용자 인증 정보 옵션을 설정해야 할 수 있습니다. 이러한 경우, GcpOptions.setProject를 사용하여 Google Cloud 프로젝트 ID를 설정해야 합니다. 사용자 인증 정보를 명시적으로 설정해야 할 수도 있습니다. 자세한 내용은 GcpOptions 클래스를 참조하세요.

Python

Python API 참조에서 Python PipelineOptions의 기본값을 확인할 수 있습니다. 자세한 내용은 PipelineOptions 모듈 목록을 참조하세요.

파이프라인이 BigQuery 또는 IO용 Cloud Storage와 같은 GCP 서비스를 사용하는 경우에는 특정 GCP 프로젝트와 사용자 인증 정보 옵션을 설정해야 할 수도 있습니다. 이러한 경우, options.view_as(GoogleCloudOptions).project를 사용하여 Google Cloud 프로젝트 ID를 설정해야 합니다. 사용자 인증 정보를 명시적으로 설정해야 할 수도 있습니다. 자세한 내용은 GoogleCloudOptions 클래스를 참조하세요.

자바: SDK 1.x

자바 API 참조에서 자바 PipelineOptions의 기본값을 확인할 수 있습니다. 자세한 내용은 PipelineOptions 클래스 목록을 참조하세요.

파이프라인이 BigQuery 또는 IO용 Cloud Storage와 같은 GCP 서비스를 사용하는 경우에는 특정 GCP 프로젝트와 사용자 인증 정보 옵션을 설정해야 할 수도 있습니다. 이러한 경우, GcpOptions.setProject를 사용하여 Google Cloud 프로젝트 ID를 설정해야 합니다. 사용자 인증 정보를 명시적으로 설정해야 할 수도 있습니다. 자세한 내용은 GcpOptions 클래스를 참조하세요.

이 페이지가 도움이 되었나요? 평가를 부탁드립니다.

다음에 대한 의견 보내기...

도움이 필요하시나요? 지원 페이지를 방문하세요.