Executar um DAG do Apache Airflow no Cloud Composer 2

Cloud Composer 1 | Cloud Composer 2

Nesta página, mostramos como criar um ambiente do Cloud Composer e executar um DAG do Apache Airflow no Cloud Composer 2.

Se você não conhece o Airflow, consulte este tutorial para mais informações sobre conceitos, objetos e uso do Airflow.

Antes de começar

  1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  3. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  4. No console do Google Cloud, na página do seletor de projetos, selecione ou crie um projeto do Google Cloud.

    Acessar o seletor de projetos

  5. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  6. Ative a API Cloud Composer.

    Ative a API

Criar um ambiente

Console

  1. No console do Google Cloud, acesse a página Criar ambiente.

    Acessar "Criar ambiente"

  2. Ao criar um ambiente no projeto, se o agente de serviço do Cloud Composer não tiver as permissões necessárias na conta de serviço do ambiente, aparecerá a seção Conceder permissões necessárias à conta de serviço do Cloud Composer.

    Adicione a conta do agente de serviço do Cloud Composer como novo principal na conta de serviço do ambiente e conceda o papel Extensão do agente de serviço da API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext).

    Confirme se você está usando a conta de serviço pretendida para seu ambiente e clique em Conceder.

  3. No campo Nome, use example-environment.

  4. Na lista suspensa Local, selecione uma região para o ambiente do Cloud Composer. Consulte Regiões disponíveis para saber mais sobre como selecionar uma região.

  5. Para outras opções de configuração do ambiente, use os padrões fornecidos.

  6. Para criar o ambiente, clique em Criar.

  7. Aguarde a criação do ambiente. Quando isso acontecer, uma marca de seleção verde será exibida ao lado do nome do ambiente.

gcloud

Adicione a conta do agente de serviço do Cloud Composer como uma nova conta principal na conta de serviço do ambiente e conceda a ela o papel Extensão do agente de serviço da API Cloud Composer v2 (roles/composer.ServiceAgentV2Ext).

Por padrão, seu ambiente usa a conta de serviço padrão do Compute Engine.

# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
  --filter="$(gcloud config get-value project)" \
  --format="value(PROJECT_NUMBER)" \
  --limit=1)

# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
    $PROJECT_NUMBER-compute@developer.gserviceaccount.com \
    --member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
    --role roles/composer.ServiceAgentV2Ext

Crie um ambiente:

gcloud composer environments create ENVIRONMENT_NAME \
  --location LOCATION \
  --image-version IMAGE_VERSION

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente; Neste guia de início rápido, usamos example-environment.
  • LOCATION por uma região para o ambiente do Cloud Composer. Consulte Regiões disponíveis para saber mais sobre como selecionar uma região.
  • IMAGE_VERSION pelo nome da imagem do Cloud Composer. Neste guia, usamos composer-2.5.2-airflow-2.6.3 para criar um ambiente com a imagem mais recente do Cloud Composer 2.

Exemplo:

gcloud composer environments create example-environment \
  --location us-central1 \
  --image-version composer-2.5.2-airflow-2.6.3

Terraform

Para configurar esse ambiente no Terraform, adicione o seguinte bloqueio de recursos à configuração do Terraform e execute terraform apply.

Para utilizar esse bloco de recursos, a conta de serviço que o Terraform usa precisa ter um papel com a permissão composer.environments.create ativada. Para mais informações sobre a conta de serviço do Terraform, consulte a Referência de configuração do provedor do Google.

Para saber mais sobre como usar o Terraform para criar um ambiente do Cloud Composer, consulte a documentação do Terraform.

resource "google_composer_environment" "example" {
  provider = google-beta
  name = "ENVIRONMENT_NAME"
  region = "LOCATION"

  config {
    software_config {
      image_version = "IMAGE_VERSION"
    }
  }
}
  • ENVIRONMENT_NAME pelo nome do ambiente; Neste guia de início rápido, usamos example-environment.

  • LOCATION por uma região para o ambiente do Cloud Composer. Consulte Regiões disponíveis para saber mais sobre como selecionar uma região.

  • IMAGE_VERSION pelo nome da imagem do Cloud Composer. Neste guia, usamos composer-2.5.2-airflow-2.6.3 para criar um ambiente com a imagem mais recente do Cloud Composer 2.

Exemplo:

resource "google_composer_environment" "example" {
  name = "example-environment"
  region = "us-central1"

  config {
    software_config {
      image_version = "composer-2.5.2-airflow-2.6.3"
    }
  }

}

Ver detalhes do ambiente

Após a criação, é possível visualizar as informações do ambiente, como a versão do Cloud Composer, o URL da interface da Web do Airflow e a pasta de DAGs no Cloud Storage.

Para ver as informações do ambiente:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Para ver a página Detalhes do ambiente, clique no nome do ambiente, example-environment.

Criar um DAG

Um Airflow DAG é uma coleção de tarefas organizadas que você quer programar e executar. Os DAGs são definidos em arquivos Python padrão.

