Adicionar e atualizar DAGs

Cloud Composer 1 | Cloud Composer 2

Nesta página, descrevemos como gerenciar DAGs no seu ambiente do Cloud Composer.

O Cloud Composer usa um bucket do Cloud Storage para armazenar os DAGs do ambiente do Cloud Composer. O ambiente sincroniza os DAGs desse bucket com os componentes do Airflow, como workers e programadores do Airflow.

Antes de começar

  • Os DAGs não são fortemente isolados pelo Apache Airflow. Portanto, recomendamos que você tenha ambientes de produção e de teste separados para evitar a interferência de DAGs. Para mais informações, consulte Como testar DAGs.
  • Verifique se sua conta tem permissões suficientes para gerenciar DAGs.
  • As alterações no DAG são propagadas para o Airflow em 3 a 5 minutos. É possível ver o status da tarefa na interface da Web do Airflow.

Acesse o bucket do seu ambiente

Para acessar o bucket associado ao seu ambiente:

Console

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, encontre uma linha com o nome do seu ambiente e, na coluna pasta de DAGs, clique no link DAGs. A página Detalhes do bucket é aberta. Ele mostra o conteúdo da pasta /dags no bucket do ambiente.

gcloud

A CLI gcloud tem comandos separados para adicionar e excluir DAGs no bucket do ambiente.

Se você quiser interagir com o bucket do ambiente, também poderá usar a ferramenta de linha de comando gsutil. Para conseguir o endereço do bucket do ambiente, execute o seguinte comando da CLI gcloud:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente;
  • LOCATION pela região em que o ambiente está localizado;

Exemplo:

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

Crie uma solicitação de API environments.get. No recurso Environment, no recurso EnvironmentConfig, no recurso dagGcsPrefix está o endereço do bucket do ambiente.

Exemplo:

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

Use a biblioteca google-auth para receber credenciais e usar a biblioteca requests para chamar a API REST.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
response = authed_session.request("GET", environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data["config"]["dagGcsPrefix"])

Adicionar ou atualizar um DAG

Para adicionar ou atualizar um DAG, mova o arquivo .py Python para o DAG para a pasta /dags no bucket do ambiente.

Console

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, encontre uma linha com o nome do seu ambiente e, na coluna pasta de DAGs, clique no link DAGs. A página Detalhes do bucket é aberta. Ele mostra o conteúdo da pasta /dags no bucket do ambiente.

  3. Clique em Fazer o upload dos arquivos. Em seguida, selecione o arquivo .py do Python para o DAG usando a caixa de diálogo do navegador e confirme.

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente;
  • LOCATION pela região em que o ambiente está localizado;
  • LOCAL_FILE_TO_UPLOAD é o arquivo Python .py para o DAG.

Exemplo:

gcloud beta composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

Atualizar um DAG com execuções ativas de DAGs

Se você atualizar um DAG que tenha execuções ativas de DAGs:

  • Todas as tarefas em execução são concluídas usando o arquivo DAG original.
  • Todas as tarefas programadas, mas que não estão em execução, usam o arquivo DAG atualizado.
  • Todas as tarefas que não estão mais presentes no arquivo DAG atualizado são marcadas como removidas.

Como atualizar DAGs executados com frequência

Após o upload de um arquivo DAG, o Airflow leva algum tempo para carregá-lo e atualizá-lo. Caso seu DAG seja executado com frequência, convém garantir que ele use a versão atualizada do arquivo DAG. Para fazer isso, siga estas etapas:

  1. Pause o DAG na IU do Airflow.
  2. Faça o upload de um arquivo DAG atualizado.
  3. Aguarde até ver as atualizações na IU do Airflow. Isso significa que o DAG foi analisado corretamente pelo programador e atualizado no banco de dados do Airflow.

    Se a IU do Airflow exibir os DAGs atualizados, isso não garante que os workers do Airflow tenham a versão atualizada do arquivo. Isso acontece porque os arquivos DAG são sincronizados de forma independente para programadores e workers.

  4. É possível estender o tempo de espera para garantir que o arquivo DAG seja sincronizado com todos os workers no ambiente. A sincronização acontece várias vezes por minuto. Em um ambiente saudável, aguardar cerca de 20 a 30 segundos é o suficiente para que todos os workers sejam sincronizados.

  5. (Opcional) Se você quiser ter certeza de que todos os workers têm a nova versão do arquivo DAG, inspecione os registros de cada worker individual. Para fazer isso, siga estas etapas:

    1. Abra a guia Registros do seu ambiente no console do Google Cloud.

    2. Acesse Registros do Composer > Infraestrutura > Sincronização do Cloud Storage e inspecione registros para cada worker no seu ambiente. Procure o item de registro Syncing dags directory mais recente que tem um carimbo de data/hora após o upload do novo arquivo DAG. Se você vir um item Finished syncing que o segue, os DAGs serão sincronizados com sucesso nesse worker.

  6. Cancele a pausa do DAG.

Como excluir um DAG

Nesta seção, você verá como excluir um DAG.

Como excluir um DAG no ambiente

Para excluir um DAG, remova o arquivo .py do Python referente ao DAG da pasta /dags do ambiente no bucket do ambiente.

Console

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Na lista de ambientes, encontre uma linha com o nome do seu ambiente e, na coluna pasta de DAGs, clique no link DAGs. A página Detalhes do bucket é aberta. Ele mostra o conteúdo da pasta /dags no bucket do ambiente.

  3. Selecione o arquivo DAG, clique em Excluir e confirme a operação.

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente;
  • LOCATION pela região em que o ambiente está localizado;
  • DAG_FILE pelo arquivo Python .py para o DAG.

Exemplo:

gcloud composer environments storage dags delete \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

Como remover um DAG da IU do Airflow

Para remover os metadados de um DAG da interface da Web do Airflow, siga estas etapas:

IU do Airflow

  1. Acesse a interface do Airflow para seu ambiente.
  2. Para o DAG, clique em Excluir DAG.

gcloud

Execute o seguinte comando na CLI gcloud:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente
  • LOCATION pela região em que o ambiente está localizado;
  • DAG_NAME é o nome do DAG a ser excluído.

A seguir