Dataflow 파이프라인 배포

이 문서에서는 파이프라인 배포를 간략하게 설명하고 배포된 파이프라인에서 수행할 수 있는 몇 가지 작업을 설명합니다.

파이프라인 실행

Apache Beam 파이프라인을 만들고 테스트한 후 파이프라인을 실행합니다. 파이프라인을 로컬에서 실행하여 Apache Beam 파이프라인을 테스트하고 디버깅하거나 Apache Beam 파이프라인 실행에 사용 가능한 데이터 처리 시스템인 Dataflow에서 실행할 수 있습니다.

로컬에서 실행

로컬에서 파이프라인을 실행합니다.

Java

빠른 시작에서 가져온 다음 예시 코드는 로컬에서 WordCount 파이프라인을 실행하는 방법을 보여줍니다. 자세한 내용은 로컬에서 Java 파이프라인을 실행하는 방법을 참조하세요.

터미널에서 다음 명령어를 실행합니다.

  mvn compile exec:java \
      -Dexec.mainClass=org.apache.beam.examples.WordCount \
      -Dexec.args="--output=counts"
  

Python

빠른 시작에서 가져온 다음 예시 코드는 로컬에서 WordCount 파이프라인을 실행하는 방법을 보여줍니다. 자세한 내용은 로컬에서 Python 파이프라인을 실행하는 방법을 참조하세요.

터미널에서 다음 명령어를 실행합니다.

python -m apache_beam.examples.wordcount \ --output outputs

Go

빠른 시작에서 가져온 다음 예시 코드는 로컬에서 WordCount 파이프라인을 실행하는 방법을 보여줍니다. 자세한 내용은 Go 파이프라인을 로컬에서 실행하는 방법을 참조하세요.

터미널에서 다음 명령어를 실행합니다.

    go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \ --output outputs
  

Direct Runner를 사용하여 머신에서 로컬로 파이프라인을 실행하는 방법을 알아보세요.

Dataflow에서 실행

Dataflow에서 파이프라인을 실행합니다.

Java

빠른 시작에서 가져온 다음 예시 코드는 Dataflow에서 WordCount 파이프라인을 실행하는 방법을 보여줍니다. 자세한 내용은 Dataflow에서 Java 파이프라인을 실행하는 방법을 참조하세요.

터미널 word-count-beam 디렉터리에서 다음 명령어를 실행합니다.

  mvn -Pdataflow-runner compile exec:java \
    -Dexec.mainClass=org.apache.beam.examples.WordCount \
    -Dexec.args="--project=PROJECT_ID \
    --gcpTempLocation=gs://BUCKET_NAME/temp/ \
    --output=gs://BUCKET_NAME/output \
    --runner=DataflowRunner \
    --region=REGION"
    

다음을 바꿉니다.

  • PROJECT_ID: Google Cloud 프로젝트 ID입니다.
  • BUCKET_NAME: Cloud Storage 버킷 이름
  • REGION: Dataflow 리전(예: us-central1)

Python

빠른 시작에서 가져온 다음 예시 코드는 Dataflow에서 WordCount 파이프라인을 실행하는 방법을 보여줍니다. 자세한 내용은 Dataflow에서 Python 파이프라인을 실행하는 방법을 참조하세요.

터미널에서 다음 명령어를 실행합니다.

python -m apache_beam.examples.wordcount \
    --region DATAFLOW_REGION \
    --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://STORAGE_BUCKET/results/outputs \
    --runner DataflowRunner \
    --project PROJECT_ID \
    --temp_location gs://STORAGE_BUCKET/tmp/

다음을 바꿉니다.

  • DATAFLOW_REGION: Dataflow 작업을 배포할 리전입니다(예: europe-west1).

    --region 플래그는 메타데이터 서버, 로컬 클라이언트 또는 환경 변수에 설정된 기본 리전을 재정의합니다.

  • STORAGE_BUCKET: 이전에 복사한 Cloud Storage 이름입니다.
  • PROJECT_ID: 이전에 복사한 Google Cloud 프로젝트 ID입니다.

