Adicionar e atualizar DAGs

Cloud Composer 1 | Cloud Composer 2

Nesta página, descrevemos como gerenciar DAGs no ambiente do Cloud Composer.

O Cloud Composer usa um bucket do Cloud Storage para armazenar os DAGs do ambiente do Cloud Composer. Seu ambiente sincroniza os DAGs desse bucket com componentes do Airflow, como workers e programadores do Airflow.

Antes de começar

  • Os DAGs não são fortemente isolados pelo Apache Airflow. Portanto, recomendamos que você tenha ambientes de produção e de teste separados para evitar a interferência de DAGs. Para mais informações, consulte Como testar DAGs.
  • Verifique se sua conta tem permissões suficientes para gerenciar DAGs.
  • As alterações no DAG são propagadas para o Airflow em três a cinco minutos. Veja o status da tarefa na interface da Web do Airflow.

Acessar o bucket do ambiente

Para acessar o bucket associado ao ambiente, faça o seguinte:

Console

  1. No Console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, encontre uma linha com o nome do ambiente e, na coluna DAGs, clique no link DAGs. A página Detalhes do bucket é aberta. Ela mostra o conteúdo da pasta /dags no bucket do seu ambiente.

gcloud

A CLI gcloud tem comandos separados para adicionar e excluir DAGs no bucket do seu ambiente.

Se você quiser interagir com o bucket do seu ambiente, use a ferramenta de linha de comando gsutil. Para ver o endereço do bucket do seu ambiente, execute o seguinte comando da CLI do gcloud:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente;
  • LOCATION pela região em que o ambiente está localizado;

Exemplo:

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

Crie uma solicitação de API environments.get. No recurso environment, no recurso environmentConfig, no recurso dagGcsPrefix é o endereço do bucket do ambiente.

Exemplo:

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

Use a biblioteca google-auth para receber credenciais e use a biblioteca requests para chamar a API REST.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=['https://www.googleapis.com/auth/cloud-platform'])
authed_session = google.auth.transport.requests.AuthorizedSession(
    credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
    '/environments/{}').format(project_id, location, composer_environment)
response = authed_session.request('GET', environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data['config']['dagGcsPrefix'])

Adicionar ou atualizar um DAG

Para adicionar ou atualizar um DAG, mova o arquivo .py do Python para o DAG para a pasta /dags no bucket do ambiente.

Console

  1. No Console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, encontre uma linha com o nome do ambiente e, na coluna DAGs, clique no link DAGs. A página Detalhes do bucket é aberta. Ela mostra o conteúdo da pasta /dags no bucket do seu ambiente.

  3. Clique em Enviar arquivos. Em seguida, selecione o arquivo Python .py para o DAG usando a caixa de diálogo do navegador e confirme.

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente;
  • LOCATION pela região em que o ambiente está localizado;
  • LOCAL_FILE_TO_UPLOAD é o arquivo .py do Python para o DAG.

Exemplo:

gcloud beta composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

Atualizar um DAG com execuções ativas de DAGs

Se você atualizar um DAG que tenha execuções ativas de DAGs:

  • Todas as tarefas em execução são concluídas usando o arquivo DAG original.
  • Todas as tarefas programadas, mas que não estão em execução, usam o arquivo DAG atualizado.
  • Todas as tarefas que não estão mais presentes no arquivo DAG atualizado são marcadas como removidas.

Como atualizar DAGs executados com frequência

Depois de fazer upload de um arquivo DAG, o Airflow demora um pouco para carregar esse arquivo e atualizar o DAG. Se o DAG for executado com frequência, talvez ele seja usado na versão atualizada do arquivo. Para fazer isso, siga estas etapas:

  1. Pause o DAG na IU do Airflow.
  2. Faça o upload de um arquivo DAG atualizado.
  3. Aguarde até ver as atualizações na IU do Airflow. Isso significa que o DAG foi analisado corretamente pelo programador e atualizado no banco de dados do Airflow.

    Se a IU do Airflow exibir os DAGs atualizados, isso não garante que os workers do Airflow tenham a versão atualizada do arquivo. Isso acontece porque os arquivos DAG são sincronizados de forma independente para programadores e workers.

  4. É possível estender o tempo de espera para garantir que o arquivo DAG seja sincronizado com todos os workers no ambiente. A sincronização acontece várias vezes por minuto. Em um ambiente saudável, aguardar cerca de 20 a 30 segundos é o suficiente para que todos os workers sejam sincronizados.

  5. (Opcional) Se você quiser ter certeza de que todos os workers têm a nova versão do arquivo DAG, inspecione os registros de cada worker individual. Para fazer isso, siga estas etapas:

    1. Abra a guia Registros do ambiente no Console do Google Cloud.

    2. Acesse Registros do Composer > Infraestrutura > Sincronização do Cloud Storage e inspecione registros para cada worker no seu ambiente. Procure o item de registro Syncing dags directory mais recente que tem um carimbo de data/hora após o upload do novo arquivo DAG. Se você vir um item Finished syncing que o segue, os DAGs serão sincronizados com sucesso nesse worker.

  6. Cancele a pausa do DAG.

Como excluir um DAG

Nesta seção, você verá como excluir um DAG.

Como excluir um DAG no ambiente

Para excluir um DAG, remova o arquivo .py do Python dele na pasta /dags do ambiente no bucket do ambiente.

Console

  1. No Console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, encontre uma linha com o nome do ambiente e, na coluna DAGs, clique no link DAGs. A página Detalhes do bucket é aberta. Ela mostra o conteúdo da pasta /dags no bucket do seu ambiente.

  3. Selecione o arquivo DAG, clique em Excluir e confirme a operação.

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente;
  • LOCATION pela região em que o ambiente está localizado;
  • DAG_FILE pelo arquivo .py do Python para o DAG.

Exemplo:

gcloud beta composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

Como remover um DAG da IU do Airflow

É possível remover um DAG da IU do Airflow nas versões 1.10.0 ou posteriores do Airflow.

Para remover os metadados de um DAG da interface da Web do Airflow:

IU do Airflow

  1. Acesse a IU do Airflow para seu ambiente.
  2. Para o DAG, clique em Excluir DAG.

gcloud

Nas versões do Airflow 1 anteriores à 1.14.0, execute o seguinte comando na CLI do gcloud:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    delete_dag -- DAG_NAME

No Airflow 2, no Airflow 1.14.0 e versões mais recentes, execute o seguinte comando na CLI do gcloud:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente
  • LOCATION pela região em que o ambiente está localizado;
  • DAG_NAME é o nome do DAG a ser excluído.

A seguir