Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Nesta página, descrevemos como usar funções do Cloud Run para acionar DAGs do Cloud Composer em resposta a eventos.
O Apache Airflow foi projetado para executar DAGs em uma programação regular, mas também é possível acionar DAGs em resposta a eventos. Uma maneira de fazer isso é usar Funções do Cloud Run para acionar DAGs do Cloud Composer quando um evento especificado ocorre.
No exemplo deste guia, um DAG é executado sempre que ocorre uma mudança em um bucket do Cloud Storage. Alterações em qualquer objeto em um bucket acionam uma função. Essa função faz uma solicitação à API REST do Airflow do ambiente do Cloud Composer. O Airflow processa essa solicitação e executa um DAG. O DAG mostra informações sobre a alteração.
Antes de começar
Verificar a configuração de rede do ambiente
Esta solução não funciona nas configurações de IP privado e VPC Service Controls porque não é possível configurar a conectividade das funções do Cloud Run para o servidor da Web do Airflow nessas configurações.
No Cloud Composer 2, é possível usar outra abordagem: Acionar DAGs usando funções do Cloud Run e mensagens do Pub/Sub
Ativar as APIs do projeto
Console
Enable the Cloud Composer and Cloud Run functions APIs.
gcloud
Enable the Cloud Composer and Cloud Run functions APIs:
gcloud services enable cloudfunctions.googleapis.comcomposer.googleapis.com
Ativar a API REST do Airflow
Dependendo da sua versão do Airflow, faça o seguinte:
- Para o Airflow 2, a API REST estável já está ativada por padrão. Se a API estável estiver desativada no ambiente, ative a API REST estável.
- Para o Airflow 1, ative a API REST experimental.
Permitir chamadas de API para a API REST do Airflow usando o controle de acesso do servidor da Web
As funções do Cloud Run podem acessar a API REST do Airflow usando IPv4 ou endereço IPv6.
Se você não tiver certeza de qual será o intervalo de IP de chamada, use um padrão
opção de configuração no controle de acesso do servidor da Web, que é All IP addresses have access (default)
para não bloquear acidentalmente as funções do Cloud Run.
Criar um bucket do Cloud Storage
Este exemplo aciona um DAG em resposta a alterações em um bucket do Cloud Storage. Crie um novo bucket para usar neste exemplo.
Ver o URL do servidor da Web do Airflow
Este exemplo faz solicitações da API REST para o endpoint do servidor da Web do Airflow.
Use a parte do URL da interface da Web do Airflow antes de .appspot.com
no
código da função do Cloud.
Console
No console do Google Cloud, acesse a página Ambientes.
Clique no nome do seu ambiente.
Na página Detalhes do ambiente, acesse a guia Configuração do ambiente.
O URL do servidor da Web do Airflow está listado no item da IU da Web do Airflow.
gcloud
Execute este comando:
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format='value(config.airflowUri)'
Substitua:
ENVIRONMENT_NAME
pelo nome do ambienteLOCATION
pela região em que o ambiente está localizado;
Acessar o client_id do proxy do IAM
Para fazer uma solicitação ao endpoint de API REST do Airflow, a função exige o ID do cliente do proxy do Identity and Access Management que protege o servidor da Web do Airflow.
O Cloud Composer não fornece essas informações diretamente. Em vez disso, faça uma solicitação não autenticada no servidor da Web do Airflow e capture o ID do cliente do URL de redirecionamento:
cURL
curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"
Substitua AIRFLOW_URL
pelo URL da interface da Web do Airflow.
Na saída, procure a string depois de client_id
. Exemplo:
client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com
Python
Salve o código a seguir em um arquivo chamado get_client_id.py
. Preencha os
valores de project_id
, location
e composer_environment
. Em seguida, execute
o código no Cloud Shell ou no ambiente local.
Fazer upload de um DAG para o ambiente
Faça o upload de um DAG para seu ambiente. O exemplo a seguir mostra a configuração de execução do DAG recebida. Você acionará esse DAG a partir de uma função que vai criar neste guia depois.
Implantar uma Função do Cloud que aciona o DAG
É possível implantar uma função do Cloud usando a linguagem preferida que é compatível com as funções do Cloud Run ou o Cloud Run. Este tutorial demonstra uma Função do Cloud implementada em Python e Java.
Especificar os parâmetros de configuração da função do Cloud
Gatilho. Neste exemplo, selecione um gatilho que funcione quando um novo objeto for criado em um bucket ou um objeto existente for substituído.
Tipo de gatilho. Cloud Storage.
Tipo de evento. Finalizar/Criar.
Bucket. Selecione um bucket que precisa acionar essa função.
Tentar novamente em caso de falha. Recomendamos desativar essa opção para os fins deste exemplo. Se você usar sua própria função em um ambiente de produção, ative essa opção para processar erros temporários.
Conta de serviço do ambiente de execução, na Seção Ambiente de execução, build, conexões e configurações de segurança. Use uma destas opções: as seguintes opções, dependendo das suas preferências:
Selecione Conta de serviço padrão do Compute Engine. Com as permissões padrão do IAM, essa conta pode executar funções que acessam os ambientes do Cloud Composer.
Crie uma conta de serviço personalizada que tenha o papel de Usuário do Composer e especifique-a como uma conta de serviço do ambiente de execução para essa função. Essa opção segue o princípio do privilégio mínimo.
Ambiente de execução e ponto de entrada, na etapa Código. Ao adicionar código para isso, exemplo, selecione o ambiente de execução Python 3.7 ou posterior e especifique
trigger_dag
como o ponto de entrada.
Adicionar requisitos
Especifique as dependências no arquivo requirements.txt
:
Coloque o código a seguir no arquivo main.py
e faça as seguintes
substituições:
Substitua o valor da variável
client_id
pelo valorclient_id
recebido anteriormente.Substitua o valor da variável
webserver_id
pelo ID do projeto de locatário, que faz parte do URL da interface da Web do Airflow antes de.appspot.com
. Você já recebeu o URL da interface da Web do Airflow.Especifique a versão da API REST do Airflow usada:
- Se você usa a API REST estável do Airflow, defina a variável
USE_EXPERIMENTAL_API
comoFalse
. - Se você usa a API REST experimental do Airflow, não é necessário fazer alterações. A variável
USE_EXPERIMENTAL_API
já está definida comoTrue
.
- Se você usa a API REST estável do Airflow, defina a variável
Testar a função
Para verificar se a função e o DAG funcionam conforme o esperado:
- Aguarde até que a função seja implantada.
- Faça upload de um arquivo para o bucket do Cloud Storage. Como alternativa, você pode acionar a função manualmente selecionando o botão Testar a função no console do Google Cloud.
- Verifique a página do DAG na interface da Web do Airflow. O DAG precisa ter uma execução ativa ou já concluída.
- Na IU do Airflow, verifique os registros de tarefas desta execução. Você verá que a tarefa
print_gcs_info
gera os dados recebidos da função para os registros:
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
{bucket: example-storage-for-gcf-triggers, contentType: text/plain,
crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
return code 0h
A seguir
- Acessar a IU do Airflow
- Acessar a API REST do Airflow
- Gravar DAGs
- Gravar funções do Cloud Run
- Gatilhos do Cloud Storage