ETL パイプライン

アーキテクチャ

ETL パイプラインは、抽出、変換、読み込みのメソッドを使用してバッチデータ処理パイプラインを実行するためのアーキテクチャです。 このアーキテクチャは、次のコンポーネントで構成されています。

  • Google Cloud Storage(ランディング ソース データ用)
  • ソースデータの変換用の Dataflow
  • 変換済みデータの宛先としての BigQuery
  • ETL プロセスをオーケストレートするための Cloud Composer 環境

使ってみる

Cloud Shell でソースコードのコピーへの次のリンクをクリックします。その後、1 つのコマンドでプロジェクト内のアプリケーションの作業コピーがスピンアップされます。

Cloud Shell で開く

GitHub でソースコードを見る


ETL パイプライン コンポーネント

ETL パイプライン アーキテクチャでは、いくつかのプロダクトを使用しています。以下に、関連動画、プロダクト ドキュメント、インタラクティブ チュートリアルへのリンクを含めた、コンポーネントの詳細を示します。
動画 ドキュメント チュートリアル
BigQuery BigQuery は、ビッグデータから価値あるビジネス分析情報を得るために設計された、サーバーレスで費用対効果に優れたマルチクラウド データ ウェアハウスです。
Cloud Composer Apache Airflow をベースに構築された、フルマネージドのワークフロー オーケストレーション サービス。
Cloud Storage Cloud Storage は、http(s) を介してファイル ストレージと画像の一般公開を提供します。

スクリプト

インストール スクリプトでは、go と Terraform CLI ツールで記述された実行ファイルを使用して、空のプロジェクトを作成し、そこにアプリケーションをインストールします。出力は、機能するアプリケーションとロード バランシング IP アドレスの URL になります。

./main.tf

サービスを有効化する

Google Cloud サービスは、デフォルトではプロジェクトで無効になっています。ここに記載されているいずれかのソリューションを使用するには、以下の対象を有効にする必要があります。

  • IAM - Google Cloud リソースの ID とアクセスを管理します。
  • Storage - Google Cloud でデータの保存とアクセスを行うサービス
  • Dataflow - さまざまなデータ処理パターンの実行に対応したマネージド サービス
  • BigQuery - データを作成、管理、共有、照会するためのデータ プラットフォーム
  • Composer — Google Cloud 上の Apache Airflow 環境を管理します
  • Compute — 仮想マシンとネットワーク サービス(Composer により使用される)
variable "gcp_service_list" {
  description = "The list of apis necessary for the project"
  type        = list(string)
  default = [
    "dataflow.googleapis.com",
    "compute.googleapis.com",
    "composer.googleapis.com",
    "storage.googleapis.com",
    "bigquery.googleapis.com",
    "iam.googleapis.com"
  ]
}

resource "google_project_service" "all" {
  for_each           = toset(var.gcp_service_list)
  project            = var.project_number
  service            = each.key
  disable_on_destroy = false
}

サービス アカウントを作成する

Composer と Dataflow で使用するサービス アカウントを作成します。

resource "google_service_account" "etl" {
  account_id   = "etlpipeline"
  display_name = "ETL SA"
  description  = "user-managed service account for Composer and Dataflow"
  project = var.project_id
  depends_on = [google_project_service.all]
}

ロールを割り当てる

サービス アカウントに必要なロールを付与し、Cloud Composer v2 API サービス エージェント拡張機能のロールを Cloud Composer サービス エージェントに付与します(Composer 2 環境で必要)。

variable "build_roles_list" { description = "Composer と Dataflow で必要なロールのリスト" type = list(string) default = [ "roles/composer.worker", "roles/dataflow.admin", "roles/dataflow.worker", "roles/bigquery.admin", "roles/storage.objectAdmin", "roles/dataflow.serviceAgent", "roles/composer.ServiceAgentV2Ext" ] }

resource "google_project_iam_member" "allbuild" {
  project    = var.project_id
  for_each   = toset(var.build_roles_list)
  role       = each.key
  member     = "serviceAccount:${google_service_account.etl.email}"
  depends_on = [google_project_service.all,google_service_account.etl]
}

resource "google_project_iam_member" "composerAgent" {
  project    = var.project_id
  role       = "roles/composer.ServiceAgentV2Ext"
  member     = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com"
  depends_on = [google_project_service.all]
}

Composer 環境を作成する

