ETL 流水线

+

架构

ETL 流水线是一种架构,用于使用提取、转换、加载方法运行批量数据处理流水线。此架构由以下组件组成:

  • 用于存储着陆来源数据的 Google Cloud Storage
  • 用于对源数据执行转换的 Dataflow
  • BigQuery 作为转换后数据的目标位置
  • 用于编排 ETL 流程的 Cloud Composer 环境

开始使用

点击以下链接,在 Cloud Shell 中查看源代码的副本。进入该环境后,只需一个命令即可在项目中启动应用的工作副本。

在 Cloud Shell 中打开

在 GitHub 上查看源代码


ETL 流水线组件

ETL 流水线架构会使用多种产品。 以下列出了这些组件,以及有关这些组件的更多信息,包括指向相关视频、产品文档和互动式演示文稿的链接。
视频 文档 演示
BigQuery BigQuery 是一个经济实惠的无服务器多云数据仓库,旨在帮助您将大数据转化成宝贵的业务数据洞见。
Cloud Composer 基于 Apache Airflow 构建的全代管式工作流编排服务。
Cloud Storage Cloud Storage 通过 http(s) 提供文件存储和图片公开传送服务。

脚本

安装脚本使用使用 go 和 Terraform CLI 工具编写的可执行文件,获取一个空项目并在其中安装应用。输出应为一个正常运行的应用和负载均衡 IP 地址的网址。

./main.tf

启用服务

默认情况下,Google Cloud 服务在项目中处于停用状态。如需使用此处的任何解决方案,我们必须启用以下功能:

  • IAM - 管理 Google Cloud 资源的身份和访问权限
  • 存储空间 - 用于在 Google Cloud 上存储和访问数据的服务
  • Dataflow:用于执行各种数据处理模式的托管式服务
  • BigQuery - 用于创建、管理、共享和查询数据的数据平台
  • Composer - 管理 Google Cloud 上的 Apache Airflow 环境
  • 计算 - 虚拟机和网络服务(由 Composer 使用)
variable "gcp_service_list" {
  description = "The list of apis necessary for the project"
  type        = list(string)
  default = [
    "dataflow.googleapis.com",
    "compute.googleapis.com",
    "composer.googleapis.com",
    "storage.googleapis.com",
    "bigquery.googleapis.com",
    "iam.googleapis.com"
  ]
}

resource "google_project_service" "all" {
  for_each           = toset(var.gcp_service_list)
  project            = var.project_number
  service            = each.key
  disable_on_destroy = false
}

创建服务账号

创建供 Composer 和 Dataflow 使用的服务账号。

resource "google_service_account" "etl" {
  account_id   = "etlpipeline"
  display_name = "ETL SA"
  description  = "user-managed service account for Composer and Dataflow"
  project = var.project_id
  depends_on = [google_project_service.all]
}

分配角色

向服务账号授予所需的角色,并向 Cloud Composer 服务代理授予 Cloud Composer v2 API Service Agent Extension 角色(对于 Composer 2 环境,此操作是必需的)。

variable "build_roles_list" { description = "Composer 和 Dataflow 需要的角色列表" type = list(string) default = [ "roles/composer.worker", "roles/dataflow.admin", "roles/dataflow.worker", "roles/bigquery.admin", "roles/storage.objectAdmin", "roles/dataflow.serviceAgent", "roles/composer.ServiceAgentV2Ext" ] }

resource "google_project_iam_member" "allbuild" {
  project    = var.project_id
  for_each   = toset(var.build_roles_list)
  role       = each.key
  member     = "serviceAccount:${google_service_account.etl.email}"
  depends_on = [google_project_service.all,google_service_account.etl]
}

resource "google_project_iam_member" "composerAgent" {
  project    = var.project_id
  role       = "roles/composer.ServiceAgentV2Ext"
  member     = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com"
  depends_on = [google_project_service.all]
}

创建 Composer 环境

Airflow 依赖于许多微服务来运行,因此 Cloud Composer 会预配 Google Cloud 组件来运行工作流。这些组件统称为 Cloud Composer 环境。

