Flex 템플릿 사용

이 가이드에서는 gcloud 명령줄 도구를 사용하여 커스텀 Docker 이미지로 Dataflow Flex 템플릿 작업을 만들고 실행하는 방법을 보여줍니다. 이 가이드는 Pub/Sub에서 JSON 인코딩된 메시지를 읽고, Beam SQL로 메시지 데이터를 변환하고, 결과를 BigQuery 테이블에 쓰는 스트리밍 파이프라인 예시를 설명합니다.

목표

  • Docker 컨테이너 이미지를 빌드합니다.
  • Dataflow Flex 템플릿을 만들고 실행합니다.

비용

이 가이드에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • Cloud Scheduler
  • App Engine
  • Container Registry
  • Cloud Build
  • BigQuery

가격 계산기를 사용하여 예상 사용량을 토대로 예상 비용을 산출합니다.

시작하기 전에

  1. Google 계정으로 로그인합니다.

    아직 계정이 없으면 새 계정을 등록하세요.

  2. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기 페이지로 이동

  3. Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.

  4. Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, App Engine, Cloud Scheduler, and Cloud Build API를 사용 설정합니다.

    API 사용 설정

  5. 인증 설정:
    1. Cloud Console에서 서비스 계정 키 만들기 페이지로 이동합니다.

      서비스 계정 키 만들기 페이지로 이동
    2. 서비스 계정 목록에서 새 서비스 계정을 선택합니다.
    3. 서비스 계정 이름 필드에 이름을 입력합니다.
    4. 역할 목록에서 프로젝트 > 소유자.

    5. 만들기를 클릭합니다. 키가 포함된 JSON 파일이 컴퓨터에 다운로드됩니다.
  6. GOOGLE_APPLICATION_CREDENTIALS 환경 변수를 서비스 계정 키가 포함된 JSON 파일의 경로로 설정합니다. 이 변수는 현재 셸 세션에만 적용되므로, 새 세션을 열 경우, 변수를 다시 설정합니다.

이 가이드를 마치면 만든 리소스를 삭제하여 비용이 계속 청구되지 않도록 할 수 있습니다. 자세한 내용은 삭제를 참조하세요.

소스 및 싱크 예시 만들기

이 섹션에서는 다음을 만드는 방법을 설명합니다.

  • Pub/Sub를 사용하여 데이터의 스트리밍 소스
  • BigQuery에 데이터를 로드하는 데이터 세트

Cloud Storage 버킷 생성

gsutil mb 명령어를 사용합니다.

export BUCKET="my-storage-bucket"
gsutil mb gs://$BUCKET

Pub/Sub 주제 및 해당 주제에 대한 구독 만들기

gcloud 명령줄 도구를 사용합니다.

export TOPIC="messages"
export SUBSCRIPTION="ratings"

gcloud pubsub topics create $TOPIC
gcloud pubsub subscriptions create --topic $TOPIC $SUBSCRIPTION

Cloud Scheduler 작업 만들기

이 단계에서는 gcloud 명령줄 도구를 사용하여 '긍정적인 평점'과 '부정적인 평점'을 게시하는 Cloud Scheduler 작업을 만들고 실행합니다.

  1. 이 Google Cloud 프로젝트의 Cloud Scheduler 작업을 만듭니다.
    gcloud scheduler jobs create pubsub positive-ratings-publisher \
      --schedule="* * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "positive"}'
    
  2. 게시자가 분당 1개의 메시지를 게시하는 '긍정적인 평점'을 만들고 실행합니다.
  3. Cloud Scheduler 작업을 시작합니다.
    gcloud scheduler jobs run positive-ratings-publisher
    
  4. 2분마다 1개의 메시지를 게시하는 '부정적인 평점'에 대해 다른 유사한 게시자를 작성하고 실행합니다.
    gcloud scheduler jobs create pubsub negative-ratings-publisher \
      --schedule="*/2 * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "negative"}'
    
    gcloud scheduler jobs run negative-ratings-publisher
    

BigQuery 데이터 세트 만들기

bq mk 명령어를 사용합니다.

export PROJECT="$(gcloud config get-value project)"
export DATASET="beam_samples"
export TABLE="streaming_beam_sql"

bq mk --dataset "$PROJECT:$DATASET"

코드 샘플 다운로드

  1. 코드 샘플을 다운로드합니다.

    자바

    java-docs-samples 저장소를 클론하고 이 가이드의 코드 샘플로 이동합니다.

      git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
      cd java-docs-samples/dataflow/flex-templates/streaming_beam_sql

    Python

    python-docs-samples repository를 클론하고 이 가이드의 코드 샘플로 이동합니다.

      git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
      cd python-docs-samples/dataflow/flex-templates/streaming_beam
  2. 이 가이드의 TEMPLATE_IMAGE를 내보냅니다.
    export TEMPLATE_IMAGE="gcr.io/$PROJECT/samples/dataflow/streaming-beam-sql:latest"
    

