파이프라인 실행 매개변수 지정

Apache Beam 프로그램이 파이프라인을 구성하면 사용자는 파이프라인을 실행해야 합니다. 파이프라인 실행은 Apache Beam 프로그램 실행과는 별개입니다. Apache Beam 프로그램에서 파이프라인이 생성되며, 작성한 코드를 통해 파이프라인 실행기에서 실행할 일련의 단계가 생성됩니다. 파이프라인 실행기는 Google Cloud, 타사 실행기 서비스 또는 로컬 환경에서 단계를 직접 실행하는 로컬 파이프라인 실행기의 Dataflow 관리형 서비스일 수 있습니다.

Apache Beam SDK 클래스 PipelineOptions를 사용하여 파이프라인 실행기와 기타 실행 옵션을 지정할 수 있습니다. PipelineOptions를 사용하여 파이프라인이 실행되는 방식과 위치, 사용되는 리소스를 구성합니다.

대부분의 경우 Dataflow 실행기 서비스를 사용하여 파이프라인을 관리형 Google Cloud 리소스에서 실행해야 합니다. Dataflow 서비스로 파이프라인을 실행하면 Google Cloud 프로젝트에서 Compute Engine 및 Cloud Storage 리소스를 사용하는 Dataflow 작업이 생성됩니다.

또한 파이프라인을 로컬로 실행할 수 있습니다. 파이프라인을 로컬로 실행하면 파이프라인 변환이 Dataflow 프로그램이 실행되는 동일한 머신에서 실행됩니다. 특히 파이프라인이 적은 양의 메모리 내 데이터세트를 사용하는 경우, 로컬 실행은 테스트와 디버깅용으로 유용합니다.

PipelineOptions 설정

Dataflow 프로그램에서 Pipeline 객체 생성 시 PipelineOptions를 전달합니다. Dataflow 서비스가 파이프라인을 실행하면 각 작업자 인스턴스로 PipelineOptions 복사본을 전송합니다.

자바: SDK 2.x

참고: ProcessContext.getPipelineOptions 메서드를 사용하여 ParDoDoFn 인스턴스에 있는 PipelineOptions에 액세스할 수 있습니다.

Python

이 기능은 아직 Python용 Apache Beam SDK에서 지원되지 않습니다.

자바: SDK 1.x

명령줄 인수에서 PipelineOptions 설정

PipelineOptions 객체를 만들고 필드를 직접 설정하여 파이프라인을 구성할 수 있지만 Apache Beam SDK에는 명령줄 인수를 사용하여 PipelineOptions 필드를 설정하는 데 사용할 수 있는 명령줄 파서가 포함되어 있습니다.

자바: SDK 2.x

명령줄에서 옵션을 읽으려면 다음 예시 코드처럼 PipelineOptionsFactory.fromArgs 메서드를 사용하여 PipelineOptions 객체를 구성합니다.

PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().create();

참고: .withValidation 메서드를 추가하면 Dataflow가 필수 명령줄 인수를 확인하고 인수 값 유효성을 검사합니다.

PipelineOptionsFactory.fromArgs를 사용하면 다음 형식을 따르는 명령줄 인수가 해석됩니다.

--<option>=<value>

이러한 방식으로 PipelineOptions를 빌드하면 org.apache.beam.sdk.options.PipelineOptions의 모든 하위 인터페이스에 있는 옵션을 명령줄 인수로 지정할 수 있습니다.

Python

명령줄에서 옵션을 읽으려면 다음 예시 코드처럼 PipelineOptions 객체를 구성합니다.

from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions(flags=argv)

인수 (flags=argv)부터 PipelineOptions까지는 다음 형식을 따르는 명령줄 인수를 해석합니다.

--<option>=<value>

이러한 방식으로 PipelineOptions를 빌드하면 PipelineOptions에서 서브클래스화하여 옵션을 지정할 수 있습니다.

자바: SDK 1.x

커스텀 옵션 만들기

표준 PipelineOptions 외에도 자체 커스텀 옵션을 추가할 수 있습니다. Dataflow의 명령줄 파서도 동일한 형식으로 지정된 명령줄 인수를 사용하여 커스텀 옵션을 설정할 수 있습니다.

자바: SDK 2.x

자체 옵션을 추가하려면 다음 예처럼 각 옵션에 대해 getter와 setter 메소드를 사용하여 인터페이스를 정의합니다.

  public interface MyOptions extends PipelineOptions {
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

Python

자체 옵션을 추가하려면 다음 예시와 같이 Python의 표준 argparse 모듈과 완전히 동일하게 작동하는 add_argument() 메서드를 사용합니다.

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

자바: SDK 1.x

사용자가 명령줄 인수로 --help를 전달하면 나타나는 설명과 기본값을 지정할 수도 있습니다.

자바: SDK 2.x

다음과 같이 주석을 사용하여 설명과 기본값을 설정합니다.

  public interface MyOptions extends PipelineOptions {
    @Description("My custom command line argument.")
    @Default.String("DEFAULT")
    String getMyCustomOption();
    void setMyCustomOption(String myCustomOption);
  }

PipelineOptionsFactory로 인터페이스를 등록한 후 PipelineOptions 객체를 만들 때 인터페이스를 전달하는 것이 좋습니다. PipelineOptionsFactory로 인터페이스를 등록하면 --help는 커스텀 옵션 인터페이스를 찾고 --help 명령어 출력에 추가할 수 있습니다. 또한 PipelineOptionsFactory는 커스텀 옵션이 등록된 모든 옵션과 호환되는지 확인합니다.

다음 예시에서는 PipelineOptionsFactory로 커스텀 옵션 인터페이스를 등록하는 방법을 보여줍니다.

  PipelineOptionsFactory.register(MyOptions.class);
  MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                            .withValidation()
                                            .as(MyOptions.class);

이제 파이프라인이 --myCustomOption=value를 명령줄 인수로 허용할 수 있습니다.

Python

다음과 같이 설명과 기본값을 설정합니다.

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input',
        help='Input for the pipeline',
        default='gs://my-bucket/input')
    parser.add_argument(
        '--output',
        help='Output for the pipeline',
        default='gs://my-bucket/output')