Airflow は多くのマイクロサービスで実行されるため、Cloud Composer ではワークフローを実行するための Google Cloud コンポーネントのプロビジョニングが行われます。これらのコンポーネントを総称して Cloud Composer 環境と呼びます。

# Create Composer environment
resource "google_composer_environment" "example" {
  project   = var.project_id
  name      = "example-environment"
  region    = var.region
  config {

    software_config {
      image_version = "composer-2.0.12-airflow-2.2.3"
      env_variables = {
        AIRFLOW_VAR_PROJECT_ID  = var.project_id
        AIRFLOW_VAR_GCE_ZONE    = var.zone
        AIRFLOW_VAR_BUCKET_PATH = "gs://${var.basename}-${var.project_id}-files"
      }
    }
    node_config {
      service_account = google_service_account.etl.name
    }
  }
  depends_on = [google_project_service.all, google_service_account.etl, google_project_iam_member.allbuild, google_project_iam_member.composerAgent]
}

BigQuery データセットとテーブルを作成する

処理されたデータを保持し、分析に利用できるように、BigQuery にデータセットとテーブルを作成します。

resource "google_bigquery_dataset" "weather_dataset" {
  project    = var.project_id
  dataset_id = "average_weather"
  location   = "US"
  depends_on = [google_project_service.all]
}

resource "google_bigquery_table" "weather_table" {
  project    = var.project_id
  dataset_id = google_bigquery_dataset.weather_dataset.dataset_id
  table_id   = "average_weather"
  deletion_protection = false

  schema     = <<EOF
[
  {
    "name": "location",
    "type": "GEOGRAPHY",
    "mode": "REQUIRED"
  },
  {
    "name": "average_temperature",
    "type": "INTEGER",
    "mode": "REQUIRED"
  },
   {
    "name": "month",
    "type": "STRING",
    "mode": "REQUIRED"
  },
   {
    "name": "inches_of_rain",
    "type": "NUMERIC",
    "mode": "NULLABLE"
  },
   {
    "name": "is_current",
    "type": "BOOLEAN",
    "mode": "NULLABLE"
  },
   {
    "name": "latest_measurement",
    "type": "DATE",
    "mode": "NULLABLE"
  }
]
EOF
  depends_on = [google_bigquery_dataset.weather_dataset]
}

Cloud Storage バケットを作成してファイルを追加する

ソースデータ(inputFile.txt)、ターゲット スキーマ(jsonSchema.json)、変換のユーザー定義関数(transformCSCtoJSON.js)など、パイプラインに必要なファイルを保持するストレージ バケットを作成します。

# Create Cloud Storage bucket and add files
resource "google_storage_bucket" "pipeline_files" {
  project       = var.project_number
  name          = "${var.basename}-${var.project_id}-files"
  location      = "US"
  force_destroy = true
  depends_on    = [google_project_service.all]
}

resource "google_storage_bucket_object" "json_schema" {
  name       = "jsonSchema.json"
  source     = "${path.module}/files/jsonSchema.json"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "input_file" {
  name       = "inputFile.txt"
  source     = "${path.module}/files/inputFile.txt"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "transform_CSVtoJSON" {
  name       = "transformCSVtoJSON.js"
  source     = "${path.module}/files/transformCSVtoJSON.js"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

DAG ファイルをアップロードする

まず、データソースを使用して、DAG ファイルを追加するための適切な Cloud Storage バケットパスを決定し、次に DAG ファイルをバケットに追加します。DAG ファイルは、Airflow でパイプラインをオーケストレートするためのワークフロー、依存関係、スケジュールを定義します。


data "google_composer_environment" "example" {
  project    = var.project_id
  region     = var.region
  name       = google_composer_environment.example.name
  depends_on = [google_composer_environment.example]
}

resource "google_storage_bucket_object" "dag_file" {
  name       = "dags/composer-dataflow-dag.py"
  source     = "${path.module}/files/composer-dataflow-dag.py"
  bucket     = replace(replace(data.google_composer_environment.example.config.0.dag_gcs_prefix, "gs://", ""),"/dags","")
  depends_on = [google_composer_environment.example, google_storage_bucket.pipeline_files, google_bigquery_table.weather_table]
}

まとめ

これで、この例に示されているデータに対して ETL ジョブを実行する Composer 環境が作成されました。さらに、環境に合わせてソリューションを変更または拡張するためのコードもすべて用意されています。