Execute um DAG do Apache Airflow no Cloud Composer 2 (Google Cloud CLI)

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Neste guia de início rápido, mostramos como criar um ambiente do Cloud Composer e executar um DAG do Apache Airflow no Cloud Composer 2.

Antes de começar

  1. Faça login na sua conta do Google Cloud. Se você começou a usar o Google Cloud agora, crie uma conta para avaliar o desempenho de nossos produtos em situações reais. Clientes novos também recebem US$ 300 em créditos para executar, testar e implantar cargas de trabalho.
  2. Instale a CLI do Google Cloud.
  3. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  4. Crie ou selecione um projeto do Google Cloud.

    • Crie um projeto do Google Cloud:

      gcloud projects create PROJECT_ID

      Substitua PROJECT_ID por um nome para o projeto do Google Cloud que você está criando.

    • Selecione o projeto do Google Cloud que você criou:

      gcloud config set project PROJECT_ID

      Substitua PROJECT_ID pelo nome do projeto do Google Cloud.

  5. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  6. Instale a CLI do Google Cloud.
  7. Para inicializar a CLI gcloud, execute o seguinte comando:

    gcloud init
  8. Crie ou selecione um projeto do Google Cloud.

    • Crie um projeto do Google Cloud:

      gcloud projects create PROJECT_ID

      Substitua PROJECT_ID por um nome para o projeto do Google Cloud que você está criando.

    • Selecione o projeto do Google Cloud que você criou:

      gcloud config set project PROJECT_ID

      Substitua PROJECT_ID pelo nome do projeto do Google Cloud.

  9. Verifique se a cobrança está ativada para o seu projeto do Google Cloud.

  10. Ative a API Cloud Composer:

    gcloud services enable composer.googleapis.com
  11. Para conseguir as permissões necessárias a fim de concluir o guia de início rápido, peça ao administrador para conceder a você os seguintes papéis do IAM no projeto:

    Para mais informações sobre como conceder papéis, consulte Gerenciar acesso.

    Também é possível receber as permissões necessárias com papéis personalizados ou outros papéis predefinidos.

Criar um ambiente

Se este for o primeiro ambiente no seu projeto, adicione a conta do agente de serviço do Cloud Composer como um novo principal na conta de serviço do ambiente e conceda o papel roles/composer.ServiceAgentV2Ext a ela.

Por padrão, seu ambiente usa a conta de serviço padrão do Compute Engine. O exemplo a seguir mostra como adicionar essa permissão a ela.

# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
  --filter="$(gcloud config get-value project)" \
  --format="value(PROJECT_NUMBER)" \
  --limit=1)

# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
    $PROJECT_NUMBER-compute@developer.gserviceaccount.com \
    --member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
    --role roles/composer.ServiceAgentV2Ext

Crie um novo ambiente chamado example-environment na região us-central1, com a versão mais recente do Cloud Composer.

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-2.8.3-airflow-2.7.3

Criar um arquivo DAG

Um Airflow DAG é uma coleção de tarefas organizadas que você quer programar e executar. Os DAGs são definidos em arquivos Python padrão.

Neste guia, usamos um exemplo de DAG do Airflow definido no arquivo quickstart.py. O código Python nesse arquivo faz o seguinte:

  1. Cria um DAG, composer_sample_dag. Este DAG é executado todos os dias.
  2. Executa uma tarefa, print_dag_run_conf. A tarefa imprime a configuração da execução do DAG usando o operador bash.

Salve uma cópia do arquivo quickstart.py na sua máquina local:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Faça upload do arquivo DAG para o bucket do ambiente

Todo ambiente do Cloud Composer tem um bucket do Cloud Storage associado a ele. O Airflow no Cloud Composer programa apenas os DAGs localizados na pasta /dags nesse bucket.

Para programar seu DAG, faça upload de quickstart.py da máquina local para a pasta /dags do ambiente:

Para fazer upload de quickstart.py com a Google Cloud CLI, execute o seguinte comando na pasta em que o arquivo quickstart.py está localizado:

gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py

Visualizar o DAG

