Dataproc Flink 구성요소

선택적 구성요소 기능을 사용하여 Dataproc 클러스터를 만들 때 추가 구성요소를 설치할 수 있습니다. 이 페이지에서는 Flink 구성요소에 대해 설명합니다.

Dataproc Flink 구성요소는 Dataproc 클러스터에 Apache Flink를 설치합니다.

구성요소 설치

Dataproc 클러스터를 만들 때 구성요소를 설치합니다. Dataproc Flink 구성요소는 Dataproc 이미지 버전 1.5 이상으로 생성된 클러스터에 설치할 수 있습니다.

클러스터 이미지 버전에 따라 클러스터에 설치된 Flink 구성요소의 버전이 결정됩니다. 예를 들어 최신 및 이전 2.0.x 이미지 출시 버전 4개에 대하여 나열된 Apache Flink 구성요소 버전을 참조하세요. 각 Dataproc 이미지 출시에 포함된 구성요소 버전은 지원되는 Dataproc 버전을 참조하세요.

gcloud 명령어

Flink 구성요소를 포함하는 Dataproc 클러스터를 만들려면 --optional-components 플래그와 함께 gcloud dataproc clusters create cluster-name 명령어를 사용합니다.

gcloud dataproc clusters create cluster-name \
    --optional-components=FLINK \
    --region=region \
    --enable-component-gateway \
    --image-version=DATAPROC_IMAGE_VERSION \
    ... other flags

참고: Flink YARN 세션에 상당한 YARN 리소스가 소비되기 때문에 기본적으로 Dataproc는 Dataproc 클러스터가 시작될 때 Flink 세션을 시작하지 않습니다. --metadata flink-start-yarn-session=true 플래그를 gcloud dataproc clusters create 명령어에 추가하여 Flink 클러스터를 시작할 때 세션을 시작할 수 있습니다.

REST API

Flink 구성요소는 SoftwareConfig.Componentclusters.create 요청에 사용하여 Dataproc API를 통해 지정할 수 있습니다. SoftwareConfig.imageVersion 필드는 클러스터 이미지 버전을 설정하는 데 사용됩니다.

Console

  1. 구성요소 및 구성요소 게이트웨이를 사용 설정합니다.
    • Cloud Console에서 Dataproc 클러스터 만들기 페이지를 엽니다. 클러스터 설정 패널이 선택되었습니다.
    • 버전 관리 섹션에서 이미지 유형 및 버전을 확인하거나 변경합니다.
    • 구성요소 섹션에서 다음을 수행합니다.
      • 구성요소 게이트웨이 아래에서 구성요소 게이트웨이 사용 설정을 선택합니다(구성요소 게이트웨이 URL 보기 및 액세스 참조).
      • 선택적 구성요소 아래에서 클러스터에 설치할 Flink 및 기타 선택적인 구성요소를 선택합니다.

Flink를 사용하는 Dataproc 클러스터가 시작되면 Dataproc 클러스터의 마스터 노드로 SSH 연결한 후 Flink 작업을 실행합니다.

예를 들면 다음과 같습니다.

단일 Flink 작업을 실행합니다. 작업을 수락한 후 Flink가 YARN에 있는 작업에 대해 작업 관리자 및 슬롯을 시작합니다. Flink 작업이 완료될 때까지 YARN 클러스터에서 실행됩니다. 작업이 완료된 후 작업 관리자가 종료됩니다. 작업 로그는 YARN 로그에 제공됩니다.

flink run -m yarn-cluster /usr/lib/flink/examples/batch/WordCount.jar

예를 들면 다음과 같습니다.

장기 실행 Flink YARN 세션을 시작한 후 작업을 실행합니다.

Dataproc 클러스터의 마스터 노드에서 세션을 시작합니다. 또는 gcloud dataproc clusters create --metadata flink-start-yarn-session=true 플래그를 사용하여 Flink 클러스터를 만들 때 Flink YARN 세션을 시작할 수 있습니다.

. /usr/bin/flink-yarn-daemon

세션이 성공적으로 시작된 후 FLINK_MASTER_URL의 호스트와 포트를 기록합니다. 다음 명령어에서 JOB_MANAGER_HOSTNAMEREST_API_PORT를 해당 항목으로 바꿉니다. 작업을 실행합니다.

HADOOP_CLASSPATH=`hadoop classpath`

flink run -m JOB_MANAGER_HOSTNAME:REST_API_PORT /usr/lib/flink/examples/batch/WordCount.jar

