Dataflow를 사용하여 Pub/Sub Lite 메시지 스트리밍
자체 데이터 처리 프로그램을 작성하고 실행하는 대신 Apache Beam용 Pub/Sub 라이트 I/O 커넥터와 함께 Dataflow를 사용할 수 있습니다. Dataflow는 신뢰성과 표현 능력은 그대로 유지하면서 스트리밍(실시간) 및 일괄 모드에서 데이터를 변환하고 강화하는 완전 관리형 서비스입니다. 확장 가능한 강력한 스테이트풀(Stateful) 처리 추상화 세트와 다른 스트리밍 및 일괄 시스템에 대한 I/O 커넥터가 있는 Apache Beam SDK를 사용하여 개발된 프로그램을 안정적으로 실행합니다.
이 빠른 시작에서는 다음과 같은 작업을 수행하는 Apache Beam 파이프라인을 작성하는 방법을 보여줍니다.
- Pub/Sub 라이트에서 메시지 읽기
- 게시 타임스탬프를 기준으로 메시지 기간 설정 또는 그룹화
- Cloud Storage에 메시지 쓰기
또한 다음을 수행하는 방법도 보여줍니다.
- Dataflow에서 실행할 파이프라인 제출
- 파이프라인에서 Dataflow Flex 템플릿 만들기
이 튜토리얼에는 Maven이 필요하지만 프로젝트 예시를 Maven에서 Gradle로 변환할 수도 있습니다. 자세한 내용은 선택사항: Maven에서 Gradle로 변환을 참조하세요.
시작하기 전에
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
- Install the Google Cloud CLI.
To initialize the gcloud CLI, run the following command:
gcloud init
Create or select a Google Cloud project.
Create a Google Cloud project:
gcloud projects create PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
with your Google Cloud project name.
Make sure that billing is enabled for your Google Cloud project.
Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:
gcloud services enable -
Set up authentication:
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="" --role=ROLE
Replace the following:
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
- Install the Google Cloud CLI.
To initialize the gcloud CLI, run the following command:
gcloud init
Create or select a Google Cloud project.
Create a Google Cloud project:
gcloud projects create PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
with your Google Cloud project name.
Make sure that billing is enabled for your Google Cloud project.
Enable the Pub/Sub Lite, Dataflow, Google Cloud Storage JSON API, and Cloud Logging APIs:
gcloud services enable -
Set up authentication:
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsublite.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="" --role=ROLE
Replace the following:
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
Create local authentication credentials for your user account:
gcloud auth application-default login
Pub/Sub Lite 프로젝트 설정
Cloud Storage 버킷, 프로젝트, Dataflow 리전의 변수를 만듭니다. Cloud Storage 버킷 이름은 전역에서 고유해야 합니다. Dataflow 리전은 작업을 실행할 수 있는 유효한 리전이어야 합니다. 리전과 위치에 대한 자세한 내용은 Dataflow 위치를 참조하세요.
export PROJECT_ID=$(gcloud config get-value project)
이 프로젝트가 소유한 Cloud Storage 버킷을 만듭니다.
gcloud storage buckets create gs://$BUCKET
Pub/Sub 라이트 영역별 라이트 주제 및 구독 만들기
영역별 라이트 Pub/Sub 라이트 주제 및 라이트 구독을 만듭니다.
Lite 위치에 지원되는 Pub/Sub Lite 위치를 선택합니다. 리전의 영역도 지정해야 합니다. 예를 들면 us-central1-a
gcloud pubsub lite-topics create $TOPIC \ --location=$LITE_LOCATION \ --partitions=1 \ --per-partition-bytes=30GiB
gcloud pubsub lite-subscriptions create $SUBSCRIPTION \ --location=$LITE_LOCATION \ --topic=$TOPIC \ --starting-offset=beginning
Dataflow로 메시지 스트리밍
빠른 시작 샘플 코드 다운로드
빠른 시작 저장소를 클론하고 샘플 코드 디렉터리로 이동합니다.
git clone
cd java-docs-samples/pubsublite/streaming-analytics
샘플 코드
이 샘플 코드에서는 Dataflow를 사용하여 다음을 수행합니다.
- Pub/Sub 라이트 구독의 메시지를 제한되지 않은 소스로 읽습니다.
- 고정 시간 기간 및 기본 트리거를 사용하여 게시 타임스탬프를 기준으로 메시지를 그룹화합니다.
그룹화된 메시지를 Cloud Storage의 파일에 작성합니다.
이 샘플을 실행하기 전에 Pub/Sub 라이트 클라이언트 라이브러리의 자바 설정 안내를 따르세요.
Dataflow 파이프라인 시작
Dataflow에서 파이프라인을 시작하려면 다음 명령어를 실행하세요.
mvn compile exec:java \
-Dexec.mainClass=examples.PubsubliteToGcs \
-Dexec.args=" \
--subscription=projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION \
--output=gs://$BUCKET/samples/output \
--windowSize=1 \
--project=$PROJECT_ID \
--tempLocation=gs://$BUCKET/temp \
--runner=DataflowRunner \
앞의 명령어는 Dataflow 작업을 시작합니다. 콘솔 출력의 링크를 따라 Dataflow 모니터링 콘솔에서 작업에 액세스합니다.
작업 진행률 관찰
Dataflow 콘솔에서 작업의 진행률을 관찰합니다.
작업 세부정보 보기를 열어 다음을 확인합니다.
- 작업 그래프
- 실행 세부정보
- 작업 측정항목
라이트 주제에 일부 메시지를 게시합니다.
gcloud pubsub lite-topics publish $TOPIC \
--location=$LITE_LOCATION \
--message="Hello World!"
작업자 로그에서 메시지를 보려면 몇 분 정도 기다려야 할 수 있습니다.
아래 명령어를 사용하여 Cloud Storage에 기록된 파일을 확인합니다.
gcloud storage ls "gs://$BUCKET/samples/"
다음과 유사하게 출력됩니다.
아래 명령어를 사용하여 파일의 콘텐츠를 확인합니다.
gcloud storage cat "gs://$BUCKET/samples/your-filename"
선택사항: Dataflow 템플릿 만들기
파이프라인을 기반으로 커스텀 Dataflow Flex 템플릿을 만들 수 있습니다. Dataflow 템플릿을 사용하면 전체 Java 개발 환경을 설정하지 않고도 Google Cloud 콘솔 또는 명령줄에서 다른 입력 매개변수를 사용하여 작업을 실행할 수 있습니다.
파이프라인의 모든 종속 항목이 포함된 팻 JAR을 만듭니다. 명령어가 실행되면
이 표시됩니다.mvn clean package -DskipTests=true
템플릿 파일 및 템플릿 컨테이너 이미지의 이름과 위치를 제공합니다.
export TEMPLATE_PATH="gs://$BUCKET/samples/your-template-file.json"
export TEMPLATE_IMAGE="$PROJECT_ID/your-template-image:latest"
커스텀 Flex 템플릿을 빌드합니다. 작업을 실행하는 데 필요한 사양이 포함된 필수
파일이 예시와 함께 제공됩니다.gcloud dataflow flex-template build $TEMPLATE_PATH \ --image-gcr-path $TEMPLATE_IMAGE \ --sdk-language "JAVA" \ --flex-template-base-image "JAVA11" \ --metadata-file "metadata.json" \ --jar "target/pubsublite-streaming-bundled-1.0.jar" \ --env FLEX_TEMPLATE_JAVA_MAIN_CLASS="examples.PubsubliteToGcs"
커스텀 Flex 템플릿을 사용하여 작업을 실행합니다.
작업 이름을 입력합니다.
Dataflow 리전을 입력합니다.
커스텀 템플릿을 선택합니다.
템플릿 경로를 입력합니다.
필수 매개변수를 입력합니다.
작업 실행을 클릭합니다.
gcloud dataflow flex-template run "pubsublite-to-gcs-`date +%Y%m%d`" \
--template-file-gcs-location $TEMPLATE_PATH \
--parameters subscription="projects/$PROJECT_ID/locations/$LITE_LOCATION/subscriptions/$SUBSCRIPTION" \
--parameters output="gs://$BUCKET/samples/template-output" \
--parameters windowSize=1 \
이 페이지에서 사용한 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 Google Cloud 프로젝트를 삭제하면 됩니다.
Dataflow 콘솔에서 작업을 중지합니다. 파이프라인을 드레이닝하는 대신 파이프라인을 취소합니다.
주제 및 구독을 삭제합니다.
gcloud pubsub lite-topics delete $TOPIC
gcloud pubsub lite-subscriptions delete $SUBSCRIPTION
파이프라인에서 만든 파일을 삭제합니다.
gcloud storage rm "gs://$BUCKET/samples/*" --recursive --continue-on-error
gcloud storage rm "gs://$BUCKET/temp/*" --recursive --continue-on-error
템플릿 이미지와 템플릿 파일(있는 경우)을 삭제합니다.
gcloud container images delete $TEMPLATE_IMAGE
gcloud storage rm $TEMPLATE_PATH
Cloud Storage 버킷을 제거합니다.
gcloud storage rm gs://$BUCKET --recursive
서비스 계정을 삭제합니다.
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
다음 단계
Dataflow Flex 템플릿 구성 자세히 알아보기
Dataflow 스트리밍 파이프라인 이해