Como adicionar e atualizar DAGs (fluxos de trabalho)

Esta página descreve como determinar o bucket de armazenamento do ambiente e como adicionar, atualizar e excluir um DAG do ambiente.

O Cloud Composer usa o Cloud Storage para armazenar DAGs do Apache Airflow, também conhecidos como fluxos de trabalho. Cada ambiente tem um bucket associado do Cloud Storage. O Cloud Composer programa somente os DAGs no bucket do Cloud Storage.

Antes de começar

  • Os DAGs não são fortemente isolados pelo Apache Airflow. Portanto, recomendamos que você tenha ambientes de produção e de teste separados para evitar a interferência de DAGs. Para mais informações, consulte Como testar DAGs.
  • As permissões a seguir são necessárias para adicionar e atualizar plug-ins no bucket do Cloud Storage do ambiente do Cloud Composer:
    • storage.objectAdmin para fazer upload de arquivos.
    • composer.environments.get para pesquisar o bucket de destino do DAG. Essa permissão não é necessária ao usar a API Cloud Storage ou gsutil.
  • As alterações no DAG levam de 3 a 5 minutos para serem feitas. É possível ver o status da tarefa na interface da Web do Airflow.

Determinar o nome do bucket de armazenamento

Para determinar o nome do bucket de armazenamento associado ao ambiente:

Console

  1. Abra a página "Ambientes" no Console do Cloud.
    Abra a página "Ambientes"
  2. Na coluna Nome, clique no nome do ambiente para abrir a página de detalhes do ambiente.

    Na guia Configuração, o nome do bucket do Cloud Storage é mostrado à direita da pasta DAGs.

  3. (Opcional) Para ver o bucket no Cloud Storage, clique no nome do bucket.

gcloud

Digite o seguinte comando:

gcloud composer environments describe ENVIRONMENT_NAME \
  --location LOCATION \
  --format="get(config.dagGcsPrefix)"

em que:

  • ENVIRONMENT_NAME é o nome do ambiente;
  • LOCATION é a região do Compute Engine em que o ambiente está localizado.
  • --format é uma opção para especificar apenas a propriedade dagGcsPrefix, em vez de todos os detalhes do ambiente.

A propriedade dagGcsPrefix mostra o nome do bucket:

gs://region-environment_name-random_id-bucket/

rest

Para mais informações sobre credenciais, consulte Autenticação do Cloud Composer.

  1. Chame o método projects.locations.environments.get.
  2. Leia config.dagGcsPrefix da resposta Ambiente.

rpc

Para mais informações sobre credenciais, consulte Autenticação do Cloud Composer.

  1. Chame o método Environments.GetEnvironment.
  2. Leia o config.dag_gcs_prefix da resposta Ambiente.

python

Use a biblioteca google-auth para acessar as credenciais e usar a biblioteca solicitações para chamar a API REST.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=['https://www.googleapis.com/auth/cloud-platform'])
authed_session = google.auth.transport.requests.AuthorizedSession(
    credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    'https://composer.googleapis.com/v1beta1/projects/{}/locations/{}'
    '/environments/{}').format(project_id, location, composer_environment)
response = authed_session.request('GET', environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data['config']['dagGcsPrefix'])

Adicionar ou atualizar um DAG

Para adicionar ou atualizar um DAG, mova o arquivo .py Python para o DAG para a pasta dags do ambiente no Cloud Storage.

Console

  1. Abra a página "Ambientes" no Console do Cloud.
    Abra a página "Ambientes"
  2. Na coluna Nome, clique no nome do ambiente para abrir a página de detalhes do ambiente.

    Na guia Configuração, o nome do bucket do Cloud Storage é mostrado à direita da pasta DAGs.

  3. Para ver o bucket no Cloud Storage, clique no nome do bucket.

    Por padrão, a pasta dags é aberta.

  4. Clique em Fazer upload de arquivos e selecione a cópia local do DAG que você quer enviar.

  5. Para fazer o upload do arquivo para a pasta dags, clique em Abrir.

gcloud

Para adicionar ou atualizar um DAG:

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source LOCAL_FILE_TO_UPLOAD

em que:

  • ENVIRONMENT_NAME é o nome do ambiente;
  • LOCATION é a região do Compute Engine em que o ambiente está localizado;
  • LOCAL_FILE_TO_UPLOAD é o DAG para upload.

