이 가이드에서는 gcloud
명령줄 도구를 사용하여 커스텀 Docker 이미지로 Dataflow Flex 템플릿 작업을 만들고 실행하는 방법을 보여줍니다. 이 가이드는 Pub/Sub에서 JSON 인코딩된 메시지를 읽고, Beam SQL로 메시지 데이터를 변환하고, 결과를 BigQuery 테이블에 쓰는 스트리밍 파이프라인 예시를 설명합니다.
목표
- Docker 컨테이너 이미지를 빌드합니다.
- Dataflow Flex 템플릿을 만들고 실행합니다.
비용
이 가이드에서는 비용이 청구될 수 있는 다음과 같은 Google Cloud 구성요소를 사용합니다.
- Dataflow
- Pub/Sub
- Cloud Storage
- Cloud Scheduler
- App Engine
- Container Registry
- Cloud Build
- BigQuery
가격 계산기를 사용하여 예상 사용량을 토대로 예상 비용을 산출합니다.
시작하기 전에
-
Google 계정으로
로그인합니다.
아직 계정이 없으면 새 계정을 등록하세요.
-
Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.
-
Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.
- Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, App Engine, Cloud Scheduler, and Cloud Build API를 사용 설정합니다.
-
인증 설정:
-
Cloud Console에서 서비스 계정 키 만들기 페이지로 이동합니다.
서비스 계정 키 만들기 페이지로 이동 - 서비스 계정 목록에서 새 서비스 계정을 선택합니다.
- 서비스 계정 이름 필드에 이름을 입력합니다.
역할 목록에서 프로젝트 > 소유자.
- 만들기를 클릭합니다. 키가 포함된 JSON 파일이 컴퓨터에 다운로드됩니다.
-
-
GOOGLE_APPLICATION_CREDENTIALS
환경 변수를 서비스 계정 키가 포함된 JSON 파일의 경로로 설정합니다. 이 변수는 현재 셸 세션에만 적용되므로, 새 세션을 열 경우, 변수를 다시 설정합니다.
이 가이드를 마치면 만든 리소스를 삭제하여 비용이 계속 청구되지 않도록 할 수 있습니다. 자세한 내용은 삭제를 참조하세요.
소스 및 싱크 예시 만들기
이 섹션에서는 다음을 만드는 방법을 설명합니다.
- Pub/Sub를 사용하여 데이터의 스트리밍 소스
- BigQuery에 데이터를 로드하는 데이터 세트
Cloud Storage 버킷 생성
gsutil mb 명령어를 사용합니다.
export BUCKET="my-storage-bucket" gsutil mb gs://$BUCKET
Pub/Sub 주제 및 해당 주제에 대한 구독 만들기
gcloud
명령줄 도구를 사용합니다.
export TOPIC="messages" export SUBSCRIPTION="ratings" gcloud pubsub topics create $TOPIC gcloud pubsub subscriptions create --topic $TOPIC $SUBSCRIPTION
Cloud Scheduler 작업 만들기
이 단계에서는 gcloud
명령줄 도구를 사용하여 '긍정적인 평점'과 '부정적인 평점'을 게시하는 Cloud Scheduler 작업을 만들고 실행합니다.
- 이 Google Cloud 프로젝트의 Cloud Scheduler 작업을 만듭니다.
gcloud scheduler jobs create pubsub positive-ratings-publisher \ --schedule="* * * * *" \ --topic="$TOPIC" \ --message-body='{"url": "https://beam.apache.org/", "review": "positive"}'
게시자가 분당 1개의 메시지를 게시하는 '긍정적인 평점'을 만들고 실행합니다.
- Cloud Scheduler 작업을 시작합니다.
gcloud scheduler jobs run positive-ratings-publisher
- 2분마다 1개의 메시지를 게시하는 '부정적인 평점'에 대해 다른 유사한 게시자를 작성하고 실행합니다.
gcloud scheduler jobs create pubsub negative-ratings-publisher \ --schedule="*/2 * * * *" \ --topic="$TOPIC" \ --message-body='{"url": "https://beam.apache.org/", "review": "negative"}' gcloud scheduler jobs run negative-ratings-publisher
BigQuery 데이터 세트 만들기
bq mk 명령어를 사용합니다.
export PROJECT="$(gcloud config get-value project)" export DATASET="beam_samples" export TABLE="streaming_beam_sql" bq mk --dataset "$PROJECT:$DATASET"
코드 샘플 다운로드
- 코드 샘플을 다운로드합니다.
자바
java-docs-samples 저장소를 클론하고 이 가이드의 코드 샘플로 이동합니다.
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/dataflow/flex-templates/streaming_beam_sql
Python
python-docs-samples repository를 클론하고 이 가이드의 코드 샘플로 이동합니다.
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/dataflow/flex-templates/streaming_beam
- 이 가이드의
TEMPLATE_IMAGE
를 내보냅니다.export TEMPLATE_IMAGE="gcr.io/$PROJECT/samples/dataflow/streaming-beam-sql:latest"
개발 환경 설정
자바
- Java Development Kit(JDK) 버전 11을 다운로드하고 설치합니다. JAVA_HOME 환경 변수가 설정되어 있고 JDK 설치를 가리키는지 확인합니다.
- 사용 중인 특정 운영체제의 Maven 설치 가이드를 따라 Apache Maven을 다운로드하고 설치합니다.
- (선택사항) 개발을 위해 Apache Beam 파이프라인을 로컬에서 실행합니다.
mvn compile exec:java \ -Dexec.mainClass=org.apache.beam.samples.StreamingBeamSQL \ -Dexec.args="\ --project=$PROJECT \ --inputSubscription=$SUBSCRIPTION \ --outputTable=$PROJECT:$DATASET.$TABLE \ --tempLocation=gs://$BUCKET/samples/dataflow/temp"
- 자바 프로젝트를 Uber JAR 파일에 빌드합니다.
mvn clean package
- (선택사항) Uber JAR 파일의 크기를 원본 파일과 비교합니다.
ls -lh target/*.jar
이 Uber JAR 파일에는 모든 종속 항목이 포함되어 있습니다. 다른 라이브러리에 외부 종속 항목이 없는 독립형 애플리케이션으로 이 파일을 실행할 수 있습니다.
Python
Python용 Apache Beam SDK(pip
포함) 및 Python 버전 2.7, 3.5, 3.6 또는 3.7을 사용합니다. 다음 명령어를 실행하여 Python이 작동 중이고 pip
가 설치되었는지 확인합니다.
python --version python -m pip --version
Python이 없으면 Python 설치 페이지에서 사용 중인 운영체제에 맞는 설치 단계를 확인합니다.
Python만 해당: 컨테이너 이미지 만들기 및 빌드
이 섹션에는 Python 사용자를 위한 단계가 포함되어 있습니다. 자바를 사용하는 경우 다음 단계를 건너뜁니다.
- (선택사항) 기본적으로 Kaniko 캐시 사용을 사용 설정합니다.
gcloud config set builds/use_kaniko True
Kaniko는 컨테이너 빌드 아티팩트를 캐시하므로 이 옵션을 사용하면 후속 빌드 속도가 빨라집니다. - (선택사항) Dockerfile을 만듭니다.
이 가이드에서 Dockerfile을 맞춤설정할 수 있습니다. 시작 파일은 다음과 같습니다.
Python
FROM gcr.io/dataflow-templates-base/python3-template-launcher-base ARG WORKDIR=/dataflow/template RUN mkdir -p ${WORKDIR} WORKDIR ${WORKDIR} # Due to a change in the Apache Beam base image in version 2.24, you must to install # libffi-dev manually as a dependency. For more information: # https://github.com/GoogleCloudPlatform/python-docs-samples/issues/4891 RUN apt-get update && apt-get install -y libffi-dev && rm -rf /var/lib/apt/lists/* COPY requirements.txt . COPY streaming_beam.py . ENV FLEX_TEMPLATE_PYTHON_REQUIREMENTS_FILE="${WORKDIR}/requirements.txt" ENV FLEX_TEMPLATE_PYTHON_PY_FILE="${WORKDIR}/streaming_beam.py" RUN pip install -U -r ./requirements.txt
이 Dockerfile에는
FROM
,ENV
,COPY
명령어가 포함되어 있습니다. 이에 대한 자세한 내용은 Dockerfile 참조를 확인하세요.gcr.io/PROJECT/
로 시작하는 이미지는 다른 Google Cloud 제품에서 액세스할 수 있는 프로젝트의 Container Registry에 저장됩니다. - Cloud Build에서
Dockerfile
을 사용하여 Docker 이미지를 빌드합니다.gcloud builds submit --tag $TEMPLATE_IMAGE .
Flex 템플릿 만들기
템플릿을 실행하려면 SDK 정보 및 메타데이터와 같이 작업을 실행하는 데 필요한 모든 정보가 포함된 템플릿 사양 파일을 Cloud Storage에 만들어야 합니다.
이 예시의 metadata.json 파일에는 name
, description
, 입력 parameters
필드와 같은 템플릿에 대한 추가 정보가 포함되어 있습니다.
- SDK 정보 및 메타데이터와 같이 작업을 실행하는 데 필요한 모든 정보가 포함된 템플릿 사양 파일을 만듭니다.
export TEMPLATE_PATH="gs://$BUCKET/samples/dataflow/templates/streaming-beam-sql.json"
- Flex 템플릿을 빌드합니다.
자바
gcloud dataflow flex-template build $TEMPLATE_PATH \ --image-gcr-path "$TEMPLATE_IMAGE" \ --sdk-language "JAVA" \ --flex-template-base-image JAVA11 \ --metadata-file "metadata.json" \ --jar "target/streaming-beam-sql-1.0.jar" \ --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="org.apache.beam.samples.StreamingBeamSQL"
Python
gcloud dataflow flex-template build $TEMPLATE_PATH \ --image "$TEMPLATE_IMAGE" \ --sdk-language "PYTHON" \ --metadata-file "metadata.json"
지정한 Cloud Storage 위치의 템플릿 파일을 통해 이제 템플릿을 사용할 수 있습니다.
Flex 템플릿 파이프라인 실행
이제 템플릿 파일을 참조하고 파이프라인에 필요한 템플릿 매개변수를 전달하여 Dataflow에서 Apache Beam 파이프라인을 실행할 수 있습니다.
- 템플릿을 실행합니다.
자바
export REGION="us-central1" gcloud dataflow flex-template run "streaming-beam-sql-`date +%Y%m%d-%H%M%S`" \ --template-file-gcs-location "$TEMPLATE_PATH" \ --parameters inputSubscription="$SUBSCRIPTION" \ --parameters outputTable="$PROJECT:$DATASET.$TABLE" \ --region "$REGION"
Python
export REGION="us-central1" gcloud dataflow flex-template run "streaming-beam-`date +%Y%m%d-%H%M%S`" \ --template-file-gcs-location "$TEMPLATE_PATH" \ --parameters input_subscription="$SUBSCRIPTION" \ --parameters output_table="$PROJECT:$DATASET.$TABLE" \ --region "$REGION"
curl -X POST \ "https://dataflow.googleapis.com/v1b3/projects/$PROJECT/locations/us-central1/flexTemplates:launch" \ -H "Content-Type: application/json" \ -H "Authorization: Bearer $(gcloud auth print-access-token)" \ -d '{ "launch_parameter": { "jobName": "streaming-beam-sql-'$(date +%Y%m%d-%H%M%S)'", "parameters": { "inputSubscription": "'$SUBSCRIPTION'", "outputTable": "'$PROJECT:$DATASET.$TABLE'" }, "containerSpecGcsPath": "'$TEMPLATE_PATH'" } }'
명령어를 실행하여 Flex 템플릿을 실행하면 Dataflow는 작업 상태가 대기 중인 작업 ID를 반환합니다. 작업 상태가 실행 중이 되기까지 몇 분 정도 걸릴 수 있으며 작업 그래프에 액세스할 수 있습니다.
- BigQuery에서 다음 쿼리를 실행하여 결과를 확인합니다.
bq query --use_legacy_sql=false 'SELECT * FROM `'"$PROJECT.$DATASET.$TABLE"'`'
이 파이프라인이 실행되는 동안 매분 BigQuery 테이블에 새 행이 추가됩니다.
삭제
이 가이드를 마쳤으면 나중에 요금이 청구되지 않도록 Google Cloud에서 만든 리소스를 삭제합니다. 다음 섹션은 이러한 리소스를 삭제하거나 사용 중지하는 방법을 설명합니다.
Flex 템플릿 리소스 정리
- Dataflow 파이프라인을 중지합니다.
gcloud dataflow jobs list \ --filter 'NAME:streaming-beam-sql AND STATE=Running' \ --format 'value(JOB_ID)' \ --region "$REGION" \ | xargs gcloud dataflow jobs cancel --region "$REGION"
- Cloud Storage에서 템플릿 사양 파일을 삭제합니다.
gsutil rm $TEMPLATE_PATH
- Container Registry에서 Flex 템플릿 컨테이너 이미지를 삭제합니다.
gcloud container images delete $TEMPLATE_IMAGE --force-delete-tags
Google Cloud 프로젝트 리소스 정리
- Cloud Scheduler 작업 삭제합니다.
gcloud scheduler jobs delete negative-ratings-publisher gcloud scheduler jobs delete positive-ratings-publisher
- Pub/Sub 구독 및 주제를 삭제합니다.
gcloud pubsub subscriptions delete $SUBSCRIPTION gcloud pubsub topics delete $TOPIC
- BigQuery 테이블을 삭제합니다.
bq rm -f -t $PROJECT:$DATASET.$TABLE
- BigQuery 데이터 세트를 삭제하면 요금이 부과되지 않습니다.
다음 명령어는 데이터 세트의 모든 테이블도 삭제합니다. 테이블과 데이터를 복구할 수 없습니다.
bq rm -r -f -d $PROJECT:$DATASET
- Cloud Storage 버킷을 삭제하면 요금만 청구되지 않습니다.
다음 명령어는 버킷의 모든 객체도 삭제합니다. 이러한 객체는 복구할 수 없습니다.
gsutil rm -r gs://$BUCKET
제한사항
Flex 템플릿 작업에 다음과 같은 제한사항이 적용됩니다.
- Docker를 사용하여 컨테이너를 패키징하려면 Google에서 제공하는 기본 이미지를 사용해야 합니다.
- 스트리밍 작업 업데이트가 지원되지 않습니다.
- FlexRS 사용은 지원되지 않습니다.
waitUntilFinish
(자바) 및wait_until_finish
(Python)은 지원되지 않습니다.
다음 단계
- 자세한 내용은 Flex 템플릿 구성을 참조하세요.
- 다른 Dataflow 기능을 직접 사용해보세요. 가이드 살펴보기