Dataproc 선택적 Flink 구성요소

컬렉션을 사용해 정리하기 내 환경설정을 기준으로 콘텐츠를 저장하고 분류하세요.

선택적 구성요소 기능을 사용하여 Dataproc 클러스터를 만들 때 Flink와 같은 추가 구성요소를 설치할 수 있습니다. 이 페이지에서는 Flink 구성요소에 대해 설명합니다.

Dataproc Flink 구성요소는 Dataproc 클러스터에 Apache Flink를 설치합니다.

구성요소 설치

Dataproc 클러스터를 만들 때 구성요소를 설치합니다. Dataproc Flink 구성요소는 Dataproc 이미지 버전 1.5 이상으로 생성된 클러스터에 설치할 수 있습니다.

클러스터 이미지 버전에 따라 클러스터에 설치된 Flink 구성요소의 버전이 결정됩니다. 각 Dataproc 이미지 출시에 포함된 구성요소 버전 목록을 보려면 지원되는 Dataproc 버전을 참조하세요.

권장사항: Flink 구성요소가 포함된 마스터 1개 표준 VM 클러스터를 사용하세요. Dataproc 고가용성 모드 클러스터(마스터 3개 VM)는 Flink 고가용성 모드를 지원하지 않습니다.

gcloud 명령어

Flink 구성요소를 포함하는 Dataproc 클러스터를 만들려면 --optional-components 플래그와 함께 gcloud dataproc clusters create cluster-name 명령어를 사용합니다.

gcloud dataproc clusters create cluster-name \
    --optional-components=FLINK \
    --region=region \
    --enable-component-gateway \
    --image-version=DATAPROC_IMAGE_VERSION \
    --scopes=https://www.googleapis.com/auth/cloud-platform  \
    ... other flags
참고:
  • --enable-component-gateway 플래그를 사용하면 Flink 작업 관리자 UI에 액세스할 수 있습니다.
  • --scopes=https://www.googleapis.com/auth/cloud-platform 플래그는 프로젝트의 Cloud Platform 서비스에 대한 API 액세스를 사용 설정합니다.
  • 선택사항인 --metadata flink-start-yarn-session=true를 추가하여 클러스터의 백그라운드에서 Flink YARN 데몬(/usr/bin/flink-yarn-daemon)을 실행하면 Flink YARN 세션을 시작할 수 있습니다(Flink 세션 모드 참조).
    gcloud dataproc clusters create cluster-name \
        --optional-components=FLINK \
        --region=region \
        --enable-component-gateway \
        --image-version=DATAPROC_IMAGE_VERSION \
        --scopes=https://www.googleapis.com/auth/cloud-platform  \
        --metadata flink-start-yarn-session=true \
        ... other flags
    

REST API

Flink 구성요소는 SoftwareConfig.Componentclusters.create 요청에 사용하여 Dataproc API를 통해 지정할 수 있습니다.

EndpointConfig.enableHttpPortAccesstrue로 설정하여 Flink 작업 관리자 UI에 연결을 사용 설정합니다.

콘솔

  1. 구성요소 및 구성요소 게이트웨이를 사용 설정합니다.
    • Google Cloud 콘솔에서 Dataproc Compute Engine에서 Dataproc 클러스터 만들기 페이지를 엽니다. 클러스터 설정 패널이 선택되었습니다.
    • 버전 관리 섹션에서 이미지 유형 및 버전을 확인하거나 변경합니다.
    • 구성요소 섹션에서 다음을 수행합니다.
      • 구성요소 게이트웨이 아래에서 구성요소 게이트웨이 사용 설정을 선택합니다.
      • 선택적 구성요소 아래에서 클러스터에 설치할 Flink 및 기타 선택적인 구성요소를 선택합니다.

Flink 속성을 설정하려면 다음 단계를 따르세요.

  1. 초기화 작업을 사용하여 /etc/flink/conf/flink-conf.yaml의 기본 Flink 속성을 업데이트합니다.

  2. Flink 작업을 제출하거나 Flink 세션을 시작할 때 명령줄 플래그를 사용하여 Flink 속성을 지정합니다.

