Adicionar e atualizar DAGs

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Nesta página, descrevemos como gerenciar DAGs no Cloud Composer de nuvem.

O Cloud Composer usa um bucket do Cloud Storage para armazenar DAGs do ambiente do Cloud Composer. O ambiente sincroniza DAGs desse bucket com componentes do Airflow, como workers e programadores.

Antes de começar

  • Os DAGs não são fortemente isolados pelo Apache Airflow. Portanto, recomendamos que você tenha ambientes de produção e de teste separados para evitar a interferência de DAGs. Para mais informações, consulte Como testar DAGs.
  • Verifique se a conta tem permissões suficientes para gerenciar DAGs.
  • As alterações no DAG são propagadas para o Airflow dentro de 3 a 5 minutos. Você pode conferir as tarefas na interface da Web do Airflow.

Acessar o bucket do seu ambiente

Para acessar o bucket associado ao ambiente:

Console

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, encontre uma linha com o nome do seu ambiente e, na coluna Pasta de DAGs, clique no link DAGs. O A página Detalhes do bucket é aberta. Ele mostra o conteúdo da pasta /dags no bucket do seu ambiente.

gcloud

A CLI gcloud tem comandos separados para adicionar e excluir DAGs no bucket do ambiente.

Se você quiser interagir com o bucket do seu ambiente, também poderá usar a CLI do Google Cloud. Para conferir o endereço do bucket do seu ambiente, execute o seguinte comando da CLI gcloud:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente;
  • LOCATION pela região em que o ambiente está localizado;

Exemplo:

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

Crie uma solicitação de API environments.get. No recurso Ambiente, no recurso EnvironmentConfig e no recurso dagGcsPrefix, o endereço é o bucket do seu ambiente.

Exemplo:

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

Use a biblioteca google-auth para acessar as credenciais e a biblioteca requests para chamar a API REST.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
response = authed_session.request("GET", environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data["config"]["dagGcsPrefix"])

Adicionar ou atualizar um DAG

Para adicionar ou atualizar um DAG, mova o arquivo .py do Python do DAG para a pasta /dags no bucket do ambiente.

Console

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, encontre uma linha com o nome do seu ambiente e, na coluna Pasta de DAGs, clique no link DAGs. O A página Detalhes do bucket é aberta. Ele mostra o conteúdo da pasta /dags no bucket do seu ambiente.

  3. Clique em Fazer o upload dos arquivos. Em seguida, selecione o arquivo Python .py para o DAG usando a caixa de diálogo do navegador e confirme.

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente;
  • LOCATION pela região em que o ambiente está localizado;
  • LOCAL_FILE_TO_UPLOAD é o arquivo .py do Python para o DAG.

Exemplo:

gcloud composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

Atualizar um DAG com execuções ativas de DAGs

Se você atualizar um DAG que tenha execuções ativas de DAGs:

  • Todas as tarefas em execução são concluídas usando o arquivo DAG original.
  • Todas as tarefas programadas, mas que não estão em execução, usam o arquivo DAG atualizado.
  • Todas as tarefas que não estão mais no arquivo DAG atualizado são marcadas como removidas.

Atualizar DAGs executados com frequência

Depois de fazer upload de um arquivo DAG, o Airflow leva algum tempo para carregar esse arquivo e atualizar o DAG. Se o DAG for executado com frequência, convém garantir que ele use a versão atualizada do arquivo DAG. Para fazer isso, siga estas etapas:

  1. Pause o DAG na IU do Airflow.

  2. Faça o upload de um arquivo DAG atualizado.

  3. Aguarde até ver as atualizações na IU do Airflow. Isso significa que o DAG foi analisado corretamente pelo programador e atualizado no banco de dados do Airflow.

    Se a IU do Airflow exibir os DAGs atualizados, isso não garante que os workers do Airflow tenham a versão atualizada do arquivo. Isso acontece porque os arquivos DAG são sincronizados de forma independente para programadores e workers.

  4. É possível estender o tempo de espera para garantir que o arquivo DAG seja sincronizado com todos os workers no ambiente. A sincronização acontece várias vezes por minuto. Em um ambiente saudável, aguardar cerca de 20 a 30 segundos é o suficiente para que todos os workers sejam sincronizados.

  5. (Opcional) Se você quiser ter certeza de que todos os workers têm a nova versão do arquivo DAG, inspecione os registros de cada worker individual. Para fazer isso, siga estas etapas:

    1. Abra a guia Registros do ambiente no console do Google Cloud.

    2. Acesse Registros do Composer > Infraestrutura >. Sincronização do Cloud Storage e inspeção de registros para cada worker em em seu ambiente. Procure o registro Syncing dags directory mais recente que tem um carimbo de data/hora após o upload do novo arquivo DAG. Se você encontrar um item Finished syncing que o segue, os DAGs serão sincronizados com sucesso nesse worker.

  6. Cancele a pausa do DAG.

Excluir um DAG no ambiente

Para excluir um DAG, remova o arquivo .py do Python relativo ao DAG da pasta /dags do ambiente no bucket do ambiente.

Console

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, encontre uma linha com o nome do ambiente Na coluna DAGs folder, clique no link DAGs. O A página Detalhes do bucket é aberta. Ele mostra o conteúdo da pasta /dags em no bucket do seu ambiente.

  3. Selecione o arquivo DAG, clique em Excluir e confirme a operação.

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente;
  • LOCATION pela região em que o ambiente está localizado;
  • DAG_FILE com o arquivo .py do Python para o DAG.

Exemplo:

gcloud composer environments storage dags delete \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

Remover um DAG da interface do Airflow

Para remover os metadados de um DAG da interface da Web do Airflow:

IU do Airflow

  1. Acesse a interface do Airflow do seu ambiente.
  2. Para o DAG, clique em Excluir DAG.

gcloud

Execute o seguinte comando na CLI gcloud:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente
  • LOCATION pela região em que o ambiente está localizado;
  • DAG_NAME é o nome do DAG a ser excluído.

A seguir