Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
이 튜토리얼은 Cloud Composer 환경을 Amazon Web Services에 연결하여 여기에 저장된 데이터를 활용하는 방법을 보여주는 Google Cloud에서 데이터 분석 DAG 실행의 수정판입니다. 여기에서는 Cloud Composer를 사용하여 Apache Airflow DAG를 만드는 방법을 보여줍니다. DAG는 BigQuery 공개 데이터 세트의 데이터와 Amazon Web Services(AWS) S3 버킷에 저장된 CSV 파일을 조인한 다음 Dataproc 서버리스 일괄 작업을 실행하여 조인된 데이터를 처리합니다.
이 튜토리얼의 BigQuery 공개 데이터 세트는 전 세계 기후 요약 통합 데이터베이스인 ghcn_d입니다. CSV 파일에는 1997년부터 2021년까지 미국 공휴일 날짜와 이름에 대한 정보가 포함되어 있습니다.
DAG를 사용하여 답변하려는 질문은 '지난 25년 동안 추수감사절에 시카고는 얼마나 따뜻했나요?'입니다.
목표
- 기본 구성으로 Cloud Composer 환경 만들기
- AWS S3에서 버킷 만들기
- 빈 BigQuery 데이터 세트 만들기
- 새 Cloud Storage 버킷 만들기
- 다음 태스크가 포함된 DAG 만들고 실행하기:
- S3에서 Cloud Storage로 외부 데이터 세트 로드
- Cloud Storage에서 BigQuery로 외부 데이터 세트 로드
- BigQuery에서 두 데이터 세트 결합
- 데이터 분석 PySpark 작업 실행
시작하기 전에
AWS에서 권한 관리
IAM 정책 AWS 만들기 튜토리얼의 '비주얼 편집기를 사용하여 정책 만들기 섹션'에 따라 다음 구성을 사용하여 AWS S3에 대해 맞춤설정된 IAM 정책을 만듭니다.
- 서비스: S3
- ListAllMyBuckets(
s3:ListAllMyBuckets
) - S3 버킷을 확인합니다. - CreateBucket(
s3:CreateBucket
) - 버킷을 만듭니다. - PutBucketOwnershipControls(
s3:PutBucketOwnershipControls
) - 버킷을 만듭니다. - ListBucket(
s3:ListBucket
) - S3 버킷에 객체를 나열할 수 있는 권한을 부여합니다. - PutObject(
s3:PutObject
) - 버킷에 파일을 업로드합니다. - GetBucketVersioning(
s3:GetBucketVersioning
) - 버킷의 객체를 삭제합니다. - DeleteObject(
s3:DeleteObject
) - 버킷의 객체를 삭제합니다. - ListBucketVersions(
s3:ListBucketVersions
) - 버킷을 삭제합니다. - DeleteBucket(
s3:DeleteBucket
) - 버킷을 삭제합니다. - 리소스: '버킷' 및 '객체' 옆의 '모두'를 선택하여 해당 유형의 모든 리소스에 권한을 부여합니다.
- 태그: 없음
- 이름: TutorialPolicy
위에 있는 각 구성에 대한 자세한 내용은 Amazon S3에서 지원되는 작업 목록을 참조하세요.
API 사용 설정
다음 API를 사용 설정합니다.
콘솔
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs.
gcloud
Enable the Dataproc, Cloud Composer, BigQuery, Cloud Storage APIs:
gcloud services enable dataproc.googleapis.comcomposer.googleapis.com bigquery.googleapis.com storage.googleapis.com
권한 부여
사용자 계정에 다음 역할과 권한을 부여합니다.
BigQuery 데이터 세트를 만들 수 있는 BigQuery 데이터 소유자(
roles/bigquery.dataOwner
) 역할을 부여합니다.Cloud Storage 버킷을 만들 수 있는 스토리지 관리자(
roles/storage.admin
) 역할을 부여합니다.
Cloud Composer 환경 만들기 및 준비
기본 매개변수로 Cloud Composer 환경을 만듭니다.
- 미국 기반 리전을 선택합니다.
- 최신 Cloud Composer 버전을 선택합니다.
Airflow 작업자가 DAG 태스크를 성공적으로 실행할 수 있도록 Cloud Composer 환경에서 사용되는 서비스 계정에 다음 역할을 부여합니다.
- BigQuery 사용자(
roles/bigquery.user
) - BigQuery 데이터 소유자(
roles/bigquery.dataOwner
) - 서비스 계정 사용자(
roles/iam.serviceAccountUser
) - Dataproc 편집자(
roles/dataproc.editor
) - Dataproc 작업자(
roles/dataproc.worker
)
- BigQuery 사용자(
Google Cloud에서 관련된 리소스 만들기 및 수정
Cloud Composer 환경에
apache-airflow-providers-amazon
PyPI 패키지를 설치합니다.다음 매개변수를 사용하여 빈 BigQuery 데이터 세트를 만듭니다.
- 이름:
holiday_weather
- 리전:
US
- 이름:
US
멀티 리전에 새 Cloud Storage 버킷을 만듭니다.다음 명령어를 실행하여 네트워킹 요구사항이 충족되도록 Dataproc 서버리스를 실행하려는 리전의 기본 서브넷에서 비공개 Google 액세스를 사용 설정합니다. Cloud Composer 환경과 동일한 리전을 사용하는 것이 좋습니다.
gcloud compute networks subnets update default \ --region DATAPROC_SERVERLESS_REGION \ --enable-private-ip-google-access
AWS에서 관련된 리소스 만들기
원하는 리전의 기본 설정으로 S3 버킷을 만듭니다.
Cloud Composer에서 AWS에 연결
- AWS 액세스 키 ID 및 보안 비밀 액세스 키 가져오기
Airflow UI를 사용하여 AWS S3 연결을 추가합니다.
- 관리자 > 연결로 이동합니다.
다음 구성으로 새 연결을 만듭니다.
- 연결 ID:
aws_s3_connection
- 연결 유형:
Amazon S3
- 기타:
{"aws_access_key_id":"your_aws_access_key_id", "aws_secret_access_key": "your_aws_secret_access_key"}
- 연결 ID:
Dataproc 서버리스를 사용한 데이터 처리
PySpark 작업 예시 살펴보기
다음 코드는 온도를 섭씨 1/10도에서 섭씨로 변환하는 PySpark 작업 예시입니다. 이 작업은 데이터 세트의 온도 데이터를 다른 형식으로 변환합니다.
Cloud Storage에 PySpark 파일 업로드
Cloud Storage에 PySpark 파일 업로드
로컬 머신에 data_analytics_process.py를 저장합니다.
Google Cloud 콘솔에서 Cloud Storage 브라우저 페이지로 이동합니다.
앞에서 만든 버킷의 이름을 클릭합니다.
버킷의 객체 탭에서 파일 업로드 버튼을 클릭하고 나타나는 대화상자에서
data_analytics_process.py
를 선택한 후 열기를 클릭합니다.
AWS S3에 CSV 파일 업로드
holidays.csv
파일을 업로드하려면 다음 안내를 따르세요.
- 로컬 머신에
holidays.csv
를 저장합니다. - AWS 가이드를 따라 버킷에 파일을 업로드합니다.
데이터 분석 DAG
예시 DAG 살펴보기
DAG는 여러 연산자를 사용하여 데이터를 변환하고 통합합니다.
S3ToGCSOperator
는 holidays.csv 파일을 AWS S3 버킷에서 Cloud Storage 버킷으로 전송합니다.GCSToBigQueryOperator
는 Cloud Storage의 holidays.csv 파일을 앞에서 만든 BigQueryholidays_weather
데이터 세트의 새 테이블로 수집합니다.DataprocCreateBatchOperator
는 Dataproc 서버리스를 사용하여 PySpark 일괄 작업을 만들고 실행합니다.BigQueryInsertJobOperator
는 '날짜' 열에 있는 holidays.csv의 데이터를 BigQuery 공개 데이터 세트 ghcn_d의 날씨 데이터와 조인합니다.BigQueryInsertJobOperator
태스크는 for 루프를 사용하여 동적으로 생성되며 이러한 태스크는TaskGroup
에 있으므로 Airflow UI의 그래프 뷰에서 가독성이 향상됩니다.
Airflow UI를 사용하여 변수 추가
Airflow에서 변수는 임의의 설정이나 구성을 간단한 키-값 스토어로 저장하고 검색하는 범용 방법입니다. 이 DAG는 Airflow 변수를 사용하여 공통 값을 저장합니다. 환경에 추가하려면 다음 안내를 따르세요.
관리자 > 변수로 이동합니다.
다음 변수를 추가합니다.
s3_bucket
: 앞에서 만든 s3 버킷의 이름입니다.gcp_project
: 프로젝트 IDgcs_bucket
: 앞에서 만든 버킷의 이름입니다(gs://
프리픽스 제외).gce_region
: Dataproc 서버리스 네트워킹 요구사항을 충족하는 Dataproc 작업을 원하는 리전입니다. 이전에 비공개 Google 액세스를 사용 설정한 리전입니다.dataproc_service_account
: Cloud Composer 환경의 서비스 계정입니다. Cloud Composer 환경의 환경 구성 탭에서 이 서비스 계정을 찾을 수 있습니다.
환경의 버킷에 DAG 업로드
Cloud Composer는 환경 버킷의 /dags
폴더에 있는 DAG를 예약합니다. Google Cloud 콘솔을 사용하여 DAG를 업로드하려면 다음 안내를 따르세요.
로컬 머신에서 s3togcsoperator_tutorial.py를 저장합니다.
Google Cloud 콘솔에서 환경 페이지로 이동합니다.
환경 목록의 DAG 폴더 열에서 DAG 링크를 클릭합니다. 환경의 DAG 폴더가 열립니다.
파일 업로드를 클릭합니다.
로컬 머신에서
s3togcsoperator_tutorial.py
를 선택하고 열기를 클릭합니다.
DAG 트리거
Cloud Composer 환경에서 DAG 탭을 클릭합니다.
DAG ID
s3_to_gcs_dag
를 클릭합니다.DAG 트리거를 클릭합니다.
태스크가 성공적으로 완료되었음을 나타내는 녹색 체크 표시가 나타날 때까지 5~10분 정도 기다립니다.
DAG 성공 검증
Google Cloud 콘솔에서 BigQuery 페이지로 이동합니다.
탐색기 패널에서 프로젝트 이름을 클릭합니다.
holidays_weather_joined
를 클릭합니다.결과 테이블을 보려면 미리보기를 클릭합니다. 값 열의 숫자는 섭씨 1/10도입니다.
holidays_weather_normalized
를 클릭합니다.결과 테이블을 보려면 미리보기를 클릭합니다. 값 열의 숫자는 섭씨입니다.
삭제
이 튜토리얼에서 만든 개별 리소스를 삭제합니다.
AWS S3 버킷에서
holidays.csv
파일을 삭제합니다.생성된 AWS S3 버킷을 삭제합니다.
이 튜토리얼에서 만든 Cloud Storage 버킷 삭제
환경 버킷을 수동으로 삭제하는 등 Cloud Composer 환경 삭제