Dataflow를 사용하여 Pub/Sub에서 메시지 스트리밍
Dataflow는 신뢰성과 표현 능력은 그대로 유지하면서 스트림(실시간) 및 일괄 모드에서 데이터를 변환하고 강화하는 완전 관리형 서비스입니다. 소스 및 싱크 커넥터의 생태계는 물론 다양한 기간 설정과 세션 분석 기본 도구를 제공하는 Apache Beam SDK를 사용하여 간소화된 파이프라인 개발 환경을 제공합니다. 이 빠른 시작에서는 Dataflow를 사용하여 다음을 수행하는 방법을 보여줍니다.
- Pub/Sub 주제에 게시된 메시지 읽기
- 타임스탬프를 기준으로 메시지 기간 설정 또는 그룹화
- Cloud Storage에 메시지 쓰기
이 빠른 시작에서는 Dataflow를 자바 및 Python으로 사용하는 방법을 소개합니다. SQL도 지원됩니다. 이 빠른 시작은 시작할 수 있도록 임시 사용자 인증 정보를 제공하는 Google Cloud Skills Boost 튜토리얼로도 제공됩니다.
커스텀 데이터 처리를 수행하지 않으려는 경우 UI 기반 Dataflow 템플릿을 사용하여 시작할 수도 있습니다.
시작하기 전에
- Google Cloud 계정에 로그인합니다. Google Cloud를 처음 사용하는 경우 계정을 만들고 Google 제품의 실제 성능을 평가해 보세요. 신규 고객에게는 워크로드를 실행, 테스트, 배포하는 데 사용할 수 있는 $300의 무료 크레딧이 제공됩니다.
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
- Install the Google Cloud CLI.
-
To initialize the gcloud CLI, run the following command:
gcloud init
-
Create or select a Google Cloud project.
-
Create a Google Cloud project:
gcloud projects create PROJECT_ID
Replace
PROJECT_ID
with a name for the Google Cloud project you are creating. -
Select the Google Cloud project that you created:
gcloud config set project PROJECT_ID
Replace
PROJECT_ID
with your Google Cloud project name.
-
-
Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON API, Pub/Sub, Resource Manager, and Cloud Scheduler APIs:
gcloud services enable dataflow.googleapis.com
compute.googleapis.com logging.googleapis.com storage-component.googleapis.com storage-api.googleapis.com pubsub.googleapis.com cloudresourcemanager.googleapis.com cloudscheduler.googleapis.com -
Set up authentication:
-
Create the service account:
gcloud iam service-accounts create SERVICE_ACCOUNT_NAME
Replace
SERVICE_ACCOUNT_NAME
with a name for the service account. -
Grant roles to the service account. Run the following command once for each of the following IAM roles:
roles/dataflow.worker, roles/storage.objectAdmin, roles/pubsub.admin
:gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com" --role=ROLE
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountROLE
: the role to grant
-
Grant the required role to the principal that will attach the service account to other resources.
gcloud iam service-accounts add-iam-policy-binding SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com --member="user:USER_EMAIL" --role=roles/iam.serviceAccountUser
Replace the following:
SERVICE_ACCOUNT_NAME
: the name of the service accountPROJECT_ID
: the project ID where you created the service accountUSER_EMAIL
: the email address for a Google Account
-
-
Create local authentication credentials for your user account:
gcloud auth application-default login
Pub/Sub 프로젝트 설정
-
버킷, 프로젝트, 리전의 변수를 만듭니다. Cloud Storage 버킷 이름은 전역에서 고유해야 합니다. 이 빠른 시작에서 명령어를 실행할 위치에 가까운 Dataflow 리전을 선택합니다.
REGION
변수 값은 유효한 리전 이름이어야 합니다. 리전과 위치에 대한 자세한 내용은 Dataflow 위치를 참조하세요.BUCKET_NAME=BUCKET_NAME PROJECT_ID=$(gcloud config get-value project) TOPIC_ID=TOPIC_ID REGION=DATAFLOW_REGION SERVICE_ACCOUNT=SERVICE_ACCOUNT_NAME@PROJECT_ID.iam.gserviceaccount.com
-
이 프로젝트가 소유한 Cloud Storage 버킷을 만듭니다.
gcloud storage buckets create gs://$BUCKET_NAME
-
이 프로젝트에서 Pub/Sub 주제를 만듭니다.
gcloud pubsub topics create $TOPIC_ID
-
이 프로젝트에서 Cloud Scheduler 작업을 만듭니다. 작업은 1분 간격으로 Pub/Sub 주제에 메시지를 게시합니다.
App Engine 앱이 프로젝트에 존재하지 않는 경우 이 단계를 수행하면 App Engine 앱이 생성됩니다.
gcloud scheduler jobs create pubsub publisher-job --schedule="* * * * *" \ --topic=$TOPIC_ID --message-body="Hello!" --location=$REGION
작업을 시작합니다.
gcloud scheduler jobs run publisher-job --location=$REGION
-
다음 명령어를 사용하여 빠른 시작 저장소를 클론하고 샘플 코드 디렉터리로 이동합니다.
Java
git clone https://github.com/GoogleCloudPlatform/java-docs-samples.git cd java-docs-samples/pubsub/streaming-analytics
Python
git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git cd python-docs-samples/pubsub/streaming-analytics pip install -r requirements.txt # Install Apache Beam dependencies
Pub/Sub에서 Cloud Storage로 메시지 스트리밍
코드 샘플
이 샘플 코드에서는 Dataflow를 사용하여 다음을 수행합니다.
- Pub/Sub 메시지를 읽습니다.
- 게시 타임스탬프를 기준으로 메시지를 고정 크기 간격으로 기간을 설정 또는 그룹화합니다.
각 창의 메시지를 Cloud Storage의 파일에 작성합니다.
자바
Python
파이프라인 시작
파이프라인을 시작하려면 다음 명령어를 실행합니다.
자바
mvn compile exec:java \ -Dexec.mainClass=com.examples.pubsub.streaming.PubSubToGcs \ -Dexec.cleanupDaemonThreads=false \ -Dexec.args=" \ --project=$PROJECT_ID \ --region=$REGION \ --inputTopic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output=gs://$BUCKET_NAME/samples/output \ --gcpTempLocation=gs://$BUCKET_NAME/temp \ --runner=DataflowRunner \ --windowSize=2 \ --serviceAccount=$SERVICE_ACCOUNT"
Python
python PubSubToGCS.py \ --project=$PROJECT_ID \ --region=$REGION \ --input_topic=projects/$PROJECT_ID/topics/$TOPIC_ID \ --output_path=gs://$BUCKET_NAME/samples/output \ --runner=DataflowRunner \ --window_size=2 \ --num_shards=2 \ --temp_location=gs://$BUCKET_NAME/temp \ --service_account_email=$SERVICE_ACCOUNT
위 명령어는 로컬에서 실행되고 클라우드에서 실행되는 Dataflow 작업을 시작합니다. 명령어가 JOB_MESSAGE_DETAILED: Workers
have started successfully
를 반환하면 Ctrl+C
를 사용하여 로컬 프로그램을 종료합니다.
작업 및 파이프라인 진행 상황 관찰
작업 진행 상황은 Dataflow 콘솔에서 확인할 수 있습니다.
작업 세부정보 보기를 열어 다음을 확인합니다.
- 작업 구조
- 작업 로그
- 단계 측정항목
Cloud Storage에서 출력 파일을 보려면 몇 분 정도 기다려야 할 수 있습니다.
또는 아래 명령줄을 사용하여 어떤 파일이 작성되었는지 확인할 수 있습니다.
gcloud storage ls gs://${BUCKET_NAME}/samples/
다음과 유사하게 출력됩니다.
자바
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:34-22:36-0-of-1 gs://{$BUCKET_NAME}/samples/output-22:36-22:38-0-of-1
Python
gs://{$BUCKET_NAME}/samples/output-22:30-22:32-0 gs://{$BUCKET_NAME}/samples/output-22:30-22:32-1 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-0 gs://{$BUCKET_NAME}/samples/output-22:32-22:34-1
삭제
이 페이지에서 사용한 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 리소스가 포함된 Google Cloud 프로젝트를 삭제하면 됩니다.
Cloud Scheduler 작업 삭제
gcloud scheduler jobs delete publisher-job --location=$REGION
Dataflow 콘솔에서 작업을 중지합니다. 파이프라인을 드레이닝하지 않고 파이프라인을 취소합니다.
주제를 삭제합니다.
gcloud pubsub topics delete $TOPIC_ID
파이프라인에서 만든 파일을 삭제합니다.
gcloud storage rm "gs://${BUCKET_NAME}/samples/output*" --recursive --continue-on-error gcloud storage rm "gs://${BUCKET_NAME}/temp/*" --recursive --continue-on-error
Cloud Storage 버킷을 제거합니다.
gcloud storage rm gs://${BUCKET_NAME} --recursive
-
서비스 계정을 삭제합니다.
gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
-
선택사항: 만든 사용자 인증 정보를 취소하고 로컬 사용자 인증 정보 파일을 삭제합니다.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
다음 단계
커스텀 타임스탬프별로 Pub/Sub 메시지 기간을 설정하려면 Pub/Sub 메시지에서 타임스탬프를 속성으로 지정하고 PubsubIO의
withTimestampAttribute
와 함께 커스텀 타임스탬프를 사용할 수 있습니다.Google의 스트리밍을 위해 설계된 오픈소스 Dataflow 템플릿을 살펴보세요.
Dataflow에서 Pub/Sub와 통합하는 방법을 자세히 알아보세요.
Dataflow Flex 템플릿을 사용하여 Pub/Sub에서 읽고 BigQuery에 쓰는 이 튜토리얼을 확인하세요.
기간 설정에 대한 자세한 내용은 Apache Beam 모바일 게임 파이프라인의 예시를 참조하세요.