세션을 중지하려면APPLICATION_IDFlink 작업 관리자 UI 또는 yarn application -list의 출력에서 찾은 Flink YARN 세션과 연결된 애플리케이션 ID로 바꾸고 다음을 실행합니다.

yarn application -kill APPLICATION_ID

Apache Beam 작업 실행

Dataproc에서 FlinkRunner를 사용하여 Apache Beam을 실행할 수 있습니다.

다음과 같은 방법으로 Flink에서 Beam 작업을 실행할 수 있습니다.

  1. 자바 Beam 작업
  2. 포터블 Beam 작업

자바 Beam 작업

Beam 작업을 JAR 파일로 패키징합니다. 작업을 실행하는 데 필요한 종속 항목이 포함된 번들 JAR 파일을 제공하세요.

다음 예시에서는 Dataproc 클러스터의 마스터 노드에서 자바 Beam 작업을 실행합니다.

  1. Flink 구성요소를 사용 설정하여 Dataproc 클러스터를 만듭니다.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink.
    • --image-version: 클러스터에 설치된 Flink 버전을 결정하는 클러스터의 이미지 버전. 예를 들어 최신 및 이전 2.0.x 이미지 출시 버전 4개에 대하여 나열된 Apache Flink 구성요소 버전을 참조하세요.
    • --region: 지원되는 Dataproc 리전
    • --enable-component-gateway: Flink Job Manager UI에 대한 액세스 사용 설정
    • --scopes: 동일한 프로젝트에서 GCP 서비스에 API 액세스 사용 설정
  2. Dataproc 클러스터의 마스터 노드에 SSH로 연결합니다.

  3. Dataproc 클러스터의 마스터 노드에서 Flink YARN 세션을 시작합니다.

    . /usr/bin/flink-yarn-daemon
    

    Dataproc 클러스터의 Flink 버전을 기록합니다.

    flink --version
    
  4. 로컬 머신에서 자바의 표준 Beam 단어 수 예시를 생성합니다.

    Dataproc 클러스터의 Flink 버전과 호환되는 Beam 버전을 선택합니다. Beam-Flink 버전 호환성이 나열된 Flink 버전 호환성 표를 참조하세요.

    생성된 POM 파일을 엽니다. <flink.artifact.name> 태그로 지정된 Beam Flink 실행기 버전을 확인합니다. Flink 아티팩트 이름의 Beam Flink 실행기 버전이 클러스터의 Flink 버전과 일치하지 않으면 일치하도록 버전 번호를 업데이트합니다.

    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=BEAM_VERSION \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    
  5. 단어 수 예시를 패키징합니다.

    mvn package -Pflink-runner
    
  6. 패키징된 uber JAR 파일 word-count-beam-bundled-0.1.jar(135MB 이하)를 Dataproc 클러스터의 마스터 노드에 업로드합니다. gsutil cp를 사용하면 Cloud Storage에서 Dataproc 클러스터로 파일을 더 빠르게 전송할 수 있습니다.

    1. 로컬 터미널에서 Cloud Storage 버킷을 만들고 uber JAR을 업로드합니다.

      gsutil mb BUCKET_NAME
      
      gsutil cp target/word-count-beam-bundled-0.1.jar gs://BUCKET_NAME/
      
    2. Dataproc의 마스터 노드에서 uber JAR을 다운로드합니다.

      gsutil cp gs://BUCKET_NAME/word-count-beam-bundled-0.1.jar .
      
  7. Dataproc 클러스터의 마스터 노드에서 자바 Beam 작업을 실행합니다.

    flink run -c org.apache.beam.examples.WordCount word-count-beam-bundled-0.1.jar \
        --runner=FlinkRunner \
        --output=gs://BUCKET_NAME/java-wordcount-out
    
  8. 결과가 Cloud Storage 버킷에 기록되었는지 확인합니다.

    gsutil cat gs://BUCKET_NAME/java-wordcount-out-SHARD_ID
    
  9. Flink YARN 세션을 중지합니다.

    yarn application -list
    
    yarn application -kill APPLICATION_ID
    

포터블 Beam 작업

Python, Go, 기타 지원되는 언어로 Beam 작업을 실행하려면 Beam의 Flink Runner에 설명된 대로 FlinkRunnerPortableRunner를 사용하면 됩니다. 이동성 프레임워크 로드맵도 참조하세요.