자바: SDK 1.x

Cloud Dataflow 서비스에서 실행되도록 PipelineOptions 구성

Dataflow 관리형 서비스를 사용하여 파이프라인을 실행하려면 PipelineOptions에서 다음 입력란을 설정해야 합니다.

자바: SDK 2.x

  • project - Google Cloud 프로젝트의 ID입니다.
  • runner - 프로그램을 파싱하고 파이프라인을 구성하는 파이프라인 실행기입니다. 클라우드 실행의 경우에는 DataflowRunner여야 합니다.
  • gcpTempLocation - 임시 파일을 스테이징할 Dataflow의 Cloud Storage 경로입니다. 파이프라인을 실행하기 전에 먼저 이 버킷을 만들어야 합니다. gcpTempLocation을 지정하지 않은 경우 파이프라인 옵션 tempLocation을 지정해도 됩니다. 그러면 gcpTempLocationtempLocation 값으로 설정됩니다. 둘 다 지정하지 않으면 기본 gcpTempLocation이 생성됩니다.
  • stagingLocation - 바이너리 파일을 스테이징하기 위한 Dataflow의 Cloud Storage 버킷입니다. 이 옵션을 설정하지 않을 경우 tempLocation에 지정한 값이 스테이징 위치에서도 사용됩니다.
  • tempLocation도 지정하지 않은 경우 gcpTempLocation이 생성됩니다. tempLocation은 지정되고 gcpTempLocation이 지정되지 않으면 tempLocation은 Cloud Storage 경로여야 하고 gcpTempLocation의 기본값이 됩니다. tempLocation이 지정되지 않고 gcpTempLocation이 지정되면 tempLocation은 채워지지 않습니다.

참고: 자바 2.15.0 이상의 Apache Beam SDK를 사용하는 경우 region을 지정해야 합니다.

Python

  • project - Google Cloud 프로젝트의 ID입니다.
  • runner - 프로그램을 파싱하고 파이프라인을 구성하는 파이프라인 실행기입니다. 클라우드 실행의 경우에는 DataflowRunner여야 합니다.
  • staging_location - 작업을 실행하는 작업자에게 필요한 코드 패키지를 스테이징할 Dataflow의 Cloud Storage 경로입니다.
  • temp_location - 파이프라인 실행 중에 생성된 임시 작업 파일을 스테이징할 Dataflow의 Cloud Storage 경로입니다.

참고: Python 2.15.0 이상의 Apache Beam SDK를 사용하는 경우 region을 지정해야 합니다.

자바: SDK 1.x

이러한 옵션을 프로그래매틱 방식으로 설정하거나 명령줄을 사용하여 지정할 수 있습니다. 다음 예제 코드는 Dataflow 관리형 서비스를 사용하여 파이프라인을 실행하기 위해 실행기 및 기타 필요한 옵션을 프로그래매틱 방식으로 설정하여 파이프라인을 구성하는 방법을 보여줍니다.

자바: SDK 2.x

  // Create and set your PipelineOptions.
  DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

  // For Cloud execution, set the Cloud Platform project, staging location,
  // and specify DataflowRunner.
  options.setProject("my-project-id");
  options.setStagingLocation("gs://my-bucket/binaries");
  options.setRunner(DataflowRunner.class);

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
options = PipelineOptions(
    flags=argv,
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
    region='us-central1')

# Create the Pipeline with the specified options.
# with beam.Pipeline(options=options) as pipeline:
#   pass  # build your pipeline here.

자바: SDK 1.x

파이프라인을 구성한 후에는 모든 파이프라인 읽기, 변환, 쓰기를 지정하고 파이프라인을 실행합니다.

다음 예제 코드에서는 명령줄을 사용하여 Dataflow 서비스를 실행하는 데 필요한 옵션을 설정하는 방법을 보여줍니다.

자바: SDK 2.x

  // Create and set your PipelineOptions.
  MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

Python

# Use Python argparse module to parse custom arguments
import argparse

import apache_beam as beam

parser = argparse.ArgumentParser()
parser.add_argument('--input')
parser.add_argument('--output')
args, beam_args = parser.parse_known_args(argv)

# Create the Pipeline with remaining arguments.
with beam.Pipeline(argv=beam_args) as pipeline:
  lines = pipeline | 'Read files' >> beam.io.ReadFromText(args.input)
  lines | 'Write files' >> beam.io.WriteToText(args.output)

자바: SDK 1.x

파이프라인을 구성한 후에는 모든 파이프라인 읽기, 변환, 쓰기를 지정하고 파이프라인을 실행합니다.

자바: SDK 2.x

