Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Esta página descreve como gerir DAGs no seu ambiente do Cloud Composer.
O Cloud Composer usa um contentor do Cloud Storage para armazenar DAGs do seu ambiente do Cloud Composer. O seu ambiente sincroniza os DAGs deste contentor com os componentes do Airflow, como os trabalhadores e os programadores do Airflow.
Antes de começar
- Uma vez que o Apache Airflow não oferece um isolamento DAG forte, recomendamos que mantenha ambientes de produção e de teste separados para evitar interferências DAG. Para mais informações, consulte o artigo Testar DAGs.
- Certifique-se de que a sua conta tem autorizações suficientes para gerir DAGs.
- As alterações ao DAG são propagadas para o Airflow no prazo de 3 a 5 minutos. Pode ver o estado da tarefa na interface Web do Airflow.
Aceda ao contentor do seu ambiente
Para aceder ao contentor associado ao seu ambiente:
Consola
Na Google Cloud consola, aceda à página Ambientes.
Na lista de ambientes, encontre uma linha com o nome do seu ambiente e, na coluna Pasta DAGs, clique no link DAGs. É apresentada a página Detalhes do contentor. Mostra o conteúdo da pasta
/dags
no contentor do seu ambiente.
gcloud
A CLI gcloud tem comandos separados para adicionar e eliminar DAGs no contentor do seu ambiente.
Se quiser interagir com o contentor do seu ambiente, também pode usar a Google Cloud CLI. Para obter o endereço do contentor do seu ambiente, execute o seguinte comando da CLI gcloud:
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format="get(config.dagGcsPrefix)"
Substituir:
ENVIRONMENT_NAME
com o nome do ambiente.LOCATION
com a região onde o ambiente está localizado.
Exemplo:
gcloud beta composer environments describe example-environment \
--location us-central1 \
--format="get(config.dagGcsPrefix)"
API
Construa um pedido de API environments.get
. No recurso Environment, no recurso EnvironmentConfig, no recurso dagGcsPrefix
, encontra-se a morada do contentor do seu ambiente.
Exemplo:
GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment
Python
Use a biblioteca google-auth para obter credenciais e use a biblioteca requests para chamar a API REST.
Adicione ou atualize um DAG
Para adicionar ou atualizar um DAG, mova o ficheiro .py
Python do DAG para a pasta /dags
no contentor do ambiente.
Consola
Na Google Cloud consola, aceda à página Ambientes.
Na lista de ambientes, encontre uma linha com o nome do seu ambiente e, na coluna Pasta DAGs, clique no link DAGs. É apresentada a página Detalhes do contentor. Mostra o conteúdo da pasta
/dags
no contentor do seu ambiente.Clique em Carregar ficheiros. Em seguida, selecione o ficheiro
.py
Python para o DAG através da caixa de diálogo do navegador e confirme.
gcloud
gcloud composer environments storage dags import \
--environment ENVIRONMENT_NAME \
--location LOCATION \
--source="LOCAL_FILE_TO_UPLOAD"
Substituir:
ENVIRONMENT_NAME
com o nome do ambiente.LOCATION
com a região onde o ambiente está localizado.LOCAL_FILE_TO_UPLOAD
é o ficheiro.py
Python para o DAG.
Exemplo:
gcloud composer environments storage dags import \
--environment example-environment \
--location us-central1 \
--source="example_dag.py"
Atualize um DAG que tenha execuções de DAG ativas
Se atualizar um DAG que tenha execuções de DAG ativas:
- Todas as tarefas em execução terminam com o ficheiro DAG original.
- Todas as tarefas agendadas, mas que não estão atualmente em execução, usam o ficheiro DAG atualizado.
- Todas as tarefas que já não estão presentes no ficheiro DAG atualizado são marcadas como removidas.
Atualize DAGs que são executados com frequência
Depois de carregar um ficheiro DAG, o Airflow demora algum tempo a carregar este ficheiro e a atualizar o DAG. Se o seu DAG for executado com frequência, é recomendável garantir que o DAG usa a versão atualizada do ficheiro DAG. Para isso:
Pausar o DAG na IU do Airflow.
Carregue um ficheiro DAG atualizado.
Aguarde até ver as atualizações na IU do Airflow. Isto significa que o DAG foi analisado corretamente pelo programador e atualizado na base de dados do Airflow.
Se a IU do Airflow apresentar os DAGs atualizados, isto não garante que os trabalhadores do Airflow tenham a versão atualizada do ficheiro DAG. Isto acontece porque os ficheiros DAG são sincronizados de forma independente para os programadores e as trabalhadoras.
Recomendamos que aumente o tempo de espera para garantir que o ficheiro DAG está sincronizado com todos os trabalhadores no seu ambiente. A sincronização ocorre várias vezes por minuto. Num ambiente saudável, aguardar cerca de 20 a 30 segundos é suficiente para que todos os trabalhadores sejam sincronizados.
(Opcional) Se quiser ter a certeza de que todos os trabalhadores têm a nova versão do ficheiro DAG, inspecione os registos de cada trabalhador individual. Para isso:
Abra o separador Registos do seu ambiente na Google Cloud consola.
Aceda a Registos do compositor > Infraestrutura > Sincronização do Cloud Storage e inspecione os registos de cada trabalhador no seu ambiente. Procure o item de registo
Syncing dags directory
mais recente que tenha uma data/hora após o carregamento do novo ficheiro DAG. Se vir um itemFinished syncing
que o segue, significa que os DAGs foram sincronizados com êxito neste trabalhador.
Despause o DAG.
Volte a analisar um DAG
Uma vez que os DAGs são armazenados no contentor do ambiente, cada DAG é sincronizado primeiro com o processador de DAGs e, em seguida, o processador de DAGs analisa-o com um pequeno atraso. Se analisar novamente um DAG manualmente, por exemplo, através da IU do Airflow, o processador de DAGs analisa novamente a versão atual do DAG disponível para si, que pode não ser a versão mais recente do DAG que carregou para o contentor do ambiente.
Recomendamos que use a reanálise a pedido apenas se encontrar tempos de análise longos. Por exemplo, isto pode acontecer se o seu ambiente tiver um grande número de ficheiros ou se um intervalo de análise DAG longo estiver configurado nas opções de configuração do Airflow.
Elimine um DAG no seu ambiente
Para eliminar um DAG, remova o ficheiro Python .py
do DAG da pasta /dags
do ambiente no contentor do ambiente.
Consola
Na Google Cloud consola, aceda à página Ambientes.
Na lista de ambientes, encontre uma linha com o nome do seu ambiente e, na coluna Pasta DAGs, clique no link DAGs. É apresentada a página Detalhes do contentor. Mostra o conteúdo da pasta
/dags
no contentor do seu ambiente.Selecione o ficheiro DAG, clique em Eliminar e confirme a operação.
gcloud
gcloud composer environments storage dags delete \
--environment ENVIRONMENT_NAME \
--location LOCATION \
DAG_FILE
Substituir:
ENVIRONMENT_NAME
com o nome do ambiente.LOCATION
com a região onde o ambiente está localizado.DAG_FILE
com o ficheiro.py
do Python para o DAG.
Exemplo:
gcloud composer environments storage dags delete \
--environment example-environment \
--location us-central1 \
example_dag.py
Remova um DAG da IU do Airflow
Para remover os metadados de um DAG da interface Web do Airflow:
IU do Airflow
- Aceda à IU do Airflow para o seu ambiente.
- Para o DAG, clique em Eliminar DAG.
gcloud
Execute o seguinte comando na CLI gcloud:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags delete -- DAG_NAME
Substituir:
ENVIRONMENT_NAME
com o nome do ambiente.LOCATION
com a região onde o ambiente está localizado.DAG_NAME
é o nome do DAG a eliminar.
O que se segue?
- Escreva DAGs
- Teste, sincronize e implemente os seus DAGs a partir do GitHub
- Resolução de problemas de DAGs