Pipeline de ETL

+

Arquitectura

Un flujo de procesamiento de ETL es una arquitectura para ejecutar flujos de procesamiento de datos por lotes mediante el método de extracción, transformación y carga. Esta arquitectura consta de los siguientes componentes:

  • Google Cloud Storage para los datos de origen de landing
  • Dataflow para realizar transformaciones en los datos de origen
  • BigQuery como destino de los datos transformados
  • Entorno de Cloud Composer para orquestar el proceso de ETL

Empezar

Haz clic en el siguiente enlace para acceder a una copia del código fuente en Cloud Shell. Una vez allí, con un solo comando se creará una copia de trabajo de la aplicación en tu proyecto.

Abrir en Cloud Shell

Ver el código fuente en GitHub


Componentes de un flujo de procesamiento ETL

La arquitectura de flujo de procesamiento de ETL usa varios productos. A continuación, se enumeran los componentes y se incluye más información sobre ellos, como enlaces a vídeos relacionados, documentación del producto y tutoriales interactivos.
Vídeo Documentos Guías
BigQuery BigQuery es un almacén de datos multinube rentable y sin servidor, diseñado para transformar el Big Data en información valiosa para tu negocio.
Cloud Composer Servicio totalmente gestionado de orquestación de flujos de trabajo, integrado en Apache Airflow.
Cloud Storage Cloud Storage ofrece almacenamiento de archivos y servicio público de imágenes a través de http(s).

Secuencias de comandos

La secuencia de comandos de instalación usa un ejecutable escrito en go y las herramientas de la CLI de Terraform para tomar un proyecto vacío e instalar la aplicación en él. El resultado debe ser una aplicación que funcione y una URL para la dirección IP de balanceo de carga.

./main.tf

Habilitar servicios

Los servicios de Google Cloud están inhabilitados en un proyecto de forma predeterminada. Para usar cualquiera de las soluciones que se indican a continuación, debemos activar lo siguiente:

  • Gestión de identidades y accesos (IAM): gestiona las identidades y el acceso a los recursos de Google Cloud.
  • Storage: servicio para almacenar datos en Google Cloud y acceder a ellos
  • Dataflow: servicio gestionado para ejecutar una amplia variedad de patrones de procesamiento de datos.
  • BigQuery plataforma de datos para crear, gestionar, compartir y consultar datos
  • Composer: gestiona entornos de Apache Airflow en Google Cloud.
  • Compute: máquinas virtuales y servicios de redes (usados por Composer)
variable "gcp_service_list" {
  description = "The list of apis necessary for the project"
  type        = list(string)
  default = [
    "dataflow.googleapis.com",
    "compute.googleapis.com",
    "composer.googleapis.com",
    "storage.googleapis.com",
    "bigquery.googleapis.com",
    "iam.googleapis.com"
  ]
}

resource "google_project_service" "all" {
  for_each           = toset(var.gcp_service_list)
  project            = var.project_number
  service            = each.key
  disable_on_destroy = false
}

Crear cuenta de servicio

Crea una cuenta de servicio que usarán Composer y Dataflow.

resource "google_service_account" "etl" {
  account_id   = "etlpipeline"
  display_name = "ETL SA"
  description  = "user-managed service account for Composer and Dataflow"
  project = var.project_id
  depends_on = [google_project_service.all]
}

Asignar roles

Concede los roles necesarios a la cuenta de servicio y el rol Extensión de agente de servicio de la API de Cloud Composer v2 al agente de servicio de Cloud Composer (obligatorio para los entornos de Composer 2).

variable "build_roles_list" { description = "The list of roles that Composer and Dataflow needs" type = list(string) default = [ "roles/composer.worker", "roles/dataflow.admin", "roles/dataflow.worker", "roles/bigquery.admin", "roles/storage.objectAdmin", "roles/dataflow.serviceAgent", "roles/composer.ServiceAgentV2Ext" ] }

resource "google_project_iam_member" "allbuild" {
  project    = var.project_id
  for_each   = toset(var.build_roles_list)
  role       = each.key
  member     = "serviceAccount:${google_service_account.etl.email}"
  depends_on = [google_project_service.all,google_service_account.etl]
}

resource "google_project_iam_member" "composerAgent" {
  project    = var.project_id
  role       = "roles/composer.ServiceAgentV2Ext"
  member     = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com"
  depends_on = [google_project_service.all]
}

Crear un entorno de Composer

Airflow depende de muchos microservicios para ejecutarse, por lo que Cloud Composer aprovisiona los componentes de Google Cloud para ejecutar tus flujos de trabajo. Estos componentes se conocen en conjunto como un entorno de Cloud Composer.