명령줄에 필수 옵션을 전달할 때 --project, --runner, --gcpTempLocation, --stagingLocation(선택사항) 옵션을 사용합니다.

Python

명령줄에 필수 옵션을 전달할 때 --project, --runner, --staging_location 옵션을 사용합니다.

자바: SDK 1.x

비동기 실행

자바: SDK 2.x

DataflowRunner를 사용하면 파이프라인이 Google 클라우드에서 비동기적으로 실행됩니다. 파이프라인이 실행되는 동안 Dataflow 모니터링 인터페이스 또는 Dataflow 명령줄 인터페이스를 사용하여 작업 진행 상태를 모니터링하고 실행 세부정보를 확인하고 파이프라인 결과 업데이트를 수신할 수 있습니다.

Python

DataflowRunner를 사용하면 파이프라인이 Google 클라우드에서 비동기적으로 실행됩니다. 파이프라인이 실행되는 동안 Dataflow 모니터링 인터페이스 또는 Dataflow 명령줄 인터페이스를 사용하여 작업 진행 상태를 모니터링하고 실행 세부정보를 확인하고 파이프라인 결과 업데이트를 수신할 수 있습니다.

자바: SDK 1.x

동기 실행

자바: SDK 2.x

파이프라인 실행기로 DataflowRunner를 지정하고 pipeline.run().waitUntilFinish()를 명시적으로 호출합니다.

DataflowRunner를 사용하고 pipeline.run()에서 반환한 PipelineResult 객체에서 waitUntilFinish()를 호출하면 파이프라인은 클라우드에서 실행되지만 로컬 코드가 클라우드 작업이 끝날 때까지 기다렸다가 최종 DataflowPipelineJob 객체를 반환합니다. 작업이 실행되는 동안 Dataflow 서비스는 대기 중에 작업 상태 업데이트와 콘솔 메시지를 인쇄합니다.

자바 SDK 1.x 사용자가 명령줄에서 --runner BlockingDataflowPipelineRunner를 사용하여 파이프라인이 종료될 때까지 기본 프로그램을 차단하도록 대화식으로 유도한 경우 기본 프로그램이 자바 2.x로 waitUntilFinish()를 명시적으로 호출해야 합니다.

Python

파이프라인이 완료될 때까지 차단하려면 실행기의 run() 메서드에서 반환된 PipelineResult 객체의 wait_until_finish() 메서드를 사용합니다.

자바: SDK 1.x

참고: 명령줄에서 Ctrl+C를 입력해도 작업이 취소되지 않습니다. Dataflow 서비스가 계속 Google Cloud에서 작업을 실행합니다. 작업을 취소하려면 Dataflow 모니터링 인터페이스 또는 Dataflow 명령줄 인터페이스를 사용해야 합니다.

스트리밍 실행

자바: SDK 2.x

파이프라인이 제한되지 않은 데이터 소스(예: Pub/Sub)에서 읽는 경우 파이프라인이 자동으로 스트리밍 모드에서 실행됩니다.

파이프라인이 제한되지 않은 데이터 소스와 싱크를 사용하는 경우 GroupByKey와 같은 집계를 사용하기 전에 unbounded PCollection에 기간 설정 전략을 선택해야 합니다.

Python

파이프라인이 제한되지 않은 데이터 소스나 싱크(예: Pub/Sub)를 사용하는 경우 streaming 옵션을 true로 설정해야 합니다.

스트리밍 작업은 기본적으로 n1-standard-2 이상의 Compute Engine 머신 유형을 사용합니다. n1-standard-2는 스트리밍 작업을 실행하는 데 필요한 최소 시스템 유형이므로 이를 재정의해서는 안 됩니다.

파이프라인이 제한되지 않은 데이터 소스와 싱크를 사용하는 경우 GroupByKey와 같은 집계를 사용하기 전에 unbounded PCollection에 기간 설정 전략을 선택해야 합니다.

자바: SDK 1.x

다른 Cloud Dataflow 파이프라인 옵션 설정

PipelineOptions 객체에서 다음 필드를 프로그래매틱 방식으로 설정하여 클라우드에서 파이프라인을 실행할 수 있습니다.

자바: SDK 2.x