클래스 경로 설정

  1. Flink 클러스터 마스터 VM의 SSH 터미널 창에서 Hadoop 클래스 경로를 초기화합니다.

    export HADOOP_CLASSPATH=$(hadoop classpath)
    

Flink를 다른 YARN의 배포 모드(애플리케이션, 작업당, 세션 모드)로 실행할 수 있습니다.

애플리케이션 모드

Flink 애플리케이션 모드는 Dataproc 이미지 버전 2.0 이상에서 지원됩니다. 이 모드는 YARN 작업 관리자에서 작업의 main() 메서드를 실행합니다. 작업이 완료되면 클러스터가 종료됩니다.

작업 제출 예시:

flink run-application \
    -t yarn-application \
   -Djobmanager.memory.process.size=1024m \
   -Dtaskmanager.memory.process.size=2048m \
   -Djobmanager.heap.mb=820 \
   -Dtaskmanager.heap.mb=1640 \
   -Dtaskmanager.numberOfTaskSlots=2 \
   -Dparallelism.default=4 \
   /usr/lib/flink/examples/batch/WordCount.jar

실행 중인 작업 나열:

./bin/flink list -t yarn-application -Dyarn.application.id=application_XXXX_YY

실행 중인 작업 취소:

./bin/flink cancel -t yarn-application -Dyarn.application.id=application_XXXX_YY <jobId>

작업별 모드

이 Flink 모드는 클라이언트 측에서 작업의 main() 메서드를 실행합니다.

작업 제출 예시:

flink run \
    -m yarn-cluster \
    -p 4 \
    -ys 2 \
    -yjm 1024m \
    -ytm 2048m \
    /usr/lib/flink/examples/batch/WordCount.jar

세션 모드

장기 실행 Flink YARN 세션을 시작한 후 하나 이상의 작업을 세션에 제출합니다.

다음 방법 중 하나로 Flink 세션을 시작할 수 있습니다.

  1. Flink 클러스터를 만든 후 커스텀 설정을 사용하여 클러스터 마스터 VM에 사전 설치된 Flink yarn-session.sh 스크립트를 실행합니다.

    커스텀 설정의 예시:

    /usr/lib/flink/bin/yarn-session.sh \
      -s 1 \
      -jm 1024m \
      -tm 2048m \
      -nm flink-dataproc \
      --detached
     ```
    
  2. Flink 클러스터를 만든 후 기본 설정으로 /usr/bin/flink-yarn-daemon 래퍼 스크립트를 실행합니다.

    . /usr/bin/flink-yarn-daemon
    
  3. Flink 클러스터를 만들어 gcloud dataproc clusters create 명령어에 --metadata flink-start-yarn-session=true 플래그를 추가합니다. 이 플래그를 사용 설정하면 클러스터가 생성된 후 Dataproc이 /usr/bin/flink-yarn-daemon을 실행하여 클러스터에서 Flink 세션을 시작합니다.

세션의 YARN 애플리케이션 ID가 /tmp/.yarn-properties-${USER}에 저장됩니다. yarn application -list 명령어를 사용하여 ID를 나열할 수 있습니다.

세션에 작업 제출

Flink 세션을 시작하면 명령어 결과에 작업이 실행되는 Flink 마스터 VM의 URL(호스트 및 포트 포함)이 나열됩니다.

Flink 마스터 URL을 보는 또 다른 방법: 다음 명령어 결과는 Tracking-URL 필드에 Flink 마스터 URL을 나열합니다.

yarn application -list -appId=<yarn-app-id> | sed 's#http://##'`

다음 명령어를 실행하여 Flink 작업을 세션에 제출합니다. http:// prefix를 삭제한 후 FLINK_MASTER_URL을 Flink 마스터 URL로 바꿉니다.

flink run -m FLINK_MASTER_URL /usr/lib/flink/examples/batch/WordCount.jar

세션에서 작업 나열

세션에서 Flink 작업을 나열하려면 다음 중 하나를 수행합니다.

  • 인수 없이 flink list를 실행합니다. 이 명령어는 /tmp/.yarn-properties-${USER}에서 세션의 YARN 애플리케이션 ID를 찾습니다.

  • flink list -yid YARN_APPLICATION_ID을 실행합니다. /tmp/.yarn-properties-${USER} 또는 yarn application -list를 실행하여 YARN 애플리케이션 ID를 가져옵니다.

  • flink list -m FLINK_MASTER_URL을 실행합니다.

