Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Esta página descreve como usar as funções do Cloud Run para acionar DAGs do Cloud Composer em resposta a eventos.
O Apache Airflow foi concebido para executar DAGs de forma regular, mas também pode acionar DAGs em resposta a eventos. Uma forma de o fazer é usar as funções do Cloud Run para acionar DAGs do Cloud Composer quando ocorre um evento especificado.
O exemplo neste guia executa um DAG sempre que ocorre uma alteração num contentor do Cloud Storage. As alterações a qualquer objeto num contentor acionam uma função. Esta função faz um pedido à API REST do Airflow do seu ambiente do Cloud Composer. O Airflow processa este pedido e executa um DAG. O DAG produz informações sobre a alteração.
Antes de começar
Verifique a configuração de rede do seu ambiente
Esta solução não funciona em configurações de IP privado e VPC Service Controls, porque não é possível configurar a conetividade das funções do Cloud Run ao servidor Web do Airflow nestas configurações.
No Cloud Composer 2, pode usar outra abordagem: Acione DAGs com funções do Cloud Run e mensagens do Pub/Sub
Ative APIs para o seu projeto
Consola
Enable the Cloud Composer and Cloud Run functions APIs.
gcloud
Enable the Cloud Composer and Cloud Run functions APIs:
gcloud services enable cloudfunctions.googleapis.comcomposer.googleapis.com
Ative a API REST do Airflow
Consoante a sua versão do Airflow:
- Para o Airflow 2, a API REST estável já está ativada por predefinição. Se o seu ambiente tiver a API estável desativada, ative a API REST estável.
- Para o Airflow 1, ative a API REST experimental.
Permita chamadas à API Airflow REST através do controlo de acesso do servidor Web
As funções do Cloud Run podem contactar a API REST Airflow através do endereço IPv4 ou IPv6.
Se não tiver a certeza do que será o intervalo de IPs de chamadas, use uma opção de configuração predefinida no controlo de acesso do servidor Web, que é All IP addresses have access (default)
para não bloquear acidentalmente as suas funções do Cloud Run.
Crie um contentor do Cloud Storage
Este exemplo aciona um DAG em resposta a alterações num contentor do Cloud Storage. Crie um novo contentor para usar neste exemplo.
Obtenha o URL do servidor Web do Airflow
Este exemplo faz pedidos da API REST ao ponto final do servidor Web do Airflow.
Use a parte do URL da interface Web do Airflow antes de .appspot.com
no código da função do Google Cloud.
Consola
Na Google Cloud consola, aceda à página Ambientes.
Clique no nome do seu ambiente.
Na página Detalhes do ambiente, aceda ao separador Configuração do ambiente.
O URL do servidor Web do Airflow está listado no item IU Web do Airflow.
gcloud
Execute o seguinte comando:
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format='value(config.airflowUri)'
Substituir:
ENVIRONMENT_NAME
com o nome do ambiente.LOCATION
com a região onde o ambiente está localizado.
Obtenha o client_id do proxy IAM
Para fazer um pedido ao ponto final da API REST do Airflow, a função requer o ID de cliente do proxy de gestão de identidades e acessos que protege o servidor Web do Airflow.
O Cloud Composer não fornece estas informações diretamente. Em alternativa, faça um pedido não autenticado ao servidor Web do Airflow e capture o ID do cliente a partir do URL de redirecionamento:
cURL
curl -v AIRFLOW_URL 2>&1 >/dev/null | grep -o "client_id\=[A-Za-z0-9-]*\.apps\.googleusercontent\.com"
Substitua AIRFLOW_URL
pelo URL da interface Web do Airflow.
Na saída, pesquise a string que se segue a client_id
. Por exemplo:
client_id=836436932391-16q2c5f5dcsfnel77va9bvf4j280t35c.apps.googleusercontent.com
Python
Guarde o seguinte código num ficheiro denominado get_client_id.py
. Preencha os valores de project_id
, location
e composer_environment
e, em seguida, execute o código no Cloud Shell ou no seu ambiente local.
Carregue um DAG para o seu ambiente
Carregue um DAG para o seu ambiente. O exemplo de DAG seguinte produz a configuração de execução do DAG recebida. Aciona este DAG a partir de uma função, que cria mais tarde neste guia.
Implemente uma função do Cloud que acione o DAG
Pode implementar uma função do Cloud com o seu idioma preferido suportado pelas funções do Cloud Run ou pelo Cloud Run. Este tutorial demonstra uma função do Google Cloud implementada em Python e Java.
Especifique parâmetros de configuração da função do Google Cloud
Acionador. Para este exemplo, selecione um acionador que funcione quando um novo objeto é criado num contentor ou um objeto existente é substituído.
Tipo de acionador. Cloud Storage.
Tipo de evento. Finalizar / criar.
Segmento. Selecione um contentor que tem de acionar esta função.
Tentar novamente em caso de falha. Recomendamos que desative esta opção para os fins deste exemplo. Se usar a sua própria função num ambiente de produção, ative esta opção para resolver erros transitórios.
Conta de serviço de tempo de execução na secção Definições de tempo de execução, compilação, ligações e segurança. Use uma das seguintes opções, consoante as suas preferências:
Selecione Conta de serviço predefinida do Compute Engine. Com as autorizações do IAM predefinidas, esta conta pode executar funções que acedem a ambientes do Cloud Composer.
Crie uma conta de serviço personalizada com a função Utilizador do Composer e especifique-a como uma conta de serviço de tempo de execução para esta função. Esta opção segue o princípio do privilégio mínimo.
Tempo de execução e ponto de entrada, no passo Código. Quando adicionar código para este exemplo, selecione o tempo de execução Python 3.7 ou posterior e especifique
trigger_dag
como ponto de entrada.
Adicione requisitos
Especifique as dependências no ficheiro requirements.txt
:
Coloque o seguinte código no ficheiro main.py
e faça as seguintes substituições:
Substitua o valor da variável
client_id
pelo valorclient_id
que obteve anteriormente.Substitua o valor da variável
webserver_id
pelo ID do projeto de inquilino, que faz parte do URL da interface Web do Airflow antes de.appspot.com
. Obteve o URL da interface Web do Airflow anteriormente.Especifique a versão da API REST Airflow que usa:
- Se usar a API REST do Airflow estável, defina a variável
USE_EXPERIMENTAL_API
comoFalse
. - Se usar a API REST do Airflow experimental, não tem de fazer nenhuma alteração. A variável
USE_EXPERIMENTAL_API
já está definida comoTrue
.
- Se usar a API REST do Airflow estável, defina a variável
Teste a sua função
Para verificar se a função e o DAG funcionam conforme esperado:
- Aguarde até que a função seja implementada.
- Carregue um ficheiro para o seu contentor do Cloud Storage. Em alternativa, pode acionar a função manualmente selecionando a ação Testar a função para a mesma na Google Cloud consola.
- Verifique a página DAG na interface Web do Airflow. O DAG deve ter uma execução de DAG ativa ou já concluída.
- Na IU do Airflow, verifique os registos de tarefas desta execução. Deverá ver que a tarefa
print_gcs_info
produz os dados recebidos da função nos registos:
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
{bucket: example-storage-for-gcf-triggers, contentType: text/plain,
crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
return code 0h
O que se segue?
- Aceda à IU do Airflow
- Aceda à API REST do Airflow
- Escreva DAGs
- Escreva funções do Cloud Run
- Acionadores do Cloud Storage