필드 유형 설명 기본값
runner Class (NameOfRunner) 사용할 PipelineRunner입니다. 이 필드를 사용하면 런타임에 PipelineRunner를 결정할 수 있습니다. DirectRunner(로컬 모드)
streaming boolean 스트리밍 모드를 사용 설정 또는 중지할지 여부를 지정합니다. 사용 설정하면 true입니다. 파이프라인이 제한되지 않은 소스에서 읽는 경우 기본값은 true입니다. 그 외의 경우는 false입니다.
project String Google Cloud 프로젝트의 프로젝트 ID입니다. Dataflow 관리형 서비스를 사용하여 파이프라인을 실행하려는 경우 필요합니다. 설정하지 않으면 Cloud SDK에서 현재 구성된 프로젝트가 기본값입니다.
jobName String Dataflow의 작업 목록과 작업 세부정보에 표시되는 실행 중인 Dataflow 작업의 이름입니다. 기존 파이프라인을 업데이트할 때도 사용됩니다. Dataflow는 고유한 이름을 자동으로 생성합니다.
gcpTempLocation String 임시 파일의 Cloud Storage 경로입니다. gs://로 시작되는 유효한 Cloud Storage URL이어야 합니다.
stagingLocation String 로컬 파일을 스테이징할 Cloud Storage 경로입니다. gs://로 시작되는 유효한 Cloud Storage URL이어야 합니다. 설정하지 않으면 tempLocation에 지정한 값이 기본값입니다.
autoscalingAlgorithm String Dataflow 작업에 사용할 자동 확장 모드입니다. 가능한 값은 자동 확장을 사용 설정하는 THROUGHPUT_BASED 또는 중지하는 NONE입니다. Dataflow 관리형 서비스에서 자동 확장의 작동 방식에 대해서는 자동 확장 기능을 참조하세요. 모든 일괄 Dataflow 작업과 Streaming Engine을 사용하는 스트리밍 작업의 기본값은 THROUGHPUT_BASED입니다. Streaming Engine을 사용하지 않는 스트리밍 작업의 기본값은 NONE입니다.
numWorkers int 파이프라인 실행 시 사용할 초기 Google Compute Engine 인스턴스의 수입니다. 이 옵션은 작업 시작 시 Dataflow 서비스가 시작하는 작업자 수를 결정합니다. 지정하지 않으면 Dataflow 서비스가 적절한 작업자 수를 결정합니다.
maxNumWorkers int 실행 중에 파이프라인을 사용할 수 있는 Compute Engine 인스턴스의 최대 수입니다. 이 수는 초기 작업자 수보다 높을 수 있습니다(작업이 자동으로 또는 다른 식으로 확장되도록 numWorkers에서 지정됨). 지정하지 않으면 Dataflow 서비스가 적절한 작업자 수를 결정합니다.
numberOfWorkerHarnessThreads int 작업자 하네스당 스레드 수입니다. 지정하지 않으면 Dataflow 서비스가 적절한 작업자당 스레드 수를 결정합니다.
region String Dataflow 작업 배포를 위한 리전 엔드포인트를 지정합니다. 설정하지 않으면 기본값은 us-central1입니다.
workerRegion String

파이프라인을 실행할 작업자 인스턴스를 실행하기 위한 Compute Engine 리전을 지정합니다. 이 옵션은 작업을 배포, 관리, 모니터링하는 데 사용되는 region과 다른 위치에서 작업자를 실행하는 데 사용됩니다. workerRegion의 영역은 자동으로 할당됩니다.

참고: 이 옵션은 workerZone 또는 zone과 함께 사용할 수 없습니다.

설정하지 않을 경우 기본값은 region에 설정된 값입니다.
workerZone String

파이프라인을 실행할 작업자 인스턴스를 실행하기 위한 Compute Engine 영역을 지정합니다. 이 옵션은 작업을 배포, 관리, 모니터링하는 데 사용되는 region과 다른 위치에서 작업자를 실행하는 데 사용됩니다.

참고: 이 옵션은 workerRegion 또는 zone과 함께 사용할 수 없습니다.

