Flex 템플릿 사용

이 가이드에서는 gcloud 명령줄 도구를 사용하여 커스텀 Docker 이미지로 Dataflow Flex 템플릿 작업을 만들고 실행하는 방법을 보여줍니다. 이 튜토리얼은 Pub/Sub에서 JSON 인코딩된 메시지를 읽고, Beam SQL로 메시지 데이터를 변환하고, 결과를 BigQuery 테이블에 쓰는 스트리밍 파이프라인 예시를 설명합니다.

Flex 템플릿에 대해 자세히 알아보려면 Dataflow 템플릿을 참조하세요.

목표

  • Docker 컨테이너 이미지를 빌드합니다.
  • Dataflow Flex 템플릿을 만들고 실행합니다.

비용

이 가이드에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • Cloud Scheduler
  • App Engine
  • Container Registry
  • Cloud Build
  • BigQuery

가격 계산기를 사용하여 예상 사용량을 토대로 예상 비용을 산출합니다.

시작하기 전에

  1. Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
  2. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기로 이동

  3. Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.

  4. Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, App Engine, Cloud Scheduler, and Cloud Build API를 사용 설정합니다.

    API 사용 설정

  5. 서비스 계정을 만듭니다.

    1. Cloud Console에서 서비스 계정 만들기 페이지로 이동합니다.

      서비스 계정 만들기로 이동
    2. 프로젝트를 선택합니다.
    3. 서비스 계정 이름 필드에 이름을 입력합니다. Cloud Console은 이 이름을 기반으로 서비스 계정 ID 필드를 채웁니다.

      서비스 계정 설명 필드에 설명을 입력합니다. 예를 들면 Service account for quickstart입니다.

    4. 만들고 계속하기를 클릭합니다.
    5. 역할 선택 필드를 클릭합니다.

      빠른 액세스에서 기본을 클릭한 후 소유자를 클릭합니다.

    6. 계속을 클릭합니다.
    7. 완료를 클릭하여 서비스 계정 만들기를 마칩니다.

      브라우저 창을 닫지 마세요. 다음 단계에서 사용합니다.

  6. 서비스 계정 키 만들기

    1. Cloud Console에서 만든 서비스 계정의 이메일 주소를 클릭합니다.
    2. 를 클릭합니다.
    3. 키 추가를 클릭한 후 새 키 만들기를 클릭합니다.
    4. 만들기를 클릭합니다. JSON 키 파일이 컴퓨터에 다운로드됩니다.
    5. 닫기를 클릭합니다.
  7. GOOGLE_APPLICATION_CREDENTIALS 환경 변수를 서비스 계정 키가 포함된 JSON 파일의 경로로 설정합니다. 이 변수는 현재 셸 세션에만 적용되므로, 새 세션을 열 경우, 변수를 다시 설정합니다.

  8. Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.

    프로젝트 선택기로 이동

  9. Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.

  10. Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, App Engine, Cloud Scheduler, and Cloud Build API를 사용 설정합니다.

    API 사용 설정

  11. 서비스 계정을 만듭니다.

    1. Cloud Console에서 서비스 계정 만들기 페이지로 이동합니다.

      서비스 계정 만들기로 이동
    2. 프로젝트를 선택합니다.
    3. 서비스 계정 이름 필드에 이름을 입력합니다. Cloud Console은 이 이름을 기반으로 서비스 계정 ID 필드를 채웁니다.

      서비스 계정 설명 필드에 설명을 입력합니다. 예를 들면 Service account for quickstart입니다.

    4. 만들고 계속하기를 클릭합니다.
    5. 역할 선택 필드를 클릭합니다.

      빠른 액세스에서 기본을 클릭한 후 소유자를 클릭합니다.

    6. 계속을 클릭합니다.
    7. 완료를 클릭하여 서비스 계정 만들기를 마칩니다.

      브라우저 창을 닫지 마세요. 다음 단계에서 사용합니다.

  12. 서비스 계정 키 만들기

    1. Cloud Console에서 만든 서비스 계정의 이메일 주소를 클릭합니다.
    2. 를 클릭합니다.
    3. 키 추가를 클릭한 후 새 키 만들기를 클릭합니다.
    4. 만들기를 클릭합니다. JSON 키 파일이 컴퓨터에 다운로드됩니다.
    5. 닫기를 클릭합니다.
  13. GOOGLE_APPLICATION_CREDENTIALS 환경 변수를 서비스 계정 키가 포함된 JSON 파일의 경로로 설정합니다. 이 변수는 현재 셸 세션에만 적용되므로, 새 세션을 열 경우, 변수를 다시 설정합니다.