Go

빠른 시작에서 가져온 다음 예시 코드는 Dataflow에서 WordCount 파이프라인을 실행하는 방법을 보여줍니다. 자세한 내용은 Dataflow에서 Go 파이프라인 실행 방법을 참조하세요.

터미널에서 다음 명령어를 실행합니다.

  posix-terminal go run wordcount.go --input gs://dataflow-samples/shakespeare/kinglear.txt \
    --output gs://STORAGE_BUCKET/results/outputs \
    --runner dataflow \
    --project PROJECT_ID \
    --region DATAFLOW_REGION \
    --staging_location gs://STORAGE_BUCKET/binaries/
  

다음을 바꿉니다.

  • STORAGE_BUCKET: Cloud Storage 버킷 이름입니다.
  • PROJECT_ID: Google Cloud 프로젝트 ID입니다.
  • DATAFLOW_REGION: Dataflow 작업을 배포할 리전입니다. 예를 들면 europe-west1입니다. 사용 가능한 위치 목록은 Dataflow 위치를 참조하세요. --region 플래그는 메타데이터 서버, 로컬 클라이언트 또는 환경 변수에 설정된 기본 리전을 재정의합니다.

Dataflow 실행기를 사용하여 Dataflow 서비스에서 파이프라인을 실행하는 방법 알아보기

Dataflow에서 파이프라인을 실행하면 Dataflow가 Apache Beam 파이프라인 코드를 Dataflow 작업으로 변환합니다. Dataflow는 Dataflow 작업을 실행할 수 있도록 Compute EngineCloud Storage와 같은 Google Cloud 서비스를 완벽하게 관리하며 필요한 리소스를 자동으로 가동하고 해체합니다. 파이프라인 수명 주기에서 Dataflow가 Apache Beam 코드를 Dataflow 작업으로 변환하는 방법에 대해 자세히 알아보세요.

파이프라인 유효성 검사

Dataflow에서 파이프라인을 실행하면 작업이 시작되기 전에 Dataflow가 파이프라인에서 유효성 검사 테스트를 수행합니다. 검증 테스트에서 파이프라인 문제가 발견되면 Dataflow는 작업 제출을 조기에 실패합니다. 작업 로그에서 Dataflow에는 다음 텍스트가 있는 메시지가 포함됩니다. 각 메시지에는 유효성 검사 발견 항목에 대한 세부정보와 문제 해결 방법 안내도 포함되어 있습니다.

The preflight pipeline validation failed for job JOB_ID.

실행되는 검증 테스트는 Dataflow 작업에서 사용하는 리소스와 서비스에 따라 달라집니다.

  • 프로젝트에 Service Usage API가 사용 설정되었다면 파이프라인 유효성 검사는 Dataflow 작업을 실행하는 데 필요한 서비스가 사용 설정되어 있는지 확인합니다.
  • 프로젝트에 Cloud Resource Manager API가 사용 설정된 경우 파이프라인 유효성 검사에서 Dataflow 작업을 실행하는 데 필요한 프로젝트 수준 구성이 있는지 확인합니다.

서비스 사용 설정에 대한 자세한 내용은 서비스 사용 설정 및 중지를 참조하세요.

파이프라인 유효성 검사 중에 포착된 권한 문제를 해결하는 방법에 대한 자세한 내용은 파이프라인 유효성 검사 실패를 참조하세요.

파이프라인 유효성 검사를 재정의하고 유효성 검사 오류가 있는 작업을 시작하려면 다음 파이프라인 서비스 옵션을 사용합니다.

Java

--dataflowServiceOptions=enable_preflight_validation=false

Python

--dataflow_service_options=enable_preflight_validation=false

Go

--dataflow_service_options=enable_preflight_validation=false

파이프라인 옵션 설정

Apache Beam 파이프라인 코드에서 파이프라인 옵션을 설정하여 Dataflow가 작업을 실행하는 방식을 제어할 수 있습니다. 예를 들어 파이프라인 옵션을 사용하여 파이프라인을 작업자 가상 머신, Dataflow 서비스 백엔드 또는 로컬 중 어떤 곳에서 실행할지 설정할 수 있습니다.