region 또는 workerRegion을 지정하는 경우 workerZone은 해당 리전의 영역으로 기본 설정됩니다. 다른 영역을 지정하여 이 동작을 재정의할 수 있습니다.
zone String (지원 중단됨) Apache Beam SDK 2.17.0 이하에서는 파이프라인을 실행할 작업자 인스턴스를 시작하기 위한 Compute Engine 영역을 지정합니다. region을 지정하는 경우 zone은 해당 리전의 영역으로 기본 설정됩니다. 다른 영역을 지정하여 이 동작을 재정의할 수 있습니다.
dataflowKmsKey String 저장 데이터 암호화에 사용한 고객 관리형 암호화 키(CMEK)를 지정합니다. Cloud KMS를 통해 암호화 키를 제어할 수 있습니다. 이 기능을 사용하려면 gcpTempLocation도 지정해야 합니다. 지정하지 않으면 Dataflow가 CMEK 대신 기본 Google Cloud 암호화를 사용합니다.
flexRSGoal String 자동 확장 일괄 작업에 가변형 리소스 예약(FlexRS)을 지정합니다. numWorkers, autoscalingAlgorithm, zone, region, workerMachineType 매개변수에 영향을 미칩니다. 자세한 내용은 FlexRS 파이프라인 옵션 섹션을 참조하세요. 지정하지 않으면 SPEED_OPTIMIZED가 기본값이며, 이 플래그를 생략하는 것과 같습니다. FlexRS를 사용 설정하려면 Dataflow 서비스가 사용 가능한 할인된 리소스를 선택할 수 있도록 COST_OPTIMIZED 값을 지정해야 합니다.
filesToStage List<String> 각 작업자가 사용할 수 있는 로컬 파일, 파일 디렉터리 또는 보관 파일 (예: JAR 또는 zip 파일)의 비어 있지 않은 목록입니다. 이 옵션을 설정하면 사용자가 지정하는 해당 파일만 업로드됩니다(자바 클래스 경로는 무시됨). 모든 리소스를 올바른 클래스 경로 순서로 지정해야 합니다. 리소스는 코드로 제한되지 않으며 모든 작업자가 사용할 수 있는 구성 파일과 다른 리소스도 포함할 수 있습니다. 코드는 자바의 표준 리소스 조회 메소드를 사용하여 나열된 리소스에 액세스할 수 있습니다. 주의: Dataflow는 업로드 전에 파일을 압축하여 시작 시간이 길어지므로, 디렉터리 경로를 지정하는 것은 최적이 아닙니다. 파이프라인 처리 대상 작업자로 데이터를 전송하는 옵션을 사용하지 마세요. 파이프라인 처리는 적절한 Dataflow 데이터 소스와 결합된 기본 Cloud Storage/BigQuery API를 사용하는 것보다 훨씬 느립니다. filesToStage가 생략되면 Dataflow는 자바 클래스 경로를 기반으로 스테이징할 파일을 유추합니다. 왼쪽 열에 언급된 고려 사항과 주의 사항은 여기에도 적용됩니다(나열할 파일 유형 및 코드에서 여기에 액세스할 방법).
network String 파이프라인을 실행할 Compute Engine 인스턴스를 시작하기 위한 Compute Engine 네트워크입니다. 네트워크 지정 방법을 참조하세요. 설정하지 않을 경우 Google Cloud는 default라는 네트워크를 사용한다고 가정합니다.
subnetwork String 파이프라인을 실행할 Compute Engine을 실행하기 위한 Compute Engine 하위 네트워크 서브네트워크 지정 방법을 참조하세요. Dataflow 서비스가 기본값을 결정합니다.
usePublicIps boolean Dataflow 작업자가 공개 IP 주소를 사용할지 여부를 지정합니다. 값을 false로 설정하면 Dataflow 작업자는 모든 통신에 비공개 IP 주소를 사용합니다. 이 경우, subnetwork 옵션을 지정하면 network 옵션이 무시됩니다. 지정된 network 또는 subnetwork비공개 Google 액세스가 사용 설정되어 있는지 확인합니다. 설정하지 않을 경우 기본값은 true이고 Dataflow 작업자는 공개 IP 주소를 사용합니다.
enableStreamingEngine boolean Dataflow Streaming Engine을 사용 설정 또는 중지할지 여부를 지정합니다. 사용 설정하면 true입니다. Streaming Engine을 사용 설정하면 Dataflow 서비스 백엔드에서 스트리밍 파이프라인의 단계를 실행할 수 있으므로 CPU, 메모리, Persistent Disk 스토리지 리소스가 보존됩니다. 기본값은 false입니다. 즉, 스트리밍 파이프라인의 단계가 작업자 VM에서 완전히 실행됩니다.
createFromSnapshot String 스트리밍 작업을 만들 때 사용할 스냅샷 ID를 지정합니다. 스냅샷은 스트리밍 파이프라인의 상태를 저장하고 해당 상태에서 새 버전의 작업을 시작할 수 있게 합니다. 스냅샷에 대한 자세한 내용은 스냅샷 사용을 참조하세요. 설정하지 않을 경우 작업을 만드는 데 스냅샷이 사용되지 않습니다.
hotKeyLoggingEnabled boolean 파이프라인에서 핫 키가 감지되면 사용자의 Cloud Logging 프로젝트에 키가 출력되도록 지정합니다. 설정하지 않으면 핫 키의 존재만 로깅됩니다.
diskSizeGb int

각 원격 Compute Engine 작업자 인스턴스에서 사용할 디스크 크기(GB)입니다. 설정하는 경우 작업자 부팅 이미지와 로컬 로그를 고려하여 30GB 이상을 지정합니다.

Dataflow Shuffle을 사용하는 일괄 작업의 경우 이 옵션은 작업자 VM의 부팅 디스크 크기를 설정합니다. Dataflow Shuffle을 사용하지 않는 일괄 작업의 경우 이 옵션은 셔플된 데이터를 저장하는 데 사용되는 디스크의 크기를 설정합니다. 부팅 디스크 크기는 영향을 받지 않습니다.

Streaming Engine을 사용하는 스트리밍 작업의 경우 이 옵션은 부팅 디스크의 크기를 설정합니다. Streaming Engine을 사용하지 않는 스트리밍 작업의 경우 이 옵션은 Dataflow 서비스에서 만든 각 추가 영구 디스크의 크기를 설정합니다. 부팅 디스크는 영향을 받지 않습니다. 스트리밍 작업이 Streaming Engine을 사용하지 않는 경우 실험 플래그 streaming_boot_disk_size_gb로 부팅 디스크 크기를 설정할 수 있습니다. 예를 들어 80GB의 부팅 디스크를 만들려면 --experiments=streaming_boot_disk_size_gb=80을 지정합니다.

Cloud Platform 프로젝트에서 정의된 기본 크기를 사용하려면 0으로 설정합니다.

일괄 작업에서 Dataflow Shuffle을 사용하는 경우 기본값은 25GB입니다. 그렇지 않은 경우 기본값은 250GB입니다.

스트리밍 작업이 Streaming Engine을 사용하는 경우 기본값은 30GB입니다. 그렇지 않은 경우 기본값은 400GB입니다.

경고: 디스크 크기를 줄이면 사용 가능한 셔플 I/O가 줄어듭니다. Dataflow Shuffle 또는 Streaming Engine을 사용하지 않는 셔플 바인딩 작업은 런타임 및 작업 비용을 증가시킬 수 있습니다.

serviceAccount String my-service-account-name@<project-id>.iam.gserviceaccount.com 형식을 사용하여 사용자 관리형 컨트롤러 서비스 계정을 지정합니다. 자세한 내용은 Cloud Dataflow 보안 및 권한 페이지의 컨트롤러 서비스 계정 섹션을 참조하세요. 설정하지 않으면 작업자가 프로젝트의 Compute Engine 서비스 계정을 컨트롤러 서비스 계정으로 사용합니다.
workerDiskType String 사용할 영구 디스크 유형입니다(디스크 유형 리소스의 전체 URL로 지정). 예를 들어 SSD 영구 디스크를 지정하려면 compute.googleapis.com/projects//zones//diskTypes/pd-ssd를 사용합니다. 자세한 내용은 Compute Engine API 참조 페이지의 diskTypes를 참조하세요. Dataflow 서비스가 기본값을 결정합니다.
workerMachineType String