# Create Composer environment
resource "google_composer_environment" "example" {
  project   = var.project_id
  name      = "example-environment"
  region    = var.region
  config {

    software_config {
      image_version = "composer-2.0.12-airflow-2.2.3"
      env_variables = {
        AIRFLOW_VAR_PROJECT_ID  = var.project_id
        AIRFLOW_VAR_GCE_ZONE    = var.zone
        AIRFLOW_VAR_BUCKET_PATH = "gs://${var.basename}-${var.project_id}-files"
      }
    }
    node_config {
      service_account = google_service_account.etl.name
    }
  }
  depends_on = [google_project_service.all, google_service_account.etl, google_project_iam_member.allbuild, google_project_iam_member.composerAgent]
}

创建 BigQuery 数据集和表

在 BigQuery 中创建数据集和表,以存储经过处理的数据,并使其可供分析。

resource "google_bigquery_dataset" "weather_dataset" {
  project    = var.project_id
  dataset_id = "average_weather"
  location   = "US"
  depends_on = [google_project_service.all]
}

resource "google_bigquery_table" "weather_table" {
  project    = var.project_id
  dataset_id = google_bigquery_dataset.weather_dataset.dataset_id
  table_id   = "average_weather"
  deletion_protection = false

  schema     = <<EOF
[
  {
    "name": "location",
    "type": "GEOGRAPHY",
    "mode": "REQUIRED"
  },
  {
    "name": "average_temperature",
    "type": "INTEGER",
    "mode": "REQUIRED"
  },
   {
    "name": "month",
    "type": "STRING",
    "mode": "REQUIRED"
  },
   {
    "name": "inches_of_rain",
    "type": "NUMERIC",
    "mode": "NULLABLE"
  },
   {
    "name": "is_current",
    "type": "BOOLEAN",
    "mode": "NULLABLE"
  },
   {
    "name": "latest_measurement",
    "type": "DATE",
    "mode": "NULLABLE"
  }
]
EOF
  depends_on = [google_bigquery_dataset.weather_dataset]
}

创建 Cloud Storage 存储分区并添加文件

创建一个存储分区来存放流水线所需的文件,包括源数据 (inputFile.txt)、目标架构 (jsonSchema.json) 和用于转换的用户定义函数 (transformCSCtoJSON.js)。

# Create Cloud Storage bucket and add files
resource "google_storage_bucket" "pipeline_files" {
  project       = var.project_number
  name          = "${var.basename}-${var.project_id}-files"
  location      = "US"
  force_destroy = true
  depends_on    = [google_project_service.all]
}

resource "google_storage_bucket_object" "json_schema" {
  name       = "jsonSchema.json"
  source     = "${path.module}/files/jsonSchema.json"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "input_file" {
  name       = "inputFile.txt"
  source     = "${path.module}/files/inputFile.txt"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "transform_CSVtoJSON" {
  name       = "transformCSVtoJSON.js"
  source     = "${path.module}/files/transformCSVtoJSON.js"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

上传 DAG 文件

首先使用数据源确定用于添加 DAG 文件的合适 Cloud Storage 存储分区路径,然后将 DAG 文件添加到该存储分区。DAG 文件定义了 Airflow 用于编排流水线的工作流、依赖项和时间表。


data "google_composer_environment" "example" {
  project    = var.project_id
  region     = var.region
  name       = google_composer_environment.example.name
  depends_on = [google_composer_environment.example]
}

resource "google_storage_bucket_object" "dag_file" {
  name       = "dags/composer-dataflow-dag.py"
  source     = "${path.module}/files/composer-dataflow-dag.py"
  bucket     = replace(replace(data.google_composer_environment.example.config.0.dag_gcs_prefix, "gs://", ""),"/dags","")
  depends_on = [google_composer_environment.example, google_storage_bucket.pipeline_files, google_bigquery_table.weather_table]
}

总结

运行完毕后,您现在应该已设置好 Composer 环境,以便对示例中提供的数据运行 ETL 作业。此外,您应该拥有所有代码,以便修改或扩展此解决方案以适应您的环境。