ETL パイプラインは、抽出、変換、読み込みのメソッドを使用してバッチデータ処理パイプラインを実行するためのアーキテクチャです。 このアーキテクチャは、次のコンポーネントで構成されています。
- ランディング ソースデータ用の Google Cloud Storage
- ソースデータの変換用の Dataflow
- 変換されたデータの宛先としての BigQuery
- ETL プロセスをオーケストレートするための Cloud Composer 環境
使ってみる
Cloud Shell でソースコードのコピーへの次のリンクをクリックします。その後、1 つのコマンドでプロジェクト内のアプリケーションの作業コピーがスピンアップされます。
ETL パイプライン コンポーネント
ETL パイプライン アーキテクチャでは、いくつかのプロダクトを使用しています。以下に、関連動画、プロダクト ドキュメント、インタラクティブ チュートリアルへのリンクを含めた、コンポーネントの詳細を示します。スクリプト
インストール スクリプトでは、go
と Terraform CLI ツールで記述された実行ファイルを使用して、空のプロジェクトを作成し、そこにアプリケーションをインストールします。出力は、機能するアプリケーションとロード バランシング IP アドレスの URL になります。
./main.tf
サービスを有効化する
Google Cloud サービスは、デフォルトではプロジェクトで無効になっています。いずれかのソリューションを使用するには、以下を有効にする必要があります。
- IAM - Google Cloud リソースの ID とアクセスを管理します。
- Storage - Google Cloud でデータの保存とアクセスを行うサービス
- Dataflow - さまざまなデータ処理パターンの実行に対応したマネージド サービス
- BigQuery - データを作成、管理、共有、照会するためのデータ プラットフォーム
- Composer — Google Cloud 上の Apache Airflow 環境を管理します
- Compute — 仮想マシンとネットワーク サービス(Composer により使用される)
variable "gcp_service_list" {
description = "The list of apis necessary for the project"
type = list(string)
default = [
"dataflow.googleapis.com",
"compute.googleapis.com",
"composer.googleapis.com",
"storage.googleapis.com",
"bigquery.googleapis.com",
"iam.googleapis.com"
]
}
resource "google_project_service" "all" {
for_each = toset(var.gcp_service_list)
project = var.project_number
service = each.key
disable_on_destroy = false
}
サービス アカウントを作成する
Composer と Dataflow で使用するサービス アカウントを作成します。
resource "google_service_account" "etl" {
account_id = "etlpipeline"
display_name = "ETL SA"
description = "user-managed service account for Composer and Dataflow"
project = var.project_id
depends_on = [google_project_service.all]
}
ロールを割り当てる
サービス アカウントに必要なロールを付与し、Cloud Composer サービス エージェント(Cloud Composer 2 環境で必要)に Cloud Composer v2 API サービス エージェント拡張機能のロールを付与します。
variable "build_roles_list" { description = "Composer と Dataflow で必要なロールのリスト" type = list(string) default = [ "roles/composer.worker", "roles/dataflow.admin", "roles/dataflow.worker", "roles/bigquery.admin", "roles/storage.objectAdmin", "roles/dataflow.serviceAgent", "roles/composer.ServiceAgentV2Ext" ] }
resource "google_project_iam_member" "allbuild" {
project = var.project_id
for_each = toset(var.build_roles_list)
role = each.key
member = "serviceAccount:${google_service_account.etl.email}"
depends_on = [google_project_service.all,google_service_account.etl]
}
resource "google_project_iam_member" "composerAgent" {
project = var.project_id
role = "roles/composer.ServiceAgentV2Ext"
member = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com"
depends_on = [google_project_service.all]
}
Composer 環境を作成する
Airflow は実行する多くのマイクロサービスに依存するため、Cloud Composer ではワークフローを実行するための Google Cloud コンポーネントがプロビジョニングされます。これらのコンポーネントは、総称して Cloud Composer 環境と呼ばれます。
# Create Composer environment
resource "google_composer_environment" "example" {
project = var.project_id
name = "example-environment"
region = var.region
config {
software_config {
image_version = "composer-2.0.12-airflow-2.2.3"
env_variables = {
AIRFLOW_VAR_PROJECT_ID = var.project_id
AIRFLOW_VAR_GCE_ZONE = var.zone
AIRFLOW_VAR_BUCKET_PATH = "gs://${var.basename}-${var.project_id}-files"
}
}
node_config {
service_account = google_service_account.etl.name
}
}
depends_on = [google_project_service.all, google_service_account.etl, google_project_iam_member.allbuild, google_project_iam_member.composerAgent]
}
BigQuery データセットとテーブルを作成する
処理されたデータを保持して分析に利用できるように、BigQuery にデータセットとテーブルを作成します。
resource "google_bigquery_dataset" "weather_dataset" {
project = var.project_id
dataset_id = "average_weather"
location = "US"
depends_on = [google_project_service.all]
}
resource "google_bigquery_table" "weather_table" {
project = var.project_id
dataset_id = google_bigquery_dataset.weather_dataset.dataset_id
table_id = "average_weather"
deletion_protection = false
schema = <<EOF
[
{
"name": "location",
"type": "GEOGRAPHY",
"mode": "REQUIRED"
},
{
"name": "average_temperature",
"type": "INTEGER",
"mode": "REQUIRED"
},
{
"name": "month",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "inches_of_rain",
"type": "NUMERIC",
"mode": "NULLABLE"
},
{
"name": "is_current",
"type": "BOOLEAN",
"mode": "NULLABLE"
},
{
"name": "latest_measurement",
"type": "DATE",
"mode": "NULLABLE"
}
]
EOF
depends_on = [google_bigquery_dataset.weather_dataset]
}
Cloud Storage バケットを作成してファイルを追加する
ソースデータ(inputFile.txt)、ターゲット スキーマ(jsonSchema.json)、変換のユーザー定義関数(transformCSCtoJSON.js)など、パイプラインに必要なファイルを保持するストレージ バケットを作成します。
# Create Cloud Storage bucket and add files
resource "google_storage_bucket" "pipeline_files" {
project = var.project_number
name = "${var.basename}-${var.project_id}-files"
location = "US"
force_destroy = true
depends_on = [google_project_service.all]
}
resource "google_storage_bucket_object" "json_schema" {
name = "jsonSchema.json"
source = "${path.module}/files/jsonSchema.json"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
resource "google_storage_bucket_object" "input_file" {
name = "inputFile.txt"
source = "${path.module}/files/inputFile.txt"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
resource "google_storage_bucket_object" "transform_CSVtoJSON" {
name = "transformCSVtoJSON.js"
source = "${path.module}/files/transformCSVtoJSON.js"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
DAG ファイルをアップロードする
まず、データソースを使用して、DAG ファイルを追加するための適切な Cloud Storage バケットパスを決定し、次に DAG ファイルをバケットに追加します。DAG ファイルは、Airflow がパイプラインをオーケストレートするためのワークフロー、依存関係、スケジュールを定義します。
data "google_composer_environment" "example" {
project = var.project_id
region = var.region
name = google_composer_environment.example.name
depends_on = [google_composer_environment.example]
}
resource "google_storage_bucket_object" "dag_file" {
name = "dags/composer-dataflow-dag.py"
source = "${path.module}/files/composer-dataflow-dag.py"
bucket = replace(replace(data.google_composer_environment.example.config.0.dag_gcs_prefix, "gs://", ""),"/dags","")
depends_on = [google_composer_environment.example, google_storage_bucket.pipeline_files, google_bigquery_table.weather_table]
}
まとめ
実行すると、例に示すデータに対して ETL ジョブを実行するように Composer 環境が設定されます。さらに、環境に合わせてソリューションを変更または拡張するためのすべてのコードが必要です。