Dataflow가 작업자 VM을 시작할 때 사용하는 Compute Engine 머신 유형입니다. 사용 가능한 Compute Engine 머신 유형 계열과 커스텀 머신 유형을 사용할 수 있습니다.

최상의 결과를 얻으려면 n1 머신 유형을 사용하세요. f1g1 시리즈 작업자와 같은 공유 코어 머신 유형은 Dataflow의 서비스수준계약에서 지원되지 않습니다.

Dataflow 요금은 vCPU 수와 작업자 메모리의 GB를 기준으로 청구됩니다. 요금 청구는 기계 유형과 무관합니다.

이 옵션을 설정하지 않을 경우 Dataflow 서비스는 작업을 기반으로 머신 유형을 선택합니다.

파이프라인 구성 옵션 전체 목록은 PipelineOptions 인터페이스(및 하위 인터페이스)에 대한 자바용 API 참조 문서를 참조하세요.

Python

필드 유형 설명 기본값
runner str 사용할 PipelineRunner입니다. 이 필드는 DirectRunner 또는 DataflowRunner 중 하나로 설정할 수 있습니다. DirectRunner(로컬 모드)
streaming bool 스트리밍 모드를 사용 설정 또는 중지할지 여부를 지정합니다. 사용 설정하면 true입니다. false
project str Google Cloud 프로젝트의 프로젝트 ID입니다. Dataflow 관리형 서비스를 사용하여 파이프라인을 실행하려는 경우 필요합니다. 설정하지 않을 경우 오류가 발생합니다.
job_name String Dataflow의 작업 목록과 작업 세부정보에 표시되는 실행 중인 Dataflow 작업의 이름입니다. Dataflow는 고유한 이름을 자동으로 생성합니다.
temp_location str 임시 파일의 Cloud Storage 경로입니다. gs://로 시작되는 유효한 Cloud Storage URL이어야 합니다. 설정하지 않을 경우 staging_location의 값이 기본값입니다. Google 클라우드에서 파이프라인을 실행할 temp_location 또는 staging_location을 1개 이상 지정해야 합니다.
staging_location str 로컬 파일을 스테이징할 Cloud Storage 경로입니다. gs://로 시작되는 유효한 Cloud Storage URL이어야 합니다. 설정하지 않을 경우 temp_location 내의 스테이징 디렉터리가 기본값입니다. Google 클라우드에서 파이프라인을 실행할 temp_location 또는 staging_location을 1개 이상 지정해야 합니다.
autoscaling_algorithm str Dataflow 작업에 사용할 자동 확장 모드입니다. 가능한 값은 자동 확장을 사용 설정하는 THROUGHPUT_BASED 또는 중지하는 NONE입니다. Dataflow 관리형 서비스에서 자동 확장의 작동 방식에 대해서는 자동 확장 기능을 참조하세요. 모든 일괄 Dataflow 작업과 Streaming Engine을 사용하는 스트리밍 작업의 기본값은 THROUGHPUT_BASED입니다. Streaming Engine을 사용하지 않는 스트리밍 작업의 기본값은 NONE입니다.
num_workers int 파이프라인 실행 시 사용할 Compute Engine 인스턴스 수입니다. 지정하지 않으면 Dataflow 서비스가 적절한 작업자 수를 결정합니다.
max_num_workers int 실행 중에 파이프라인을 사용할 수 있는 Compute Engine 인스턴스의 최대 수입니다. 이 수는 초기 작업자 수보다 높을 수 있습니다(작업이 자동으로 또는 다른 식으로 확장되도록 num_workers에서 지정됨). 지정하지 않으면 Dataflow 서비스가 적절한 작업자 수를 결정합니다.
number_of_worker_harness_threads int 작업자 하네스당 스레드 수입니다. 지정하지 않으면 Dataflow 서비스가 적절한 작업자당 스레드 수를 결정합니다. 이 매개변수를 사용하려면 플래그 --experiments=use_runner_v2도 사용해야 합니다.
region str Dataflow 작업 배포를 위한 리전 엔드포인트를 지정합니다. 설정하지 않으면 기본값은 us-central1입니다.
worker_region String

파이프라인을 실행할 작업자 인스턴스를 실행하기 위한 Compute Engine 리전을 지정합니다. 이 옵션은 작업을 배포, 관리, 모니터링하는 데 사용되는 region과 다른 위치에서 작업자를 실행하는 데 사용됩니다. worker_region의 영역은 자동으로 할당됩니다.

참고: 이 옵션은 worker_zone 또는 zone과 함께 사용할 수 없습니다.

설정하지 않을 경우 기본값은 region에 설정된 값입니다.
worker_zone String

파이프라인을 실행할 작업자 인스턴스를 실행하기 위한 Compute Engine 영역을 지정합니다. 이 옵션은 작업을 배포, 관리, 모니터링하는 데 사용되는 region과 다른 위치에서 작업자를 실행하는 데 사용됩니다.

참고: 이 옵션은 worker_region 또는 zone과 함께 사용할 수 없습니다.