개발 환경 설정

자바

  1. Java Development Kit(JDK) 버전 11을 다운로드하고 설치합니다. JAVA_HOME 환경 변수가 설정되어 있고 JDK 설치를 가리키는지 확인합니다.
  2. 사용 중인 특정 운영체제의 Maven 설치 가이드를 따라 Apache Maven을 다운로드하고 설치합니다.
  3. (선택사항) 개발을 위해 Apache Beam 파이프라인을 로컬에서 실행합니다.
      mvn compile exec:java \
        -Dexec.mainClass=org.apache.beam.samples.StreamingBeamSQL \
        -Dexec.args="\
          --project=$PROJECT \
          --inputSubscription=$SUBSCRIPTION \
          --outputTable=$PROJECT:$DATASET.$TABLE \
          --tempLocation=gs://$BUCKET/samples/dataflow/temp"
  4. 자바 프로젝트를 Uber JAR 파일에 빌드합니다.
      mvn clean package
  5. (선택사항) Uber JAR 파일의 크기를 원본 파일과 비교합니다.
      ls -lh target/*.jar
    이 Uber JAR 파일에는 모든 종속 항목이 포함되어 있습니다. 다른 라이브러리에 외부 종속 항목이 없는 독립형 애플리케이션으로 이 파일을 실행할 수 있습니다.

Python

Python용 Apache Beam SDK(pip 포함) 및 Python 버전 2.7, 3.5, 3.6 또는 3.7을 사용합니다. 다음 명령어를 실행하여 Python이 작동 중이고 pip가 설치되었는지 확인합니다.

    python --version
    python -m pip --version

Python이 없으면 Python 설치 페이지에서 사용 중인 운영체제에 맞는 설치 단계를 확인합니다.

Python만 해당: 컨테이너 이미지 만들기 및 빌드

이 섹션에는 Python 사용자를 위한 단계가 포함되어 있습니다. 자바를 사용하는 경우 다음 단계를 건너뜁니다.

  1. (선택사항) 기본적으로 Kaniko 캐시 사용을 사용 설정합니다.
    gcloud config set builds/use_kaniko True
    
    Kaniko는 컨테이너 빌드 아티팩트를 캐시하므로 이 옵션을 사용하면 후속 빌드 속도가 빨라집니다.
  2. (선택사항) Dockerfile을 만듭니다. 이 가이드에서 Dockerfile을 맞춤설정할 수 있습니다. 시작 파일은 다음과 같습니다.

    Python

      FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
    
      ARG WORKDIR=/dataflow/template
      RUN mkdir -p ${WORKDIR}
      WORKDIR ${WORKDIR}
    
      # Due to a change in the Apache Beam base image in version 2.24, you must to install
      # libffi-dev manually as a dependency. For more information:
      #   https://github.com/GoogleCloudPlatform/python-docs-samples/issues/4891
      RUN apt-get update && apt-get install -y libffi-dev && rm -rf /var/lib/apt/lists/*
    
      COPY requirements.txt .
      COPY streaming_beam.py .
    
      ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
      ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/streaming_beam.py"
    
      RUN pip install -U -r ./requirements.txt

    이 Dockerfile에는 FROM, ENV, COPY 명령어가 포함되어 있습니다. 이에 대한 자세한 내용은 Dockerfile 참조를 확인하세요.

    gcr.io/PROJECT/로 시작하는 이미지는 다른 Google Cloud 제품에서 액세스할 수 있는 프로젝트의 Container Registry에 저장됩니다.
  3. Cloud Build에서 Dockerfile 사용하여 Docker 이미지를 빌드합니다.
    gcloud builds submit --tag $TEMPLATE_IMAGE .
    

Flex 템플릿 만들기

템플릿을 실행하려면 SDK 정보 및 메타데이터와 같이 작업을 실행하는 데 필요한 모든 정보가 포함된 템플릿 사양 파일을 Cloud Storage에 만들어야 합니다.

이 예시의 metadata.json 파일에는 name, description, 입력 parameters 필드와 같은 템플릿에 대한 추가 정보가 포함되어 있습니다.

  1. SDK 정보 및 메타데이터와 같이 작업을 실행하는 데 필요한 모든 정보가 포함된 템플릿 사양 파일을 만듭니다.
    export TEMPLATE_PATH="gs://$BUCKET/samples/dataflow/templates/streaming-beam-sql.json"
    
  2. Flex 템플릿을 빌드합니다.

    자바

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image-gcr-path "$TEMPLATE_IMAGE" \
          --sdk-language "JAVA" \
          --flex-template-base-image JAVA11 \
          --metadata-file "metadata.json" \
          --jar "target/streaming-beam-sql-1.0.jar" \
          --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.apache.beam.samples.StreamingBeamSQL"

    Python

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image "$TEMPLATE_IMAGE" \
          --sdk-language "PYTHON" \
          --metadata-file "metadata.json"

지정한 Cloud Storage 위치의 템플릿 파일을 통해 이제 템플릿을 사용할 수 있습니다.

Flex 템플릿 파이프라인 실행

이제 템플릿 파일을 참조하고 파이프라인에 필요한 템플릿 매개변수를 전달하여 Dataflow에서 Apache Beam 파이프라인을 실행할 수 있습니다.

  1. 템플릿을 실행합니다.

    자바

      export REGION="us-central1"
    
      gcloud dataflow flex-template run "streaming-beam-sql-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters inputSubscription="$SUBSCRIPTION" \
        --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"

    Python

      export REGION="us-central1"
    
      gcloud dataflow flex-template run "streaming-beam-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters input_subscription="$SUBSCRIPTION" \
        --parameters output_table="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"
    또는 REST API 요청을 사용하여 템플릿을 실행합니다.
    curl -X POST \
      "https://dataflow.googleapis.com/v1b3/projects/$PROJECT/locations/us-central1/flexTemplates:launch" \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -d '{
        "launch_parameter": {
          "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'",
          "parameters": {
            "inputSubscription": "'$SUBSCRIPTION'",
            "outputTable": "'$PROJECT:$DATASET.$TABLE'"
          },
          "containerSpecGcsPath": "'$TEMPLATE_PATH'"
        }
      }'
    
  2. 명령어를 실행하여 Flex 템플릿을 실행하면 Dataflow는 작업 상태가 대기 중인 작업 ID를 반환합니다. 작업 상태가 실행 중이 되기까지 몇 분 정도 걸릴 수 있으며 작업 그래프에 액세스할 수 있습니다.
  3. BigQuery에서 다음 쿼리를 실행하여 결과를 확인합니다.
    bq query --use_legacy_sql=false 'SELECT * FROM `'"$PROJECT.$DATASET.$TABLE"'`'
    
    이 파이프라인이 실행되는 동안 매분 BigQuery 테이블에 새 행이 추가됩니다.

삭제

이 가이드를 마쳤으면 나중에 요금이 청구되지 않도록 Google Cloud에서 만든 리소스를 삭제합니다. 다음 섹션은 이러한 리소스를 삭제하거나 사용 중지하는 방법을 설명합니다.

Flex 템플릿 리소스 정리

  1. Dataflow 파이프라인을 중지합니다.
    gcloud dataflow jobs list \
      --filter 'NAME:streaming-beam-sql AND STATE=Running' \
      --format 'value(JOB_ID)' \
      --region "$REGION" \
      | xargs gcloud dataflow jobs cancel --region "$REGION"
    
  2. Cloud Storage에서 템플릿 사양 파일을 삭제합니다.
    gsutil rm $TEMPLATE_PATH
    
  3. Container Registry에서 Flex 템플릿 컨테이너 이미지를 삭제합니다.
    gcloud container images delete $TEMPLATE_IMAGE --force-delete-tags
    

Google Cloud 프로젝트 리소스 정리

  1. Cloud Scheduler 작업 삭제합니다.
    gcloud scheduler jobs delete negative-ratings-publisher
    gcloud scheduler jobs delete positive-ratings-publisher
    
  2. Pub/Sub 구독 및 주제를 삭제합니다.
    gcloud pubsub subscriptions delete $SUBSCRIPTION
    gcloud pubsub topics delete $TOPIC
    
  3. BigQuery 테이블을 삭제합니다.
    bq rm -f -t $PROJECT:$DATASET.$TABLE
    
  4. BigQuery 데이터 세트를 삭제하면 요금이 부과되지 않습니다.

    다음 명령어는 데이터 세트의 모든 테이블도 삭제합니다. 테이블과 데이터를 복구할 수 없습니다.

    bq rm -r -f -d $PROJECT:$DATASET
    
  5. Cloud Storage 버킷을 삭제하면 요금만 청구되지 않습니다.

    다음 명령어는 버킷의 모든 객체도 삭제합니다. 이러한 객체는 복구할 수 없습니다.

    gsutil rm -r gs://$BUCKET
    

제한사항

Flex 템플릿 작업에 다음과 같은 제한사항이 적용됩니다.

  • Docker를 사용하여 컨테이너를 패키징하려면 Google에서 제공하는 기본 이미지를 사용해야 합니다.
  • 스트리밍 작업 업데이트가 지원되지 않습니다.
  • FlexRS 사용은 지원되지 않습니다.
  • waitUntilFinish(자바) 및 wait_until_finish(Python)은 지원되지 않습니다.

다음 단계