Flex 템플릿 사용

이 튜토리얼에서는 Google Cloud CLI를 사용하여 커스텀 Docker 이미지로 Dataflow Flex 템플릿 작업을 만들고 실행하는 방법을 보여줍니다. 이 가이드는 Pub/Sub에서 JSON 인코딩된 메시지를 읽고, Beam SQL로 메시지 데이터를 변환하고, 결과를 BigQuery 테이블에 쓰는 스트리밍 파이프라인 예시를 설명합니다. 이 튜토리얼에서는 Artifact Registry에 컨테이너 이미지를 저장합니다. Flex 템플릿에서 비공개 레지스트리에 저장된 사전 빌드된 이미지를 사용할 수도 있습니다.

Flex 템플릿에 대해 자세히 알아보려면 Dataflow 템플릿을 참조하세요.

목표

  • Docker 컨테이너 이미지를 빌드합니다.
  • Dataflow Flex 템플릿을 만들고 실행합니다.

비용

이 튜토리얼에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.

  • Dataflow
  • Pub/Sub
  • Cloud Storage
  • Cloud Scheduler
  • App Engine
  • Artifact Registry
  • Cloud Build
  • BigQuery

가격 계산기를 사용하여 예상 사용량을 토대로 예상 비용을 산출합니다.

시작하기 전에

이 섹션에서는 API를 사용 설정하고, 서비스 계정을 만들고, 서비스 계정에 소유자 역할을 부여하는 방법을 설명합니다. 프로덕션 환경에서는 소유자 역할을 부여하지 마세요. 대신 적절한 Dataflow 특정 권한 및 역할을 사용합니다. 자세한 내용은 Flex 템플릿 권한 이해를 참조하세요

  1. Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
  2. Google Cloud CLI를 설치합니다.
  3. gcloud CLI를 초기화하려면 다음 명령어를 실행합니다.

    gcloud init
  4. Google Cloud 프로젝트를 만들거나 선택합니다.

    • Google Cloud 프로젝트를 만듭니다.

      gcloud projects create PROJECT_ID

      PROJECT_ID를 만들려는 Google Cloud 프로젝트의 이름으로 바꿉니다.

    • 만든 Google Cloud 프로젝트를 선택합니다.

      gcloud config set project PROJECT_ID

      PROJECT_ID를 Google Cloud 프로젝트 이름으로 바꿉니다.

  5. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

  6. Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, App Engine, Artifact Registry, Cloud Scheduler, and Cloud Build API를 사용 설정합니다.

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub cloudresourcemanager.googleapis.com appengine.googleapis.com artifactregistry.googleapis.com cloudscheduler.googleapis.com cloudbuild.googleapis.com
  7. Google 계정의 로컬 인증 사용자 인증 정보를 만듭니다.

    gcloud auth application-default login
  8. Google 계정에 역할을 부여합니다. 다음 각 IAM 역할에 대해 다음 명령어를 한 번씩 실행합니다. roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID를 프로젝트 ID로 바꿉니다.
    • EMAIL_ADDRESS를 이메일 주소로 바꿉니다.
    • ROLE을 각 개별 역할로 바꿉니다.
  9. Google Cloud CLI를 설치합니다.
  10. gcloud CLI를 초기화하려면 다음 명령어를 실행합니다.

    gcloud init
  11. Google Cloud 프로젝트를 만들거나 선택합니다.

    • Google Cloud 프로젝트를 만듭니다.

      gcloud projects create PROJECT_ID

      PROJECT_ID를 만들려는 Google Cloud 프로젝트의 이름으로 바꿉니다.

    • 만든 Google Cloud 프로젝트를 선택합니다.

      gcloud config set project PROJECT_ID

      PROJECT_ID를 Google Cloud 프로젝트 이름으로 바꿉니다.

  12. Google Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다.

  13. Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, App Engine, Artifact Registry, Cloud Scheduler, and Cloud Build API를 사용 설정합니다.

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub cloudresourcemanager.googleapis.com appengine.googleapis.com artifactregistry.googleapis.com cloudscheduler.googleapis.com cloudbuild.googleapis.com
  14. Google 계정의 로컬 인증 사용자 인증 정보를 만듭니다.

    gcloud auth application-default login
  15. Google 계정에 역할을 부여합니다. 다음 각 IAM 역할에 대해 다음 명령어를 한 번씩 실행합니다. roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID를 프로젝트 ID로 바꿉니다.
    • EMAIL_ADDRESS를 이메일 주소로 바꿉니다.
    • ROLE을 각 개별 역할로 바꿉니다.
  16. Compute Engine 기본 서비스 계정에 역할을 부여합니다. 다음 IAM 역할마다 다음 명령어를 1회 실행합니다. roles/dataflow.admin, roles/dataflow.worker, roles/bigquery.dataEditor, roles/pubsub.editor, roles/storage.objectAdmin, roles/artifactregistry.reader.

    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • PROJECT_ID를 프로젝트 ID로 바꿉니다.
    • 여기에서 PROJECT_NUMBER를 프로젝트 번호로 바꿉니다. 프로젝트 번호를 찾으려면 프로젝트 식별을 참조하세요.
    • SERVICE_ACCOUNT_ROLE을 각 개별 역할로 바꿉니다.