Depois de fazer upload do arquivo DAG, o Airflow faz o seguinte:

  1. Analisa o arquivo DAG que você enviou. Pode levar alguns minutos para que o DAG fique disponível para o Airflow.
  2. Adiciona o DAG à lista de DAGs disponíveis.
  3. Executa o DAG de acordo com a programação que você forneceu no arquivo DAG.

Para verificar se o DAG foi processado sem erros e está disponível no Airflow, visualize-o na interface do DAG. A interface do DAG é a interface do Cloud Composer para visualizar as informações do DAG no console do Google Cloud. O Cloud Composer também fornece acesso à IU do Airflow, que é uma interface da Web nativa do Airflow.

  1. Aguarde cerca de cinco minutos para dar tempo ao Airflow para processar o arquivo DAG enviado anteriormente e concluir a primeira execução do DAG (explicada mais tarde).

  2. Execute o comando a seguir na Google Cloud CLI. Esse comando executa o comando dags list da CLI do Airflow que lista os DAGs no ambiente.

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list
    
  3. Verifique se o DAG composer_quickstart está listado na saída do comando.

    Exemplo de saída:

    Executing the command: [ airflow dags list ]...
    Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
    Use ctrl-c to interrupt the command
    dag_id              | filepath              | owner            | paused
    ====================+=======================+==================+=======
    airflow_monitoring  | airflow_monitoring.py | airflow          | False
    composer_quickstart | dag-quickstart-af2.py | Composer Example | False
    

Mais detalhes da execução do DAG

Uma única execução de um DAG é chamada de execução de DAG. O Airflow executa imediatamente uma execução do DAG para o DAG de exemplo porque a data de início no arquivo DAG está definida como ontem. Dessa forma, o Airflow alcança a programação do DAG especificado.

O DAG de exemplo contém uma tarefa, print_dag_run_conf, que executa o comando echo no console. Esse comando exibe metainformações sobre o DAG (identificador numérico de execução do DAG).

Execute o comando a seguir na Google Cloud CLI. Este comando lista as execuções de DAG para o DAG composer_quickstart:

gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart

Exemplo de saída:

dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00

A CLI do Airflow não fornece um comando para visualizar os registros de tarefas. É possível usar outros métodos para ver os registros de tarefas do Airflow: a interface do DAG do Cloud Composer, a interface do Airflow ou o Cloud Logging. Neste guia, mostramos uma maneira de consultar o Cloud Logging para encontrar registros de uma execução específica de DAG.

Execute o comando a seguir na Google Cloud CLI. Esse comando lê registros do Cloud Logging para uma execução específica do DAG composer_quickstart. O argumento --format formata a saída para que apenas o texto da mensagem de registro seja exibido.

gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"

Substitua:

  • RUN_ID pelo valor run_id da saída do comando tasks states-for-dag-run executado anteriormente. Por exemplo, 2024-02-17T15:38:38.969307+00:00.

Exemplo de saída:

...

Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task

...

Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746

...

Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check

Limpar

Para evitar cobranças na sua conta do Google Cloud pelos recursos usados nesta página, exclua o projeto do Google Cloud com esses recursos.

Exclua os recursos usados neste tutorial:

  1. Exclua o ambiente do Cloud Composer:

    1. No console do Google Cloud, acesse a página Ambientes.

      Acessar "Ambientes"

    2. Selecione example-environment e clique em Excluir.

    3. Aguarde até o ambiente ser excluído.

  2. Exclua o bucket do ambiente. A exclusão do ambiente do Cloud Composer não exclui o bucket.

    1. No console do Google Cloud, acesse a página Armazenamento > Navegador.

      Acesse Armazenamento > Navegador

    2. Selecione o bucket do ambiente e clique em Excluir. Por exemplo, ele pode ter o nome us-central1-example-environ-c1616fe8-bucket.

  3. Exclua o disco permanente da fila do Redis do seu ambiente. A exclusão do ambiente do Cloud Composer não remove o disco permanente.

    1. No console do Google Cloud, acesse Compute Engine > Discos.

      Acessar "Discos"

    2. Selecione o disco permanente da fila do Redis do ambiente e clique em Excluir.

      Por exemplo, esse disco pode ser chamado de pvc-02bc4842-2312-4347-8519-d87bdcd31115. Os discos do Cloud Composer 2 sempre têm o tipo Balanced persistent disk e o tamanho de 2 GB.

A seguir