O pipeline ETL é uma arquitetura para executar pipelines de processamento de dados em lote usando o método de extração, transformação e carregamento. Essa arquitetura consiste nos seguintes componentes:
- Google Cloud Storage para dados de origem da página de destino
- Dataflow para realizar transformações nos dados de origem
- BigQuery como destino dos dados transformados
- Ambiente do Cloud Composer para orquestrar o processo de ETL
Primeiros passos
Clique no link a seguir para acessar uma cópia do código-fonte no Cloud Shell. Depois disso, um único comando vai criar uma cópia funcional do aplicativo no projeto.
Acessar o código-fonte no GitHub
Componentes do pipeline de ETL
A arquitetura do pipeline ETL usa vários produtos. A lista a seguir mostra os componentes e mais informações sobre eles, incluindo links para vídeos relacionados, documentação do produto e tutoriais interativos.Scripts
O script de instalação usa um executável escrito em go
e ferramentas da CLI do Terraform para
instalar o aplicativo em um projeto vazio. A saída deve ser um
aplicativo em funcionamento e um URL para o endereço IP do balanceamento de carga.
./main.tf
Ativar serviços
Os serviços do Google Cloud ficam desativados por padrão em um projeto. Para usar qualquer uma das soluções aqui, precisamos ativar o seguinte:
- IAM: gerencia a identidade e o acesso de recursos do Google Cloud.
- Armazenamento: serviço para armazenar e acessar seus dados no Google Cloud
- Dataflow: serviço gerenciado para executar uma ampla variedade de padrões de processamento de dados.
- BigQuery: plataforma de dados para criar, gerenciar, compartilhar e consultar dados
- Composer: gerencia ambientes do Apache Airflow no Google Cloud.
- Compute: máquinas virtuais e serviços de rede (usados pelo Composer)
variable "gcp_service_list" {
description = "The list of apis necessary for the project"
type = list(string)
default = [
"dataflow.googleapis.com",
"compute.googleapis.com",
"composer.googleapis.com",
"storage.googleapis.com",
"bigquery.googleapis.com",
"iam.googleapis.com"
]
}
resource "google_project_service" "all" {
for_each = toset(var.gcp_service_list)
project = var.project_number
service = each.key
disable_on_destroy = false
}
Criar conta de serviço
Cria uma conta de serviço para ser usada pelo Composer e pelo Dataflow.
resource "google_service_account" "etl" {
account_id = "etlpipeline"
display_name = "ETL SA"
description = "user-managed service account for Composer and Dataflow"
project = var.project_id
depends_on = [google_project_service.all]
}
Atribuir papéis
Concede os papéis necessários à conta de serviço e concede a função de extensão do agente de serviço da API Cloud Composer v2 ao agente de serviço do Cloud Composer (obrigatório para ambientes do Composer 2).
variable "build_roles_list" { description = "The list of roles that Composer and Dataflow needs" type = list(string) default = [ "roles/composer.worker", "roles/dataflow.admin", "roles/dataflow.worker", "roles/bigquery.admin", "roles/storage.objectAdmin", "roles/dataflow.serviceAgent", "roles/composer.ServiceAgentV2Ext" ] }
resource "google_project_iam_member" "allbuild" {
project = var.project_id
for_each = toset(var.build_roles_list)
role = each.key
member = "serviceAccount:${google_service_account.etl.email}"
depends_on = [google_project_service.all,google_service_account.etl]
}
resource "google_project_iam_member" "composerAgent" {
project = var.project_id
role = "roles/composer.ServiceAgentV2Ext"
member = "serviceAccount:service-${var.project_number}@cloudcomposer-accounts.iam.gserviceaccount.com"
depends_on = [google_project_service.all]
}
Criar o ambiente do Composer
O Airflow depende de muitos microsserviços para serem executados. Por isso, o Cloud Composer provisiona os componentes do Google Cloud para executar os fluxos de trabalho. Esses componentes são conhecidos coletivamente como um ambiente do Cloud Composer.
# Create Composer environment
resource "google_composer_environment" "example" {
project = var.project_id
name = "example-environment"
region = var.region
config {
software_config {
image_version = "composer-2.0.12-airflow-2.2.3"
env_variables = {
AIRFLOW_VAR_PROJECT_ID = var.project_id
AIRFLOW_VAR_GCE_ZONE = var.zone
AIRFLOW_VAR_BUCKET_PATH = "gs://${var.basename}-${var.project_id}-files"
}
}
node_config {
service_account = google_service_account.etl.name
}
}
depends_on = [google_project_service.all, google_service_account.etl, google_project_iam_member.allbuild, google_project_iam_member.composerAgent]
}
Criar um conjunto de dados e uma tabela do BigQuery
Cria um conjunto de dados e uma tabela no BigQuery para armazenar os dados processados e disponibilizá-los para análises.
resource "google_bigquery_dataset" "weather_dataset" {
project = var.project_id
dataset_id = "average_weather"
location = "US"
depends_on = [google_project_service.all]
}
resource "google_bigquery_table" "weather_table" {
project = var.project_id
dataset_id = google_bigquery_dataset.weather_dataset.dataset_id
table_id = "average_weather"
deletion_protection = false
schema = <<EOF
[
{
"name": "location",
"type": "GEOGRAPHY",
"mode": "REQUIRED"
},
{
"name": "average_temperature",
"type": "INTEGER",
"mode": "REQUIRED"
},
{
"name": "month",
"type": "STRING",
"mode": "REQUIRED"
},
{
"name": "inches_of_rain",
"type": "NUMERIC",
"mode": "NULLABLE"
},
{
"name": "is_current",
"type": "BOOLEAN",
"mode": "NULLABLE"
},
{
"name": "latest_measurement",
"type": "DATE",
"mode": "NULLABLE"
}
]
EOF
depends_on = [google_bigquery_dataset.weather_dataset]
}
Criar um bucket do Cloud Storage e adicionar arquivos
Cria um bucket de armazenamento para armazenar os arquivos necessários para o pipeline, incluindo os dados de origem (inputFile.txt), o esquema de destino (jsonSchema.json) e a função definida pelo usuário para a transformação (transformCSCtoJSON.js).
# Create Cloud Storage bucket and add files
resource "google_storage_bucket" "pipeline_files" {
project = var.project_number
name = "${var.basename}-${var.project_id}-files"
location = "US"
force_destroy = true
depends_on = [google_project_service.all]
}
resource "google_storage_bucket_object" "json_schema" {
name = "jsonSchema.json"
source = "${path.module}/files/jsonSchema.json"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
resource "google_storage_bucket_object" "input_file" {
name = "inputFile.txt"
source = "${path.module}/files/inputFile.txt"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
resource "google_storage_bucket_object" "transform_CSVtoJSON" {
name = "transformCSVtoJSON.js"
source = "${path.module}/files/transformCSVtoJSON.js"
bucket = google_storage_bucket.pipeline_files.name
depends_on = [google_storage_bucket.pipeline_files]
}
Fazer upload do arquivo DAG
Primeiro, ele usa uma fonte de dados para determinar o caminho do bucket do Cloud Storage adequado para adicionar o arquivo DAG e, em seguida, adiciona os arquivos DAG ao bucket. O arquivo DAG define os fluxos de trabalho, as dependências e as programações para que o Airflow orquestre seu pipeline.
data "google_composer_environment" "example" {
project = var.project_id
region = var.region
name = google_composer_environment.example.name
depends_on = [google_composer_environment.example]
}
resource "google_storage_bucket_object" "dag_file" {
name = "dags/composer-dataflow-dag.py"
source = "${path.module}/files/composer-dataflow-dag.py"
bucket = replace(replace(data.google_composer_environment.example.config.0.dag_gcs_prefix, "gs://", ""),"/dags","")
depends_on = [google_composer_environment.example, google_storage_bucket.pipeline_files, google_bigquery_table.weather_table]
}
Conclusão
Depois de executar, você terá um ambiente do Composer configurado para executar jobs de ETL nos dados apresentados no exemplo. Além disso, você precisa ter todo o código para modificar ou estender essa solução para se adequar ao seu ambiente.