region 또는 worker_region을 지정하는 경우 worker_zone은 해당 리전의 영역으로 기본 설정됩니다. 다른 영역을 지정하여 이 동작을 재정의할 수 있습니다.
zone str (지원 중단됨) Apache Beam SDK 2.17.0 이하에서는 파이프라인을 실행할 작업자 인스턴스를 시작하기 위한 Compute Engine 영역을 지정합니다. region을 지정하는 경우 zone은 해당 리전의 영역으로 기본 설정됩니다. 다른 영역을 지정하여 이 동작을 재정의할 수 있습니다.
dataflow_kms_key str 저장 데이터 암호화에 사용한 고객 관리형 암호화 키(CMEK)를 지정합니다. Cloud KMS를 통해 암호화 키를 제어할 수 있습니다. 이 기능을 사용하려면 temp_location도 지정해야 합니다. 지정하지 않으면 Dataflow가 CMEK 대신 기본 Google Cloud 암호화를 사용합니다.
flexrs_goal str 자동 확장 일괄 작업에 가변형 리소스 예약(FlexRS)을 지정합니다. num_workers, autoscaling_algorithm, zone, region, machine_type 매개변수에 영향을 미칩니다. 자세한 내용은 FlexRS 파이프라인 옵션 섹션을 참조하세요. 지정하지 않으면 SPEED_OPTIMIZED가 기본값이며, 이 플래그를 생략하는 것과 같습니다. FlexRS를 사용 설정하려면 Dataflow 서비스가 사용 가능한 할인된 리소스를 선택할 수 있도록 COST_OPTIMIZED 값을 지정해야 합니다.
network str 파이프라인을 실행할 Compute Engine 인스턴스를 실행하기 위한 Compute Engine 네트워크 네트워크 지정 방법을 참조하세요. 설정하지 않을 경우 Google Cloud는 default라는 네트워크를 사용한다고 가정합니다.
subnetwork str 파이프라인을 실행할 Compute Engine을 실행하기 위한 Compute Engine 하위 네트워크 서브네트워크 지정 방법을 참조하세요. Dataflow 서비스가 기본값을 결정합니다.
use_public_ips bool Dataflow 작업자가 공개 IP 주소를 사용하도록 지정합니다. 값을 false로 설정하면 Dataflow 작업자는 모든 통신에 비공개 IP 주소를 사용합니다. 이 경우, subnetwork 옵션을 지정하면 network 옵션이 무시됩니다. 지정된 network 또는 subnetwork비공개 Google 액세스가 사용 설정되어 있는지 확인합니다. 이 옵션을 사용하려면 Python용 Beam SDK가 필요합니다. 지원 중단된 Python용 Dataflow SDK는 이 옵션을 지원하지 않습니다. 설정하지 않으면 Dataflow 작업자가 공개 IP 주소를 사용합니다.
enable_streaming_engine bool Dataflow Streaming Engine을 사용 설정 또는 중지할지 여부를 지정합니다. 사용 설정하면 true입니다. Streaming Engine을 사용 설정하면 Dataflow 서비스 백엔드에서 스트리밍 파이프라인의 단계를 실행할 수 있으므로 CPU, 메모리, Persistent Disk 스토리지 리소스가 보존됩니다. 기본값은 false입니다. 즉, 스트리밍 파이프라인의 단계가 작업자 VM에서 완전히 실행됩니다.
disk_size_gb int

각 원격 Compute Engine 작업자 인스턴스에서 사용할 디스크 크기(GB)입니다. 설정하는 경우 작업자 부팅 이미지와 로컬 로그를 고려하여 30GB 이상을 지정합니다.

Dataflow Shuffle을 사용하는 일괄 작업의 경우 이 옵션은 작업자 VM의 부팅 디스크 크기를 설정합니다. Dataflow Shuffle을 사용하지 않는 일괄 작업의 경우 이 옵션은 셔플된 데이터를 저장하는 데 사용되는 디스크의 크기를 설정합니다. 부팅 디스크 크기는 영향을 받지 않습니다.

Streaming Engine을 사용하는 스트리밍 작업의 경우 이 옵션은 부팅 디스크의 크기를 설정합니다. Streaming Engine을 사용하지 않는 스트리밍 작업의 경우 이 옵션은 Dataflow 서비스에서 만든 각 추가 영구 디스크의 크기를 설정합니다. 부팅 디스크는 영향을 받지 않습니다. 스트리밍 작업이 Streaming Engine을 사용하지 않는 경우 실험 플래그 streaming_boot_disk_size_gb로 부팅 디스크 크기를 설정할 수 있습니다. 예를 들어 80GB의 부팅 디스크를 만들려면 --experiments=streaming_boot_disk_size_gb=80을 지정합니다.

Cloud Platform 프로젝트에서 정의된 기본 크기를 사용하려면 0으로 설정합니다.

일괄 작업에서 Dataflow Shuffle을 사용하는 경우 기본값은 25GB입니다. 그렇지 않은 경우 기본값은 250GB입니다.

스트리밍 작업이 Streaming Engine을 사용하는 경우 기본값은 30GB입니다. 그렇지 않은 경우 기본값은 400GB입니다.

경고: 디스크 크기를 줄이면 사용 가능한 셔플 I/O가 줄어듭니다. Dataflow Shuffle 또는 Streaming Engine을 사용하지 않는 셔플 바인딩 작업은 런타임 및 작업 비용을 증가시킬 수 있습니다.

