이 페이지에서는 DataflowTemplateOperator
를 사용하여 Cloud Composer에서 Dataflow 파이프라인을 실행하는 방법을 설명합니다. The Cloud Storage Text to BigQuery 파이프라인은 Cloud Storage에 저장된 텍스트 파일을 업로드하고 제공된 자바스크립트 사용자 정의 함수(UDF)를 사용하여 변환하고 BigQuery로 결과를 출력하는 일괄 파이프라인입니다.
워크플로를 시작하기 전에 다음 항목을 만들어야 합니다.
location
,average_temperature
,month
및 선택 사항으로inches_of_rain
,is_current
,latest_measurement
열이 있는 빈 데이터 세트의 빈 BigQuery 테이블..txt
파일의 데이터를 BigQuery 테이블 스키마의 올바른 형식으로 정규화하는 JSON 파일입니다. JSON 객체에는 열 이름, 입력 유형, 필수 입력란인지 여부를 포함하는BigQuery Schema
배열이 있습니다.BigQuery 테이블에 일괄 업로드할 데이터를 저장할 입력
.txt
파일입니다..txt
파일의 각 행을 테이블의 관련 변수로 변환하는 자바스크립트로 작성된 사용자 정의 함수입니다.위에서 언급한 파일의 위치를 가리키는 방향성 비순환 그래프(DAG) 파일입니다.
다음으로
.txt
파일,.js
UDF 파일,.json
스키마 파일을 스토리지 버킷에 업로드합니다. DAG도 Cloud Composer 환경에 업로드합니다.DAG가 업로드되면 Airflow 작업이 시작됩니다. 이 작업은 사용자 정의 함수를
.txt
파일에 적용하고 JSON 스키마에 따라 형식을 지정하는 Cloud Dataflow 파이프라인을 시작합니다.마지막으로 데이터는 앞에서 만든 BigQuery 테이블에 업로드됩니다.
비용
이 가이드에서는 다음과 같은 비용이 청구될 수 있는 Google Cloud 구성요소를 사용합니다.
- Cloud Composer
- Dataflow
- Cloud Storage
- BigQuery
기본 요건
- Cloud Composer 환경을 만들었는지 확인합니다.
- Cloud Composer 버전은 최소 1.9.0 이상이어야 합니다. 이미지 버전을 확인하려면 환경 세부정보를 참조하세요.
- 이 가이드를 따라 사용자 정의 함수를 작성하려면 자바스크립트에 익숙해야 합니다.
- Cloud Composer, Dataflow, Cloud Storage, BigQuery API를 사용 설정합니다.
환경 설정
스키마 정의가 있는 빈 BigQuery 테이블 만들기
먼저 스키마 정의가 있는 BigQuery 테이블을 만듭니다. 이 스키마 정의는 이 가이드의 뒷부분에서 사용합니다. 이 BigQuery 테이블에는 일괄 업로드 결과가 저장됩니다.
스키마 정의가 있는 빈 테이블을 만들려면 다음과 같이 하세요.
Console
Cloud Console에서 BigQuery 페이지로 이동합니다.
탐색 패널의 리소스 섹션에서 프로젝트를 확장합니다.
창 오른쪽의 세부정보 패널에서 데이터 세트 만들기를 클릭합니다.
- 데이터 세트 만들기 페이지의 데이터 세트 ID 섹션에서 데이터 세트 이름을
average_weather
로 지정합니다. 다른 필드는 모두 기본 상태로 둡니다.
데이터 세트 만들기를 클릭합니다.
탐색 패널로 돌아가 리소스 섹션에서 프로젝트를 펼칩니다. 그런 다음
average_weather
데이터 세트를 클릭합니다.창 오른쪽에 있는 세부정보 패널에서 테이블 만들기를 클릭합니다.
테이블 만들기 페이지의 소스 섹션에서 빈 테이블을 선택합니다.
테이블 만들기 페이지의 대상 섹션에서 다음을 수행합니다.
데이터 세트 이름에서
average_weather
데이터 세트를 선택합니다.테이블 이름 필드에
average_weather
라는 이름을 입력합니다.테이블 유형이 기본 테이블로 설정되어 있는지 확인합니다.
스키마 섹션에 스키마 정의를 입력합니다.
다음과 같이 스키마 정보를 직접 입력합니다.
텍스트로 편집을 사용 설정하고 테이블 스키마를 JSON 배열로 입력합니다. 이 옵션에서 다음 필드를 입력합니다.
[ { "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED" }, { "name": "average_temperature", "type": "INTEGER", "mode": "REQUIRED" }, { "name": "month", "type": "STRING", "mode": "REQUIRED" }, { "name": "inches_of_rain", "type": "NUMERIC" }, { "name": "is_current", "type": "BOOLEAN" }, { "name": "latest_measurement", "type": "DATE" } ]
필드 추가를 사용하여 스키마를 직접 입력합니다.
파티션 및 클러스터 설정에서 기본값(
No partitioning
)을 그대로 둡니다.고급 옵션 섹션에서 암호화의 기본값(
Google-managed key
)을 그대로 둡니다. 기본적으로 Compute Engine은 미사용으로 저장된 고객 콘텐츠를 암호화합니다.테이블 만들기를 클릭합니다.
bq
--location
플래그와 함께 bq mk
명령어를 사용하여 빈 데이터 세트를 만듭니다.
PROJECT_ID를 프로젝트 ID로 바꾸고 LOCATION을 원하는 위치로 바꿉니다. 지연 시간을 최소화하려면 Composer 환경이 위치한 동일한 리전을 선택하는 것이 좋습니다.
다음 명령어를 복사하여 전 세계 평균 날씨 데이터 세트를 만듭니다.
bq --location=LOCATION mk \ --dataset \ PROJECT_ID:average_weather
스키마 정의로 이 데이터 세트의 빈 테이블을 만들려면 아래 명령어에서 PROJECT_ID를 프로젝트 ID로 바꾸고 터미널에 입력합니다.
bq mk \ --table \ PROJECT_ID:average_weather.average_weather \ location:GEOGRAPHY,average_temperature:INTEGER,month:STRING,inches_of_rain:NUMERIC,is_current:BOOLEAN,latest_measurement:DATE
테이블을 만든 후 테이블의 만료 시간, 설명, 라벨을 업데이트할 수 있습니다. 또한 스키마 정의를 수정할 수 있습니다.
Python
샘플을 실행하기 전에 다음 명령어를 실행하여 환경에 라이브러리를 설치해야 합니다.
pip install google.cloud.bigquery
이 코드를 dataflowtemplateoperator_create_dataset_and_table_helper.py
로 저장하고 프로젝트 및 위치를 반영하도록 변수를 업데이트한 후 다음 명령어로 실행합니다.
python dataflowtemplateoperator_create_dataset_and_table_helper.py
Python
이 샘플을 사용해 보기 전에 Compute Engine 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Compute Engine Python API 참조 문서를 확인하세요.
스토리지 버킷 만들기
다음으로 워크플로에 필요한 모든 파일을 보관할 스토리지 버킷을 만들어야 합니다. 앞으로 만드는 DAG는 이 스토리지 버킷에 업로드한 파일을 참조합니다. 새 저장소 버킷을 만들려면 다음 안내를 따르세요.
콘솔
Cloud Console에서 Cloud Storage를 엽니다.
버킷 만들기를 클릭하여 버킷 생성 양식을 엽니다.
각 단계를 완료하려면 버킷 정보를 입력하고 계속을 클릭합니다.
버킷의 전역 고유 이름을 지정합니다(이 가이드의 나머지 부분에서는 bucketName으로 참조됨).
위치 유형으로 리전을 선택합니다. 그런 다음 버킷 데이터가 영구적으로 저장될 위치를 선택합니다.
데이터의 기본 스토리지 클래스로 표준을 선택합니다.
균일 액세스 제어를 선택하여 객체에 액세스합니다.
완료를 클릭합니다.
gsutil
gsutil mb
명령어를 사용합니다.gsutil mb gs://bucketName/
코드 샘플
C#
이 샘플을 사용해 보기 전에 Compute Engine 빠른 시작: 클라이언트 라이브러리 사용의 C# 설정 안내를 따르세요. 자세한 내용은 Compute Engine C# API 참조 문서를 확인하세요.
Go
이 샘플을 사용해 보기 전에 Compute Engine 빠른 시작: 클라이언트 라이브러리 사용의 Go 설정 안내를 따르세요. 자세한 내용은 Compute Engine Go API 참조 문서를 참조하세요.
Python
이 샘플을 사용해 보기 전에 Compute Engine 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Compute Engine Python API 참조 문서를 확인하세요.
출력 테이블용 JSON 형식 BigQuery 스키마 만들기
앞에서 만든 출력 테이블과 일치하는 JSON 형식의 BigQuery 스키마 파일을 만듭니다. 필드 이름, 유형, 모드는 앞에서 BigQuery 테이블 스키마에 정의한 것과 일치해야 합니다. 이 파일은 .txt
파일의 데이터를 BigQuery 스키마와 호환되는 형식으로 정규화합니다. 파일 이름을 jsonSchema.json
로 지정합니다.
{ "BigQuery Schema": [ { "name": "location", "type": "GEOGRAPHY", "mode": "REQUIRED" }, { "name": "average_temperature", "type": "INTEGER", "mode": "REQUIRED" }, { "name": "month", "type": "STRING", "mode": "REQUIRED" }, { "name": "inches_of_rain", "type": "NUMERIC" }, { "name": "is_current", "type": "BOOLEAN" }, { "name": "latest_measurement", "type": "DATE" }] }
데이터 형식 지정을 위해 자바 스크립트(.js
) 파일 만들기
이 파일에서 입력 파일의 텍스트 줄을 변환하는 로직을 제공하는 UDF(사용자 정의 함수)를 정의합니다. 이 함수는 입력 파일의 각 텍스트 줄을 자체 인수로 취하므로 이 함수는 입력 파일의 각 줄에 대해 한 번씩 실행됩니다. 파일 이름을 transformCSVtoJSON.js
로 지정합니다.
Node.js
이 샘플을 사용해 보기 전에 Compute Engine 빠른 시작: 클라이언트 라이브러리 사용의 Node.js 설정 안내를 따르세요. 자세한 내용은 Compute Engine Node.js API 참조 문서를 참조하세요.
입력 파일 만들기
이 파일에는 BigQuery 테이블에 업로드할 정보가 저장됩니다.
이 파일을 로컬에서 복사하고 이름을 inputFile.txt
로 지정합니다.
POINT(40.7128 74.006),45,'July',null,true,2020-02-16 POINT(41.8781 87.6298),23,'October',13,false,2015-02-13 POINT(48.8566 2.3522),80,'December',null,true,null POINT(6.5244 3.3792),15,'March',14,true,null
스토리지 버킷에 파일 업로드 및 스테이징 폴더 만들기
앞에서 만든 스토리지 버킷에 다음 파일을 업로드합니다.
- JSON 형식의 BigQuery 스키마(
.json
) - 자바 스크립트 사용자 정의 함수(
transformCSVtoJSON.js
) 처리하려는 텍스트의 입력 파일(
.txt
)
콘솔
- Google Cloud Console에서 Cloud Storage 브라우저를 엽니다.
Cloud Storage 브라우저 열기 버킷 목록에서 bucketName 버킷을 클릭합니다.
버킷의 객체 탭에서 다음 중 하나를 수행합니다.
원하는 파일을 바탕화면이나 파일 관리자에서 Cloud Console의 기본 창으로 드래그 앤 드롭합니다.
파일 업로드 버튼을 클릭하고 나타나는 대화상자에서 업로드할 파일을 선택하고 열기를 클릭합니다.
gsutil
[gsutil cp
] 명령어를 사용합니다.
gsutil cp [OBJECT_LOCATION] gs://bucketName
여기에서
[OBJECT_LOCATION]
은 객체의 로컬 경로입니다. 예:Desktop/dog.png
[bucketName]
은 앞에서 만든 전역적으로 고유한 버킷 이름입니다.
성공하면 응답은 다음 예와 같습니다.
Operation completed over 1 objects/58.8 KiB.
Python
Python
이 샘플을 사용해 보기 전에 Compute Engine 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Compute Engine Python API 참조 문서를 확인하세요.
DataflowTemplateOperator 구성
샘플을 실행하기 전에 적절한 환경 변수를 설정해야 합니다.
gcloud
또는 Airflow UI를 사용하면 됩니다.
gcloud
다음 명령어를 입력합니다.
gcloud composer environments run ENVIRONMENT \ --location LOCATION \ variables -- \ --set project_id PROJECT_ID
각 매개변수는 다음과 같습니다.
ENVIRONMENT
는 Cloud Composer 환경의 이름입니다.LOCATION
은 Cloud Composer 환경이 위치한 리전입니다.PROJECT_ID
는 Google Cloud 프로젝트 ID입니다.
gcloud composer environments run ENVIRONMENT \ --location LOCATION \ variables -- \ --set gce_region GCE_REGION
각 매개변수는 다음과 같습니다.
GCE_REGION
은 Compute Engine 리전입니다.
gcloud composer environments run ENVIRONMENT \ --location LOCATION \ variables -- \ --set gce_zone GCE_ZONE
각 매개변수는 다음과 같습니다.
GCE_ZONE
은 Compute Engine 영역입니다. 영역과 리전 간 차이점을 자세히 알아보세요.
gcloud composer environments run ENVIRONMENT \ --location LOCATION \ variables -- \ --set bucket_path BUCKET_PATH
각 매개변수는 다음과 같습니다.
BUCKET_PATH
는 앞서 만든 Cloud Storage 버킷의 위치입니다.
Airflow UI
툴바에서 관리 > 변수를 클릭합니다.
만들기를 클릭합니다.
다음 정보를 입력합니다.
- 키:
project_id
- 값: PROJECT_ID(Google Cloud 프로젝트 ID)
- 키:
저장 후 다른 항목 추가를 클릭합니다.
다음 정보를 입력합니다.
- 키:
bucket_path
- 값: BUCKET_PATH(Cloud Storage 버킷의 위치. 예: 'gs://my-bucket')
- 키:
저장 후 다른 항목 추가를 클릭합니다.
다음 정보를 입력합니다.
- 키:
gce_region
- 값: GCE_REGION(Compute Engine 리전의 리전)
- 키:
저장 후 다른 항목 추가를 클릭합니다.
다음 정보를 입력합니다.
- 키:
gce_zone
- 값:
GCE_ZONE
은 Compute Engine 영역. 영역과 리전 간 차이점을 자세히 알아보세요.
- 키:
저장을 클릭합니다.
이제 앞서 만든 파일을 참조하여 Dataflow 워크플로를 시작하는 DAG를 만듭니다. 이 DAG를 복사하여 composer-dataflow-dag.py
로 로컬에 저장합니다.
Python
이 샘플을 사용해 보기 전에 Compute Engine 빠른 시작: 클라이언트 라이브러리 사용의 Python 설정 안내를 따르세요. 자세한 내용은 Compute Engine Python API 참조 문서를 확인하세요.
Cloud Storage에 DAG 업로드
DAG를 환경 폴더에 업로드합니다. 업로드가 성공적으로 완료되면 Cloud Composer 환경 페이지에서 DAG 폴더 링크를 클릭하여 이를 확인할 수 있어야 합니다.
작업 상태 보기
- Airflow 웹 인터페이스로 이동합니다.
- DAG 페이지에서 DAG 이름(예:
composerDataflowDAG
)을 클릭합니다. - DAG 세부정보 페이지에서 그래프 보기를 클릭합니다.
상태를 확인합니다.
실패: 작업 주변에 빨간색 상자가 있습니다. 작업 위로 마우스 포인터를 올려놓고 상태: 실패를 찾을 수도 있습니다.
몇 분 후 Dataflow와 BigQuery에서 결과를 확인할 수 있습니다.
Dataflow에서 작업 보기
Dataflow 웹 UI로 이동합니다. Dataflow 웹 UI로 이동
작업 이름은 하이픈으로 이름 끝에 연결된 고유한 ID와 함께
dataflow_operator_transform_csv_to_bq
로 지정됩니다.작업 세부정보를 보려면 이름을 클릭합니다. Dataflow 작업 세부정보 자세히 알아보기
BigQuery에서 결과 보기
BigQuery 웹 UI로 이동합니다. BigQuery 웹 UI로 이동
표준 SQL을 사용하여 쿼리를 제출할 수 있습니다. 다음 쿼리를 사용하여 테이블에 추가된 행을 확인합니다.
SELECT * FROM projectId.average_weather
삭제
Google Cloud Platform 계정에 요금이 부과되지 않도록 이 가이드에서 사용한 리소스를 삭제할 수 있습니다.
- Cloud Composer 환경을 삭제합니다.
- Cloud Composer 환경의 Cloud Storage 버킷을 삭제합니다. Cloud Composer 환경을 삭제하면 버킷이 삭제되지 않습니다.
- Dataflow 작업을 중지합니다.
- BigQuery 테이블 및 BigQuery 데이터 세트를 삭제합니다.