# Create Composer environment
resource "google_composer_environment" "example" {
  project   = var.project_id
  name      = "example-environment"
  region    = var.region
  config {

    software_config {
      image_version = "composer-2.0.12-airflow-2.2.3"
      env_variables = {
        AIRFLOW_VAR_PROJECT_ID  = var.project_id
        AIRFLOW_VAR_GCE_ZONE    = var.zone
        AIRFLOW_VAR_BUCKET_PATH = "gs://${var.basename}-${var.project_id}-files"
      }
    }
    node_config {
      service_account = google_service_account.etl.name
    }
  }
  depends_on = [google_project_service.all, google_service_account.etl, google_project_iam_member.allbuild, google_project_iam_member.composerAgent]
}

Crear un conjunto de datos y una tabla de BigQuery

Crea un conjunto de datos y una tabla en BigQuery para almacenar los datos procesados y ponerlos a disposición de las analíticas.

resource "google_bigquery_dataset" "weather_dataset" {
  project    = var.project_id
  dataset_id = "average_weather"
  location   = "US"
  depends_on = [google_project_service.all]
}

resource "google_bigquery_table" "weather_table" {
  project    = var.project_id
  dataset_id = google_bigquery_dataset.weather_dataset.dataset_id
  table_id   = "average_weather"
  deletion_protection = false

  schema     = <<EOF
[
  {
    "name": "location",
    "type": "GEOGRAPHY",
    "mode": "REQUIRED"
  },
  {
    "name": "average_temperature",
    "type": "INTEGER",
    "mode": "REQUIRED"
  },
   {
    "name": "month",
    "type": "STRING",
    "mode": "REQUIRED"
  },
   {
    "name": "inches_of_rain",
    "type": "NUMERIC",
    "mode": "NULLABLE"
  },
   {
    "name": "is_current",
    "type": "BOOLEAN",
    "mode": "NULLABLE"
  },
   {
    "name": "latest_measurement",
    "type": "DATE",
    "mode": "NULLABLE"
  }
]
EOF
  depends_on = [google_bigquery_dataset.weather_dataset]
}

Crear un segmento de Cloud Storage y añadir archivos

Crea un contenedor de almacenamiento para alojar los archivos necesarios para la canalización, incluidos los datos de origen (inputFile.txt), el esquema de destino (jsonSchema.json) y la función definida por el usuario para la transformación (transformCSCtoJSON.js).

# Create Cloud Storage bucket and add files
resource "google_storage_bucket" "pipeline_files" {
  project       = var.project_number
  name          = "${var.basename}-${var.project_id}-files"
  location      = "US"
  force_destroy = true
  depends_on    = [google_project_service.all]
}

resource "google_storage_bucket_object" "json_schema" {
  name       = "jsonSchema.json"
  source     = "${path.module}/files/jsonSchema.json"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "input_file" {
  name       = "inputFile.txt"
  source     = "${path.module}/files/inputFile.txt"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

resource "google_storage_bucket_object" "transform_CSVtoJSON" {
  name       = "transformCSVtoJSON.js"
  source     = "${path.module}/files/transformCSVtoJSON.js"
  bucket     = google_storage_bucket.pipeline_files.name
  depends_on = [google_storage_bucket.pipeline_files]
}

Subir un archivo DAG

Primero usa una fuente de datos para determinar la ruta del segmento de Cloud Storage adecuada para añadir el archivo DAG y, a continuación, añade los archivos DAG al segmento. El archivo DAG define los flujos de trabajo, las dependencias y las programaciones para que Airflow coordine tu canalización.


data "google_composer_environment" "example" {
  project    = var.project_id
  region     = var.region
  name       = google_composer_environment.example.name
  depends_on = [google_composer_environment.example]
}

resource "google_storage_bucket_object" "dag_file" {
  name       = "dags/composer-dataflow-dag.py"
  source     = "${path.module}/files/composer-dataflow-dag.py"
  bucket     = replace(replace(data.google_composer_environment.example.config.0.dag_gcs_prefix, "gs://", ""),"/dags","")
  depends_on = [google_composer_environment.example, google_storage_bucket.pipeline_files, google_bigquery_table.weather_table]
}

Conclusión

Una vez que se haya ejecutado, deberías tener un entorno de Composer configurado para ejecutar tareas de ETL en los datos que se muestran en el ejemplo. Además, debes tener todo el código para modificar o ampliar esta solución y adaptarla a tu entorno.