커스텀 컨테이너에서 Dataflow 작업 실행

이 문서에서는 커스텀 컨테이너를 사용하여 Dataflow 파이프라인을 실행하는 방법을 설명합니다.

컨테이너 이미지 만들기에 대한 자세한 내용은 Dataflow를 위한 커스텀 컨테이너 이미지 빌드를 참조하세요.

파이프라인을 실행할 때는 커스텀 컨테이너 이미지의 SDK와 동일한 버전 및 언어 버전의 Apache Beam SDK를 사용합니다. 이 단계를 진행하면 호환되지 않는 종속 항목 또는 SDK로 인해 예상치 못한 오류가 발생하지 않습니다.

로컬에서 테스트

Dataflow에서 파이프라인을 실행하려면 먼저 더 빠른 테스트 및 디버깅을 위해 컨테이너 이미지를 로컬로 테스트하는 것이 좋습니다.

Apache Beam 관련 사용 방법에 대한 자세한 내용은 커스텀 컨테이너 이미지로 파이프라인 실행에 대한 Apache Beam 가이드를 참조하세요.

PortableRunner를 사용한 기본 테스트

원격 컨테이너 이미지를 가져올 수 있고 간단한 파이프라인을 실행할 수 있는지 확인하려면 Apache Beam PortableRunner를 사용합니다. PortableRunner를 사용하면 작업 제출이 로컬 환경에서 발생하고 DoFn 실행은 Docker 환경에서 이루어집니다.

GPU를 사용하는 경우 Docker 컨테이너에 GPU 액세스 권한이 없을 수 있습니다. GPU를 사용하여 컨테이너를 테스트하려면 Direct Runner를 사용하고 GPU 사용 페이지의 독립형 VM으로 디버그 섹션에 나온 GPU를 사용해 독립형 VM에서 컨테이너 이미지를 테스트하는 단계를 수행합니다.

다음은 예시 파이프라인을 실행합니다.

Java

mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain \
    -Dexec.args="--runner=PortableRunner \
    --jobEndpoint=REGION \
    --defaultEnvironmentType=DOCKER \
    --defaultEnvironmentConfig=IMAGE_URI \
    --inputFile=INPUT_FILE \
    --output=OUTPUT_FILE"

Python

python path/to/my/pipeline.py \
  --runner=PortableRunner \
  --job_endpoint=REGION \
  --environment_type=DOCKER \
  --environment_config=IMAGE_URI \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE

Go

go path/to/my/pipeline.go \
  --runner=PortableRunner \
  --job_endpoint=REGION \
  --environment_type=DOCKER \
  --environment_config=IMAGE_URI \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE

다음을 바꿉니다.

  • REGION: 사용할 작업 서비스 리전으로, 주소 및 포트 형식입니다. 예를 들면 localhost:3000입니다. embed를 사용하여 프로세스 내 작업 서비스를 실행합니다.
  • IMAGE_URI: 커스텀 컨테이너 이미지 URI입니다.
  • INPUT_FILE: 텍스트 파일로 읽을 수 있는 입력 파일입니다. 컨테이너 이미지 또는 원격 파일에 미리 로드된
    SDK 하네스 컨테이너 이미지가 이 파일에 액세스할 수 있어야 합니다.
  • OUTPUT_FILE: 출력을 기록할 경로입니다. 이 경로는 원격 경로 또는 컨테이너의 로컬 경로입니다.

파이프라인이 성공적으로 완료되면 콘솔 로그를 검토하여 파이프라인이 성공적으로 완료되었으며 IMAGE_URI로 지정된 원격 이미지가 사용되었는지 확인합니다.

파이프라인을 실행한 후에는 컨테이너에 저장된 파일이 로컬 파일 시스템에 존재하지 않으며 컨테이너가 중지됩니다. docker cp를 사용하여 중지된 컨테이너 파일 시스템에서 파일을 복사할 수 있습니다.

다른 방법은 다음과 같습니다.

  • Cloud Storage와 같은 원격 파일 시스템에 출력을 제공합니다. 사용자 인증 정보 파일 또는 애플리케이션 기본 사용자 인증 정보를 포함하여 테스트 목적으로 액세스를 수동으로 구성해야 할 수 있습니다.
  • 빠른 디버깅을 위해 임시 로깅을 추가합니다.

Direct Runner 사용

컨테이너 이미지와 파이프라인에 대해 심층적인 로컬 테스트를 진행하려면 Apache Beam Direct Runner를 사용합니다.

컨테이너 이미지와 일치하는 로컬 환경에서 테스트하거나 실행 중인 컨테이너에서 파이프라인을 실행하면 컨테이너와 독립적으로 파이프라인을 확인할 수 있습니다.

Java

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  mvn compile exec:java -Dexec.mainClass=com.example.package.MyClassWithMain ...

Python

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  python path/to/my/pipeline.py ...

Go

docker run -it --entrypoint "/bin/bash" IMAGE_URI
...
# On docker container:
root@4f041a451ef3:/#  go path/to/my/pipeline.go ...

IMAGE_URI를 커스텀 컨테이너 이미지 URI로 바꿉니다.

이 예시에서는 파이프라인 자체를 포함한 모든 파이프라인 파일이 커스텀 컨테이너에 있거나, 로컬 파일 시스템에서 마운트되었거나, Apache Beam 및 컨테이너에서 원격으로 액세스할 수 있다고 가정합니다. 예를 들어 Maven(mvn)을 사용하여 위의 Java 예시를 실행하려면 Maven 및 종속 항목을 컨테이너에 스테이징해야 합니다. 자세한 내용은 Docker 문서에서 스토리지docker run을 참조하세요.