service_account_email str my-service-account-name@<project-id>.iam.gserviceaccount.com 형식을 사용하여 사용자 관리형 컨트롤러 서비스 계정을 지정합니다. 자세한 내용은 Cloud Dataflow 보안 및 권한 페이지의 컨트롤러 서비스 계정 섹션을 참조하세요. 설정하지 않으면 작업자가 프로젝트의 Compute Engine 서비스 계정을 컨트롤러 서비스 계정으로 사용합니다.
worker_disk_type str 사용할 영구 디스크 유형입니다(디스크 유형 리소스의 전체 URL로 지정). 예를 들어 SSD 영구 디스크를 지정하려면 compute.googleapis.com/projects//zones//diskTypes/pd-ssd를 사용합니다. 자세한 내용은 Compute Engine API 참조 페이지의 diskTypes를 참조하세요. Dataflow 서비스가 기본값을 결정합니다.
machine_type str

Dataflow가 작업자 VM을 시작할 때 사용하는 Compute Engine 머신 유형입니다. 사용 가능한 Compute Engine 머신 유형 계열과 커스텀 머신 유형을 사용할 수 있습니다.

최상의 결과를 얻으려면 n1 머신 유형을 사용하세요. f1g1 시리즈 작업자와 같은 공유 코어 머신 유형은 Dataflow의 서비스수준계약에서 지원되지 않습니다.

Dataflow 요금은 vCPU 수와 작업자 메모리의 GB를 기준으로 청구됩니다. 요금 청구는 기계 유형과 무관합니다.

이 옵션을 설정하지 않을 경우 Dataflow 서비스는 작업을 기반으로 머신 유형을 선택합니다.

자바: SDK 1.x

로컬 실행을 위한 PipelineOptions 구성

관리형 클라우드 리소스에서 파이프라인을 실행하는 대신 파이프라인을 로컬에서 실행할 수 있습니다. 로컬 실행은 작은 데이터 세트에 대해 파이프라인을 테스트, 디버깅 또는 실행하는 데 몇 가지 장점을 제공합니다. 예를 들어 로컬 실행은 원격 Dataflow 서비스와 관련된 Google Cloud 프로젝트의 종속 항목을 삭제합니다.

로컬 실행을 사용할 때는 로컬 메모리에 저장할 수 있을 만큼 작은 데이터세트로 파이프라인을 실행하는 것이 좋습니다. Create 변환을 사용하여 작은 인메모리 데이터 세트를 만들거나 Read 변환을 사용하여 작은 로컬 또는 원격 파일로 작업할 수 있습니다. 로컬 실행은 외부 종속 항목을 줄이면서 테스트와 디버깅을 빠르고 간편하게 수행할 수 있는 방법이지만 로컬 환경에서 사용할 수 있는 메모리로 제한됩니다.

다음 예제 코드는 로컬 환경에서 실행되는 파이프라인을 구성하는 방법을 보여줍니다.

자바: SDK 2.x

  // Create and set our Pipeline Options.
  PipelineOptions options = PipelineOptionsFactory.create();

  // Create the Pipeline with the specified options.
  Pipeline p = Pipeline.create(options);

참고: 로컬 모드의 경우 DirectRunner가 이미 기본값이므로 실행기를 설정할 필요가 없습니다. 그러나 DirectRunner를 종속 항목으로 명시적으로 포함하거나 이를 클래스 경로에 추가해야 합니다.

Python

# Create and set your Pipeline Options.
options = PipelineOptions(flags=argv)
my_options = options.view_as(MyOptions)

with Pipeline(options=options) as pipeline:
  pass  # build your pipeline here.

참고: 로컬 모드의 경우 DirectRunner가 이미 기본값이므로 실행기를 설정할 필요가 없습니다.

자바: SDK 1.x

파이프라인을 구성한 후에는 실행합니다.

다른 로컬 파이프라인 옵션 설정

파이프라인을 로컬에서 실행하는 경우 일반적으로 PipelineOptions 속성의 기본값이면 충분합니다.

자바: SDK 2.x

자바 API 참조에서 자바 PipelineOptions의 기본값을 확인할 수 있습니다. 자세한 내용은 PipelineOptions 클래스 목록을 참조하세요.

파이프라인이 BigQuery 또는 IO용 Cloud Storage와 같은 Google Cloud 서비스를 사용하는 경우 특정 Google Cloud 프로젝트와 사용자 인증 정보 옵션을 설정해야 할 수 있습니다. 이러한 경우 GcpOptions.setProject를 사용하여 Google Cloud 프로젝트 ID를 설정해야 합니다. 사용자 인증 정보를 명시적으로 설정해야 할 수도 있습니다. 자세한 내용은 GcpOptions 클래스를 참조하세요.

Python

Python API 참조에서 Python PipelineOptions의 기본값을 확인할 수 있습니다. 자세한 내용은 PipelineOptions 모듈 목록을 참조하세요.

파이프라인이 BigQuery 또는 IO용 Cloud Storage와 같은 Google Cloud 서비스를 사용하는 경우 특정 Google Cloud 프로젝트와 사용자 인증 정보 옵션을 설정해야 할 수 있습니다. 이러한 경우 options.view_as(GoogleCloudOptions).project를 사용하여 Google Cloud 프로젝트 ID를 설정해야 합니다. 사용자 인증 정보를 명시적으로 설정해야 할 수도 있습니다. 자세한 내용은 GoogleCloudOptions 클래스를 참조하세요.

자바: SDK 1.x