Dataflow 파이프라인 옵션 설정

이 페이지에서는 Dataflow 작업의 파이프라인 옵션을 설정하는 방법을 설명합니다. 이러한 파이프라인 옵션은 파이프라인이 실행되는 방식과 위치, 사용되는 리소스를 구성합니다.

파이프라인 실행은 Apache Beam 프로그램 실행과는 별개입니다. 작성한 Apache Beam 프로그램은 지연된 실행을 위한 파이프라인을 구성합니다. 즉, 프로그램은 지원되는 모든 Apache Beam 실행기가 실행할 수 있는 일련의 단계를 생성합니다. 호환되는 실행기에는 Google Cloud의 Dataflow 실행기와 로컬 환경에서 직접 파이프라인을 실행하는 직접 실행기가 있습니다.

런타임에 Dataflow 작업에 매개변수를 전달할 수 있습니다. 런타임 시 파이프라인 옵션 설정에 대한 자세한 내용은 파이프라인 옵션 구성을 참조하세요.

Apache Beam SDK에 파이프라인 옵션 사용

다음 SDK를 사용하여 Dataflow 작업에 대해 파이프라인 옵션을 설정할 수 있습니다.

  • Python용 Apache Beam SDK
  • Java용 Apache Beam SDK
  • Go용 Apache Beam SDK

SDK를 사용하려면 Apache Beam SDK 클래스 PipelineOptions를 사용하여 파이프라인 실행기와 기타 실행 매개변수를 설정합니다.

파이프라인 옵션을 지정하는 방법에는 두 가지가 있습니다.

  • 파이프라인 옵션 목록을 제공하여 파이프라인 옵션을 프로그래매틱 방식으로 설정합니다.
  • 파이프라인 코드를 실행할 때 명령줄에서 파이프라인 옵션을 직접 설정합니다.

프로그래매틱 방식으로 파이프라인 옵션 설정

PipelineOptions 객체를 만들고 수정하여 프로그래매틱 방식으로 파이프라인 옵션을 설정할 수 있습니다.

자바

PipelineOptionsFactory.fromArgs 메서드를 사용하여 PipelineOptions 객체를 구성합니다.

예시를 보려면 이 페이지의 Dataflow 샘플에서 실행 섹션을 참조하세요.

Python

PipelineOptions 객체를 만듭니다.

예시를 보려면 이 페이지의 Dataflow 샘플에서 실행 섹션을 참조하세요.

Go

Go용 Apache Beam SDK에서는 PipelineOptions를 사용하여 프로그래매틱 방식으로 파이프라인 옵션을 설정할 수 없습니다. Go 명령줄 인수를 사용합니다.

예시를 보려면 이 페이지의 Dataflow 샘플에서 실행 섹션을 참조하세요.

명령줄에서 파이프라인 옵션 설정

명령줄 인수를 사용하여 파이프라인 옵션을 설정할 수 있습니다.

자바

다음 예시 구문은 Java 빠른 시작WordCount 파이프라인에서 가져온 것입니다.

mvn -Pdataflow-runner compile exec:java \
  -Dexec.mainClass=org.apache.beam.examples.WordCount \
  -Dexec.args="--project=PROJECT_ID \
  --gcpTempLocation=gs://BUCKET_NAME/temp/ \
  --output=gs://BUCKET_NAME/output \
  --runner=DataflowRunner \
  --region=REGION"

다음을 바꿉니다.

  • PROJECT_ID: Google Cloud 프로젝트 ID입니다.
  • BUCKET_NAME: Cloud Storage 버킷 이름입니다.
  • REGION: Dataflow 리전us-central1입니다.

Python

다음 예시 구문은 Python 빠른 시작WordCount 파이프라인에서 가져온 것입니다.

python -m apache_beam.examples.wordcount \
  --region DATAFLOW_REGION \
  --input gs://dataflow-samples/shakespeare/kinglear.txt \
  --output gs://STORAGE_BUCKET/results/outputs \
  --runner DataflowRunner \
  --project PROJECT_ID \
  --temp_location gs://STORAGE_BUCKET/tmp/

