Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Nesta página, descrevemos como gerenciar DAGs no Cloud Composer de nuvem.
O Cloud Composer usa um bucket do Cloud Storage para armazenar DAGs do seu ambiente do Cloud Composer. Seu ambiente será sincronizado DAGs deste bucket para componentes do Airflow, como workers do Airflow e programadores.
Antes de começar
- Os DAGs não são fortemente isolados pelo Apache Airflow. Portanto, recomendamos que você tenha ambientes de produção e de teste separados para evitar a interferência de DAGs. Para mais informações, consulte Como testar DAGs.
- Verifique se sua conta tem permissões suficientes para gerenciar DAGs.
- As mudanças no DAG são propagadas para o Airflow em 3 a 5 minutos. É possível conferir o status da tarefa na interface da Web do Airflow.
Acessar o bucket do seu ambiente
Para acessar o bucket associado ao seu ambiente:
Console
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, encontre uma linha com o nome do seu ambiente e, na coluna Pasta de DAGs, clique no link DAGs. A página Detalhes do bucket é aberta. Ele mostra o conteúdo da pasta
/dags
no bucket do seu ambiente.
gcloud
A CLI gcloud tem comandos separados para adicionar e excluir DAGs no bucket do ambiente.
Se você quiser interagir com o bucket do seu ambiente, também poderá usar a CLI do Google Cloud. Para conferir o endereço do bucket do seu ambiente, execute o seguinte comando da CLI gcloud:
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format="get(config.dagGcsPrefix)"
Substitua:
ENVIRONMENT_NAME
pelo nome do ambiente;LOCATION
pela região em que o ambiente está localizado;
Exemplo:
gcloud beta composer environments describe example-environment \
--location us-central1 \
--format="get(config.dagGcsPrefix)"
API
Crie uma solicitação de API environments.get
. No recurso Ambiente, no recurso EnvironmentConfig e no recurso dagGcsPrefix
, o endereço é o bucket do seu ambiente.
Exemplo:
GET https://composer.googleapis.com/v1/projects/example-project/
locations/us-central1/environments/example-environment
Python
Use a biblioteca google-auth para receber credenciais e usar a biblioteca requests para chame a API REST.
Adicionar ou atualizar um DAG
Para adicionar ou atualizar um DAG, mova o arquivo .py
do Python do DAG para
a pasta /dags
no bucket do ambiente.
Console
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, encontre uma linha com o nome do ambiente Na coluna DAGs folder, clique no link DAGs. A página Detalhes do bucket é aberta. Ele mostra o conteúdo da pasta
/dags
no bucket do seu ambiente.Clique em Fazer o upload dos arquivos. Em seguida, selecione o arquivo
.py
do Python para o DAG usando a caixa de diálogo do navegador e confirme.
gcloud
gcloud composer environments storage dags import \
--environment ENVIRONMENT_NAME \
--location LOCATION \
--source="LOCAL_FILE_TO_UPLOAD"
Substitua:
ENVIRONMENT_NAME
pelo nome do ambiente;LOCATION
pela região em que o ambiente está localizado;LOCAL_FILE_TO_UPLOAD
é o arquivo.py
do Python para o DAG.
Exemplo:
gcloud composer environments storage dags import \
--environment example-environment \
--location us-central1 \
--source="example_dag.py"
Atualizar um DAG com execuções ativas de DAGs
Se você atualizar um DAG que tenha execuções ativas de DAGs:
- Todas as tarefas em execução são concluídas usando o arquivo DAG original.
- Todas as tarefas programadas, mas que não estão em execução, usam o arquivo DAG atualizado.
- Todas as tarefas que não estão mais presentes no arquivo DAG atualizado são marcadas como removida.
Atualizar DAGs executados com frequência
Depois de fazer upload de um arquivo DAG, o Airflow leva algum tempo para carregar esse arquivo e atualizar o DAG. Se o DAG for executado com frequência, talvez seja necessário garantir que o DAG use a versão atualizada do arquivo DAG. Para fazer isso, siga estas etapas:
Pause o DAG na IU do Airflow.
Faça o upload de um arquivo DAG atualizado.
Aguarde até ver as atualizações na IU do Airflow. Isso significa que o DAG foi analisado corretamente pelo programador e atualizado no banco de dados do Airflow.
Se a IU do Airflow exibir os DAGs atualizados, isso não garante que os workers do Airflow tenham a versão atualizada do arquivo. Isso acontece porque os arquivos DAG são sincronizados de forma independente para programadores e workers.
É possível estender o tempo de espera para garantir que o arquivo DAG seja sincronizado com todos os workers no ambiente. A sincronização acontece várias vezes por minuto. Em um ambiente saudável, aguardar cerca de 20 a 30 segundos é o suficiente para que todos os workers sejam sincronizados.
(Opcional) Se você quiser ter certeza de que todos os workers têm a nova versão do arquivo DAG, inspecione os registros de cada worker individual. Para fazer isso, siga estas etapas:
Abra a guia Registros do ambiente no console do Google Cloud.
Acesse Registros do Composer > Infraestrutura >. Sincronização do Cloud Storage e inspeção de registros para cada worker em em seu ambiente. Procure o item de registro
Syncing dags directory
mais recente que tem um carimbo de data/hora após o upload do novo arquivo DAG. Se você vir um itemFinished syncing
que o segue, os DAGs serão sincronizado neste worker.
Cancele a pausa do DAG.
Excluir um DAG no ambiente
Para excluir um DAG, remova o arquivo Python .py
dele do
pasta /dags
do ambiente no bucket dele.
Console
No console do Google Cloud, acesse a página Ambientes.
Na lista de ambientes, encontre uma linha com o nome do seu ambiente e, na coluna Pasta de DAGs, clique no link DAGs. O A página Detalhes do bucket é aberta. Ele mostra o conteúdo da pasta
/dags
no bucket do seu ambiente.Selecione o arquivo DAG, clique em Excluir e confirme a operação.
gcloud
gcloud composer environments storage dags delete \
--environment ENVIRONMENT_NAME \
--location LOCATION \
DAG_FILE
Substitua:
ENVIRONMENT_NAME
pelo nome do ambiente;LOCATION
pela região em que o ambiente está localizado;DAG_FILE
com o arquivo.py
do Python para o DAG.
Exemplo:
gcloud composer environments storage dags delete \
--environment example-environment \
--location us-central1 \
example_dag.py
Remover um DAG da interface do Airflow
Para remover os metadados de um DAG da interface da Web do Airflow:
IU do Airflow
- Acesse a interface do Airflow para seu ambiente.
- No DAG, clique em Excluir DAG.
gcloud
Nas versões do Airflow 1 anteriores à 1.14.0, execute o seguinte comando em CLI gcloud:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
delete_dag -- DAG_NAME
No Airflow 2, Airflow 1.14.0 e versões mais recentes, execute o seguinte comando na CLI do gcloud:
gcloud composer environments run ENVIRONMENT_NAME \
--location LOCATION \
dags delete -- DAG_NAME
Substitua:
ENVIRONMENT_NAME
pelo nome do ambienteLOCATION
pela região em que o ambiente está localizado;DAG_NAME
é o nome do DAG a ser excluído.