Direct Runner에서 테스트를 수행하는 목적은 기본 ENTRYPOINT를 사용해서 컨테이너 실행을 테스트하는 것이 아니라 커스텀 컨테이너 환경에서 파이프라인을 테스트하기 위한 것입니다. ENTRYPOINT(예: docker run --entrypoint ...)를 수정하여 파이프라인을 직접 실행하거나 컨테이너에서 명령어를 수동으로 실행하도록 허용합니다.

Compute Engine에서 컨테이너 실행을 기반으로 하는 특정 구성을 사용하는 경우 Compute Engine VM에서 직접 컨테이너를 실행할 수 있습니다. 자세한 내용은 Compute Engine의 컨테이너를 참조하세요.

Dataflow 작업 실행

Dataflow에서 Apache Beam 파이프라인을 시작할 때 컨테이너 이미지의 경로를 지정합니다. 커스텀 이미지에 :latest 태그를 사용하지 마세요. 날짜 또는 고유 식별자로 빌드에 태그를 지정합니다. 문제가 발생하면 이 유형의 태그를 사용하여 파이프라인 실행을 이전에 알려진 작동 구성으로 되돌리고 변경사항을 검사할 수 있습니다.

Java

--sdkContainerImage를 사용하여 Java 런타임용 SDK 컨테이너 이미지를 지정합니다.

--experiments=use_runner_v2를 사용하여 Runner v2를 사용 설정합니다.

Python

SDK 버전 2.30.0 이상을 사용하는 경우 파이프라인 옵션 --sdk_container_image를 사용하여 SDK 컨테이너 이미지를 지정합니다.

이전 버전의 SDK에는 파이프라인 옵션 --worker_harness_container_image를 사용하여 작업자 하네스에 사용할 컨테이너 이미지의 위치를 지정합니다.

커스텀 컨테이너는 Dataflow Runner v2에서만 지원됩니다. 일괄 Python 파이프라인을 실행하는 경우 --experiments=use_runner_v2 플래그를 설정합니다. 스트리밍 Python 파이프라인을 시작하는 경우 스트리밍 Python 파이프라인에 기본적으로 Runner v2가 사용되기 때문에 실험을 지정할 필요가 없습니다.

Go

SDK 버전 2.40.0 이상을 사용하는 경우 파이프라인 옵션 --sdk_container_image를 사용하여 SDK 컨테이너 이미지를 지정합니다.

이전 버전의 SDK에는 파이프라인 옵션 --worker_harness_container_image를 사용하여 작업자 하네스에 사용할 컨테이너 이미지의 위치를 지정합니다.

커스텀 컨테이너는 기본적으로 Dataflow Runner v2를 사용하므로 모든 버전의 Go SDK에서 지원됩니다.

다음 예시에서는 커스텀 컨테이너로 일괄 WordCount 예시를 실행하는 방법을 보여줍니다.

Java

mvn compile exec:java -Dexec.mainClass=org.apache.beam.examples.WordCount \
   -Dexec.args="--runner=DataflowRunner \
                --inputFile=INPUT_FILE \
                --output=OUTPUT_FILE \
                --project=PROJECT_ID \
                --region=REGION \
                --gcpTempLocation=TEMP_LOCATION \
                --diskSizeGb=DISK_SIZE_GB \
                --experiments=use_runner_v2 \
                --sdkContainerImage=IMAGE_URI"

Python

Python용 Apache Beam SDK 버전 2.30.0 이상을 사용하는 경우:

python -m apache_beam.examples.wordcount \
  --input=INPUT_FILE \
  --output=OUTPUT_FILE \
  --project=PROJECT_ID \
  --region=REGION \
  --temp_location=TEMP_LOCATION \
  --runner=DataflowRunner \
  --disk_size_gb=DISK_SIZE_GB \
  --experiments=use_runner_v2 \
  --sdk_container_image=IMAGE_URI

Go

wordcount --input gs://dataflow-samples/shakespeare/kinglear.txt \
          --output gs://<your-gcs-bucket>/counts \
          --runner dataflow \
          --project your-gcp-project \
          --region your-gcp-region \
          --temp_location gs://<your-gcs-bucket>/tmp/ \
          --staging_location gs://<your-gcs-bucket>/binaries/ \
          --sdk_container_image=IMAGE_URI

다음을 바꿉니다.

  • INPUT_FILE: 예시를 실행할 때 Dataflow가 읽은 Cloud Storage 입력 경로입니다.
  • OUTPUT_FILE: 예시 파이프라인에서 기록하는 Cloud Storage 출력 경로입니다. 이 파일에는 단어 수가 포함되어 있습니다.
  • PROJECT_ID: Google Cloud 프로젝트의 ID입니다.
  • REGION: Dataflow 작업을 배포할 리전입니다.
  • TEMP_LOCATION: 파이프라인 실행 중에 생성된 임시 작업 파일을 스테이징할 Dataflow의 Cloud Storage 경로입니다.
  • DISK_SIZE_GB: (선택사항) 컨테이너가 큰 경우 디스크 공간 부족을 방지하기 위해 기본 부팅 디스크 크기를 늘리는 것이 좋습니다.
  • IMAGE_URI: SDK 커스텀 컨테이너 이미지 URI입니다. 항상 버전이 지정된 컨테이너 SHA 또는 태그를 사용합니다. :latest 태그 또는 변경 가능한 태그를 사용하지 마세요.