이 튜토리얼을 마치면 만든 리소스를 삭제하여 비용이 계속 청구되지 않도록 할 수 있습니다. 자세한 내용은 삭제를 참조하세요.

예시 소스 및 싱크 만들기

이 섹션에서는 다음을 만드는 방법을 설명합니다.

  • Pub/Sub를 사용하여 데이터의 스트리밍 소스
  • BigQuery에 데이터를 로드하는 데이터 세트

Cloud Storage 버킷 생성

gsutil mb 명령어를 사용합니다.

gsutil mb gs://BUCKET_NAME

BUCKET_NAME버킷 이름 지정 요구사항을 충족하는 Cloud Storage 버킷의 이름으로 바꿉니다. Cloud Storage 버킷 이름은 전역에서 고유해야 합니다.

Pub/Sub 주제 및 해당 주제에 대한 구독 만들기

Google Cloud CLI를 사용합니다.

gcloud pubsub topics create TOPIC_ID
gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID
  • TOPIC_ID를 Pub/Sub 주제의 이름으로 바꿉니다.
  • SUBSCRIPTION_ID를 Pub/Sub 구독 이름으로 바꿉니다.

Cloud Scheduler 작업 만들기

이 단계에서는 Google Cloud CLI를 사용하여 '긍정적인 평점'과 '부정적인 평점'을 게시하는 Cloud Scheduler 작업을 만들고 실행합니다.

  1. 이 Google Cloud 프로젝트의 Cloud Scheduler 작업을 만듭니다.
         gcloud scheduler jobs create pubsub positive-ratings-publisher \
           --schedule="* * * * *" \
           --location=REGION \
           --topic="TOPIC_ID" \
           --message-body='{"url": "https://beam.apache.org/", "review": "positive"}'
  2. REGION을 Dataflow 작업 배포에 사용되는 리전 엔드포인트로 바꿉니다. REGION 변수 값은 유효한 리전 이름이어야 합니다. 리전과 위치에 대한 자세한 내용은 Dataflow 위치를 참조하세요. 게시자가 분당 메시지 1개를 게시하는 '긍정적인 평점'을 만들고 실행합니다.
  3. Cloud Scheduler 작업을 시작합니다.
    gcloud scheduler jobs run --location=REGION positive-ratings-publisher
    
  4. 2분마다 1개의 메시지를 게시하는 '부정적인 평점'에 대해 다른 유사한 게시자를 작성하고 실행합니다.
    gcloud scheduler jobs create pubsub negative-ratings-publisher \
      --schedule="*/2 * * * *" \
      --location=REGION  \
      --topic="TOPIC_ID" \
      --message-body='{"url": "https://beam.apache.org/", "review": "negative"}'
    
    gcloud scheduler jobs run --location=REGION negative-ratings-publisher
    

