이 튜토리얼에서는 Pub/Sub Subscription to BigQuery 템플릿을 사용하여 Google Cloud 콘솔이나 Google Cloud CLI를 통해 Dataflow 템플릿 작업을 만들고 실행합니다. 이 튜토리얼에서는 Pub/Sub에서 JSON 인코딩된 메시지를 읽고 BigQuery 테이블에 쓰는 스트리밍 파이프라인 예시를 설명합니다.
스트리밍 분석 및 데이터 통합 파이프라인은 Pub/Sub를 사용하여 데이터를 수집하고 배포합니다. Pub/Sub를 사용하면 게시자 및 구독자라는 이벤트 제작자 및 소비자 시스템을 만들 수 있습니다. 게시자는 이벤트를 Pub/Sub 서비스에 비동기적으로 보내고, Pub/Sub는 이벤트에 응답해야 하는 모든 서비스에 이벤트를 전달합니다.
Dataflow는 스트림(실시간) 및 일괄 모드에서 데이터를 변환하고 강화하는 완전 관리형 서비스입니다. Apache Beam SDK를 사용하여 수신 데이터를 변환하고 변환된 데이터를 출력하는 간소화된 파이프라인 개발 환경을 제공합니다.
이 워크플로의 이점은 UDF를 사용하여 메시지 데이터를 BigQuery에 쓰기 전에 변환할 수 있다는 점입니다.
이 시나리오에 Dataflow 파이프라인을 실행하기 전에 UDF가 있는 Pub/Sub BigQuery 구독이 요구사항을 충족하는지 고려하세요.
목표
- Pub/Sub 주제를 만듭니다.
- 테이블 및 스키마가 있는 BigQuery 데이터 세트를 만듭니다.
- Google에서 제공하는 스트리밍 템플릿을 사용하여 Dataflow를 통해 Pub/Sub 구독에서 BigQuery로 데이터를 스트리밍합니다.
비용
이 문서에서는 비용이 청구될 수 있는 Google Cloud구성요소( )를 사용합니다.
- Dataflow
- Pub/Sub
- Cloud Storage
- BigQuery
프로젝트 사용량을 기준으로 예상 비용을 산출하려면 가격 계산기를 사용합니다.
이 문서에 설명된 태스크를 완료했으면 만든 리소스를 삭제하여 청구가 계속되는 것을 방지할 수 있습니다. 자세한 내용은 삭제를 참조하세요.
시작하기 전에
이 섹션에서는 프로젝트를 선택하고 API를 사용 설정하며 사용자 계정과 작업자 서비스 계정에 적절한 역할을 부여하는 방법을 보여줍니다.
콘솔
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
이 튜토리얼의 단계를 완료하려면 사용자 계정에 서비스 계정 사용자 역할이 있어야 합니다. Compute Engine 기본 서비스 계정에는 Dataflow 작업자, Dataflow 관리자, Pub/Sub 편집자, 스토리지 객체 관리자, BigQuery 데이터 편집자 역할이 있어야 합니다. Google Cloud 콘솔에서 필요한 역할을 추가하려면 다음 안내를 따르세요.
Google Cloud 콘솔에서 IAM 페이지로 이동합니다.
IAM으로 이동- 프로젝트를 선택합니다.
- 사용자 계정이 포함된 행에서 주 구성원 수정을 클릭한 후 다른 역할 추가를 클릭합니다.
- 드롭다운 목록에서 서비스 계정 사용자 역할을 선택합니다.
- Compute Engine 기본 서비스 계정이 포함된 행에서 주 구성원 수정을 클릭한 후 다른 역할 추가를 클릭합니다.
- 드롭다운 목록에서 Dataflow 작업자 역할을 선택합니다.
Dataflow 관리자, Pub/Sub 편집자, 스토리지 객체 관리자, BigQuery 데이터 편집자 역할에 대해 같은 단계를 반복하고 저장을 클릭합니다.
역할 부여에 대한 상세 설명은 콘솔을 사용하여 IAM 역할 부여를 참조하세요.
gcloud
- Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
-
Install the Google Cloud CLI.
-
외부 ID 공급업체(IdP)를 사용하는 경우 먼저 제휴 ID로 gcloud CLI에 로그인해야 합니다.
-
gcloud CLI를 초기화하려면, 다음 명령어를 실행합니다.
gcloud init
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the Compute Engine, Dataflow, Cloud Logging, BigQuery, Pub/Sub, Cloud Storage, Resource Manager APIs.
-
Install the Google Cloud CLI.
-
외부 ID 공급업체(IdP)를 사용하는 경우 먼저 제휴 ID로 gcloud CLI에 로그인해야 합니다.
-
gcloud CLI를 초기화하려면, 다음 명령어를 실행합니다.
gcloud init
-
Compute Engine 기본 서비스 계정에 역할을 부여합니다. 다음 IAM 역할마다 다음 명령어를 1회 실행합니다.
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/bigquery.dataEditor
gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
다음을 바꿉니다.
PROJECT_ID
: 프로젝트 IDPROJECT_NUMBER
: 프로젝트 번호 프로젝트 번호를 찾으려면gcloud projects describe
명령어를 사용합니다.SERVICE_ACCOUNT_ROLE
: 개별 역할입니다.
Cloud Storage 버킷 만들기
Google Cloud 콘솔 또는 Google Cloud CLI를 사용하여 Cloud Storage 버킷을 만드는 것으로 시작합니다. Dataflow 파이프라인은 이 버킷을 임시 스토리지 위치로 사용합니다.
콘솔
Google Cloud 콘솔에서 Cloud Storage 버킷 페이지로 이동합니다.
만들기를 클릭합니다.
버킷 만들기 페이지에서 버킷 이름 지정에 버킷 이름 지정 요구사항을 충족하는 이름을 입력합니다. Cloud Storage 버킷 이름은 전역에서 고유해야 합니다. 다른 옵션은 선택하지 마세요.
만들기를 클릭합니다.
gcloud
gcloud storage buckets create
명령어를 사용합니다.
gcloud storage buckets create gs://BUCKET_NAME
BUCKET_NAME
을 버킷 이름 지정 요구사항을 충족하는 Cloud Storage 버킷의 이름으로 바꿉니다.
Cloud Storage 버킷 이름은 전역에서 고유해야 합니다.
Pub/Sub 주제 및 구독 만들기
Pub/Sub 주제를 만든 다음 이 주제에 대한 구독을 만듭니다.
콘솔
주제를 만들려면 다음 단계를 완료합니다.
Google Cloud 콘솔에서 Pub/Sub 주제 페이지로 이동합니다.
주제 만들기를 클릭합니다.
주제 ID 필드에 주제의 ID를 입력합니다. 주제 이름을 지정하는 방법에 대한 자세한 내용은 주제 또는 구독 이름 지정 안내를 참조하세요.
기본 구독 추가 옵션을 유지합니다. 다른 옵션은 선택하지 마세요.
만들기를 클릭합니다.
- 주제 세부정보 페이지에서 생성된 구독의 이름은 구독 ID 아래에 나열됩니다. 나중에 사용할 수 있도록 이 값을 기록해 둡니다.
gcloud
주제를 만들려면 gcloud pubsub topics create
명령어를 실행합니다. 구독 이름 지정 방법은 주제 또는 구독 이름 지정 안내를 참조하세요.
gcloud pubsub topics create TOPIC_ID
TOPIC_ID
를 Pub/Sub 주제의 이름으로 바꿉니다.
주제에 대한 구독을 만들려면 gcloud pubsub subscriptions create
명령어를 실행합니다.
gcloud pubsub subscriptions create --topic TOPIC_ID SUBSCRIPTION_ID
SUBSCRIPTION_ID
를 Pub/Sub 구독 이름으로 바꿉니다.
BigQuery 테이블 만들기
이 단계에서는 다음 스키마를 사용하여 BigQuery 테이블을 만듭니다.
열 이름 | 데이터 유형 |
---|---|
name |
STRING |
customer_id |
INTEGER |
BigQuery 데이터 세트가 아직 없으면 먼저 만듭니다. 자세한 내용은 데이터 세트 만들기를 참조하세요. 그런 다음 빈 테이블을 새로 만듭니다.
콘솔
BigQuery 페이지로 이동합니다.
탐색기 창에서 프로젝트를 펼친 후 데이터 세트를 선택합니다.
데이터 세트 정보 섹션에서
테이블 만들기를 클릭합니다.테이블을 만들 소스 목록에서 빈 테이블을 선택합니다.
테이블 체크박스에 테이블 이름을 입력합니다.
스키마 섹션에서 텍스트로 수정을 클릭합니다.
다음 스키마 정의를 붙여넣습니다.
name:STRING, customer_id:INTEGER
테이블 만들기를 클릭합니다.
gcloud
bq mk
명령어를 사용합니다.
bq mk --table \
PROJECT_ID:DATASET_NAME.TABLE_NAME \
name:STRING,customer_id:INTEGER
다음을 바꿉니다.
PROJECT_ID
: 프로젝트 IDDATASET_NAME
: 데이터 세트의 이름TABLE_NAME
: 만들 테이블의 이름
파이프라인 실행
Google에서 제공하는 Pub/Sub Subscription to BigQuery 템플릿을 사용하여 스트리밍 파이프라인을 실행합니다. 파이프라인은 Pub/Sub 주제에서 수신 데이터를 가져와 BigQuery 데이터 세트에 데이터를 출력합니다.
콘솔
Google Cloud 콘솔에서 Dataflow 작업 페이지로 이동합니다.
템플릿에서 작업 만들기를 클릭합니다.
Dataflow 작업의 작업 이름을 입력합니다.
리전 엔드포인트에서 Dataflow 작업 리전을 선택합니다.
Dataflow 템플릿에서 Pub/Sub Subscription to BigQuery 템플릿을 선택합니다.
BigQuery 출력 테이블에서 둘러보기를 선택하고 BigQuery 테이블을 선택합니다.
Pub/Sub 입력 구독 목록에서 Pub/Sub 구독을 선택합니다.
임시 위치에 다음을 입력합니다.
gs://BUCKET_NAME/temp/
BUCKET_NAME
을 Cloud Storage 버킷 이름으로 바꿉니다.temp
폴더는 Dataflow 작업의 임시 파일을 저장합니다.작업 실행을 클릭합니다.
gcloud
셸 또는 터미널에서 템플릿을 실행하려면 gcloud dataflow jobs run
명령어를 사용합니다.
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates-DATAFLOW_REGION/latest/PubSub_Subscription_to_BigQuery \
--region DATAFLOW_REGION \
--staging-location gs://BUCKET_NAME/temp \
--parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/SUBSCRIPTION_ID,\
outputTableSpec=PROJECT_ID:DATASET_NAME.TABLE_NAME
다음 변수를 바꿉니다.
JOB_NAME
: 작업 이름DATAFLOW_REGION
: 작업의 리전PROJECT_ID
: Google Cloud 프로젝트의 이름SUBSCRIPTION_ID
: Pub/Sub 구독 이름DATASET_NAME
: BigQuery 데이터 세트의 이름TABLE_NAME
: BigQuery 테이블의 이름
Pub/Sub에 메시지 게시
Dataflow 작업이 시작되면 Pub/Sub에 메시지를 게시할 수 있으며 파이프라인이 BigQuery에 메시지를 씁니다.
콘솔
Google Cloud 콘솔에서 Pub/Sub > 주제 페이지로 이동합니다.
주제 목록에서 주제 이름을 클릭합니다.
메시지를 클릭합니다.
메시지 게시를 클릭합니다.
메시지 수에
10
을 입력합니다.메시지 본문에
{"name": "Alice", "customer_id": 1}
을 입력합니다.게시를 클릭합니다.
gcloud
주제에 메시지를 게시하려면 gcloud pubsub topics publish
명령어를 사용합니다.
for run in {1..10}; do
gcloud pubsub topics publish TOPIC_ID --message='{"name": "Alice", "customer_id": 1}'
done
TOPIC_ID
를 주제의 이름으로 바꿉니다.
결과 보기
BigQuery 테이블에 기록된 데이터를 봅니다. 데이터가 테이블에 표시되는 데 최대 1분이 걸릴 수 있습니다.
콘솔
Google Cloud 콘솔에서 BigQuery 페이지로 이동합니다.
BigQuery 페이지로 이동쿼리 편집기 창에서 다음 쿼리를 실행합니다.
SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME` LIMIT 1000
다음 변수를 바꿉니다.
PROJECT_ID
: Google Cloud프로젝트의 이름DATASET_NAME
: BigQuery 데이터 세트의 이름TABLE_NAME
: BigQuery 테이블의 이름
gcloud
BigQuery에서 다음 쿼리를 실행하여 결과를 확인합니다.
bq query --use_legacy_sql=false 'SELECT * FROM `PROJECT_ID.DATASET_NAME.TABLE_NAME`'
다음 변수를 바꿉니다.
PROJECT_ID
: Google Cloud프로젝트의 이름DATASET_NAME
: BigQuery 데이터 세트의 이름TABLE_NAME
: BigQuery 테이블의 이름
UDF를 사용하여 데이터 변환
이 튜토리얼에서는 Pub/Sub 메시지가 JSON으로 형식이 지정되어있고 BigQuery 테이블 스키마가 JSON 데이터와 일치한다고 가정합니다.
선택적으로 BigQuery에 기록되기 전 데이터를 변환하는 JavaScript 사용자 정의 함수(UDF)를 제공할 수 있습니다. UDF는 필터링, 개인 식별 정보(PII) 삭제 또는 추가 필드로 데이터 강화와 같은 추가 처리를 수행할 수 있습니다.
자세한 내용은 Dataflow 템플릿에 대한 사용자 정의 함수 만들기를 참조하세요.
데드 레터 테이블 사용
작업이 실행되는 동안 파이프라인이 개별 메시지를 BigQuery에 쓰지 못할 수 있습니다. 가능한 오류는 다음과 같습니다.
- 잘못된 형식의 JSON을 포함하는 직렬화 오류
- 테이블 스키마 및 JSON 데이터의 불일치로 인한 유형 변환 오류
- 테이블 스키마에 없는 JSON 데이터의 추가 필드
파이프라인은 이러한 오류를 BigQuery의 데드 레터 테이블에 씁니다. 기본적으로 파이프라인은 TABLE_NAME_error_records
라는 데드레터 테이블을 자동으로 생성하며 여기서 TABLE_NAME
은 출력 테이블의 이름입니다.
다른 이름을 사용하려면 outputDeadletterTable
템플릿 매개변수를 설정하세요.
삭제
이 튜토리얼에서 사용된 리소스 비용이 Google Cloud 계정에 청구되지 않도록 하려면 리소스가 포함된 프로젝트를 삭제하거나 프로젝트를 유지하고 개별 리소스를 삭제하세요.
프로젝트 삭제
비용 청구를 방지하는 가장 쉬운 방법은 튜토리얼에서 만든 Google Cloud 프로젝트를 삭제하는 것입니다.
콘솔
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
gcloud
- In the Google Cloud console, go to the Manage resources page.
- In the project list, select the project that you want to delete, and then click Delete.
- In the dialog, type the project ID, and then click Shut down to delete the project.
개별 리소스 삭제
나중에 프로젝트를 재사용하려면 프로젝트를 유지하고 튜토리얼 중에 만든 리소스를 삭제하면 됩니다.
Dataflow 파이프라인 중지
콘솔
Google Cloud 콘솔에서 Dataflow 작업 페이지로 이동합니다.
중지할 작업을 클릭합니다.
작업을 중지하려면 작업 상태가 실행 중이어야 합니다.
작업 세부정보 페이지에서 중지를 클릭합니다.
취소를 클릭합니다.
선택을 확인하려면 작업 중지를 클릭합니다.
gcloud
Dataflow 작업을 취소하려면 gcloud dataflow jobs
명령어를 사용합니다.
gcloud dataflow jobs list \
--filter 'NAME=JOB_NAME AND STATE=Running' \
--format 'value(JOB_ID)' \
--region "DATAFLOW_REGION" \
| xargs gcloud dataflow jobs cancel --region "DATAFLOW_REGION"
Google Cloud 프로젝트 리소스 정리
콘솔
Pub/Sub 주제 및 구독을 삭제합니다.
Google Cloud 콘솔에서 Pub/Sub 주제 페이지로 이동합니다.
앞서 만든 주제를 선택합니다.
삭제를 클릭하여 주제를 영구 삭제합니다.
Google Cloud 콘솔의 Pub/Sub 구독 페이지로 이동합니다.
주제로 만든 구독을 선택합니다.
삭제를 클릭하여 주제를 영구 삭제합니다.
BigQuery 테이블 및 데이터 세트를 삭제합니다.
Google Cloud 콘솔에서 BigQuery 페이지로 이동합니다.
탐색기 패널에서 프로젝트를 펼칩니다.
삭제하려는 데이터 세트 옆에 있는
작업 보기를 클릭한 다음 삭제를 클릭합니다.
Cloud Storage 버킷을 삭제합니다.
Google Cloud 콘솔에서 Cloud Storage 버킷 페이지로 이동합니다.
삭제할 버킷을 선택하고
삭제를 클릭한 후 안내를 따르세요.
gcloud
Pub/Sub 구독 및 주제를 삭제하려면
gcloud pubsub subscriptions delete
및gcloud pubsub topics delete
명령어를 사용합니다.gcloud pubsub subscriptions delete SUBSCRIPTION_ID gcloud pubsub topics delete TOPIC_ID
BigQuery 테이블을 삭제하려면
bq rm
명령어를 사용합니다.bq rm -f -t PROJECT_ID:tutorial_dataset.tutorial
BigQuery 데이터 세트를 삭제합니다. 데이터 세트만으로는 비용이 발생하지 않습니다.
bq rm -r -f -d PROJECT_ID:tutorial_dataset
Cloud Storage 버킷과 해당 객체를 삭제하려면
gcloud storage rm
명령어를 사용합니다. 버킷만으로는 비용이 청구되지 않습니다.gcloud storage rm gs://BUCKET_NAME --recursive
사용자 인증 정보 취소
콘솔
프로젝트를 유지할 경우 Compute Engine 기본 서비스 계정에 부여한 역할을 취소합니다.
- Google Cloud 콘솔에서 IAM 페이지로 이동합니다.
프로젝트, 폴더, 조직을 선택합니다.
액세스 권한을 취소하려는 주 구성원이 포함된 행을 찾으세요. 이 행에서
주 구성원 수정을 클릭합니다.취소할 역할마다 삭제
버튼을 클릭한 다음 저장을 클릭합니다.
gcloud
- 프로젝트를 유지할 경우 Compute Engine 기본 서비스 계정에 부여한 역할을 취소합니다. 다음 IAM 역할마다 다음 명령어를 1회 실행합니다.
roles/dataflow.admin
roles/dataflow.worker
roles/storage.admin
roles/pubsub.editor
roles/bigquery.dataEditor
gcloud projects remove-iam-policy-binding <var>PROJECT_ID</var> \ --member=serviceAccount:<var>PROJECT_NUMBER</var>-compute@developer.gserviceaccount.com \ --role=<var>ROLE</var>
-
Optional: Revoke the authentication credentials that you created, and delete the local credential file.
gcloud auth application-default revoke
-
Optional: Revoke credentials from the gcloud CLI.
gcloud auth revoke
다음 단계
- UDF로 Dataflow 템플릿 확장하기
- Dataflow 템플릿 사용에 대해 자세히 알아보기
- Google에서 제공하는 모든 템플릿 보기
- Pub/Sub를 사용하여 주제를 만들고 사용하는 방법과 풀(pull) 구독을 만드는 방법 알아보기
- BigQuery를 사용하여 데이터 세트를 만드는 방법 알아보기
- Pub/Sub 구독 알아보기
- Google Cloud에 대한 참조 아키텍처, 다이어그램, 권장사항 살펴보기 Cloud 아키텍처 센터 살펴보기