Adicione e atualize DAGs

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Esta página descreve como gerir DAGs no seu ambiente do Cloud Composer.

O Cloud Composer usa um contentor do Cloud Storage para armazenar DAGs do seu ambiente do Cloud Composer. O seu ambiente sincroniza os DAGs deste contentor com os componentes do Airflow, como os trabalhadores e os programadores do Airflow.

Antes de começar

  • Uma vez que o Apache Airflow não oferece um isolamento DAG forte, recomendamos que mantenha ambientes de produção e de teste separados para evitar interferências DAG. Para mais informações, consulte o artigo Testar DAGs.
  • Certifique-se de que a sua conta tem autorizações suficientes para gerir DAGs.
  • As alterações ao DAG são propagadas para o Airflow no prazo de 3 a 5 minutos. Pode ver o estado da tarefa na interface Web do Airflow.

Aceda ao contentor do seu ambiente

Para aceder ao contentor associado ao seu ambiente:

Consola

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Na lista de ambientes, encontre uma linha com o nome do seu ambiente e, na coluna Pasta DAGs, clique no link DAGs. É apresentada a página Detalhes do contentor. Mostra o conteúdo da pasta /dags no contentor do seu ambiente.

gcloud

A CLI gcloud tem comandos separados para adicionar e eliminar DAGs no contentor do seu ambiente.

Se quiser interagir com o contentor do seu ambiente, também pode usar a Google Cloud CLI. Para obter o endereço do contentor do seu ambiente, execute o seguinte comando da CLI gcloud:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format="get(config.dagGcsPrefix)"

Substituir:

  • ENVIRONMENT_NAME com o nome do ambiente.
  • LOCATION com a região onde o ambiente está localizado.

Exemplo:

gcloud beta composer environments describe example-environment \
    --location us-central1 \
    --format="get(config.dagGcsPrefix)"

API

Construa um pedido de API environments.get. No recurso Environment, no recurso EnvironmentConfig, no recurso dagGcsPrefix, encontra-se a morada do contentor do seu ambiente.

Exemplo:

GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment

Python

Use a biblioteca google-auth para obter credenciais e use a biblioteca requests para chamar a API REST.

import google.auth
import google.auth.transport.requests

# Authenticate with Google Cloud.
# See: https://cloud.google.com/docs/authentication/getting-started
credentials, _ = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
authed_session = google.auth.transport.requests.AuthorizedSession(credentials)

# project_id = 'YOUR_PROJECT_ID'
# location = 'us-central1'
# composer_environment = 'YOUR_COMPOSER_ENVIRONMENT_NAME'

environment_url = (
    "https://composer.googleapis.com/v1beta1/projects/{}/locations/{}"
    "/environments/{}"
).format(project_id, location, composer_environment)
response = authed_session.request("GET", environment_url)
environment_data = response.json()

# Print the bucket name from the response body.
print(environment_data["config"]["dagGcsPrefix"])

Adicione ou atualize um DAG

Para adicionar ou atualizar um DAG, mova o ficheiro .py Python do DAG para a pasta /dags no contentor do ambiente.

Consola

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Na lista de ambientes, encontre uma linha com o nome do seu ambiente e, na coluna Pasta DAGs, clique no link DAGs. É apresentada a página Detalhes do contentor. Mostra o conteúdo da pasta /dags no contentor do seu ambiente.

  3. Clique em Carregar ficheiros. Em seguida, selecione o ficheiro .py Python para o DAG através da caixa de diálogo do navegador e confirme.

gcloud

gcloud composer environments storage dags import \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    --source="LOCAL_FILE_TO_UPLOAD"

Substituir:

  • ENVIRONMENT_NAME com o nome do ambiente.
  • LOCATION com a região onde o ambiente está localizado.
  • LOCAL_FILE_TO_UPLOAD é o ficheiro .py Python para o DAG.

Exemplo:

gcloud composer environments storage dags import \
    --environment example-environment \
    --location us-central1 \
    --source="example_dag.py"

Atualize um DAG que tenha execuções de DAG ativas