다음을 바꿉니다.

  • DATAFLOW_REGION: Dataflow 작업을 배포할 리전입니다(예: europe-west1).

    --region 플래그는 메타데이터 서버, 로컬 클라이언트 또는 환경 변수에 설정된 기본 리전을 재정의합니다.

  • STORAGE_BUCKET: Cloud Storage 버킷 이름입니다.

  • PROJECT_ID: Google Cloud 프로젝트 ID입니다.

Go

다음 예시 구문은 Go 빠른 시작WordCount 파이프라인에서 가져온 것입니다.

go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
   --output gs://BUCKET_NAME/results/outputs \
   --runner dataflow \
   --project PROJECT_ID \
   --region DATAFLOW_REGION \
   --staging_location gs://BUCKET_NAME/binaries/

다음을 바꿉니다.

  • BUCKET_NAME: Cloud Storage 버킷 이름입니다.

  • PROJECT_ID: Google Cloud 프로젝트 ID입니다.

  • DATAFLOW_REGION: Dataflow 작업을 배포할 리전입니다. 예를 들면 europe-west1입니다. --region 플래그는 메타데이터 서버, 로컬 클라이언트 또는 환경 변수에 설정된 기본 리전을 재정의합니다.

실험용 파이프라인 옵션 설정

Java, Python, Go SDK에서 experiments 파이프라인 옵션은 실험용 또는 GA 이전 Dataflow 기능을 사용 설정합니다.

프로그래매틱 방식으로 설정

프로그래매틱 방식으로 experiments 옵션을 설정하려면 다음 구문을 사용하세요.

자바

PipelineOptions 객체에 다음 구문을 사용하여 experiments 옵션을 포함하세요. 이 예시에서는 실험 플래그를 사용해서 부팅 디스크 크기를 80GB로 설정합니다.

options.setExperiments("streaming_boot_disk_size_gb=80")

PipelineOptions 객체를 만드는 방법을 보여주는 예시는 이 페이지의 Dataflow 샘플에서 실행 섹션을 참조하세요.

Python

PipelineOptions 객체에 다음 구문을 사용하여 experiments 옵션을 포함하세요. 이 예시에서는 실험 플래그를 사용해서 부팅 디스크 크기를 80GB로 설정합니다.

beam_options = PipelineOptions(
  beam_args,
  experiments=['streaming_boot_disk_size_gb=80'])

PipelineOptions 객체를 만드는 방법을 보여주는 예시는 이 페이지의 Dataflow 샘플에서 실행 섹션을 참조하세요.

Go

Go용 Apache Beam SDK에서는 PipelineOptions를 사용하여 프로그래매틱 방식으로 파이프라인 옵션을 설정할 수 없습니다. Go 명령줄 인수를 사용합니다.

명령줄에서 설정

명령줄에서 experiments 옵션을 설정하려면 다음 구문을 사용합니다.

자바

이 예시에서는 실험 플래그를 사용해서 부팅 디스크 크기를 80GB로 설정합니다.

--experiments=streaming_boot_disk_size_gb=80

Python

이 예시에서는 실험 플래그를 사용해서 부팅 디스크 크기를 80GB로 설정합니다.

--experiments=streaming_boot_disk_size_gb=80

Go

이 예시에서는 실험 플래그를 사용해서 부팅 디스크 크기를 80GB로 설정합니다.

--experiments=streaming_boot_disk_size_gb=80

템플릿에서 설정

Dataflow 템플릿을 실행할 때 실험용 기능을 사용 설정하려면 --additional-experiments 플래그를 사용합니다.

기본 템플릿

gcloud dataflow jobs run JOB_NAME --additional-experiments=EXPERIMENT[,...]

Flex 템플릿

gcloud dataflow flex-template run JOB_NAME --additional-experiments=EXPERIMENT[,...]

파이프라인 옵션 객체 액세스

Apache Beam 프로그램에서 Pipeline 객체를 만들 때 PipelineOptions를 전달합니다. Dataflow 서비스가 파이프라인을 실행하면 각 작업자에게 PipelineOptions 복사본을 전송합니다.

