이 페이지에서는 Google에서 제공하는 Dataflow 템플릿을 사용하여 스트리밍 파이프라인을 만드는 방법을 보여줍니다. 특히, 이 페이지에서는 BigQuery에 대한 Pub/Sub 주제 템플릿을 예시로 사용합니다.
시작하기 전에
-
Google 계정으로
로그인합니다.
아직 계정이 없으면 새 계정을 등록하세요.
-
Google Cloud Console의 프로젝트 선택기 페이지에서 Google Cloud 프로젝트를 선택하거나 만듭니다.
-
Cloud 프로젝트에 결제가 사용 설정되어 있는지 확인합니다. 프로젝트에 결제가 사용 설정되어 있는지 확인하는 방법을 알아보세요.
- Cloud Dataflow, Compute Engine, Stackdriver Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub 및 Cloud Resource Manager API를 사용 설정합니다.
- Cloud Storage 버킷을 생성합니다.
- Cloud Console에서 Cloud Storage 브라우저 페이지로 이동합니다.
- 버킷 만들기를 클릭합니다.
- 버킷 만들기 대화상자에서 다음 속성을 지정합니다.
- 이름: 고유한 버킷 이름입니다. 버킷 네임스페이스는 전역적이며 전체 공개로 표시되기 때문에 버킷 이름에 민감한 정보를 포함하면 안 됩니다.
- 기본 스토리지 클래스: 스탠더드
- 버킷 데이터를 저장할 위치입니다.
- 만들기를 클릭합니다.
주제 만들기
- 웹 UI에서 Pub/Sub 주제 페이지로 이동합니다.
Pub/Sub 주제 페이지로 이동 - add주제 만들기를 클릭합니다.
- 주제 ID 입력란에 고유 주제 이름을 입력합니다. 예를 들어
taxirides-realtime
이 있습니다. - 저장을 클릭합니다.
BigQuery 데이터 세트와 테이블 만들기
Cloud Shell 또는 Cloud Console을 사용하여 Pub/Sub 주제에 해당하는 스키마로 BigQuery 데이터 세트 와 테이블을 만듭니다.
이 예시에서 데이터 세트 이름은 taxirides
이고 테이블 이름은 realtime
입니다.
Cloud Shell 사용
Cloud Shell을 사용하여 데이터 세트 및 테이블을 만듭니다.
- 다음 명령어를 실행하여 데이터 세트를 만듭니다.
bq mk taxirides
출력 결과는 다음과 유사합니다.Dataset “myprojectid:taxirides” successfully created
- 다음 명령어를 실행하여 테이블을 만듭니다.
bq mk \ --time_partitioning_field timestamp \ --schema ride_id:string,point_idx:integer,latitude:float,longitude:float,\ timestamp:timestamp,meter_reading:float,meter_increment:float,ride_status:string,\ passenger_count:integer -t taxirides.realtime
출력 결과는 다음과 유사합니다.Table “myprojectid:taxirides.realtime” successfully created
테이블은 쿼리 비용을 줄이고 성능을 향상하도록 파티션으로 나뉘어 있습니다.
Google Cloud Platform Console 사용
Google Cloud Console을 사용하여 데이터 세트와 테이블을 만듭니다.
- BigQuery 웹 UI로 이동합니다.
BigQuery 웹 UI로 이동 - 탐색 창에서 프로젝트 이름 옆의 아래쪽 화살표 아이콘을 클릭한 다음 데이터 세트 만들기를 클릭합니다. 데이터 세트 ID로
taxirides
를 입력합니다.데이터 세트 ID는 프로젝트별로 고유합니다. ID 제한사항을 보려면 물음표를 클릭하세요.
- 다른 기본 설정은 그대로 두고 확인을 클릭합니다.
- 탐색 창에서 방금 만든 데이터 세트 ID 위에 마우스 포인터를 놓습니다. ID 옆에 있는 아래쪽 화살표 아이콘을 클릭하고 새 테이블 만들기를 클릭합니다.
- 소스 데이터 옆의 빈 테이블 만들기 옵션을 선택합니다.
- 대상 테이블에서
taxirides
를 선택하고realtime
을 입력합니다. - 스키마에서 텍스트로 편집을 선택하고 다음을 입력합니다.
ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp, meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
- 옵션에서 파티션 나누기 유형 필드에 일 옵션을 선택합니다.
- 옵션에서 파티션 나누기 필드 선택기에 타임스탬프 열을 선택합니다.
- 테이블 만들기 버튼을 클릭합니다.

파이프라인 실행
Google에서 제공하는 BigQuery에 대한 Pub/Sub 주제 템플릿을 사용하여 스트리밍 파이프라인을 실행합니다.
- Dataflow 웹 UI로 이동합니다.
Cloud Dataflow 웹 UI로 이동 - 템플릿에서 작업 만들기를 클릭합니다.
- Dataflow 작업의 작업 이름을 입력합니다.
- Dataflow 템플릿에서 BigQuery에 대한 Pub/Sub 주제 템플릿을 선택합니다.
- Pub/Sub 입력 주제 아래에
projects/pubsub-public-data/topics/taxirides-realtime
을 입력합니다. 파이프라인은 입력 주제에서 수신되는 데이터를 가져옵니다. - BigQuery 출력 테이블에
myprojectid:taxirides.realtime
을 입력합니다. - 임시 위치에
gs://mybucket/temp/
를 입력합니다. 이 폴더는 스테이징된 파이프라인 작업과 같은 임시 파일이 저장되는 하위 폴더입니다. - 작업 실행 버튼을 클릭합니다.
- BigQuery에 작성된 데이터를 봅니다. BigQuery 웹 UI로 이동합니다.
BigQuery 웹 UI로 이동
표준 SQL을 사용하여 쿼리를 제출할 수 있습니다. 예를 들어, 다음 쿼리는 지난 24시간 동안 추가된 모든 행을 선택합니다.SELECT * FROM `myprojectid.taxirides.realtime` WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY) LIMIT 1000

삭제
이 빠른 시작에서 사용한 리소스의 비용이 Google Cloud 계정에 청구되지 않도록 하려면 다음 단계를 따르세요.
- Dataflow 웹 UI로 이동합니다.
Cloud Dataflow 웹 UI로 이동 - BigQuery 웹 UI로 이동합니다.
BigQuery 웹 UI로 이동- 탐색 창에서 만든 taxirides 데이터 세트에 마우스를 가져갑니다.
- 탐색 창에서 데이터 세트 이름 옆의 아래쪽 화살표 아이콘을 클릭한 다음 데이터 세트 삭제를 클릭합니다.
- 데이터 세트 삭제 대화상자에서 데이터 세트의 이름(`taxirides`)을 입력하고 확인을 클릭하여 삭제 명령을 확인합니다.