Atualizar um DAG com execuções ativas de DAGs

Se você atualizar um DAG que tenha execuções ativas de DAGs:

  • Todas as tarefas em execução são concluídas usando o arquivo DAG original.
  • Todas as tarefas programadas, mas que não estão em execução, usam o arquivo DAG atualizado.
  • Todas as tarefas que não estão mais no arquivo DAG atualizado são marcadas como removidas.

Como atualizar DAGs executados com frequência

Depois de fazer o upload de um arquivo DAG, leva algum tempo para que o Airflow carregue esse arquivo e atualize o DAG. Se o DAG for executado com frequência, convém garantir que ele use a versão atualizada dele. Para fazer isso, siga estas etapas:

  1. Pause o DAG na IU do Airflow.
  2. Faça o upload de um arquivo DAG atualizado.
  3. Aguarde até ver as atualizações na IU do Airflow. Isso significa que o DAG foi analisado corretamente pelo programador e atualizado no banco de dados do Airflow.

    Se a IU do Airflow exibir os DAGs atualizados, isso não garante que os workers do Airflow tenham a versão atualizada do arquivo. Isso acontece porque os arquivos DAG são sincronizados de forma independente para programadores e workers.

  4. É possível estender o tempo de espera para garantir que o arquivo DAG seja sincronizado com todos os workers no ambiente. A sincronização acontece várias vezes por minuto. Em um ambiente saudável, aguardar cerca de 20 a 30 segundos é o suficiente para que todos os workers sejam sincronizados.

  5. (Opcional) Se você quiser ter certeza de que todos os workers têm a nova versão do arquivo DAG, inspecione os registros de cada worker individual. Para fazer isso, siga estas etapas:

    1. Abra a guia Registros do ambiente no Console.
    2. Acesse Registros do Composer > Infraestrutura > Sincronização do Cloud Storage e inspecione registros para cada worker no seu ambiente. Procure o item de registro Syncing dags directory mais recente que tem um carimbo de data/hora após o upload do novo arquivo DAG. Se você vir um item Finished syncing que o segue, os DAGs serão sincronizados com sucesso nesse worker.
  6. Cancele a pausa do DAG.

Como excluir um DAG

Nesta seção, você verá como excluir um DAG.

Como excluir um DAG no ambiente

Para excluir um DAG, remova o arquivo .py do Python relativo ao DAG da pasta dags do ambiente no Cloud Storage. A exclusão de um DAG não remove os metadados do DAG da interface da Web do Airflow.

Console

  1. Acesse a página "Ambientes" no Console do Cloud.
    Abrir a página "Ambientes"
  2. Na coluna Nome, clique no nome do ambiente para abrir a página de detalhes do ambiente.

    Na guia Configuração, o nome do bucket do Cloud Storage é mostrado à direita da pasta DAGs.

  3. Para ver o bucket no Cloud Storage, clique no nome do bucket.

    Por padrão, a pasta dags é aberta.

  4. Clique na caixa de seleção ao lado do DAG que você quer excluir.

  5. Na parte superior do Console do Cloud, clique em Excluir.

  6. Na caixa de diálogo exibida, clique em OK.

gcloud

gcloud composer environments storage dags delete 
--environment ENVIRONMENT_NAME
--location LOCATION
DAG_NAME.py

em que:

  • ENVIRONMENT_NAME é o nome do ambiente;
  • LOCATION é a região do Compute Engine em que o ambiente está localizado;
  • DAG_NAME.py é o DAG a ser excluído.
  • Airflow 1.9.0: os metadados dos DAGs excluídos permanecem visíveis na interface da Web do Airflow.

  • Airflow 1.10.0 ou posterior: é possível usar a ferramenta gcloud para remover os metadados do DAG.

Como remover um DAG da interface da Web do Airflow

Para remover os metadados de um DAG da interface da Web do Airflow, insira:

CLI do Airflow 1.10

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    delete_dag -- DAG_NAME

CLI do Airflow 2.0

  gcloud beta composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

onde:

  • ENVIRONMENT_NAME é o nome do ambiente;
  • LOCATION é a região do Compute Engine em que o ambiente está localizado;
  • DAG_NAME é o nome do DAG a ser excluído.

A seguir