세션 중지

세션을 중지하려면 /tmp/.yarn-properties-${USER} 또는 yarn application -list의 출력에서 세션의 YARN 애플리케이션 ID를 가져온 후 다음 명령어 중 하나를 실행합니다.

echo "stop" | /usr/lib/flink/bin/yarn-session.sh -id YARN_APPLICATION_ID
yarn application -kill YARN_APPLICATION_ID

Apache Beam 작업 실행

Dataproc에서 FlinkRunner를 사용하여 Apache Beam을 실행할 수 있습니다.

다음과 같은 방법으로 Flink에서 Beam 작업을 실행할 수 있습니다.

  1. 자바 Beam 작업
  2. 포터블 Beam 작업

자바 Beam 작업

Beam 작업을 JAR 파일로 패키징합니다. 작업을 실행하는 데 필요한 종속 항목이 포함된 번들 JAR 파일을 제공하세요.

다음 예시에서는 Dataproc 클러스터의 마스터 노드에서 자바 Beam 작업을 실행합니다.

  1. Flink 구성요소를 사용 설정하여 Dataproc 클러스터를 만듭니다.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink.
    • --image-version: 클러스터에 설치된 Flink 버전을 결정하는 클러스터의 이미지 버전. 예를 들어 최신 및 이전 2.0.x 이미지 출시 버전 4개에 대하여 나열된 Apache Flink 구성요소 버전을 참조하세요.
    • --region: 지원되는 Dataproc 리전
    • --enable-component-gateway: Flink Job Manager UI에 대한 액세스 사용 설정
    • --scopes: 프로젝트의 Cloud Platform 서비스에 대한 API 액세스 사용 설정
  2. SSH 유틸리티를 사용하여 FLink 클러스터 마스터 노드에서 터미널 창을 엽니다.

  3. Dataproc 클러스터 마스터 노드에서 Flink YARN 세션을 시작합니다.

    . /usr/bin/flink-yarn-daemon
    

    Dataproc 클러스터의 Flink 버전을 기록합니다.

    flink --version
    
  4. 로컬 머신에서 자바의 표준 Beam 단어 수 예시를 생성합니다.

    Dataproc 클러스터의 Flink 버전과 호환되는 Beam 버전을 선택합니다. Beam-Flink 버전 호환성이 나열된 Flink 버전 호환성 표를 참조하세요.

    생성된 POM 파일을 엽니다. <flink.artifact.name> 태그로 지정된 Beam Flink 실행기 버전을 확인합니다. Flink 아티팩트 이름의 Beam Flink 실행기 버전이 클러스터의 Flink 버전과 일치하지 않으면 일치하도록 버전 번호를 업데이트합니다.

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. 단어 수 예시를 패키징합니다.

    mvn package -Pflink-runner
    
  6. 패키징된 uber JAR 파일 word-count-beam-bundled-0.1.jar(135MB 이하)를 Dataproc 클러스터의 마스터 노드에 업로드합니다. gsutil cp를 사용하면 Cloud Storage에서 Dataproc 클러스터로 파일을 더 빠르게 전송할 수 있습니다.

    1. 로컬 터미널에서 Cloud Storage 버킷을 만들고 uber JAR을 업로드합니다.

      gsutil mb BUCKET_NAME
      
      gsutil cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. Dataproc의 마스터 노드에서 uber JAR을 다운로드합니다.

      gsutil cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. Dataproc 클러스터의 마스터 노드에서 자바 Beam 작업을 실행합니다.

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. 결과가 Cloud Storage 버킷에 기록되었는지 확인합니다.

    gsutil cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. Flink YARN 세션을 중지합니다.

    yarn application -list
    
    yarn application -kill YARN_APPLICATION_ID
    

포터블 Beam 작업