다음 예시에서는 Dataproc 클러스터의 마스터 노드에서 이동 가능한 Beam 작업을 Python으로 실행합니다.

  1. FlinkDocker 구성요소가 모두 사용 설정된 Dataproc 클러스터를 만듭니다.

    gcloud dataproc clusters create CLUSTER_NAME \
        --optional-components=FLINK,DOCKER \
        --image-version=DATAPROC_IMAGE_VERSION \
        --region=REGION \
        --enable-component-gateway \
        --scopes=https://www.googleapis.com/auth/cloud-platform
    
    • --optional-components: Flink 및 Docker
    • --image-version: 클러스터에 설치된 Flink 버전을 결정하는 클러스터의 이미지 버전. 예를 들어 최신 및 이전 2.0.x 이미지 출시 버전 4개에 대하여 나열된 Apache Flink 구성요소 버전을 참조하세요.
    • --region: 지원되는 Dataproc 리전
    • --enable-component-gateway: Flink Job Manager UI에 대한 액세스 사용 설정
    • --scopes: 동일한 프로젝트에서 GCP 서비스에 API 액세스 사용 설정
  2. Dataproc 클러스터의 마스터 노드에 SSH로 연결합니다.

  3. Cloud Storage 버킷을 만듭니다.

    gsutil mb BUCKET_NAME
    
  4. Dataproc 클러스터의 마스터 노드에서 Flink YARN 세션을 시작하고 세션이 시작되면 Flink 마스터 URL을 저장합니다.

    . /usr/bin/flink-yarn-daemon
    

    Dataproc 클러스터의 Flink 버전을 기록합니다.

    flink --version
    
  5. Dataproc 클러스터의 마스터 노드에 작업에 필요한 Python 라이브러리를 설치합니다.

    Dataproc 클러스터의 Flink 버전과 호환되는 Beam 버전을 선택합니다. Beam-Flink 버전 호환성이 나열된 Flink 버전 호환성 표를 참조하세요.

    python -m pip install apache-beam[gcp]==BEAM_VERSION
    
  6. Dataproc 클러스터의 마스터 노드에서 단어 수 예시를 실행합니다.

    python -m apache_beam.examples.wordcount \
        --runner=FlinkRunner \
        --flink_version=FLINK_VERSION \
        --flink_master=FLINK_MASTER_URL
        --flink_submit_uber_jar \
        --output=gs://BUCKET_NAME/python-wordcount-out
    
    • --runner(필수 항목): FlinkRunner
    • --flink_version(필수 항목): Flink 버전
    • --flink_master(필수 항목): 작업이 실행될 Flink 마스터의 주소
    • --flink_submit_uber_jar(필수 항목): uber JAR을 사용하여 Beam 작업 실행
    • --output (필수 항목): 출력을 작성할 위치
  7. 결과가 버킷에 기록되었는지 확인합니다.

    gsutil cat gs://BUCKET_NAME/python-wordcount-out-SHARD_ID
    
  8. Flink YARN 세션을 중지합니다.

    yarn application -list
    
    yarn application -kill APPLICATION_ID
    

Dataproc Flink 구성요소는 Kerberized 클러스터를 지원합니다. Flink 작업을 제출하고 유지하거나 Flink 클러스터를 시작하려면 유효한 Kerberos 티켓이 필요합니다. 기본적으로 티켓은 7일 동안 유효합니다.

Flink 작업 또는 Flink 세션 클러스터를 실행하는 동안 Flink 작업 관리자 웹 인터페이스를 사용할 수 있습니다. YARN의 Flink 애플리케이션의 애플리케이션 마스터에서 Flink Job Manager UI를 열 수 있습니다.

UI 액세스를 사용 설정하고 사용하려면 다음 안내를 따르세요.

  1. 구성요소 게이트웨이를 사용 설정하여 Dataproc 클러스터를 만듭니다.
  2. 클러스터를 만든 후 Google Cloud Console의 클러스터 세부정보 페이지에 있는 웹 인터페이스 탭에서 구성요소 게이트웨이 YARN ResourceManager 링크를 클릭합니다.
  3. YARN Resource Manager UI에서 Flink 클러스터 애플리케이션 항목을 식별합니다. 작업의 완료 상태에 따라 ApplicationMaster 또는 기록 링크가 나열됩니다.
  4. 장기 실행 스트리밍 작업의 경우 ApplicationManager 링크를 클릭하여 Flink 대시보드를 엽니다. 완료된 작업의 경우 기록 링크를 클릭하여 작업 세부정보를 확인합니다.