BigQuery 데이터세트 만들기

bq mk 명령어를 사용합니다.

  bq --location=REGION mk \
  PROJECT_ID:DATASET_ID
  • PROJECT_ID를 해당 프로젝트의 프로젝트 ID로 바꿉니다.
  • DATASET_ID를 데이터 세트의 이름으로 바꿉니다. 데이터 세트 이름 지정에 대한 자세한 내용은 데이터 세트 만들기의 데이터 세트 이름 지정을 참조하세요.

코드 샘플 다운로드 및 디렉터리 변경

코드 샘플을 다운로드한 후 디렉터리를 변경합니다.

자바

java-docs-samples 저장소를 클론합니다.

  git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git

이 튜토리얼의 코드 샘플로 이동합니다.

  cd java-docs-samples/dataflow/flex-templates/streaming_beam_sql

Python

python-docs-samples 저장소를 클론합니다.

  git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git

이 튜토리얼의 코드 샘플로 이동합니다.

  cd python-docs-samples/dataflow/flex-templates/streaming_beam

개발 환경 설정

자바

  1. Java Development Kit(JDK) 버전 11을 다운로드하고 설치합니다. JAVA_HOME 환경 변수가 설정되어 있고 JDK 설치를 가리키는지 확인합니다.
  2. 사용 중인 특정 운영체제의 Maven 설치 가이드를 따라 Apache Maven을 다운로드하고 설치합니다.
  3. (선택사항) 개발을 위해 Apache Beam 파이프라인을 로컬에서 실행합니다.
      mvn compile exec:java \
        -Dexec.mainClass=org.apache.beam.samples.StreamingBeamSql \
        -Dexec.args="\
          --project=PROJECT_ID \
          --inputSubscription=SUBSCRIPTION_ID \
          --outputTable=PROJECT_ID:DATASET_ID.TABLE_ID \
          --tempLocation=gs://BUCKET_NAME/samples/dataflow/temp"

    TABLE_ID를 테이블의 이름으로 바꿉니다.

  4. 자바 프로젝트를 Uber JAR 파일에 빌드합니다.
      mvn clean package
  5. (선택사항) Uber JAR 파일의 크기를 원본 파일과 비교합니다.
      ls -lh target/*.jar
    이 Uber JAR 파일에는 모든 종속 항목이 포함되어 있습니다. 다른 라이브러리에 외부 종속 항목이 없는 독립형 애플리케이션으로 이 파일을 실행할 수 있습니다.

Python

Python용 Apache Beam SDK를 사용합니다.

컨테이너 이미지 만들기 및 빌드

  1. 아티팩트를 업로드하려면 먼저 Artifact Registry 저장소를 만들어야 합니다. 저장소마다 지원되는 단일 형식의 아티팩트가 포함될 수 있습니다.

    모든 저장소 콘텐츠는 Google 관리 또는 고객 관리 암호화 키를 통해 암호화됩니다. Artifact Registry는 기본적으로 Google 관리 암호화 키를 사용하며 이 옵션을 구성할 필요가 없습니다.

    저장소에 최소한 Artifact Registry 작성자 액세스 권한이 있어야 합니다.

    다음 명령어를 실행하여 새 저장소를 만듭니다. 이 명령어는 진행 중인 작업이 완료될 때까지 기다리지 않고 --async 플래그를 사용하여 즉시 반환합니다.

    gcloud artifacts repositories create REPOSITORY \
        --repository-format=docker \
        --location=REGION \
        --async 

    REPOSITORY를 저장소 이름으로 바꿉니다. 프로젝트의 저장소 위치마다 저장소 이름이 고유해야 합니다.

  2. (선택사항) Dockerfile을 만듭니다.

    자바

    Dockerfile을 사용하여 빌드된 Flex 템플릿 컨테이너는 작업 그래프를 만들고 Dataflow 작업을 시작하기 위해서만 사용됩니다. Flex 템플릿 컨테이너에 설치된 패키지는 Beam 컨테이너에서 사용할 수 없습니다.

    패키지를 Beam 컨테이너에 포함하기 위해서는 패키지를 requirements.txt 파일의 일부로 지정해야 합니다. requirements.txt 파일의 일부로 apache-beam을 지정하지 마세요. Beam 컨테이너에는 이미 apache-beam이 있습니다.

    이 튜토리얼에서 Dockerfile을 맞춤설정할 수 있습니다. 시작 파일은 다음과 같습니다.

      FROM gcr.io/dataflow-templates-base/java11-template-launcher-base
    
      ARG WORKDIR=/dataflow/template
      RUN mkdir -p ${WORKDIR}
      RUN mkdir -p ${WORKDIR}/src/main/java/org/apache/beam/samples
      WORKDIR ${WORKDIR}
    
      COPY pom.xml .
      COPY StreamingBeamSql.java  /dataflow/template/src/main/java/org/apache/beam/samples/
    
      ENV FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.apache.beam.samples.StreamingBeamSql"
      ENV FLEX_TEMPLATE_JAVA_CLASSPATH="${WORKDIR}/src/main/java/org/apache/beam/samples/StreamingBeamSql.java"
    
      ENTRYPOINT ["/opt/google/dataflow/java_template_launcher"]

    이 Dockerfile에는 FROM, ENV, COPY 명령어가 포함되어 있습니다. 이에 대한 자세한 내용은 Dockerfile 참조를 확인하세요.

    Python

    Dockerfile의 일부로 apache-beam을 설치하여 작업 그래프를 생성해야 합니다. Dockerfile을 사용하여 빌드된 Flex 템플릿 컨테이너는 작업 그래프를 만들고 Dataflow 작업을 시작하기 위해서만 사용됩니다. Flex 템플릿 컨테이너에 설치된 패키지는 Beam 컨테이너에서 사용할 수 없습니다.

    패키지를 Beam 컨테이너에 포함하기 위해서는 requirements.txt 파일의 일부로 지정해야 합니다. requirements.txt 파일의 일부로 apache-beam을 지정하지 않아야 합니다. Beam 컨테이너에는 이미 apache-beam이 있습니다. 예시 파일을 보려면 python-docs-samples 저장소에서 requirements-test.txt 파일을 확인합니다.

    Flex 템플릿 기본 이미지 python3-template-launcher-base는 Python 3.7을 기반으로 합니다. Python 3.8의 경우 python38-template-launcher-base , Python 3.9의 경우 python39-template-launcher-base와 같은 다른 기본 이미지를 사용할 수 있습니다.

    이 튜토리얼에서 Dockerfile을 맞춤설정할 수 있습니다. 시작 파일은 다음과 같습니다.

      FROM gcr.io/dataflow-templates-base/python3-template-launcher-base
    
      ARG WORKDIR=/dataflow/template
      RUN mkdir -p ${WORKDIR}
      WORKDIR ${WORKDIR}
    
      COPY requirements.txt .
      COPY streaming_beam.py .
    
      # Do not include `apache-beam` in requirements.txt
      ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt"
      ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/streaming_beam.py"
    
      # Install apache-beam and other dependencies to launch the pipeline
      RUN pip install apache-beam[gcp]
      RUN pip install -U -r ./requirements.txt

    이 Dockerfile에는 FROM, ENV, COPY 명령어가 포함되어 있습니다. 이에 대한 자세한 내용은 Dockerfile 참조를 확인하세요.

  3. 이미지를 내보내거나 가져오려면 먼저 Artifact Registry 요청을 인증하도록 Docker를 구성해야 합니다. Docker 저장소에 인증을 설정하려면 다음 명령어를 실행합니다.
    gcloud auth configure-docker REGION-docker.pkg.dev

    이 명령어는 Docker 구성을 업데이트합니다. 이제 Google Cloud 프로젝트의 Artifact Registry와 연결하여 이미지를 내보낼 수 있습니다.

  4. Cloud Build에서 Dockerfile 사용하여 Docker 이미지를 빌드합니다.

    gcloud builds submit --tag REGION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/streaming-beam-sql:latest .
    

    이 명령어는 파일을 빌드하여 Artifact Registry 저장소에 푸시합니다.

메타데이터

템플릿을 실행할 때 커스텀 매개변수를 검증할 수 있도록 추가 메타데이터로 템플릿을 확장할 수 있습니다. 템플릿의 메타데이터를 만들려면 다음 단계를 수행합니다.

  1. 메타데이터 매개변수의 매개변수를 사용하여 metadata.json 파일을 만듭니다.

    예시를 보려면 메타데이터 파일 예시를 참조하세요.

  2. 메타데이터 파일을 Cloud Storage에서 템플릿과 동일한 폴더에 저장합니다.

메타데이터 매개변수

매개변수 키 필수 값 설명
name 템플릿 이름입니다.
description 아니요 템플릿을 설명하는 짧은 텍스트 단락입니다.
parameters 아니요 템플릿에 사용되는 추가 매개변수 배열입니다. 기본적으로 빈 배열이 사용됩니다.
name 템플릿에 사용되는 매개변수 이름입니다.
label Google Cloud 콘솔에서 매개변수 라벨을 지정하기 위해 사용되는 사람이 읽을 수 있는 문자열입니다.
helpText 매개변수를 설명하는 짧은 텍스트 단락입니다.
isOptional 아니요 매개변수가 필수이면 false이고, 매개변수가 선택사항이면 true입니다. 특정 값으로 설정하지 않으면 isOptional이 기본적으로 false로 설정됩니다. 메타데이터에 이 매개변수 키를 포함하지 않으면 메타데이터가 필수 매개변수가 됩니다.
regexes 아니요 매개변수 값 유효성을 검사하는 데 사용할 문자열 형식의 POSIX-egrep 정규 표현식 배열입니다. 예를 들어 ["^[a-zA-Z][a-zA-Z0-9]+"]는 값이 문자로 시작하고 문자를 한 개 이상 포함하고 있음을 검증하는 단일 정규 표현식입니다. 기본적으로 빈 배열이 사용됩니다.

메타데이터 파일 예

자바

{
  "name": "Streaming Beam SQL",
  "description": "An Apache Beam streaming pipeline that reads JSON encoded messages from Pub/Sub, uses Beam SQL to transform the message data, and writes the results to a BigQuery",
  "parameters": [
    {
      "name": "inputSubscription",
      "label": "Pub/Sub input subscription.",
      "helpText": "Pub/Sub subscription to read from.",
      "regexes": [
        "[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}"
      ]
    },
    {
      "name": "outputTable",
      "label": "BigQuery output table",
      "helpText": "BigQuery table spec to write to, in the form 'project:dataset.table'.",
      "isOptional": true,
      "regexes": [
        "[^:]+:[^.]+[.].+"
      ]
    }
  ]
}

Python

{
  "name": "Streaming beam Python flex template",
  "description": "Streaming beam example for python flex template.",
  "parameters": [
    {
      "name": "input_subscription",
      "label": "Input PubSub subscription.",
      "helpText": "Name of the input PubSub subscription to consume from.",
      "regexes": [
        "projects/[^/]+/subscriptions/[a-zA-Z][-_.~+%a-zA-Z0-9]{2,}"
      ]
    },
    {
      "name": "output_table",
      "label": "BigQuery output table name.",
      "helpText": "Name of the BigQuery output table name.",
      "isOptional": true,
      "regexes": [
        "([^:]+:)?[^.]+[.].+"
      ]
    }
  ]
}

Dataflow 템플릿 디렉터리에서 Google 제공 템플릿의 메타데이터 파일을 다운로드할 수 있습니다.

Flex 템플릿 만들기

템플릿을 실행하려면 SDK 정보 및 메타데이터와 같이 작업을 실행하는 데 필요한 모든 정보가 포함된 템플릿 사양 파일을 Cloud Storage 버킷에 만들어야 합니다.

이 튜토리얼에서는 name, description, parameters 필드와 같은 템플릿에 대한 추가 정보가 포함된 예시 메타데이터 파일을 사용합니다.

gcloud dataflow flex-template build 명령어를 사용하여 Cloud Storage 버킷에 streaming-beam-sql.json이라는 Flex 템플릿을 만듭니다. 이 명령어에서 파일 이름과 스토리지 위치를 포함한 파일 매개변수를 지정합니다.

자바

  gcloud dataflow flex-template build gs://BUCKET_NAME/samples/dataflow/templates/streaming-beam-sql.json \
      --image-gcr-path "REGION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/streaming-beam-sql:latest" \
      --sdk-language "JAVA" \
      --flex-template-base-image JAVA11 \
      --metadata-file "metadata.json" \
      --jar "target/streaming-beam-sql-1.0.jar" \
      --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.apache.beam.samples.StreamingBeamSql"

Python

  gcloud dataflow flex-template build gs://BUCKET_NAME/samples/dataflow/templates/streaming-beam-sql.json \
     --image "REGION-docker.pkg.dev/PROJECT_ID/REPOSITORY/dataflow/streaming-beam-sql:latest" \
     --sdk-language "PYTHON" \
     --metadata-file "metadata.json"
지정한 Cloud Storage 위치의 템플릿 파일을 통해 이제 템플릿을 사용할 수 있습니다.

Flex 템플릿 파이프라인 실행

이제 템플릿 파일을 참조하고 파이프라인에 필요한 템플릿 매개변수를 전달하여 Dataflow에서 Apache Beam 파이프라인을 실행할 수 있습니다.
  1. 셸 또는 터미널에서 템플릿을 실행합니다.

    자바

    gcloud dataflow flex-template run "streaming-beam-sql-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "gs://BUCKET_NAME/samples/dataflow/templates/streaming-beam-sql.json" \
        --parameters inputSubscription="SUBSCRIPTION_ID" \
        --parameters outputTable="PROJECT_ID:DATASET_ID.TABLE_ID" \
        --region "REGION"

    Python

    gcloud dataflow flex-template run "streaming-beam-`date +%Y%m%d-%H%M%S`" \
        --template-file-gcs-location "gs://BUCKET_NAME/samples/dataflow/templates/streaming-beam-sql.json" \
        --parameters input_subscription="projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID" \
        --parameters output_table="PROJECT_ID:DATASET_ID.TABLE_ID" \
        --region "REGION"

    TABLE_ID를 테이블의 이름으로 바꿉니다.

    또는 다음과 같이 템플릿을 실행할 수 있습니다.

    REST API 요청:

    자바

    export REGION="us-central1"
    
    curl -X POST \
      "https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch" \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -d '{
        "launch_parameter": {
          "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'",
          "parameters": {
            "inputSubscription": "'SUBSCRIPTION_ID'",
            "outputTable": "'PROJECT_ID:DATASET_ID.TABLE_ID'"
          },
          "containerSpecGcsPath": "'gs://BUCKET_NAME/samples/dataflow/templates/streaming-beam-sql.json'"
        }
      }'

    Python

    export REGION="us-central1"
    
    curl -X POST \
      "https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/flexTemplates:launch" \
      -H "Content-Type: application/json" \
      -H "Authorization: Bearer $(gcloud auth print-access-token)" \
      -d '{
        "launch_parameter": {
          "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'",
          "parameters": {
            "input_subscription": "projects/'PROJECT_ID'/subscriptions/'SUBSCRIPTION_ID'",
            "output_table": "'PROJECT_ID:DATASET_ID.TABLE_ID'"
          },
          "containerSpecGcsPath": "'gs://BUCKET_NAME/samples/dataflow/templates/streaming-beam-sql.json'"
        }
      }'
  2. 명령어를 실행하여 Flex 템플릿을 실행하면 Dataflow는 작업 상태가 대기 중인 작업 ID를 반환합니다. 작업 상태가 실행 중이 되기까지 몇 분 정도 걸릴 수 있으며 작업 그래프에 액세스할 수 있습니다.
  3. BigQuery에서 다음 쿼리를 실행하여 결과를 확인합니다.
    bq query --use_legacy_sql=false 'SELECT * FROM `'"PROJECT_ID.DATASET_ID.TABLE_ID"'`'
    
    이 파이프라인이 실행되는 동안 매분 BigQuery 테이블에 새 행이 추가됩니다.

삭제

이 가이드를 마쳤으면 나중에 요금이 청구되지 않도록 Google Cloud에서 만든 리소스를 삭제합니다. 다음 섹션은 이러한 리소스를 삭제하거나 사용 중지하는 방법을 설명합니다.

Flex 템플릿 리소스 삭제

  1. Dataflow 파이프라인을 중지합니다.
    gcloud dataflow jobs list \
      --filter 'NAME=streaming-beam-sql AND STATE=Running' \
      --format 'value(JOB_ID)' \
      --region "REGION" \
      | xargs gcloud dataflow jobs cancel --region "REGION"
  2. Cloud Storage에서 템플릿 사양 파일을 삭제합니다.
    gsutil rm gs://BUCKET_NAME/samples/dataflow/templates/streaming-beam-sql.json
  3. Artifact Registry 저장소를 삭제합니다.
    gcloud artifacts repositories delete REPOSITORY \
    --location=REGION --async

Google Cloud 프로젝트 리소스 삭제

  1. Cloud Scheduler 작업 삭제합니다.
    gcloud scheduler jobs delete negative-ratings-publisher --location=REGION
    
    gcloud scheduler jobs delete positive-ratings-publisher --location=REGION
    
  2. Pub/Sub 구독 및 주제를 삭제합니다.
    gcloud pubsub subscriptions delete SUBSCRIPTION_ID
    gcloud pubsub topics delete TOPIC_ID
    
  3. BigQuery 테이블을 삭제합니다.
    bq rm -f -t PROJECT_ID:DATASET_ID.TABLE_ID
    
  4. BigQuery 데이터 세트를 삭제하면 요금이 부과되지 않습니다.
    bq rm -r -f -d PROJECT_ID:DATASET_ID
    
  5. Cloud Storage 버킷을 삭제하면 요금만 청구되지 않습니다.

사용자 인증 정보 취소

  1. 선택사항: 만든 사용자 인증 정보를 취소하고 로컬 사용자 인증 정보 파일을 삭제합니다.

    gcloud auth application-default revoke
  2. 선택사항: gcloud CLI에서 사용자 인증 정보를 취소합니다.

    gcloud auth revoke
  3. Compute Engine 기본 서비스 계정에 부여한 역할을 취소합니다.

    다음 IAM 역할마다 다음 명령어를 1회 실행합니다.

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/bigquery.dataEditor
    • roles/pubsub.editor
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
      --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
      --role=SERVICE_ACCOUNT_ROLE

제한사항

Flex 템플릿 작업에 다음과 같은 제한사항이 적용됩니다.

  • Docker를 사용하여 컨테이너를 패키징하려면 Google에서 제공하는 기본 이미지를 사용해야 합니다. 적용 가능한 이미지 목록은 Flex 템플릿 기본 이미지를 참조하세요.
  • 파이프라인을 시작하려면 run이 호출된 후 파이프라인을 구성하는 프로그램이 종료되어야 합니다.
  • waitUntilFinish(자바) 및 wait_until_finish(Python)은 지원되지 않습니다.

다음 단계