자바

ProcessContext.getPipelineOptions 메서드를 사용하여 ParDo 변환의 DoFn 인스턴스에 있는 PipelineOptions에 액세스합니다.

Python

이 기능은 Python용 Apache Beam SDK에서 지원되지 않습니다.

Go

beam.PipelineOptions를 사용하여 파이프라인 옵션에 액세스합니다.

Dataflow에서 실행

Dataflow 실행기 서비스를 사용하여 관리형 Google Cloud 리소스에서 작업을 실행하세요. Dataflow로 파이프라인을 실행하면 Google Cloud 프로젝트에서 Compute Engine 및 Cloud Storage 리소스를 사용하는 Dataflow 작업이 생성됩니다. Dataflow 권한에 대한 자세한 내용은 Dataflow 보안 및 권한을 참조하세요.

Dataflow 작업은 Cloud Storage를 사용하여 파이프라인 실행 중에 임시 파일을 저장합니다. 불필요한 스토리지 비용이 청구되지 않도록 하려면 Dataflow 작업에서 임시 스토리지에 사용하는 버킷에서 소프트 삭제 기능을 사용 중지합니다. 자세한 내용은 버킷에서 소프트 삭제 정책 삭제를 참조하세요.

필수 옵션 설정

Dataflow를 사용하여 파이프라인을 실행하려면 다음 파이프라인 옵션을 설정합니다.

자바

  • project: Google Cloud 프로젝트의 ID입니다.
  • runner: 파이프라인을 실행하는 파이프라인 실행기입니다. Google Cloud 실행의 경우에는 DataflowRunner여야 합니다.
  • gcpTempLocation: 대부분의 임시 파일을 스테이징할 Dataflow의 Cloud Storage 경로입니다. 버킷을 지정하려면 미리 버킷을 만들어야 합니다. gcpTempLocation을 설정하지 않은 경우 파이프라인 옵션 tempLocation을 설정할 수 있습니다. 그러면 gcpTempLocationtempLocation 값으로 설정됩니다. 둘 다 지정하지 않으면 기본값인 gcpTempLocation이 생성됩니다.
  • stagingLocation: Dataflow가 바이너리 파일을 스테이징할 Cloud Storage 경로입니다. Apache Beam SDK 2.28 이상을 사용하는 경우에는 이 옵션을 설정하지 마세요. Apache Beam SDK 2.28 이하에서 이 옵션을 설정하지 않으면 tempLocation에 지정한 값이 스테이징 위치에 사용됩니다.

    gcpTempLocation이나 tempLocation을 지정하지 않으면 기본값인 gcpTempLocation이 생성됩니다. tempLocation이 지정되고 gcpTempLocation이 지정되지 않으면 tempLocation은 Cloud Storage 경로여야 하고 gcpTempLocation 기본값이 됩니다. tempLocation이 지정되지 않고 gcpTempLocation이 지정되면 tempLocation은 채워지지 않습니다.

Python

  • project: Google Cloud 프로젝트 ID입니다.
  • region: Dataflow 작업의 리전입니다.
  • runner: 파이프라인을 실행하는 파이프라인 실행기입니다. Google Cloud 실행의 경우에는 DataflowRunner여야 합니다.
  • temp_location: 파이프라인 실행 중에 생성된 임시 작업 파일을 스테이징할 Dataflow의 Cloud Storage 경로입니다.

Go

  • project: Google Cloud 프로젝트 ID입니다.
  • region: Dataflow 작업의 리전입니다.
  • runner: 파이프라인을 실행하는 파이프라인 실행기입니다. Google Cloud 실행의 경우에는 dataflow여야 합니다.
  • staging_location: 파이프라인 실행 중에 생성된 임시 작업 파일을 스테이징할 Dataflow의 Cloud Storage 경로입니다.

프로그래매틱 방식으로 파이프라인 옵션 설정

