Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Esta página descreve como gerir DAGs no seu ambiente do Cloud Composer.
O Cloud Composer usa um contentor do Cloud Storage para armazenar DAGs do seu ambiente do Cloud Composer. O seu ambiente sincroniza os DAGs deste contentor com os componentes do Airflow, como os trabalhadores e os programadores do Airflow.
Antes de começar
- Uma vez que o Apache Airflow não oferece um isolamento DAG forte, recomendamos que mantenha ambientes de produção e de teste separados para evitar interferências DAG. Para mais informações, consulte o artigo Testar DAGs.
- Certifique-se de que a sua conta tem autorizações suficientes para gerir DAGs.
- As alterações ao DAG são propagadas para o Airflow no prazo de 3 a 5 minutos. Pode ver o estado da tarefa na interface Web do Airflow.
Aceda ao contentor do seu ambiente
Para aceder ao contentor associado ao seu ambiente:
Consola
Na Google Cloud consola, aceda à página Ambientes.
Na lista de ambientes, encontre uma linha com o nome do seu ambiente e, na coluna Pasta DAGs, clique no link DAGs. É apresentada a página Detalhes do contentor. Mostra o conteúdo da pasta
/dags
no contentor do seu ambiente.
gcloud
A CLI gcloud tem comandos separados para adicionar e eliminar DAGs no contentor do seu ambiente.
Se quiser interagir com o contentor do seu ambiente, também pode usar a Google Cloud CLI. Para obter o endereço do contentor do seu ambiente, execute o seguinte comando da CLI gcloud:
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format="get(config.dagGcsPrefix)"
Substituir:
ENVIRONMENT_NAME
com o nome do ambiente.LOCATION
com a região onde o ambiente está localizado.
Exemplo:
gcloud beta composer environments describe example-environment \
--location us-central1 \
--format="get(config.dagGcsPrefix)"
API
Construa um pedido de API environments.get
. No recurso Environment, no recurso EnvironmentConfig, no recurso dagGcsPrefix
, encontra-se a morada do contentor do seu ambiente.
Exemplo:
GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment
Python
Use a biblioteca google-auth para obter credenciais e use a biblioteca requests para chamar a API REST.
Adicione ou atualize um DAG
Para adicionar ou atualizar um DAG, mova o ficheiro .py
Python do DAG para a pasta /dags
no contentor do ambiente.
Consola
Na Google Cloud consola, aceda à página Ambientes.
Na lista de ambientes, encontre uma linha com o nome do seu ambiente e, na coluna Pasta DAGs, clique no link DAGs. É apresentada a página Detalhes do contentor. Mostra o conteúdo da pasta
/dags
no contentor do seu ambiente.Clique em Carregar ficheiros. Em seguida, selecione o ficheiro
.py
Python para o DAG através da caixa de diálogo do navegador e confirme.
gcloud
gcloud composer environments storage dags import \
--environment ENVIRONMENT_NAME \
--location LOCATION \
--source="LOCAL_FILE_TO_UPLOAD"
Substituir:
ENVIRONMENT_NAME
com o nome do ambiente.LOCATION
com a região onde o ambiente está localizado.LOCAL_FILE_TO_UPLOAD
é o ficheiro.py
Python para o DAG.
Exemplo:
gcloud composer environments storage dags import \
--environment example-environment \
--location us-central1 \
--source="example_dag.py"
Atualize um DAG que tenha execuções de DAG ativas
Se atualizar um DAG que tenha execuções de DAG ativas:
- Todas as tarefas em execução terminam com o ficheiro DAG original.
- Todas as tarefas agendadas, mas que não estão atualmente em execução, usam o ficheiro DAG atualizado.
- Todas as tarefas que já não estão presentes no ficheiro DAG atualizado são marcadas como removidas.
Atualize DAGs que são executados com frequência
Depois de carregar um ficheiro DAG, o Airflow demora algum tempo a carregar este ficheiro e a atualizar o DAG. Se o seu DAG for executado com frequência, é recomendável garantir que o DAG usa a versão atualizada do ficheiro DAG. Para isso:
Pausar o DAG na IU do Airflow.
Carregue um ficheiro DAG atualizado.
Aguarde até ver as atualizações na IU do Airflow. Isto significa que o DAG foi analisado corretamente pelo programador e atualizado na base de dados do Airflow.
Se a IU do Airflow apresentar os DAGs atualizados, isto não garante que os trabalhadores do Airflow tenham a versão atualizada do ficheiro DAG. Isto acontece porque os ficheiros DAG são sincronizados de forma independente para os programadores e as trabalhadoras.
Recomendamos que aumente o tempo de espera para garantir que o ficheiro DAG está sincronizado com todos os trabalhadores no seu ambiente. A sincronização ocorre várias vezes por minuto. Num ambiente saudável, aguardar cerca de 20 a 30 segundos é suficiente para que todos os trabalhadores sejam sincronizados.
(Opcional) Se quiser ter a certeza de que todos os trabalhadores têm a nova versão do ficheiro DAG, inspecione os registos de cada trabalhador individual. Para isso:
Abra o separador Registos do seu ambiente na Google Cloud consola.
Aceda a Registos do compositor > Infraestrutura > Sincronização do Cloud Storage e inspecione os registos de cada trabalhador no seu ambiente. Procure o item de registo
Syncing dags directory
mais recente que tenha uma data/hora após o carregamento do novo ficheiro DAG. Se vir um itemFinished syncing
que o segue, significa que os DAGs foram sincronizados com êxito neste trabalhador.
Despause o DAG.
Elimine um DAG no seu ambiente
Para eliminar um DAG, remova o ficheiro Python .py
do DAG da pasta /dags
do ambiente no contentor do ambiente.
Consola
Na Google Cloud consola, aceda à página Ambientes.
Na lista de ambientes, encontre uma linha com o nome do seu ambiente e, na coluna Pasta DAGs, clique no link DAGs. É apresentada a página Detalhes do contentor. Mostra o conteúdo da pasta
/dags
no contentor do seu ambiente.Selecione o ficheiro DAG, clique em Eliminar e confirme a operação.
gcloud
gcloud composer environments storage dags delete \
--environment ENVIRONMENT_NAME \
--location LOCATION \
DAG_FILE
Substituir:
ENVIRONMENT_NAME
com o nome do ambiente.LOCATION
com a região onde o ambiente está localizado.DAG_FILE
com o ficheiro.py
do Python para o DAG.
Exemplo:
gcloud composer environments storage dags delete \
--environment example-environment \
--location us-central1 \
example_dag.py
Remova um DAG da IU do Airflow
Para remover os metadados de um DAG da interface Web do Airflow:
IU do Airflow
- Aceda à IU do Airflow para o seu ambiente.
- Para o DAG, clique em Eliminar DAG.
gcloud
Nas versões do Airflow anteriores à 1.14.0, execute o seguinte comando na CLI gcloud:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
delete_dag -- DAG_NAME
No Airflow 2, Airflow 1.14.0 e versões posteriores, execute o seguinte comando na CLI gcloud:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags delete -- DAG_NAME
Substituir:
ENVIRONMENT_NAME
com o nome do ambiente.LOCATION
com a região onde o ambiente está localizado.DAG_NAME
é o nome do DAG a eliminar.
O que se segue?
- Escreva DAGs
- Teste, sincronize e implemente os seus DAGs a partir do GitHub
- Resolução de problemas de DAGs