이 가이드를 마치면 만든 리소스를 삭제하여 비용이 계속 청구되지 않도록 할 수 있습니다. 자세한 내용은 삭제를 참조하세요.

소스 및 싱크 예시 만들기

이 섹션에서는 다음을 만드는 방법을 설명합니다.

  • Pub/Sub를 사용하여 데이터의 스트리밍 소스
  • BigQuery에 데이터를 로드하는 데이터세트

Cloud Storage 버킷 생성

gsutil mb 명령어를 사용합니다.

export BUCKET="my-storage-bucket"
gsutil mb gs://$BUCKET

Pub/Sub 주제 및 해당 주제에 대한 구독 만들기

gcloud 명령줄 도구를 사용합니다.

export TOPIC="messages"
export SUBSCRIPTION="ratings"

gcloud pubsub topics create $TOPIC
gcloud pubsub subscriptions create --topic $TOPIC $SUBSCRIPTION

Cloud Scheduler 작업 만들기

이 단계에서는 gcloud 명령줄 도구를 사용하여 '긍정적인 평점'과 '부정적인 평점'을 게시하는 Cloud Scheduler 작업을 만들고 실행합니다.

  1. 이 Google Cloud 프로젝트의 Cloud Scheduler 작업을 만듭니다.
    gcloud scheduler jobs create pubsub positive-ratings-publisher \
      --schedule="* * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "positive"}'
    
  2. 게시자가 분당 1개의 메시지를 게시하는 '긍정적인 평점'을 만들고 실행합니다.
  3. Cloud Scheduler 작업을 시작합니다.
    gcloud scheduler jobs run positive-ratings-publisher
    
  4. 2분마다 1개의 메시지를 게시하는 '부정적인 평점'에 대해 다른 유사한 게시자를 작성하고 실행합니다.
    gcloud scheduler jobs create pubsub negative-ratings-publisher \
      --schedule="*/2 * * * *" \
      --topic="$TOPIC" \
      --message-body='{"url": "https://beam.apache.org/", "review": "negative"}'
    
    gcloud scheduler jobs run negative-ratings-publisher
    

BigQuery 데이터 세트 만들기

bq mk 명령어를 사용합니다.

export PROJECT="$(gcloud config get-value project)"
export DATASET="beam_samples"
export TABLE="streaming_beam_sql"

bq mk --dataset "$PROJECT:$DATASET"

코드 샘플 다운로드

  1. 코드 샘플을 다운로드합니다.

    자바

    java-docs-samples 저장소를 클론하고 이 튜토리얼의 코드 샘플로 이동합니다.

      git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git
      cd java-docs-samples/dataflow/flex-templates/streaming_beam_sql

    Python

    python-docs-samples repository를 클론하고 이 튜토리얼의 코드 샘플로 이동합니다.

      git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
      cd python-docs-samples/dataflow/flex-templates/streaming_beam
  2. 이 튜토리얼의 TEMPLATE_IMAGE를 내보냅니다.
    export TEMPLATE_IMAGE="gcr.io/$PROJECT/samples/dataflow/streaming-beam-sql:latest"
    

개발 환경 설정