다음 예시 코드는 Dataflow를 사용하여 파이프라인을 실행하는 데 필요한 실행기와 기타 필수 옵션을 프로그래매틱 방식으로 설정하여 파이프라인을 구성하는 방법을 보여줍니다.

자바

// Create and set your PipelineOptions.
DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class);

// For cloud execution, set the Google Cloud project, staging location,
// and set DataflowRunner.
options.setProject("my-project-id");
options.setStagingLocation("gs://my-bucket/binaries");
options.setRunner(DataflowRunner.class);

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg', help='description')
args, beam_args = parser.parse_known_args()

# Create and set your PipelineOptions.
# For Cloud execution, specify DataflowRunner and set the Cloud Platform
# project, job name, temporary files location, and region.
# For more information about regions, check:
# https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
beam_options = PipelineOptions(
    beam_args,
    runner='DataflowRunner',
    project='my-project-id',
    job_name='unique-job-name',
    temp_location='gs://my-bucket/temp',
    region='us-central1')
# Note: Repeatable options like dataflow_service_options or experiments must
# be specified as a list of string(s).
# e.g. dataflow_service_options=['enable_prime']

# Create the Pipeline with the specified options.
with beam.Pipeline(options=beam_options) as pipeline:
  pass  # build your pipeline here.

Go

Go용 Apache Beam SDK는 Go 명령줄 인수를 사용합니다. flag.Set()을 사용하여 플래그 값을 설정합니다.

// Use the Go flag package to parse custom options.
flag.Parse()

// Set the required options programmatically.
// For Cloud execution, specify the Dataflow runner, Google Cloud
// project ID, region, and staging location.
// For more information about regions, see
// https://cloud.google.com/dataflow/docs/concepts/regional-endpoints
flag.Set("runner", "dataflow")
flag.Set("project", "my-project-id")
flag.Set("region", "us-central1")
flag.Set("staging_location", "gs://my-bucket/binaries")

beam.Init()

// Create the Pipeline.
p := beam.NewPipeline()
s := p.Root()

파이프라인을 구성한 후에는 모든 파이프라인 읽기, 변환, 쓰기를 지정하고 파이프라인을 실행합니다.

명령줄에서 파이프라인 옵션 사용

다음 예시에서는 명령줄에서 지정된 파이프라인 옵션을 사용하는 방법을 보여줍니다. 이 예시에서는 파이프라인 옵션을 프로그래매틱 방식으로 설정하지 않습니다.

자바

// Set your PipelineOptions to the specified command-line options
MyOptions options = PipelineOptionsFactory.fromArgs(args).withValidation();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

Python argparse 모듈을 사용하여 명령줄 옵션을 파싱합니다.

# Use Python argparse module to parse custom arguments
import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

# For more details on how to use argparse, take a look at:
#   https://docs.python.org/3/library/argparse.html
parser = argparse.ArgumentParser()
parser.add_argument(
    '--input-file',
    default='gs://dataflow-samples/shakespeare/kinglear.txt',
    help='The file path for the input text to process.')
parser.add_argument(
    '--output-path', required=True, help='The path prefix for output files.')
args, beam_args = parser.parse_known_args()

# Create the Pipeline with remaining arguments.
beam_options = PipelineOptions(beam_args)
with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | 'Read files' >> beam.io.ReadFromText(args.input_file)
      | 'Write files' >> beam.io.WriteToText(args.output_path))

Go

Go flag 패키지를 사용하여 명령줄 옵션을 파싱합니다. beam.Init()를 호출하기 전에 옵션을 파싱해야 합니다. 이 예시에서 output은 명령줄 옵션입니다.

// Define configuration options
var (
  output = flag.String("output", "", "Output file (required).")
)

// Parse options before beam.Init()
flag.Parse()

beam.Init()

// Input validation must be done after beam.Init()
if *output == "" {
  log.Fatal("No output provided!")
}

p := beam.NewPipeline()

파이프라인을 구성한 후에는 모든 파이프라인 읽기, 변환, 쓰기를 지정하고 파이프라인을 실행합니다.

실행 모드 제어