파이프라인 종속 항목 관리

많은 Apache Beam 파이프라인은 기본 Dataflow 런타임 환경을 사용하여 실행할 수 있습니다. 그러나 일부 데이터 처리 사용 사례에서는 추가 라이브러리나 클래스를 사용하는 이점을 얻을 수 있습니다. 이러한 경우 파이프라인 종속 항목을 관리해야 할 수 있습니다. 종속 항목 관리에 대한 자세한 내용은 Dataflow에서 파이프라인 종속 항목 관리를 참조하세요.

작업 모니터링

Dataflow는 Dataflow 모니터링 인터페이스Dataflow 명령줄 인터페이스와 같은 도구를 통해 작업에 대한 가시성을 제공합니다.

작업자 VM 액세스

Google Cloud Console을 사용하여 특정 파이프라인의 VM 인스턴스를 볼 수 있습니다. 그런 다음 SSH를 사용하여 각 인스턴스에 액세스할 수 있습니다. 단, 작업이 완료되거나 실패하면 Dataflow 서비스가 자동으로 VM 인스턴스를 종료하고 삭제합니다.

작업 최적화

Dataflow는 Google Cloud 리소스를 관리하는 것 외에도 분산 병렬 처리의 여러 측면을 자동으로 수행하고 최적화합니다.

동시 처리 및 분산

Dataflow는 자동으로 데이터를 분할하고 작업자 코드를 Compute Engine 인스턴스에 배포하여 동시 처리합니다. 자세한 내용은 동시 처리 및 분산을 참조하세요.

융합 및 결합 최적화

Dataflow는 파이프라인 코드를 사용하여 파이프라인의 PCollection 및 변환을 나타내는 실행 그래프를 만들고, 가장 효율적인 성능과 리소스 사용에 맞게 그래프를 최적화합니다. Dataflow는 또한 데이터 집계와 같이 많은 비용이 들 수 있는 작업을 자동으로 최적화합니다. 자세한 내용은 융합 최적화결합 최적화를 참조하세요.

자동 조정 기능

Dataflow 서비스에는 리소스 할당 및 데이터 파티셔닝 즉시 조정을 제공하는 여러 기능이 포함되어 있습니다. 이러한 기능은 Dataflow가 최대한 신속하고 효율적으로 작업을 실행하는 데 도움이 됩니다. 이러한 기능에는 다음과 같은 사항이 포함됩니다.

Streaming Engine

기본적으로 Dataflow 파이프라인 실행자는 스트리밍 파이프라인의 단계를 작업자 가상 머신에서 실행하여 작업자 CPU, 메모리, 영구 디스크 스토리지를 사용합니다. Dataflow의 Streaming Engine은 파이프라인 실행을 작업자 VM에서 Dataflow 서비스 백엔드로 이동합니다. 자세한 내용은 Streaming Engine을 참조하세요.

Dataflow 가변형 리소스 예약

Dataflow FlexRS는 고급 예약 기술, Dataflow Shuffle 서비스, 선점형 가상 머신(VM) 인스턴스와 일반 VM의 조합을 사용하여 일괄 처리 비용을 줄입니다. Dataflow는 선점형 VM과 일반 VM을 동시에 실행하여 시스템 이벤트 발생 시 Compute Engine이 선점형 VM 인스턴스를 중지하는 경우 사용자 환경을 개선시킵니다. FlexRS를 사용하면 파이프라인이 계속해서 진행되고 Compute Engine에서 선점형 VM을 선점할 때 이전 작업이 손실되지 않습니다. FlexRS에 대한 자세한 내용은 Dataflow에서 가변형 리소스 예약 사용을 참조하세요.

Dataflow 보안 VM

2022년 6월 1일부터 Dataflow 서비스는 모든 작업자에 보안 VM을 사용합니다. 보안 VM 기능에 대한 자세한 내용은 보안 VM을 참조하세요.