Execute um DAG do Apache Airflow no Cloud Composer 2 (Google Cloud CLI)

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Neste guia de início rápido, mostramos como criar um ambiente do Cloud Composer e execute um DAG do Apache Airflow no Cloud Composer 2.

Antes de começar

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.
  3. To initialize the gcloud CLI, run the following command:

    gcloud init
  4. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  5. Make sure that billing is enabled for your Google Cloud project.

  6. Install the Google Cloud CLI.
  7. To initialize the gcloud CLI, run the following command:

    gcloud init
  8. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  9. Make sure that billing is enabled for your Google Cloud project.

  10. Enable the Cloud Composer API:

    gcloud services enable composer.googleapis.com
  11. Para conseguir as permissões necessárias a fim de concluir o guia de início rápido, peça ao administrador para conceder a você os seguintes papéis do IAM no projeto:

    Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

    Também é possível conseguir as permissões necessárias por meio de papéis personalizados ou de outros papéis predefinidos.

Criar um ambiente

Se este for o primeiro ambiente em seu projeto, então adicionar a conta do agente de serviço do Cloud Composer como um novo principal na conta de serviço do seu ambiente e conceda roles/composer.ServiceAgentV2Ext a ele.

Por padrão, o ambiente usa a conta de serviço padrão do Compute Engine, e o exemplo a seguir mostra como adicionar essa permissão a ela.

# Get current project's project number
PROJECT_NUMBER=$(gcloud projects list \
  --filter="$(gcloud config get-value project)" \
  --format="value(PROJECT_NUMBER)" \
  --limit=1)

# Add the Cloud Composer v2 API Service Agent Extension role
gcloud iam service-accounts add-iam-policy-binding \
    $PROJECT_NUMBER-compute@developer.gserviceaccount.com \
    --member serviceAccount:service-$PROJECT_NUMBER@cloudcomposer-accounts.iam.gserviceaccount.com \
    --role roles/composer.ServiceAgentV2Ext

Crie um novo ambiente chamado example-environment no us-central1. com a versão mais recente do Cloud Composer.

gcloud composer environments create example-environment \
    --location us-central1 \
    --image-version composer-2.9.5-airflow-2.9.1

Criar um arquivo DAG

Um DAG do Airflow é uma coleção de tarefas organizadas que você quer programar e executar. Os DAGs são definidos em arquivos Python padrão.

Neste guia, usamos um exemplo de DAG do Airflow definido no arquivo quickstart.py. O código Python nesse arquivo faz o seguinte:

  1. Cria um DAG, composer_sample_dag. Este DAG é executado todos os dias.
  2. Executa uma tarefa, print_dag_run_conf. A tarefa imprime a configuração da execução do DAG usando o operador bash.

Salve uma cópia do arquivo quickstart.py na sua máquina local:

import datetime

from airflow import models
from airflow.operators import bash

# If you are running Airflow in more than one time zone
# see https://airflow.apache.org/docs/apache-airflow/stable/timezone.html
# for best practices
YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)

default_args = {
    "owner": "Composer Example",
    "depends_on_past": False,
    "email": [""],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": datetime.timedelta(minutes=5),
    "start_date": YESTERDAY,
}

with models.DAG(
    "composer_quickstart",
    catchup=False,
    default_args=default_args,
    schedule_interval=datetime.timedelta(days=1),
) as dag:
    # Print the dag_run id from the Airflow logs
    print_dag_run_conf = bash.BashOperator(
        task_id="print_dag_run_conf", bash_command="echo {{ dag_run.id }}"
    )

Faça upload do arquivo DAG para o bucket do ambiente

Todos os ambientes do Cloud Composer têm um bucket do Cloud Storage associado. Apenas programações do Airflow no Cloud Composer DAGs que estão localizados na pasta /dags neste bucket.

Para programar seu DAG, faça upload do arquivo quickstart.py da máquina local para o pasta /dags do ambiente:

Para fazer upload de quickstart.py com a Google Cloud CLI, execute o seguinte comando em a pasta em que o arquivo quickstart.py está localizado:

gcloud composer environments storage dags import \
--environment example-environment --location us-central1 \
--source quickstart.py

Conferir o DAG

Depois de fazer upload do arquivo DAG, o Airflow faz o seguinte:

  1. Analisa o arquivo DAG enviado por upload. Pode levar alguns minutos para que o DAG fique disponível para o Airflow.
  2. Adiciona o DAG à lista de DAGs disponíveis.
  3. Executa o DAG de acordo com a programação fornecida no arquivo DAG.