O código Python em quickstart.py:

  1. Cria um DAG, composer_sample_dag. O DAG é executado uma vez por dia.
  2. Executa uma tarefa, print_dag_run_conf. A tarefa imprime a configuração da execução do DAG usando o operador bash.

Para criar um DAG, crie uma cópia do arquivo quickstart.py na máquina local.

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Fazer upload do DAG para o Cloud Storage

O Cloud Composer agenda somente os DAGs que estão na pasta /dags no bucket do Cloud Storage no ambiente.

Para programar o DAG, faça upload de quickstart.py da máquina local para a pasta /dags do ambiente.

Console

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Para abrir a pasta /dags, clique no link pasta DAGs de example-environment.

  3. Na página Detalhes do bucket, clique em Fazer upload de arquivos e selecione a cópia local de quickstart.py.

  4. Para fazer upload do arquivo, clique em Abrir.

    Depois de fazer upload, o Cloud Composer adiciona o DAG ao Airflow e programa a execução dele imediatamente. Pode levar alguns minutos para que o DAG seja exibido na interface da Web do Airflow.

gcloud

Para fazer upload de quickstart.py com gcloud, execute o seguinte comando:

gcloud composer environments storage dags import \
--environment example-environment  --location us-central1 \
--source quickstart.py

Visualizar o DAG na IU do Airflow

Todos os ambientes do Cloud Composer têm um servidor da Web que executa essa interface. É possível gerenciar DAGs na interface da Web do Airflow.

Para visualizar o DAG na interface da Web do Airflow:

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Para abrir a interface da Web do Airflow, clique no link do Airflow de example-environment. A IU do Airflow é aberta em uma nova janela do navegador.

  3. Na barra de ferramentas do Airflow, acesse a página DAGs.

  4. Para abrir a página de detalhes do DAG, clique em composer_sample_dag.

    Página "DAGs" na interface do Airflow
    Figura 1. Página "DAGs" na interface do Airflow (clique para ampliar)

    A página do DAG mostra a visualização em árvore, uma representação gráfica das tarefas e dependências do fluxo de trabalho.

    Visualização em árvore do DAG composer_sample_dags
    Figura 2. Visualização em árvore do DAG composer_sample_dags

Ver detalhes da instância da tarefa nos registros do Airflow

O DAG que você programou inclui a tarefa print_dag_run_conf. A tarefa imprime a configuração da execução do DAG, que pode ser vista nos registros do Airflow da instância da tarefa.

Para visualizar os detalhes da instância da tarefa:

  1. Na visualização em árvore do DAG na interface da Web do Airflow, clique em Visualização de gráfico.

    Se você mantiver o ponteiro sobre a tarefa print_dag_run_conf, o status dela será exibido.

    Visualização em árvore do DAG composer_sample_dags
    Figura 3. Status da tarefa print_dag_run_conf
  2. Clique na tarefa print_dag_run_conf.

    No menu de contexto Instância de tarefa, é possível receber metadados e executar algumas ações.

    Menu de contexto da instância da tarefa para a tarefa composer_sample_dags
    Figura 4. Menu de contexto da instância da tarefa para a tarefa composer_sample_dags
  3. No menu de contexto Instância de tarefa, clique em Registro.

  4. No registro, procure Running command: ['bash' para ver a saída do operador bash.

    [2021-10-04 15:27:21,029] {subprocess.py:63} INFO - Running command:
    ['bash', '-c', 'echo 735']
    [2021-10-04 15:27:21,167] {subprocess.py:74} INFO - Output:
    [2021-10-04 15:27:21,168] {subprocess.py:78} INFO - 735
    [2021-10-04 15:27:21,168] {subprocess.py:82} INFO - Command exited with
    return code 0
    

Limpar

Para evitar cobranças na sua conta do Google Cloud pelos recursos usados nesta página, siga estas etapas.

Exclua os recursos usados neste tutorial:

  1. Exclua o ambiente do Cloud Composer:

    1. No console do Google Cloud, acesse a página Ambientes.

      Acessar "Ambientes"

    2. Selecione example-environment e clique em Excluir.

    3. Aguarde até o ambiente ser excluído.

  2. Exclua o bucket do ambiente. A exclusão do ambiente do Cloud Composer não exclui o bucket.

    1. No console do Google Cloud, acesse a página Armazenamento > Navegador.

      Acesse Armazenamento > Navegador

    2. Selecione o bucket do ambiente e clique em Excluir. Por exemplo, ele pode ter o nome us-central1-example-environ-c1616fe8-bucket.

  3. Exclua o disco permanente da fila do Redis do seu ambiente. A exclusão do ambiente do Cloud Composer não remove o disco permanente.

    1. No console do Google Cloud, acesse Compute Engine > Discos.

      Acessar "Discos"

    2. Selecione o disco permanente da fila do Redis do ambiente e clique em Excluir.

      Por exemplo, esse disco pode ser chamado de pvc-02bc4842-2312-4347-8519-d87bdcd31115. Os discos do Cloud Composer 2 sempre têm o tipo Balanced persistent disk e o tamanho de 2 GB.

A seguir