Canalización de ETL

Arquitectura

La canalización ETL es una arquitectura para ejecutar canalizaciones de procesamiento de datos por lotes con el método de extracción, transformación y carga. Esta arquitectura consta de los siguientes componentes:

  • Google Cloud Storage para los datos de origen de destino
  • Dataflow para realizar transformaciones en los datos de origen
  • BigQuery como destino para los datos transformados
  • Entorno de Cloud Composer para organizar el proceso de ETL

Comenzar

Haz clic en el siguiente vínculo para obtener una copia del código fuente en Cloud Shell. Una vez allí, un solo comando iniciará una copia de trabajo de la aplicación en tu proyecto.

Abrir en Cloud Shell

Consulta el código fuente en GitHub


Componentes de la canalización de ETL

La arquitectura de canalización de ETL usa varios productos. A continuación, se enumeran los componentes y más información sobre ellos, además de vínculos a videos relacionados, documentación del producto y explicaciones interactivas.
Video Documentos Explicaciones
BigQuery BigQuery es un almacén de datos sin servidores, rentable y en múltiples nubes diseñado para ayudarte a convertir los macrodatos en estadísticas empresariales valiosas.
Cloud Composer Un servicio de organización de flujos de trabajo totalmente administrado y basado en Apache Airflow.
Cloud Storage Cloud Storage proporciona almacenamiento de archivos y entrega pública de imágenes a través de http(s).

Secuencias de comandos

La secuencia de comandos de instalación usa un archivo ejecutable escrito en go y las herramientas de la CLI de Terraform para tomar un proyecto vacío y, luego, instalar la aplicación en él. El resultado debe ser una aplicación que funcione y una URL para la dirección IP del balanceo de cargas.

./main.tf

Habilitar servicios

Los servicios de Google Cloud están inhabilitados en un proyecto de forma predeterminada. Para poder usar cualquiera de estas soluciones, tenemos que activar lo siguiente:

  • IAM: Administra la identidad y el acceso a los recursos de Google Cloud.
  • Almacenamiento: Servicio para almacenar y acceder a tus datos en Google Cloud
  • Dataflow: Es un servicio administrado para ejecutar una amplia variedad de patrones de procesamiento de datos.
  • BigQuery: Plataforma de datos para crear, administrar, compartir y consultar datos
  • Composer: Administra entornos de Apache Airflow en Google Cloud
  • Procesamiento: Máquinas virtuales y servicios de redes (que usa Composer)
variable "gcp_service_list" {
  description = "The list of apis necessary for the project"
  type        = list(string)
  default = [
    "dataflow.googleapis.com",
    "compute.googleapis.com",
    "composer.googleapis.com",
    "storage.googleapis.com",
    "bigquery.googleapis.com",
    "iam.googleapis.com"
  ]
}

resource "google_project_service" "all" {
  for_each           = toset(var.gcp_service_list)
  project            = var.project_number
  service            = each.key
  disable_on_destroy = false
}

Crea una cuenta de servicio

Crea una cuenta de servicio para que la usen Composer y Dataflow.

resource "google_service_account" "etl" {
  account_id   = "etlpipeline"
  display_name = "ETL SA"
  description  = "user-managed service account for Composer and Dataflow"
  project = var.project_id
  depends_on = [google_project_service.all]
}

Asignar roles

Otorga los roles necesarios a la cuenta de servicio y el rol de extensión de agente de servicio de la API de Cloud Composer v2 al agente de servicio de Cloud Composer (obligatorio para los entornos de Composer 2).

