ETL 파이프라인은 추출, 변환, 로드 메서드를 사용하여 일괄 데이터 처리를 실행하기 위한 아키텍처입니다. 이 아키텍처는 다음 구성요소로 구성됩니다.
- 소스 데이터 랜딩을 위한 Google Cloud Storage
- 소스 데이터에서 변환을 수행하기 위한 Dataflow
- 변환된 데이터의 대상으로 사용되는 BigQuery
- ETL 프로세스 조정을 위한 Cloud Composer 환경
시작하기
Cloud Shell에서 소스 코드의 복사본에 대해 다음 링크를 클릭합니다. 링크에 도착한 다음 단일 명령어로 프로젝트에서 애플리케이션의 작업 복사본이 작동합니다.
ETL 파이프라인 구성요소
ETL 파이프라인 아키텍처에는 여러 제품이 사용됩니다. 다음은 관련 동영상 링크, 제품 문서 및 대화형 둘러보기를 포함하여 구성 요소에 대한 자세한 정보와 함께 구성요소 목록을 보여줍니다.스크립트
설치 스크립트는 go
및 Terraform CLI 도구로 작성된 실행 파일을 사용하여 빈 프로젝트를 가져오고 여기에 애플리케이션을 설치합니다. 출력은 작동하는 애플리케이션과 부하 분산 IP 주소의 URL입니다.
./main.tf
서비스 사용 설정
Google Cloud 서비스는 기본적으로 프로젝트에서 사용 중지되어 있습니다. 여기에서 솔루션을 사용하려면 다음을 활성화해야 합니다.
- IAM - Google Cloud 리소스에 대한 ID 및 액세스를 관리합니다.
- 스토리지 - Google Cloud에서 데이터를 저장하고 액세스하는 서비스입니다.
- Dataflow - 다양한 데이터 처리 패턴을 실행하는 관리형 서비스입니다.
- BigQuery - 데이터를 생성, 관리, 공유, 쿼리하기 위한 데이터 플랫폼입니다.
- Composer — Google Cloud에서 Apache Airflow 환경을 관리합니다.
- 컴퓨팅 - 가상 머신 및 네트워킹 서비스 (Composer에서 사용)
variable "gcp_service_list" {
description = "The list of apis necessary for the project"
type = list(string)
default = [
"dataflow.googleapis.com",
"compute.googleapis.com",
"composer.googleapis.com",
"storage.googleapis.com",
"bigquery.googleapis.com",
"iam.googleapis.com"
]
}
resource "google_project_service" "all" {
for_each = toset(var.gcp_service_list)
project = var.project_number
service = each.key
disable_on_destroy = false
}
서비스 계정 만들기
Composer 및 Dataflow에서 사용할 서비스 계정을 만듭니다.
resource "google_service_account" "etl" {
account_id = "etlpipeline"
display_name = "ETL SA"
description = "user-managed service account for Composer and Dataflow"
project = var.project_id
depends_on = [google_project_service.all]
}
역할 지정
서비스 계정에 필요한 역할을 부여하고 Cloud Composer 서비스 에이전트에 Cloud Composer v2 API 서비스 에이전트 확장 역할을 부여합니다(Composer 2 환경에 필요).
variable "build_roles_list" { description = "The list of roles that Composer and Dataflow needs" type = list(string) default = [ "roles/composer.worker", "roles/dataflow.admin", "roles/dataflow.worker", "roles/bigquery.admin", "roles/storage.objectAdmin", "roles/dataflow.serviceAgent", "roles/composer.ServiceAgentV2Ext" ] }
resource "google_project_iam_member" "allbuild" {
project = var.project_id
for_each = toset(var.build_roles_list)
role = each.key
member = "serviceAccount:${google_service_account.etl.email}"
depends_on = [google_project_service.all,google_service_account.etl]
}
resource "google_project_iam_member" "composerAgent" {
project = var.project_id
role = "roles/composer.ServiceAgentV2Ext"
member = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com"
depends_on = [google_project_service.all]
}
Composer 환경 만들기
Airflow는 실행할 많은 마이크로서비스에 의존하므로 Cloud Composer는 워크플로를 실행할 Google Cloud 구성요소를 프로비저닝합니다. 이러한 구성요소를 통칭하여 Cloud Composer 환경이라고 합니다.
# Create Composer environment
resource "google_composer_environment" "example" {
project = var.project_id
name = "example-environment"
region = var.region
config {
software_config {
image_version = "composer-2.0.12-airflow-2.2.3"
env_variables = {
AIRFLOW_VAR_PROJECT_ID = var.project_id
AIRFLOW_VAR_GCE_ZONE = var.zone
AIRFLOW_VAR_BUCKET_PATH = "gs://${var.basename}-${var.project_id}-files"
}
}
node_config {
service_account = google_service_account.etl.name
}
}
depends_on = [google_project_service.all, google_service_account.etl, google_project_iam_member.allbuild, google_project_iam_member.composerAgent]
}
BigQuery 데이터 세트 및 테이블 만들기
처리된 데이터를 저장하고 분석에 사용할 수 있도록 BigQuery에서 데이터 세트 및 테이블을 만듭니다.
resource "google_bigquery_dataset" "weather_dataset" {
project = var.project_id
dataset_id = "average_weather"
location = "US"
depends_on = [google_project_service.all]
}
resource "google_bigquery_table" "weather_table" {
project = var.project_id
dataset_id = google_bigquery_dataset.weather_dataset.dataset_id
table_id = "average_weather"
deletion_protection = false
schema = <<EOF
[
{
"name": "location",
"type": "GEOGRAPHY",
"mode": "REQUIRED"
},
{
"name": "average_temperature",
"type": "INTEGER",
"mode": "REQUIRED"
},
{
"name": "month",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "inches_of_rain",
"type": "NUMERIC",
"mode": "NULLABLE"
},
{
"name": "is_current",
"type": "BOOLEAN",
"mode": "NULLABLE"
},
{
"name": "latest_measurement",
"type": "DATE",
"mode": "NULLABLE"
}
]
EOF
depends_on = [google_bigquery_dataset.weather_dataset]
}
Cloud Storage 버킷 만들기 및 파일 추가
소스 데이터(inputFile.txt), 대상 스키마(jsonSchema.json), 변환에 대한 사용자 정의 함수(transformCSCtoJSON.js)를 포함하여 파이프라인에 필요한 파일을 저장하기 위해 스토리지 버킷을 만듭니다.
# Create Cloud Storage bucket and add files
resource "google_storage_bucket" "pipeline_files" {
project = var.project_number
name = "${var.basename}-${var.project_id}-files"
location = "US"
force_destroy = true
depends_on = [google_project_service.all]
}
resource "google_storage_bucket_object" "json_schema" {
name = "jsonSchema.json"
source = "${path.module}/files/jsonSchema.json"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
resource "google_storage_bucket_object" "input_file" {
name = "inputFile.txt"
source = "${path.module}/files/inputFile.txt"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
resource "google_storage_bucket_object" "transform_CSVtoJSON" {
name = "transformCSVtoJSON.js"
source = "${path.module}/files/transformCSVtoJSON.js"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
DAG 파일 업로드
먼저 데이터 소스를 사용하여 DAG 파일을 추가하는 데 적합한 Cloud Storage 버킷 경로를 결정하고 DAG 파일을 버킷에 추가합니다. DAG 파일은 Airflow가 파이프라인을 조정하기 위한 워크플로, 종속 항목, 일정을 정의합니다.
data "google_composer_environment" "example" {
project = var.project_id
region = var.region
name = google_composer_environment.example.name
depends_on = [google_composer_environment.example]
}
resource "google_storage_bucket_object" "dag_file" {
name = "dags/composer-dataflow-dag.py"
source = "${path.module}/files/composer-dataflow-dag.py"
bucket = replace(replace(data.google_composer_environment.example.config.0.dag_gcs_prefix, "gs://", ""),"/dags","")
depends_on = [google_composer_environment.example, google_storage_bucket.pipeline_files, google_bigquery_table.weather_table]
}
결론
실행이 완료되면 이제 예시에 제공된 데이터로 ETL 작업을 실행하도록 Composer 환경이 설정됩니다. 또한 환경에 맞게 이 솔루션을 수정하거나 확장할 수 있도록 모든 코드가 준비되었습니다.