Adicionar e atualizar DAGs

Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3

Nesta página, descrevemos como gerenciar DAGs no Cloud Composer de nuvem.

O Cloud Composer usa um bucket do Cloud Storage para armazenar DAGs do ambiente do Cloud Composer. Seu ambiente será sincronizado DAGs deste bucket para componentes do Airflow, como workers do Airflow e programadores.

Antes de começar

  • Os DAGs não são fortemente isolados pelo Apache Airflow. Portanto, recomendamos que você tenha ambientes de produção e de teste separados para evitar a interferência de DAGs. Para mais informações, consulte Como testar DAGs.
  • Verifique se sua conta tem permissões suficientes para gerenciar DAGs.
  • As alterações no DAG são propagadas para o Airflow dentro de 3 a 5 minutos. Você pode conferir as tarefas na interface da Web do Airflow.

Acesse o bucket do seu ambiente

Para acessar o bucket associado ao ambiente:

Console

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, encontre uma linha com o nome do ambiente Na coluna DAGs folder, clique no link DAGs. O A página Detalhes do bucket é aberta. Ele mostra o conteúdo da pasta /dags no bucket do ambiente.

gcloud

A CLI gcloud tem comandos separados para Adicione e exclua DAGs no bucket do ambiente.

Se você quiser interagir com o bucket do seu ambiente, também poderá usar a ferramenta de linha de comando gsutil. Para obter o endereço do bucket do seu ambiente, execute a seguinte CLI gcloud comando:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente;
  • LOCATION pela região em que o ambiente está localizado;

Exemplo:

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

Crie uma solicitação de API environments.get. Em recurso Environment, na EnvironmentConfig, em O recurso dagGcsPrefix é o endereço do bucket do ambiente.

Exemplo:

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

Use a biblioteca google-auth para receber credenciais e usar a biblioteca requests para chame a API REST.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
response = authed_session.request("GET", environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data["config"]["dagGcsPrefix"])

Adicionar ou atualizar um DAG

Para adicionar ou atualizar um DAG, mova o arquivo .py do Python do DAG para a pasta /dags no bucket do ambiente.

Console

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, encontre uma linha com o nome do ambiente Na coluna DAGs folder, clique no link DAGs. O A página Detalhes do bucket é aberta. Ele mostra o conteúdo da pasta /dags no bucket do ambiente.

  3. Clique em Fazer o upload dos arquivos. Em seguida, selecione o arquivo Python .py para o DAG usando a caixa de diálogo do navegador e confirme.

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente;
  • LOCATION pela região em que o ambiente está localizado;
  • LOCAL_FILE_TO_UPLOAD é o arquivo .py do Python para o DAG.

Exemplo:

gcloud composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

Atualizar um DAG com execuções ativas de DAGs

Se você atualizar um DAG que tenha execuções ativas de DAGs:

  • Todas as tarefas em execução são concluídas usando o arquivo DAG original.
  • Todas as tarefas programadas, mas que não estão em execução, usam o arquivo DAG atualizado.
  • Todas as tarefas que não estão mais presentes no arquivo DAG atualizado são marcadas como removida.

Atualizar DAGs executados em uma programação frequente

Depois de fazer upload de um arquivo DAG, o Airflow leva algum tempo para carregar esse arquivo e atualizar o DAG. Se o DAG for executado com frequência, talvez seja necessário garantir que o DAG use a versão atualizada do arquivo DAG. Para fazer isso, siga estas etapas:

  1. Pause o DAG na IU do Airflow.

  2. Faça o upload de um arquivo DAG atualizado.

  3. Aguarde até ver as atualizações na IU do Airflow. Isso significa que o DAG foi analisado corretamente pelo programador e atualizado no banco de dados do Airflow.

    Se a IU do Airflow exibir os DAGs atualizados, isso não garante que os workers do Airflow tenham a versão atualizada do arquivo. Isso acontece porque os arquivos DAG são sincronizados de forma independente para programadores e workers.

  4. É possível estender o tempo de espera para garantir que o arquivo DAG seja sincronizado com todos os workers no ambiente. A sincronização acontece várias vezes por minuto. Em um ambiente saudável, aguardar cerca de 20 a 30 segundos é o suficiente para que todos os workers sejam sincronizados.

  5. (Opcional) Se você quiser ter certeza de que todos os workers têm a nova versão do arquivo DAG, inspecione os registros de cada worker individual. Para fazer isso, siga estas etapas:

    1. Abra a guia Registros do ambiente no console do Google Cloud.

    2. Acesse Registros do Composer > Infraestrutura >. Sincronização do Cloud Storage e inspeção de registros para cada worker em em seu ambiente. Procure o registro Syncing dags directory mais recente que tem um carimbo de data/hora após o upload do novo arquivo DAG. Se você vir um item Finished syncing que o segue, os DAGs serão sincronizado neste worker.

  6. Cancele a pausa do DAG.

Excluir um DAG do ambiente

Para excluir um DAG, remova o arquivo Python .py dele do pasta /dags do ambiente no bucket dele.

Console

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, encontre uma linha com o nome do ambiente Na coluna DAGs folder, clique no link DAGs. O A página Detalhes do bucket é aberta. Ele mostra o conteúdo da pasta /dags em no bucket do seu ambiente.

  3. Selecione o arquivo DAG, clique em Excluir e confirme a operação.

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente;
  • LOCATION pela região em que o ambiente está localizado;
  • DAG_FILE pelo arquivo .py do Python para o DAG.

Exemplo:

gcloud composer environments storage dags delete \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

Remover um DAG da interface do Airflow

Para remover os metadados de um DAG da interface da Web do Airflow:

IU do Airflow

  1. Acesse a interface do Airflow do seu ambiente.
  2. No DAG, clique em Excluir DAG.

gcloud

Execute o seguinte comando na CLI gcloud:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente
  • LOCATION pela região em que o ambiente está localizado;
  • DAG_NAME é o nome do DAG a ser excluído.

A seguir