Adicionar e atualizar DAGs

Mantenha tudo organizado com as coleções Salve e categorize o conteúdo com base nas suas preferências.

Cloud Composer 1 | Cloud Composer 2

Nesta página, descrevemos como gerenciar DAGs no ambiente do Cloud Composer.

O Cloud Composer usa um bucket do Cloud Storage para armazenar DAGs do seu ambiente. Seu ambiente sincroniza DAGs desse bucket com componentes do Airflow, como workers e programadores do Airflow.

Antes de começar

  • Os DAGs não são fortemente isolados pelo Apache Airflow. Portanto, recomendamos que você tenha ambientes de produção e de teste separados para evitar a interferência de DAGs. Para mais informações, consulte Como testar DAGs.
  • Verifique se a conta tem permissões suficientes para gerenciar DAGs.
  • As alterações no DAG são propagadas para o Airflow em três a cinco minutos. É possível ver o status da tarefa na interface da Web do Airflow.

Acesse o bucket do seu ambiente

Para acessar o bucket associado ao ambiente:

Console

  1. No Console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, encontre uma linha com o nome do seu ambiente. Na coluna Pasta do DAGs, clique no link DAGs. A página Detalhes do bucket é aberta. Ela mostra o conteúdo da pasta /dags no bucket do seu ambiente.

gcloud

A CLI gcloud tem comandos separados para adicionar e excluir DAGs no bucket do seu ambiente.

Se quiser interagir com o bucket do seu ambiente, use também a ferramenta de linha de comando gsutil. Para ver o endereço do bucket do seu ambiente, execute o seguinte comando da CLI gcloud:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente;
  • LOCATION pela região em que o ambiente está localizado;

Exemplo:

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

Crie uma solicitação de API environments.get. No recurso Ambiente, no recurso AmbienteConfig, no recurso dagGcsPrefix é o endereço do bucket do ambiente.

Exemplo:

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

Use a biblioteca google-auth para receber credenciais e use a biblioteca requests para chamar a API REST.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=['https://www.googleapis.com/auth/cloud-platform'])
authed_session = google.auth.transport.requests.AuthorizedSession(
    credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
    '/environments/{}').format(project_id, location, composer_environment)
response = authed_session.request('GET', environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data['config']['dagGcsPrefix'])

Adicionar ou atualizar um DAG

Para adicionar ou atualizar um DAG, mova o arquivo .py Python para o DAG para a pasta /dags no bucket do ambiente.

Console

  1. No Console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, encontre uma linha com o nome do seu ambiente. Na coluna Pasta do DAGs, clique no link DAGs. A página Detalhes do bucket é aberta. Ela mostra o conteúdo da pasta /dags no bucket do seu ambiente.

  3. Clique em Fazer o upload dos arquivos. Em seguida, selecione o arquivo .py do Python para o DAG usando a caixa de diálogo do navegador e confirme.

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente;
  • LOCATION pela região em que o ambiente está localizado;
  • LOCAL_FILE_TO_UPLOAD é o arquivo .py do Python para o DAG.

Exemplo:

gcloud beta composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

Atualizar um DAG com execuções ativas de DAGs

Se você atualizar um DAG que tenha execuções ativas de DAGs:

  • Todas as tarefas em execução são concluídas usando o arquivo DAG original.
  • Todas as tarefas programadas, mas que não estão em execução, usam o arquivo DAG atualizado.
  • Todas as tarefas que não estão mais presentes no arquivo DAG atualizado são marcadas como removidas.

Como atualizar DAGs executados com frequência

Depois de fazer upload de um arquivo DAG, o Airflow leva algum tempo para carregar esse arquivo e atualizar o DAG. Se o DAG for executado em uma programação frequente, confira se ele está usando a versão atualizada do arquivo DAG. Para fazer isso, siga estas etapas:

  1. Pause o DAG na IU do Airflow.
  2. Faça o upload de um arquivo DAG atualizado.
  3. Aguarde até ver as atualizações na IU do Airflow. Isso significa que o DAG foi analisado corretamente pelo programador e atualizado no banco de dados do Airflow.

    Se a IU do Airflow exibir os DAGs atualizados, isso não garante que os workers do Airflow tenham a versão atualizada do arquivo. Isso acontece porque os arquivos DAG são sincronizados de forma independente para programadores e workers.

  4. É possível estender o tempo de espera para garantir que o arquivo DAG seja sincronizado com todos os workers no ambiente. A sincronização acontece várias vezes por minuto. Em um ambiente saudável, aguardar cerca de 20 a 30 segundos é o suficiente para que todos os workers sejam sincronizados.

  5. (Opcional) Se você quiser ter certeza de que todos os workers têm a nova versão do arquivo DAG, inspecione os registros de cada worker individual. Para fazer isso, siga estas etapas:

    1. Abra a guia Registros do seu ambiente no Console do Google Cloud.

    2. Acesse Registros do Composer > Infraestrutura > Sincronização do Cloud Storage e inspecione registros para cada worker no seu ambiente. Procure o item de registro Syncing dags directory mais recente que tem um carimbo de data/hora após o upload do novo arquivo DAG. Se você vir um item Finished syncing que o segue, os DAGs serão sincronizados com sucesso nesse worker.

  6. Cancele a pausa do DAG.

Como excluir um DAG

Nesta seção, você verá como excluir um DAG.

Como excluir um DAG no ambiente

Para excluir um DAG, remova o arquivo .py do Python dele em A pasta /dags do ambiente no bucket do ambiente.

Console

  1. No Console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, encontre uma linha com o nome do seu ambiente. Na coluna Pasta do DAGs, clique no link DAGs. A página Detalhes do bucket é aberta. Ele mostra o conteúdo da pasta /dags no bucket do ambiente.

  3. Selecione o arquivo DAG, clique em Excluir e confirme a operação.

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente;
  • LOCATION pela região em que o ambiente está localizado;
  • DAG_FILE pelo arquivo .py do Python para o DAG.

Exemplo:

gcloud beta composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

Como remover um DAG da IU do Airflow

É possível remover um DAG da IU do Airflow no Airflow 1.10.0 ou em versões mais recentes.

Para remover os metadados de um DAG da interface da Web do Airflow:

IU do Airflow

  1. Acesse a IU do Airflow do seu ambiente.
  2. Para o DAG, clique em Excluir DAG.

gcloud

Nas versões do Airflow 1 anteriores à 1.14.0, execute o seguinte comando na CLI gcloud:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    delete_dag -- DAG_NAME

No Airflow 2.1, 14.0 e versões mais recentes, execute o seguinte comando na CLI gcloud:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente
  • LOCATION pela região em que o ambiente está localizado;
  • DAG_NAME é o nome do DAG a ser excluído.

A seguir