La canalización de ETL es una arquitectura para ejecutar canalizaciones de procesamiento de datos por lotes mediante el método de extracción, transformación y carga. Esta arquitectura consta de los siguientes componentes:
- Google Cloud Storage para los datos de origen de destino
- Dataflow para realizar transformaciones en los datos de origen
- BigQuery como destino para los datos transformados
- Entorno de Cloud Composer para organizar el proceso de ETL
Comenzar
Haga clic en el siguiente vínculo para obtener una copia del código fuente en Cloud Shell. Una vez ahí, un solo comando iniciará una copia de la aplicación en funcionamiento en tu proyecto.
Componentes de la canalización de ETL
La arquitectura de canalización de ETL usa varios productos. A continuación, se enumeran los componentes, junto con más información sobre los componentes, incluidos los vínculos a videos relacionados, documentación del producto y explicaciones interactivas.Secuencias de comandos
La secuencia de comandos de instalación usa un ejecutable escrito en go
y las herramientas de la CLI de Terraform para tomar un proyecto vacío y, luego, instalar la aplicación en él. El resultado debe ser una aplicación en funcionamiento y una URL para la dirección IP de balanceo de cargas.
./main.tf
Habilitar servicios
Los servicios de Google Cloud están inhabilitados en un proyecto de forma predeterminada. Para poder usar cualquiera de las soluciones, tenemos que activar lo siguiente:
- IAM: Administra la identidad y el acceso a los recursos de Google Cloud.
- Almacenamiento: Servicio para almacenar y acceder a tus datos en Google Cloud
- Dataflow: Servicio administrado para ejecutar una amplia variedad de patrones de procesamiento de datos
- BigQuery: Plataforma de datos para crear, administrar, compartir y consultar datos
- Composer: Administra entornos de Apache Airflow en Google Cloud.
- Procesamiento: Máquinas virtuales y servicios de herramientas de redes (usados por Composer)
variable "gcp_service_list" {
description = "The list of apis necessary for the project"
type = list(string)
default = [
"dataflow.googleapis.com",
"compute.googleapis.com",
"composer.googleapis.com",
"storage.googleapis.com",
"bigquery.googleapis.com",
"iam.googleapis.com"
]
}
resource "google_project_service" "all" {
for_each = toset(var.gcp_service_list)
project = var.project_number
service = each.key
disable_on_destroy = false
}
Crea una cuenta de servicio
Crea una cuenta de servicio para que Composer y Dataflow la usen.
resource "google_service_account" "etl" {
account_id = "etlpipeline"
display_name = "ETL SA"
description = "user-managed service account for Composer and Dataflow"
project = var.project_id
depends_on = [google_project_service.all]
}
Asignar funciones
Otorga las funciones necesarias a la cuenta de servicio y la función de extensión del agente de servicio de la API de Cloud Composer v2 al agente de servicio de Cloud Composer (obligatorio para los entornos de Composer 2).
variable "build_roles_list" {description = "La lista de funciones que Composer y Dataflow necesitan
resource "google_project_iam_member" "allbuild" {
project = var.project_id
for_each = toset(var.build_roles_list)
role = each.key
member = "serviceAccount:${google_service_account.etl.email}"
depends_on = [google_project_service.all,google_service_account.etl]
}
resource "google_project_iam_member" "composerAgent" {
project = var.project_id
role = "roles/composer.ServiceAgentV2Ext"
member = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com"
depends_on = [google_project_service.all]
}
Crear entorno de Composer
Airflow depende de muchos microservicios para ejecutarse, por lo que Cloud Composer aprovisiona los componentes de Google Cloud a fin de ejecutar sus flujos de trabajo. Estos componentes se conocen en conjunto como entorno de Cloud Composer.
# Create Composer environment
resource "google_composer_environment" "example" {
project = var.project_id
name = "example-environment"
region = var.region
config {
software_config {
image_version = "composer-2.0.12-airflow-2.2.3"
env_variables = {
AIRFLOW_VAR_PROJECT_ID = var.project_id
AIRFLOW_VAR_GCE_ZONE = var.zone
AIRFLOW_VAR_BUCKET_PATH = "gs://${var.basename}-${var.project_id}-files"
}
}
node_config {
service_account = google_service_account.etl.name
}
}
depends_on = [google_project_service.all, google_service_account.etl, google_project_iam_member.allbuild, google_project_iam_member.composerAgent]
}
Crear un conjunto de datos y una tabla de BigQuery
Crea un conjunto de datos y una tabla en BigQuery a fin de conservar los datos procesados y hacer que estén disponibles para estadísticas.
resource "google_bigquery_dataset" "weather_dataset" {
project = var.project_id
dataset_id = "average_weather"
location = "US"
depends_on = [google_project_service.all]
}
resource "google_bigquery_table" "weather_table" {
project = var.project_id
dataset_id = google_bigquery_dataset.weather_dataset.dataset_id
table_id = "average_weather"
deletion_protection = false
schema = <<EOF
[
{
"name": "location",
"type": "GEOGRAPHY",
"mode": "REQUIRED"
},
{
"name": "average_temperature",
"type": "INTEGER",
"mode": "REQUIRED"
},
{
"name": "month",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "inches_of_rain",
"type": "NUMERIC",
"mode": "NULLABLE"
},
{
"name": "is_current",
"type": "BOOLEAN",
"mode": "NULLABLE"
},
{
"name": "latest_measurement",
"type": "DATE",
"mode": "NULLABLE"
}
]
EOF
depends_on = [google_bigquery_dataset.weather_dataset]
}
Crea un bucket de Cloud Storage y agrega archivos
Crea un bucket de almacenamiento que contenga los archivos necesarios para la canalización, incluidos los datos de origen (inputFile.txt), el esquema de destino (jsonSchema.json) y la función definida por el usuario para la transformación (transformCSCtoJSON.js).
# Create Cloud Storage bucket and add files
resource "google_storage_bucket" "pipeline_files" {
project = var.project_number
name = "${var.basename}-${var.project_id}-files"
location = "US"
force_destroy = true
depends_on = [google_project_service.all]
}
resource "google_storage_bucket_object" "json_schema" {
name = "jsonSchema.json"
source = "${path.module}/files/jsonSchema.json"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
resource "google_storage_bucket_object" "input_file" {
name = "inputFile.txt"
source = "${path.module}/files/inputFile.txt"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
resource "google_storage_bucket_object" "transform_CSVtoJSON" {
name = "transformCSVtoJSON.js"
source = "${path.module}/files/transformCSVtoJSON.js"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
Subir archivo DAG
Primero, usa una fuente de datos a fin de determinar la ruta adecuada del bucket de Cloud Storage para agregar el archivo DAG y, luego, agrega los archivos DAG al bucket. El archivo DAG define los flujos de trabajo, las dependencias y los programas para que Airflow organice su canalización.
data "google_composer_environment" "example" {
project = var.project_id
region = var.region
name = google_composer_environment.example.name
depends_on = [google_composer_environment.example]
}
resource "google_storage_bucket_object" "dag_file" {
name = "dags/composer-dataflow-dag.py"
source = "${path.module}/files/composer-dataflow-dag.py"
bucket = replace(replace(data.google_composer_environment.example.config.0.dag_gcs_prefix, "gs://", ""),"/dags","")
depends_on = [google_composer_environment.example, google_storage_bucket.pipeline_files, google_bigquery_table.weather_table]
}
Conclusión
Una vez ejecutado, ahora deberías tener configurado un entorno de Composer para ejecutar trabajos de ETL en los datos que se presentan en el ejemplo. Además, debes tener todo el código para modificar o extender esta solución a fin de que se ajuste a tu entorno.