variable "build_roles_list" { description = "The list of roles that Composer and Dataflow needs" type = list(string) default = [ "roles/composer.worker", "roles/dataflow.admin", "roles/dataflow.worker", "roles/bigquery.admin", "roles/storage.objectAdmin", "roles/dataflow.serviceAgent", "Ext} composer.ServiceAgent"

resource "google_project_iam_member" "allbuild" {
  project    = var.project_id
  for_each   = toset(var.build_roles_list)
  role       = each.key
  member     = "serviceAccount:${google_service_account.etl.email}"
  depends_on = [google_project_service.all,google_service_account.etl]
}

resource "google_project_iam_member" "composerAgent" {
  project    = var.project_id
  role       = "roles/composer.ServiceAgentV2Ext"
  member     = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com"
  depends_on = [google_project_service.all]
}

Crea el entorno de Composer

Airflow depende de muchos microservicios para ejecutarse, por lo que Cloud Composer aprovisiona los componentes de Google Cloud para ejecutar sus flujos de trabajo. Estos componentes se conocen colectivamente como entorno de Cloud Composer.

# Create Composer environment
resource "google_composer_environment" "example" {
  project   = var.project_id
  name      = "example-environment"
  region    = var.region
  config {

    software_config {
      image_version = "composer-2.0.12-airflow-2.2.3"
      env_variables = {
        AIRFLOW_VAR_PROJECT_ID  = var.project_id
        AIRFLOW_VAR_GCE_ZONE    = var.zone
        AIRFLOW_VAR_BUCKET_PATH = "gs://${var.basename}-${var.project_id}-files"
      }
    }
    node_config {
      service_account = google_service_account.etl.name
    }
  }
  depends_on = [google_project_service.all, google_service_account.etl, google_project_iam_member.allbuild, google_project_iam_member.composerAgent]
}

Crea un conjunto de datos y una tabla de BigQuery

Crea un conjunto de datos y una tabla en BigQuery para conservar los datos procesados y ponerlos a disposición para las estadísticas.

resource "google_bigquery_dataset" "weather_dataset" {
  project    = var.project_id
  dataset_id = "average_weather"
  location   = "US"
  depends_on = [google_project_service.all]
}

resource "google_bigquery_table" "weather_table" {
  project    = var.project_id
  dataset_id = google_bigquery_dataset.weather_dataset.dataset_id
  table_id   = "average_weather"
  deletion_protection = false

  schema     = <<EOF
[
  {
    "name": "location",
    "type": "GEOGRAPHY",
    "mode": "REQUIRED"
  },
  {
    "name": "average_temperature",
    "type": "INTEGER",
    "mode": "REQUIRED"
  },
   {
    "name": "month",
    "type": "STRING",
    "mode": "REQUIRED"
  },
   {
    "name": "inches_of_rain",
    "type": "NUMERIC",
    "mode": "NULLABLE"
  },
   {
    "name": "is_current",
    "type": "BOOLEAN",
    "mode": "NULLABLE"
  },
   {
    "name": "latest_measurement",
    "type": "DATE",
    "mode": "NULLABLE"
  }
]
EOF
  depends_on = [google_bigquery_dataset.weather_dataset]
}

Crea un bucket de Cloud Storage y agrega archivos

Crea un bucket de almacenamiento que contenga los archivos necesarios para la canalización, incluidos los datos de origen (inputFile.txt), el esquema de destino (jsonSchema.json) y la función definida por el usuario para la transformación (transformCSCtoJSON.js).

# Create Cloud Storage bucket and add files
resource "google_storage_bucket" "pipeline_files" {
  project       = var.project_number
  name          = "${var.basename}-${var.project_id}-files"
  location      = "US"
  force_destroy = true
  depends_on    = [google_project_service.all]
}

resource "google_storage_bucket_object" "json_schema" {
  name       = "jsonSchema.json"
  source     = "${path.module}/files/jsonSchema.json"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "input_file" {
  name       = "inputFile.txt"
  source     = "${path.module}/files/inputFile.txt"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "transform_CSVtoJSON" {
  name       = "transformCSVtoJSON.js"
  source     = "${path.module}/files/transformCSVtoJSON.js"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

Subir archivo DAG

Primero, usa una fuente de datos para determinar la ruta de acceso adecuada del bucket de Cloud Storage para agregar el archivo DAG. Luego, agrega los archivos DAG al bucket. El archivo DAG define los flujos de trabajo, las dependencias y los programas para que Airflow organice tu canalización.


data "google_composer_environment" "example" {
  project    = var.project_id
  region     = var.region
  name       = google_composer_environment.example.name
  depends_on = [google_composer_environment.example]
}

resource "google_storage_bucket_object" "dag_file" {
  name       = "dags/composer-dataflow-dag.py"
  source     = "${path.module}/files/composer-dataflow-dag.py"
  bucket     = replace(replace(data.google_composer_environment.example.config.0.dag_gcs_prefix, "gs://", ""),"/dags","")
  depends_on = [google_composer_environment.example, google_storage_bucket.pipeline_files, google_bigquery_table.weather_table]
}

Conclusión

Una vez que se ejecute, deberías tener configurado un entorno de Composer para ejecutar trabajos de ETL en los datos que se presentan en el ejemplo. Además, debes tener todo el código para modificar o extender esta solución a fin de que se adapte a tu entorno.