Verifique se o DAG é processado sem erros e está disponível no Airflow abrindo a interface do DAG. A interface do DAG é a interface do Cloud Composer para visualização Informações do DAG no console do Google Cloud. O Cloud Composer também oferece Acesso à IU do Airflow, que é uma interface nativa da plataforma interface gráfica do usuário.

  1. Aguarde cerca de cinco minutos para que o Airflow processe o arquivo DAG que você enviou anteriormente e conclua a primeira execução de DAG (explicado mais adiante).

  2. Execute o comando a seguir na Google Cloud CLI. Esse comando executa o comando da CLI do Airflow dags list, que lista DAGs no seu ambiente.

    gcloud composer environments run example-environment \
    --location us-central1 \
    dags list
    
  3. Verifique se o DAG composer_quickstart está listado na saída do comando.

    Exemplo de saída:

    Executing the command: [ airflow dags list ]...
    Command has been started. execution_id=d49074c7-bbeb-4ee7-9b26-23124a5bafcb
    Use ctrl-c to interrupt the command
    dag_id              | filepath              | owner            | paused
    ====================+=======================+==================+=======
    airflow_monitoring  | airflow_monitoring.py | airflow          | False
    composer_quickstart | dag-quickstart-af2.py | Composer Example | False
    

Conferir os detalhes da execução do DAG

Uma única execução de um DAG é chamada de execução de DAG. Airflow imediatamente executa um DAG para o DAG de exemplo porque a data de início no arquivo DAG é definido como ontem. Assim, o Airflow alcança os DAGs especificados cronograma.

O DAG de exemplo contém uma tarefa, print_dag_run_conf, que executa o comando echo no console. Esse comando exibe metainformações sobre o DAG (identificador numérico da execução do DAG).

Execute o comando a seguir na Google Cloud CLI. Esse comando lista as execuções do DAG para o DAG composer_quickstart:

gcloud composer environments run example-environment \
--location us-central1 \
dags list-runs -- --dag-id composer_quickstart

Exemplo de saída:

dag_id              | run_id                                      | state   | execution_date                   | start_date                       | end_date
====================+=============================================+=========+==================================+==================================+=================================
composer_quickstart | scheduled__2024-02-17T15:38:38.969307+00:00 | success | 2024-02-17T15:38:38.969307+00:00 | 2024-02-18T15:38:39.526707+00:00 | 2024-02-18T15:38:42.020661+00:00

A CLI do Airflow não fornece um comando para visualizar os registros de tarefas. Você pode usar Outros métodos para ver registros de tarefas do Airflow: a interface do DAG do Cloud Composer, a interface do Airflow ou o Cloud Logging. Este guia mostra uma maneira de consultar o Cloud Logging para encontrar registros de uma execução de DAG específica.

Execute o comando a seguir na CLI do Google Cloud. Esse comando lê os registros do Cloud Logging para uma execução de DAG específica do DAG composer_quickstart. O argumento --format formata a saída para que apenas o texto da mensagem de registro seja mostrado.

gcloud logging read \
--format="value(textPayload)" \
--order=asc \
"resource.type=cloud_composer_environment \
resource.labels.location=us-central1 \
resource.labels.environment_name=example-environment \
labels.workflow=composer_quickstart \
(labels.\"execution-date\"=\"RUN_ID\")"

Substitua:

  • RUN_ID pelo valor run_id da saída do tasks states-for-dag-run executado anteriormente. Por exemplo, 2024-02-17T15:38:38.969307+00:00.

Exemplo de saída:

...

Starting attempt 1 of 2
Executing <Task(BashOperator): print_dag_run_conf> on 2024-02-17
15:38:38.969307+00:00
Started process 22544 to run task

...

Running command: ['/usr/bin/bash', '-c', 'echo 115746']
Output:
115746

...

Command exited with return code 0
Marking task as SUCCESS. dag_id=composer_quickstart,
task_id=print_dag_run_conf, execution_date=20240217T153838,
start_date=20240218T153841, end_date=20240218T153841
Task exited with return code 0
0 downstream tasks scheduled from follow-on schedule check

Limpar

Para evitar cobranças na sua conta do Google Cloud pelos recursos usados nesta página, exclua o projeto do Google Cloud com esses recursos.

Exclua os recursos usados neste tutorial:

  1. Exclua o ambiente do Cloud Composer:

    1. No console do Google Cloud, acesse a página Ambientes.

      Acessar "Ambientes"

    2. Selecione example-environment e clique em Excluir.

    3. Aguarde até o ambiente ser excluído.

  2. Exclua o bucket do ambiente. A exclusão do ambiente do Cloud Composer não exclui o bucket.

    1. No console do Google Cloud, acesse a página Armazenamento > Navegador.

      Acesse Armazenamento > Navegador

    2. Selecione o bucket do ambiente e clique em Excluir. Por exemplo, ele pode ter o nome us-central1-example-environ-c1616fe8-bucket.

  3. Exclua o disco permanente da fila do Redis do seu ambiente. A exclusão do ambiente do Cloud Composer não remove o disco permanente.

    1. No console do Google Cloud, acesse Compute Engine > Discos.

      Acessar "Discos"

    2. Selecione o disco permanente da fila do Redis do ambiente e clique em Excluir.

      Por exemplo, ele pode ser chamado de pvc-02bc4842-2312-4347-8519-d87bdcd31115. Os discos do Cloud Composer 2 sempre têm o tipo Balanced persistent disk e o tamanho de 2 GB.

A seguir