자바

  1. Java Development Kit(JDK) 버전 11을 다운로드하고 설치합니다. JAVA_HOME 환경 변수가 설정되어 있고 JDK 설치를 가리키는지 확인합니다.
  2. 사용 중인 특정 운영체제의 Maven 설치 가이드를 따라 Apache Maven을 다운로드하고 설치합니다.
  3. (선택사항) 개발을 위해 Apache Beam 파이프라인을 로컬에서 실행합니다.
      mvn compile exec:java \
        -Dexec.mainClass=org.apache.beam.samples.StreamingBeamSql \
        -Dexec.args="\
          --project=$PROJECT \
          --inputSubscription=$SUBSCRIPTION \
          --outputTable=$PROJECT:$DATASET.$TABLE \
          --tempLocation=gs://$BUCKET/samples/dataflow/temp"
  4. 자바 프로젝트를 Uber JAR 파일에 빌드합니다.
      mvn clean package
  5. (선택사항) Uber JAR 파일의 크기를 원본 파일과 비교합니다.
      ls -lh target/*.jar
    이 Uber JAR 파일에는 모든 종속 항목이 포함되어 있습니다. 다른 라이브러리에 외부 종속 항목이 없는 독립형 애플리케이션으로 이 파일을 실행할 수 있습니다.

Python

Python용 Apache Beam SDK를 사용합니다.

Python만 해당: 컨테이너 이미지 만들기 및 빌드

이 섹션에는 Python 사용자를 위한 단계가 포함되어 있습니다. 자바를 사용하는 경우 다음 단계를 건너뜁니다.

작업 실행이 실패하고 A Timeout in polling error message 오류 메시지가 표시되면 문제 해결 단계를 참조하세요.

  1. (선택사항) 기본적으로 Kaniko 캐시 사용을 사용 설정합니다.
    gcloud config set builds/use_kaniko True
    
    Kaniko는 컨테이너 빌드 아티팩트를 캐시하므로 이 옵션을 사용하면 후속 빌드 속도가 빨라집니다.
  2. (선택사항) Dockerfile을 만듭니다. 이 가이드에서 Dockerfile을 맞춤설정할 수 있습니다. 시작 파일은 다음과 같습니다.

    Python

      FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
    
      ARG WORKDIR=/dataflow/template
      RUN mkdir -p ${WORKDIR}
      WORKDIR ${WORKDIR}
    
      COPY requirements.txt .
      COPY streaming_beam.py .
    
      # Do not include `apache-beam` in requirements.txt
      ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
      ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/streaming_beam.py"
    
      # Install apache-beam and other dependencies to launch the pipeline
      RUN pip install apache-beam[gcp]
      RUN pip install -U -r ./requirements.txt

    이 Dockerfile에는 FROM, ENV, COPY 명령어가 포함되어 있습니다. 이에 대한 자세한 내용은 Dockerfile 참조를 확인하세요.

    gcr.io/PROJECT/로 시작하는 이미지는 다른 Google Cloud 제품에서 액세스할 수 있는 프로젝트의 Container Registry에 저장됩니다.
  3. Cloud Build에서 Dockerfile 사용하여 Docker 이미지를 빌드합니다.
    gcloud builds submit --tag $TEMPLATE_IMAGE .
    

메타데이터

템플릿을 실행할 때 커스텀 매개변수를 검증할 수 있도록 추가 메타데이터로 템플릿을 확장할 수 있습니다. 템플릿의 메타데이터를 만들려면 다음 단계를 수행합니다.

  1. 메타데이터 매개변수의 매개변수를 사용하여 metadata.json 파일을 만듭니다.

    예시를 보려면 메타데이터 파일 예시를 참조하세요.

  2. 메타데이터 파일을 Cloud Storage에서 템플릿과 동일한 폴더에 저장합니다.

메타데이터 매개변수

매개변수 키 필수 값 설명
name 템플릿 이름입니다.
description 아니요 템플릿을 설명하는 짧은 텍스트 단락입니다.
parameters 아니요 템플릿에 사용되는 추가 매개변수 배열입니다. 기본적으로 빈 배열이 사용됩니다.
name 템플릿에 사용되는 매개변수 이름입니다.
label Cloud Console에서 매개변수 레이블을 지정하기 위해 사용되는 사람이 읽을 수 있는 문자열입니다.
helpText 매개변수를 설명하는 짧은 텍스트 단락입니다.
isOptional 아니요 매개변수가 필수이면 false이고, 매개변수가 선택사항이면 true입니다. 특정 값으로 설정하지 않으면 isOptional이 기본적으로 false로 설정됩니다. 메타데이터에 이 매개변수 키를 포함하지 않으면 메타데이터가 필수 매개변수가 됩니다.
regexes 아니요 매개변수 값 유효성을 검사하는 데 사용할 문자열 형식의 POSIX-egrep 정규 표현식 배열입니다. 예를 들어 ["^[a-zA-Z][a-zA-Z0-9]+"]는 값이 문자로 시작하고 문자를 한 개 이상 포함하고 있음을 검증하는 단일 정규 표현식입니다. 기본적으로 빈 배열이 사용됩니다.

메타데이터 파일 예

자바

{
  "name": "Streaming Beam SQL",
  "description": "An Apache Beam streaming pipeline that reads JSON encoded messages from Pub/Sub, uses Beam SQL to transform the message data, and writes the results to a BigQuery",
  "parameters": [
    {
      "name": "inputSubscription",
      "label": "Pub/Sub input subscription.",
      "helpText": "Pub/Sub subscription to read from.",
      "regexes": [
        "[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}"
      ]
    },
    {
      "name": "outputTable",
      "label": "BigQuery output table",
      "helpText": "BigQuery table spec to write to, in the form 'project:dataset.table'.",
      "isOptional": true,
      "regexes": [
        "[^:]+:[^.]+[.].+"
      ]
    }
  ]
}

Python

{
  "name": "Streaming beam Python flex template",
  "description": "Streaming beam example for python flex template.",
  "parameters": [
    {
      "name": "input_subscription",
      "label": "Input PubSub subscription.",
      "helpText": "Name of the input PubSub subscription to consume from.",
      "regexes": [
        "projects/[^/]+/subscriptions/[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}"
      ]
    },
    {
      "name": "output_table",
      "label": "BigQuery output table name.",
      "helpText": "Name of the BigQuery output table name.",
      "isOptional": true,
      "regexes": [
        "[^:]+:[^.]+[.].+"
      ]
    }
  ]
}

Dataflow 템플릿 디렉터리에서 Google 제공 템플릿의 메타데이터 파일을 다운로드할 수 있습니다.

Flex 템플릿 만들기

템플릿을 실행하려면 SDK 정보 및 메타데이터와 같이 작업을 실행하는 데 필요한 모든 정보가 포함된 템플릿 사양 파일을 Cloud Storage에 만들어야 합니다.

이 튜토리얼에서는 name, description, parameters 필드와 같은 템플릿에 대한 추가 정보가 포함된 예시 메타데이터 파일을 사용합니다.

  1. SDK 정보 및 메타데이터와 같이 작업을 실행하는 데 필요한 모든 정보가 포함된 템플릿 사양 파일을 만듭니다.
    export TEMPLATE_PATH="gs://$BUCKET/samples/dataflow/templates/streaming-beam-sql.json"
    
  2. Flex 템플릿을 빌드합니다.

    자바

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image-gcr-path "$TEMPLATE_IMAGE" \
          --sdk-language "JAVA" \
          --flex-template-base-image JAVA11 \
          --metadata-file "metadata.json" \
          --jar "target/streaming-beam-sql-1.0.jar" \
          --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.apache.beam.samples.StreamingBeamSql"

    Python

        gcloud dataflow flex-template build $TEMPLATE_PATH \
          --image "$TEMPLATE_IMAGE" \
          --sdk-language "PYTHON" \
          --metadata-file "metadata.json"

지정한 Cloud Storage 위치의 템플릿 파일을 통해 이제 템플릿을 사용할 수 있습니다.

Flex 템플릿 파이프라인 실행

이제 템플릿 파일을 참조하고 파이프라인에 필요한 템플릿 매개변수를 전달하여 Dataflow에서 Apache Beam 파이프라인을 실행할 수 있습니다.

  1. 셸 또는 터미널에서 템플릿을 실행합니다.

    자바

    export REGION="us-central1"
    
    gcloud dataflow flex-template run "streaming-beam-sql-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters inputSubscription="$SUBSCRIPTION" \
        --parameters outputTable="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"

    Python

    export REGION="us-central1"
    
    gcloud dataflow flex-template run "streaming-beam-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "$TEMPLATE_PATH" \
        --parameters input_subscription="projects/$PROJECT/subscriptions/$SUBSCRIPTION" \
        --parameters output_table="$PROJECT:$DATASET.$TABLE" \
        --region "$REGION"
    또는 REST API 요청으로 템플릿을 실행할 수 있습니다.

    자바

    export REGION="us-central1"
    
    curl -X POST \
      "https://dataflow.googleapis.com/v1b3/projects/$PROJECT/locations/$REGION/flexTemplates:launch" \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -d '{
        "launch_parameter": {
          "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'",
          "parameters": {
            "inputSubscription": "'$SUBSCRIPTION'",
            "outputTable": "'$PROJECT:$DATASET.$TABLE'"
          },
          "containerSpecGcsPath": "'$TEMPLATE_PATH'"
        }
      }'

    Python

    export REGION="us-central1"
    
    curl -X POST \
      "https://dataflow.googleapis.com/v1b3/projects/$PROJECT/locations/$REGION/flexTemplates:launch" \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -d '{
        "launch_parameter": {
          "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'",
          "parameters": {
            "input_subscription": "projects/'$PROJECT'/subscriptions/'$SUBSCRIPTION'",
            "output_table": "'$PROJECT:$DATASET.$TABLE'"
          },
          "containerSpecGcsPath": "'$TEMPLATE_PATH'"
        }
      }'
  2. 명령어를 실행하여 Flex 템플릿을 실행하면 Dataflow는 작업 상태가 대기 중인 작업 ID를 반환합니다. 작업 상태가 실행 중이 되기까지 몇 분 정도 걸릴 수 있으며 작업 그래프에 액세스할 수 있습니다.
  3. BigQuery에서 다음 쿼리를 실행하여 결과를 확인합니다.
    bq query --use_legacy_sql=false 'SELECT * FROM `'"$PROJECT.$DATASET.$TABLE"'`'
    
    이 파이프라인이 실행되는 동안 매분 BigQuery 테이블에 새 행이 추가됩니다.

삭제

이 가이드를 마쳤으면 나중에 요금이 청구되지 않도록 Google Cloud에서 만든 리소스를 삭제합니다. 다음 섹션은 이러한 리소스를 삭제하거나 사용 중지하는 방법을 설명합니다.

Flex 템플릿 리소스 정리

  1. Dataflow 파이프라인을 중지합니다.
    gcloud dataflow jobs list \
      --filter 'NAME=streaming-beam-sql AND STATE=Running' \
      --format 'value(JOB_ID)' \
      --region "$REGION" \
      | xargs gcloud dataflow jobs cancel --region "$REGION"
    
  2. Cloud Storage에서 템플릿 사양 파일을 삭제합니다.
    gsutil rm $TEMPLATE_PATH
    
  3. Container Registry에서 Flex 템플릿 컨테이너 이미지를 삭제합니다.
    gcloud container images delete $TEMPLATE_IMAGE --force-delete-tags
    

Google Cloud 프로젝트 리소스 정리

  1. Cloud Scheduler 작업 삭제합니다.
    gcloud scheduler jobs delete negative-ratings-publisher
    gcloud scheduler jobs delete positive-ratings-publisher
    
  2. Pub/Sub 구독 및 주제를 삭제합니다.
    gcloud pubsub subscriptions delete $SUBSCRIPTION
    gcloud pubsub topics delete $TOPIC
    
  3. BigQuery 테이블을 삭제합니다.
    bq rm -f -t $PROJECT:$DATASET.$TABLE
    
  4. BigQuery 데이터세트를 삭제하면 요금이 부과되지 않습니다.

    다음 명령어는 데이터 세트의 모든 테이블도 삭제합니다. 테이블과 데이터를 복구할 수 없습니다.

    bq rm -r -f -d $PROJECT:$DATASET
    
  5. Cloud Storage 버킷을 삭제하면 요금만 청구되지 않습니다.

    다음 명령어는 버킷의 모든 객체도 삭제합니다. 이러한 객체는 복구할 수 없습니다.

    gsutil rm -r gs://$BUCKET
    

제한사항

Flex 템플릿 작업에 다음과 같은 제한사항이 적용됩니다.

  • Docker를 사용하여 컨테이너를 패키징하려면 Google에서 제공하는 기본 이미지를 사용해야 합니다. 적용 가능한 이미지 목록은 Flex 템플릿 기본 이미지를 참조하세요.
  • 파이프라인을 시작하려면 run이 호출된 후 파이프라인을 구성하는 프로그램이 종료되어야 합니다.
  • waitUntilFinish(자바) 및 wait_until_finish(Python)은 지원되지 않습니다.

다음 단계