Python, Go, 기타 지원되는 언어로 Beam 작업을 실행하려면 Beam의 Flink Runner에 설명된 대로 FlinkRunnerPortableRunner를 사용하면 됩니다. 이동성 프레임워크 로드맵도 참조하세요.

다음 예시에서는 Dataproc 클러스터의 마스터 노드에서 이동 가능한 Beam 작업을 Python으로 실행합니다.

  1. FlinkDocker 구성요소가 모두 사용 설정된 Dataproc 클러스터를 만듭니다.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    

    참고:

    • --optional-components: Flink 및 Docker
    • --image-version: 클러스터에 설치된 Flink 버전을 결정하는 클러스터의 이미지 버전. 예를 들어 최신 및 이전 2.0.x 이미지 출시 버전 4개에 대하여 나열된 Apache Flink 구성요소 버전을 참조하세요.
    • --region: 사용 가능한 Dataproc 리전
    • --enable-component-gateway: Flink Job Manager UI에 대한 액세스 사용 설정
    • --scopes: 프로젝트의 Cloud Platform 서비스에 대한 API 액세스 사용 설정
  2. 로컬 또는 Cloud Shell에서 gsutil을 사용하여 Cloud Storage 버킷을 만듭니다. 샘플 WordCount 프로그램을 실행할 때 BUCKET_NAME을 지정합니다.

    gsutil mb BUCKET_NAME
    
  3. 클러스터 VM의 터미널 창에서 Flink YARN 세션을 시작합니다. 작업이 실행되는 Flink 마스터의 주소인 Flink 마스터 URL을 확인합니다. 샘플 WordCount 프로그램을 실행할 때 FLINK_MASTER_URL을 지정합니다.

    . /usr/bin/flink-yarn-daemon
    

    Dataproc 클러스터를 실행하는 Flink 버전을 표시합니다. 샘플 WordCount 프로그램을 실행할 때 FLINK_VERSION을 지정합니다.

    flink --version
    
  4. 작업에 필요한 Python 라이브러리를 클러스터 마스터 노드에 설치합니다.

  5. 클러스터의 Flink 버전과 호환되는 Beam 버전을 설치합니다.

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. 클러스터 마스터 노드에서 워드 수 예시를 실행합니다.

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    

    참고:

    • --runner: FlinkRunner.
    • --flink_version: FLINK_VERSION, 앞서 언급됨
    • --flink_master: FLINK_MASTER_URL, 앞서 언급됨
    • --flink_submit_uber_jar: uber JAR을 사용하여 Beam 작업 실행
    • --output: BUCKET_NAME, 앞서 생성됨
  7. 버킷에 결과가 기록되었는지 확인합니다.

    gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. Flink YARN 세션을 중지합니다.

    1. 애플리케이션 ID를 가져옵니다.
    yarn application -list
    
    1. Insert the <var>YARN_APPLICATION_ID</var>, then stop the session.
    
    yarn application -kill 
    

Dataproc Flink 구성요소는 Kerberized 클러스터를 지원합니다. Flink 작업을 제출하고 유지하거나 Flink 클러스터를 시작하려면 유효한 Kerberos 티켓이 필요합니다. 기본적으로 Kerberos 티켓은 7일 동안 유효합니다.

Flink 작업 또는 Flink 세션 클러스터를 실행하는 동안 Flink 작업 관리자 웹 인터페이스를 사용할 수 있습니다. 웹 인터페이스를 사용하려면 다음 단계를 따르세요.

  1. 구성요소 게이트웨이를 사용 설정하여 Dataproc 클러스터를 만듭니다.
  2. 클러스터를 만든 후 Google Cloud Console의 클러스터 세부정보 페이지에 있는 웹 인터페이스 탭에서 구성요소 게이트웨이 YARN ResourceManager 링크를 클릭합니다.
  3. YARN Resource Manager UI에서 Flink 클러스터 애플리케이션 항목을 식별합니다. 작업의 완료 상태에 따라 ApplicationMaster 또는 기록 링크가 나열됩니다.
  4. 장기 실행 스트리밍 작업의 경우 ApplicationManager 링크를 클릭하여 Flink 대시보드를 엽니다. 완료된 작업의 경우 기록 링크를 클릭하여 작업 세부정보를 확인합니다.