Apache Beam 프로그램이 Dataflow와 같은 서비스에서 파이프라인을 실행하면 프로그램에서 파이프라인을 비동기적으로 실행하거나 파이프라인이 완료될 때까지 차단할 수 있습니다. 다음 안내를 수행하여 이 동작을 변경할 수 있습니다.

자바

Apache Beam Java 프로그램이 Dataflow와 같은 서비스에서 파이프라인을 실행하면 일반적으로 비동기식으로 실행됩니다. 파이프라인을 실행하고 작업이 완료될 때까지 기다리려면 DataflowRunner를 파이프라인 실행기로 설정하고 pipeline.run().waitUntilFinish()를 명시적으로 호출합니다.

DataflowRunner를 사용하고 pipeline.run()에서 반환한 PipelineResult 객체에서 waitUntilFinish()를 호출하면 파이프라인은 Google Cloud에서 실행되지만 로컬 코드가 클라우드 작업이 끝날 때까지 기다렸다가 최종 DataflowPipelineJob 객체를 반환합니다. 작업이 실행되는 동안 Dataflow 서비스는 대기 중에 작업 상태 업데이트와 콘솔 메시지를 출력합니다.

Python

Apache Beam Python 프로그램이 Dataflow와 같은 서비스에서 파이프라인을 실행하면 일반적으로 비동기식으로 실행됩니다. 파이프라인이 완료될 때까지 차단하려면 실행기의 run() 메서드에서 반환된 PipelineResult 객체의 wait_until_finish() 메서드를 사용합니다.

Go

Apache Beam Go 프로그램이 Dataflow에서 파이프라인을 실행하면 기본적으로 동기식이며 파이프라인이 완료될 때까지 차단합니다. 차단하지 않으려는 경우 두 가지 옵션이 있습니다.

  1. Go 루틴에서 작업을 시작합니다.

    go func() {
      pr, err := beamx.Run(ctx, p)
      if err != nil {
        // Handle the error.
      }
      // Send beam.PipelineResult into a channel.
      results <- pr
    }()
    // Do other operations while the pipeline runs.
    
  2. jobopts 패키지에 있는 --async 명령줄 플래그를 사용합니다.

실행 세부정보를 보고 진행 상황을 모니터링하고 작업 완료 상태를 확인하려면 Dataflow 모니터링 인터페이스 또는 Dataflow 명령줄 인터페이스를 사용합니다.

스트리밍 소스 사용

자바

파이프라인이 제한되지 않은 데이터 소스(예: Pub/Sub)에서 읽는 경우 파이프라인이 자동으로 스트리밍 모드에서 실행됩니다.

Python

파이프라인이 제한되지 않은 데이터 소스(예: Pub/Sub)를 사용하는 경우 streaming 옵션을 true로 설정해야 합니다.

Go

파이프라인이 제한되지 않은 데이터 소스(예: Pub/Sub)에서 읽는 경우 파이프라인이 자동으로 스트리밍 모드에서 실행됩니다.

스트리밍 작업은 기본적으로 n1-standard-2 이상의 Compute Engine 머신 유형을 사용합니다.

로컬에서 실행

관리형 클라우드 리소스에서 파이프라인을 실행하는 대신 파이프라인을 로컬에서 실행할 수 있습니다. 로컬 실행은 작은 데이터 세트에 대해 파이프라인을 테스트, 디버깅 또는 실행하는 데 몇 가지 장점을 제공합니다. 예를 들어 로컬 실행은 원격 Dataflow 서비스와 관련된 Google Cloud 프로젝트의 종속 항목을 삭제합니다.

로컬 실행을 사용할 때는 로컬 메모리에 저장할 수 있을 만큼 작은 데이터 세트로 파이프라인을 실행해야 합니다. Create 변환을 사용하여 작은 인메모리 데이터 세트를 만들거나 Read 변환을 사용하여 작은 로컬 또는 원격 파일로 작업할 수 있습니다. 로컬 실행은 일반적으로 외부 종속 항목을 줄이면서 테스트와 디버깅을 더 빠르고 간편하게 수행하는 방법을 제공하지만 로컬 환경에서 사용할 수 있는 메모리로 제한됩니다.

