Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Nesta página, descrevemos como gerenciar DAGs no Cloud Composer de nuvem.
O Cloud Composer usa um bucket do Cloud Storage para armazenar DAGs do ambiente do Cloud Composer. O ambiente sincroniza DAGs desse bucket com componentes do Airflow, como workers e programadores.
Antes de começar
- Os DAGs não são fortemente isolados pelo Apache Airflow. Portanto, recomendamos que você tenha ambientes de produção e de teste separados para evitar a interferência de DAGs. Para mais informações, consulte Como testar DAGs.
- Verifique se a conta tem permissões suficientes para gerenciar DAGs.
- As alterações no DAG são propagadas para o Airflow dentro de 3 a 5 minutos. Você pode conferir as tarefas na interface da Web do Airflow.
Acessar o bucket do seu ambiente
Para acessar o bucket associado ao ambiente:
Console
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, encontre uma linha com o nome do seu ambiente e, na coluna Pasta de DAGs, clique no link DAGs. O A página Detalhes do bucket é aberta. Ele mostra o conteúdo da pasta
/dags
no bucket do seu ambiente.
gcloud
A CLI gcloud tem comandos separados para adicionar e excluir DAGs no bucket do ambiente.
Se você quiser interagir com o bucket do seu ambiente, também poderá usar a CLI do Google Cloud. Para conferir o endereço do bucket do seu ambiente, execute o seguinte comando da CLI gcloud:
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format="get(config.dagGcsPrefix)"
Substitua:
ENVIRONMENT_NAME
pelo nome do ambiente;LOCATION
pela região em que o ambiente está localizado;
Exemplo:
gcloud beta composer environments describe example-environment \
--location us-central1 \
--format="get(config.dagGcsPrefix)"
API
Crie uma solicitação de API environments.get
. No recurso Ambiente, no recurso EnvironmentConfig e no recurso dagGcsPrefix
, o endereço é o bucket do seu ambiente.
Exemplo:
GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment
Python
Use a biblioteca google-auth para acessar as credenciais e a biblioteca requests para chamar a API REST.
Adicionar ou atualizar um DAG
Para adicionar ou atualizar um DAG, mova o arquivo .py
do Python do DAG para
a pasta /dags
no bucket do ambiente.
Console
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, encontre uma linha com o nome do seu ambiente e, na coluna Pasta de DAGs, clique no link DAGs. O A página Detalhes do bucket é aberta. Ele mostra o conteúdo da pasta
/dags
no bucket do seu ambiente.Clique em Fazer o upload dos arquivos. Em seguida, selecione o arquivo Python
.py
para o DAG usando a caixa de diálogo do navegador e confirme.
gcloud
gcloud composer environments storage dags import \
--environment ENVIRONMENT_NAME \
--location LOCATION \
--source="LOCAL_FILE_TO_UPLOAD"
Substitua:
ENVIRONMENT_NAME
pelo nome do ambiente;LOCATION
pela região em que o ambiente está localizado;LOCAL_FILE_TO_UPLOAD
é o arquivo.py
do Python para o DAG.
Exemplo:
gcloud composer environments storage dags import \
--environment example-environment \
--location us-central1 \
--source="example_dag.py"
Atualizar um DAG com execuções ativas de DAGs
Se você atualizar um DAG que tenha execuções ativas de DAGs:
- Todas as tarefas em execução são concluídas usando o arquivo DAG original.
- Todas as tarefas programadas, mas que não estão em execução, usam o arquivo DAG atualizado.
- Todas as tarefas que não estão mais no arquivo DAG atualizado são marcadas como removidas.
Atualizar DAGs executados com frequência
Depois de fazer upload de um arquivo DAG, o Airflow leva algum tempo para carregar esse arquivo e atualizar o DAG. Se o DAG for executado com frequência, convém garantir que ele use a versão atualizada do arquivo DAG. Para fazer isso, siga estas etapas:
Pause o DAG na IU do Airflow.
Faça o upload de um arquivo DAG atualizado.
Aguarde até ver as atualizações na IU do Airflow. Isso significa que o DAG foi analisado corretamente pelo programador e atualizado no banco de dados do Airflow.
Se a IU do Airflow exibir os DAGs atualizados, isso não garante que os workers do Airflow tenham a versão atualizada do arquivo. Isso acontece porque os arquivos DAG são sincronizados de forma independente para programadores e workers.
É possível estender o tempo de espera para garantir que o arquivo DAG seja sincronizado com todos os workers no ambiente. A sincronização acontece várias vezes por minuto. Em um ambiente saudável, aguardar cerca de 20 a 30 segundos é o suficiente para que todos os workers sejam sincronizados.
(Opcional) Se você quiser ter certeza de que todos os workers têm a nova versão do arquivo DAG, inspecione os registros de cada worker individual. Para fazer isso, siga estas etapas:
Abra a guia Registros do ambiente no console do Google Cloud.
Acesse Registros do Composer > Infraestrutura >. Sincronização do Cloud Storage e inspeção de registros para cada worker em em seu ambiente. Procure o registro
Syncing dags directory
mais recente que tem um carimbo de data/hora após o upload do novo arquivo DAG. Se você encontrar um itemFinished syncing
que o segue, os DAGs serão sincronizados com sucesso nesse worker.
Cancele a pausa do DAG.
Excluir um DAG no ambiente
Para excluir um DAG, remova o arquivo .py
do Python relativo ao DAG da pasta /dags
do ambiente no bucket do ambiente.
Console
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, encontre uma linha com o nome do ambiente Na coluna DAGs folder, clique no link DAGs. O A página Detalhes do bucket é aberta. Ele mostra o conteúdo da pasta
/dags
em no bucket do seu ambiente.Selecione o arquivo DAG, clique em Excluir e confirme a operação.
gcloud
gcloud composer environments storage dags delete \
--environment ENVIRONMENT_NAME \
--location LOCATION \
DAG_FILE
Substitua:
ENVIRONMENT_NAME
pelo nome do ambiente;LOCATION
pela região em que o ambiente está localizado;DAG_FILE
com o arquivo.py
do Python para o DAG.
Exemplo:
gcloud composer environments storage dags delete \
--environment example-environment \
--location us-central1 \
example_dag.py
Remover um DAG da interface do Airflow
Para remover os metadados de um DAG da interface da Web do Airflow:
IU do Airflow
- Acesse a interface do Airflow do seu ambiente.
- Para o DAG, clique em Excluir DAG.
gcloud
Execute o seguinte comando na CLI gcloud:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags delete -- DAG_NAME
Substitua:
ENVIRONMENT_NAME
pelo nome do ambienteLOCATION
pela região em que o ambiente está localizado;DAG_NAME
é o nome do DAG a ser excluído.