Adicionar e atualizar DAGs

Cloud Composer 1 | Cloud Composer 2

Nesta página, descrevemos como gerenciar DAGs no ambiente do Cloud Composer.

O Cloud Composer usa um bucket do Cloud Storage para armazenar os DAGs do ambiente do Cloud Composer. O ambiente sincroniza os DAGs desse bucket com os componentes do Airflow, como workers e programadores.

Antes de começar

  • Os DAGs não são fortemente isolados pelo Apache Airflow. Portanto, recomendamos que você tenha ambientes de produção e de teste separados para evitar a interferência de DAGs. Para mais informações, consulte Como testar DAGs.
  • Verifique se a conta tem permissões suficientes para gerenciar DAGs.
  • As alterações no DAG são propagadas para o Airflow dentro de 3 a 5 minutos. É possível ver o status da tarefa na interface da Web do Airflow.

Acesse o bucket do seu ambiente

Para acessar o bucket associado ao ambiente:

Console

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, encontre uma linha com o nome do ambiente e, na coluna da pasta de DAGs, clique no link DAGs. A página Detalhes do bucket é aberta. Ele mostra o conteúdo da pasta /dags no bucket do ambiente.

gcloud

CLI gcloud tem comandos separados para adicionar e excluir DAGs no bucket do ambiente.

Se você quiser interagir com o bucket do ambiente, também é possível usar a ferramenta de linha de comando gsutil. Para saber o endereço do bucket do ambiente, execute o seguinte comando da CLI gcloud:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente;
  • LOCATION pela região em que o ambiente está localizado;

Exemplo:

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

Crie uma solicitação de API environments.get. No recurso Ambiente, no recurso EnvironmentConfig, no recurso dagGcsPrefix é o endereço do bucket do seu ambiente.

Exemplo:

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

Use a biblioteca google-auth para receber credenciais e use a biblioteca requests para chamar a API REST.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
response = authed_session.request("GET", environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data["config"]["dagGcsPrefix"])

Adicionar ou atualizar um DAG

Para adicionar ou atualizar um DAG, mova o arquivo .py do Python do DAG para a pasta /dags no bucket do ambiente.

Console

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, encontre uma linha com o nome do ambiente e, na coluna da pasta de DAGs, clique no link DAGs. A página Detalhes do bucket é aberta. Ele mostra o conteúdo da pasta /dags no bucket do ambiente.

  3. Clique em Fazer o upload dos arquivos. Em seguida, selecione o arquivo Python .py para o DAG usando a caixa de diálogo do navegador e confirme.

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente;
  • LOCATION pela região em que o ambiente está localizado;
  • LOCAL_FILE_TO_UPLOAD é o arquivo .py do Python para o DAG.

Exemplo:

gcloud composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

Atualizar um DAG com execuções ativas de DAGs

Se você atualizar um DAG que tenha execuções ativas de DAGs:

  • Todas as tarefas em execução são concluídas usando o arquivo DAG original.
  • Todas as tarefas programadas, mas que não estão em execução, usam o arquivo DAG atualizado.
  • Todas as tarefas que não estão mais presentes no arquivo DAG atualizado são marcadas como removidas.

Atualizar DAGs executados em uma programação frequente

Depois de fazer upload de um arquivo DAG, leva algum tempo para que o Airflow carregue esse arquivo e atualize o DAG. Se o DAG for executado com frequência, verifique se ele usa a versão atualizada do arquivo. Para fazer isso, siga estas etapas:

  1. Pause o DAG na IU do Airflow.

  2. Faça o upload de um arquivo DAG atualizado.

  3. Aguarde até ver as atualizações na IU do Airflow. Isso significa que o DAG foi analisado corretamente pelo programador e atualizado no banco de dados do Airflow.

    Se a IU do Airflow exibir os DAGs atualizados, isso não garante que os workers do Airflow tenham a versão atualizada do arquivo. Isso acontece porque os arquivos DAG são sincronizados de forma independente para programadores e workers.

  4. É possível estender o tempo de espera para garantir que o arquivo DAG seja sincronizado com todos os workers no ambiente. A sincronização acontece várias vezes por minuto. Em um ambiente saudável, aguardar cerca de 20 a 30 segundos é o suficiente para que todos os workers sejam sincronizados.

  5. (Opcional) Se você quiser ter certeza de que todos os workers têm a nova versão do arquivo DAG, inspecione os registros de cada worker individual. Para fazer isso, siga estas etapas:

    1. Abra a guia Registros do ambiente no console do Google Cloud.

    2. Acesse Registros do Composer > Infraestrutura > item Sincronização do Cloud Storage e inspecione os registros de cada worker no ambiente. Procure o item de registro Syncing dags directory mais recente que tenha um carimbo de data/hora após o upload do novo arquivo DAG. Se você vir um item Finished syncing que o siga, os DAGs serão sincronizados com sucesso nesse worker.

  6. Cancele a pausa do DAG.

Excluir um DAG do ambiente

Para excluir um DAG, remova o arquivo .py do Python dele da pasta /dags do ambiente no bucket dele.

Console

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, encontre uma linha com o nome do ambiente e, na coluna da pasta de DAGs, clique no link DAGs. A página Detalhes do bucket é aberta. Ele mostra o conteúdo da pasta /dags no bucket do ambiente.

  3. Selecione o arquivo DAG, clique em Excluir e confirme a operação.

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente;
  • LOCATION pela região em que o ambiente está localizado;
  • DAG_FILE pelo arquivo .py do Python para o DAG.

Exemplo:

gcloud composer environments storage dags delete \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

Remover um DAG da interface do Airflow

Para remover os metadados de um DAG da interface da Web do Airflow:

IU do Airflow

  1. Acesse a interface do Airflow do seu ambiente.
  2. No DAG, clique em Excluir DAG.

gcloud

Nas versões do Airflow 1 anteriores à 1.14.0, execute o seguinte comando na CLI gcloud:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    delete_dag -- DAG_NAME

No Airflow 2, Airflow 1.14.0 e versões mais recentes, execute o seguinte comando na CLI gcloud:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente
  • LOCATION pela região em que o ambiente está localizado;
  • DAG_NAME é o nome do DAG a ser excluído.

A seguir