다음 예시 코드는 로컬 환경에서 실행되는 파이프라인을 구성하는 방법을 보여줍니다.

자바

// Create and set our Pipeline Options.
PipelineOptions options = PipelineOptionsFactory.create();

// Create the Pipeline with the specified options.
Pipeline p = Pipeline.create(options);

Python

import argparse

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

parser = argparse.ArgumentParser()
# parser.add_argument('--my-arg')
args, beam_args = parser.parse_known_args()

# Create and set your Pipeline Options.
beam_options = PipelineOptions(beam_args)
args = beam_options.view_as(MyOptions)

with beam.Pipeline(options=beam_options) as pipeline:
  lines = (
      pipeline
      | beam.io.ReadFromText(args.input)
      | beam.io.WriteToText(args.output))

Go

// Parse options before beam.Init()
flag.Parse()

beam.Init()

p := beam.NewPipeline()

파이프라인을 구성한 후에는 실행합니다.

커스텀 파이프라인 옵션 만들기

표준 PipelineOptions 외에도 자체 커스텀 옵션을 추가할 수 있습니다. Apache Beam의 명령줄은 동일한 형식으로 지정된 명령줄 인수를 사용하여 커스텀 옵션을 파싱할 수도 있습니다.

자바

자체 옵션을 추가하려면 다음 예시처럼 옵션마다 getter와 setter 메서드를 사용하여 인터페이스를 정의합니다.

public interface MyOptions extends PipelineOptions {
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

Python

자체 옵션을 추가하려면 다음 예시처럼 Python의 표준 argparse 모듈과 완전히 동일하게 작동하는 add_argument() 메서드를 사용합니다.

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument('--input')
    parser.add_argument('--output')

Go

자체 옵션을 추가하려면 다음 예시와 같이 Go 플래그 패키지를 사용합니다.

var (
  input  = flag.String("input", "", "")
  output = flag.String("output", "", "")
)

사용자가 --help를 명령줄 인수로 전달하면 나타나는 설명과 기본값을 지정할 수도 있습니다.

자바

다음과 같이 주석을 사용하여 설명과 기본값을 설정합니다.

public interface MyOptions extends PipelineOptions {
  @Description("My custom command line argument.")
  @Default.String("DEFAULT")
  String getMyCustomOption();
  void setMyCustomOption(String myCustomOption);
}

PipelineOptionsFactory로 인터페이스를 등록한 후 PipelineOptions 객체를 만들 때 인터페이스를 전달하는 것이 좋습니다. PipelineOptionsFactory로 인터페이스를 등록하면 --help는 커스텀 옵션 인터페이스를 찾고 --help 명령어 출력에 추가할 수 있습니다. PipelineOptionsFactory는 커스텀 옵션이 다른 등록된 모든 옵션과 호환되는지 검증합니다.

다음 예시 코드에서는 PipelineOptionsFactory로 커스텀 옵션 인터페이스를 등록하는 방법을 보여줍니다.

PipelineOptionsFactory.register(MyOptions.class);
MyOptions options = PipelineOptionsFactory.fromArgs(args)
                                          .withValidation()
                                          .as(MyOptions.class);

이제 파이프라인이 --myCustomOption=value를 명령줄 인수로 허용할 수 있습니다.

Python

다음과 같이 설명과 기본값을 설정합니다.

from apache_beam.options.pipeline_options import PipelineOptions

class MyOptions(PipelineOptions):
  @classmethod
  def _add_argparse_args(cls, parser):
    parser.add_argument(
        '--input',
        default='gs://dataflow-samples/shakespeare/kinglear.txt',
        help='The file path for the input text to process.')
    parser.add_argument(
        '--output', required=True, help='The path prefix for output files.')

Go

다음과 같이 설명과 기본값을 설정합니다.

var (
  input  = flag.String("input", "gs://MY_STORAGE_BUCKET/input", "Input for the pipeline")
  output = flag.String("output", "gs://MY_STORAGE_BUCKET/output", "Output for the pipeline")
)