Se atualizar um DAG que tenha execuções de DAG ativas:

  • Todas as tarefas em execução terminam com o ficheiro DAG original.
  • Todas as tarefas agendadas, mas que não estão atualmente em execução, usam o ficheiro DAG atualizado.
  • Todas as tarefas que já não estão presentes no ficheiro DAG atualizado são marcadas como removidas.

Atualize DAGs que são executados com frequência

Depois de carregar um ficheiro DAG, o Airflow demora algum tempo a carregar este ficheiro e a atualizar o DAG. Se o seu DAG for executado com frequência, é recomendável garantir que o DAG usa a versão atualizada do ficheiro DAG. Para isso:

  1. Pausar o DAG na IU do Airflow.

  2. Carregue um ficheiro DAG atualizado.

  3. Aguarde até ver as atualizações na IU do Airflow. Isto significa que o DAG foi analisado corretamente pelo programador e atualizado na base de dados do Airflow.

    Se a IU do Airflow apresentar os DAGs atualizados, isto não garante que os trabalhadores do Airflow tenham a versão atualizada do ficheiro DAG. Isto acontece porque os ficheiros DAG são sincronizados de forma independente para os programadores e as trabalhadoras.

  4. Recomendamos que aumente o tempo de espera para garantir que o ficheiro DAG está sincronizado com todos os trabalhadores no seu ambiente. A sincronização ocorre várias vezes por minuto. Num ambiente saudável, aguardar cerca de 20 a 30 segundos é suficiente para que todos os trabalhadores sejam sincronizados.

  5. (Opcional) Se quiser ter a certeza de que todos os trabalhadores têm a nova versão do ficheiro DAG, inspecione os registos de cada trabalhador individual. Para isso:

    1. Abra o separador Registos do seu ambiente na Google Cloud consola.

    2. Aceda a Registos do compositor > Infraestrutura > Sincronização do Cloud Storage e inspecione os registos de cada trabalhador no seu ambiente. Procure o item de registo Syncing dags directory mais recente que tenha uma data/hora após o carregamento do novo ficheiro DAG. Se vir um item Finished syncing que o segue, significa que os DAGs foram sincronizados com êxito neste trabalhador.

  6. Despause o DAG.

Elimine um DAG no seu ambiente

Para eliminar um DAG, remova o ficheiro Python .py do DAG da pasta /dags do ambiente no contentor do ambiente.

Consola

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Na lista de ambientes, encontre uma linha com o nome do seu ambiente e, na coluna Pasta DAGs, clique no link DAGs. É apresentada a página Detalhes do contentor. Mostra o conteúdo da pasta /dags no contentor do seu ambiente.

  3. Selecione o ficheiro DAG, clique em Eliminar e confirme a operação.

gcloud

gcloud composer environments storage dags delete \
    --environment ENVIRONMENT_NAME \
    --location LOCATION \
    DAG_FILE

Substituir:

  • ENVIRONMENT_NAME com o nome do ambiente.
  • LOCATION com a região onde o ambiente está localizado.
  • DAG_FILE com o ficheiro .py do Python para o DAG.

Exemplo:

gcloud composer environments storage dags delete \
    --environment example-environment \
    --location us-central1 \
    example_dag.py

Remova um DAG da IU do Airflow

Para remover os metadados de um DAG da interface Web do Airflow:

IU do Airflow

  1. Aceda à IU do Airflow para o seu ambiente.
  2. Para o DAG, clique em Eliminar DAG.

gcloud

Nas versões do Airflow anteriores à 1.14.0, execute o seguinte comando na CLI gcloud:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    delete_dag -- DAG_NAME

No Airflow 2, Airflow 1.14.0 e versões posteriores, execute o seguinte comando na CLI gcloud:

  gcloud composer environments run ENVIRONMENT_NAME \
    --location LOCATION \
    dags delete -- DAG_NAME

Substituir:

  • ENVIRONMENT_NAME com o nome do ambiente.
  • LOCATION com a região onde o ambiente está localizado.
  • DAG_NAME é o nome do DAG a eliminar.

O que se segue?