Cloud Composer 1 | Cloud Composer 2 | Cloud Composer 3
Nesta página, descrevemos como usar funções do Cloud Run para acionar DAGs do Cloud Composer em resposta a eventos.
O Apache Airflow foi criado para executar DAGs regularmente, mas também é possível acioná-los em resposta a eventos. Uma maneira de fazer isso é usar Funções do Cloud Run para acionar DAGs do Cloud Composer quando um evento especificado ocorre.
No exemplo deste guia, um DAG é executado sempre que ocorre uma mudança em um bucket do Cloud Storage. Alterações em qualquer objeto em um bucket acionam uma função. Essa função faz uma solicitação à API REST do Airflow do ambiente do Cloud Composer. O Airflow processa essa solicitação e executa um DAG. O DAG mostra informações sobre a alteração.
Antes de começar
Verificar a configuração de rede do ambiente
Essa solução não funciona em configurações de IP privado e VPC Service Controls, porque não é possível configurar a conectividade das funções do Cloud Run para o servidor da Web do Airflow nessas configurações.
No Cloud Composer 2, é possível usar outra abordagem: Acione DAGs usando as funções do Cloud Run e as mensagens do Pub/Sub
Ativar as APIs do projeto
Console
Enable the Cloud Composer and Cloud Run functions APIs.
gcloud
Enable the Cloud Composer and Cloud Run functions APIs:
gcloud services enable cloudfunctions.googleapis.comcomposer.googleapis.com
Ativar a API REST do Airflow
Para o Airflow 2, a API REST estável já está ativada por padrão. Se a API estável estiver desativada no ambiente, ative a API REST estável.
Permitir chamadas de API para a API REST do Airflow usando o controle de acesso do servidor da Web
As funções do Cloud Run podem acessar a API REST do Airflow usando IPv4 ou endereço IPv6.
Se você não souber qual será o intervalo de IP da chamada, use uma opção de configuração padrão no controle de acesso do servidor da Web, que é All IP addresses have access (default)
, para não bloquear acidentalmente as funções do Cloud Run.
Criar um bucket do Cloud Storage
Este exemplo aciona um DAG em resposta a alterações em um bucket do Cloud Storage. Crie um novo bucket para usar neste exemplo.
Ver o URL do servidor da Web do Airflow
Este exemplo faz solicitações da API REST para o endpoint do servidor da Web do Airflow. Use o URL do servidor da Web do Airflow no código da função do Cloud.
Console
No console do Google Cloud, acesse a página Ambientes.
Clique no nome do seu ambiente.
Na página Detalhes do ambiente, acesse a guia Configuração do ambiente.
O URL do servidor da Web do Airflow está listado no item da IU da Web do Airflow.
gcloud
Execute este comando:
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format='value(config.airflowUri)'
Substitua:
ENVIRONMENT_NAME
pelo nome do ambienteLOCATION
pela região em que o ambiente está localizado;
Fazer upload de um DAG para o ambiente
Faça o upload de um DAG para seu ambiente. O exemplo a seguir mostra a configuração de execução do DAG recebida. Você acionará esse DAG a partir de uma função que vai criar neste guia depois.
Implantar uma Função do Cloud que aciona o DAG
É possível implantar uma função do Cloud usando a linguagem de sua preferência aceita pelo funções do Cloud Run ou o Cloud Run. Este tutorial demonstra do função do Cloud implementada em Python e Java
Especificar os parâmetros de configuração da função do Cloud
Gatilho. Neste exemplo, selecione um gatilho que funcione quando um novo objeto for criado em um bucket ou um objeto existente for substituído.
Tipo de gatilho. Cloud Storage.
Tipo de evento. Finalizar/Criar.
Bucket. Selecione um bucket que precisa acionar essa função.
Tentar novamente em caso de falha. Recomendamos desativar essa opção para os fins deste exemplo. Se você usar sua própria função em um ambiente de produção, ative essa opção para processar erros temporários.
Conta de serviço do ambiente de execução, na seção Configurações de ambiente de execução, build, conexões e segurança. Use uma destas opções: as seguintes opções, dependendo das suas preferências:
Selecione Conta de serviço padrão do Compute Engine. Com as permissões padrão do IAM, essa conta pode executar funções que acessam os ambientes do Cloud Composer.
Crie uma conta de serviço personalizada que tenha o papel de Usuário do Composer e especifique-a como uma conta de serviço do ambiente de execução para essa função. Essa opção segue o princípio do privilégio mínimo.
Ambiente de execução e ponto de entrada na etapa Código:
(Python) Ao adicionar o código deste exemplo, selecione o ambiente de execução Python 3.7 ou posterior e especifique
trigger_dag_gcf
como o ponto de entrada.(Java) Ao adicionar o código deste exemplo, selecione o ambiente de execução do Java 17 e especifique
com.example.Example
como o ponto de entrada.
Adicionar requisitos
Python
Especifique as dependências no arquivo requirements.txt
:
Java
Adicione as dependências a seguir à seção dependencies
no pom.xml
gerado pela IU do Google Cloud Functions.
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-docs</artifactId>
<version>v1-rev20210707-1.32.1</version>
</dependency>
<dependency>
<groupId>com.google.api-client</groupId>
<artifactId>google-api-client</artifactId>
<version>1.32.1</version>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-credentials</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
<version>1.14.0</version>
</dependency>
Python
Adicione o código para acionar DAGs usando a API REST do Airflow. Crie um arquivo chamado
composer2_airflow_rest_api.py
e coloque nele o código para fazer chamadas da API REST do Airflow.
Não altere nenhuma variável. A função do Cloud importa esse arquivo do
arquivo main.py
.
Coloque o seguinte código no arquivo main.py
. Substitua o valor de
a variável web_server_url
pelo endereço do servidor da Web do Airflow que você
que você aprendeu antes.
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Trigger a DAG in a Cloud Composer 2 environment in response to an event,
using Cloud Functions.
"""
from typing import Any
import composer2_airflow_rest_api
def trigger_dag_gcf(data, context=None):
"""
Trigger a DAG and pass event data.
Args:
data: A dictionary containing the data for the event. Its format depends
on the event.
context: The context object for the event.
For more information about the arguments, see:
https://cloud.google.com/functions/docs/writing/background#function_parameters
"""
# TODO(developer): replace with your values
# Replace web_server_url with the Airflow web server address. To obtain this
# URL, run the following command for your environment:
# gcloud composer environments describe example-environment \
# --location=your-composer-region \
# --format="value(config.airflowUri)"
web_server_url = (
"https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
)
# Replace with the ID of the DAG that you want to run.
dag_id = 'composer_sample_trigger_response_dag'
composer2_airflow_rest_api.trigger_dag(web_server_url, dag_id, data)
Java
Coloque o seguinte código no arquivo Example.java
. Substitua o valor do parâmetro
A variável webServerUrl
pelo endereço do servidor da Web do Airflow que você
que você aprendeu antes.
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.example;
import com.example.Example.GcsEvent;
import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpContent;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestFactory;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.http.json.JsonHttpContent;
import com.google.api.client.json.gson.GsonFactory;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;
/**
* Cloud Function that triggers an Airflow DAG in response to an event (in
* this case a Cloud Storage event).
*/
public class Example implements BackgroundFunction<GcsEvent> {
private static final Logger logger = Logger.getLogger(Example.class.getName());
// TODO(developer): replace with your values
// Replace webServerUrl with the Airflow web server address. To obtain this
// URL, run the following command for your environment:
// gcloud composer environments describe example-environment \
// --location=your-composer-region \
// --format="value(config.airflowUri)"
@Override
public void accept(GcsEvent event, Context context) throws Exception {
String webServerUrl = "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com";
String dagName = "composer_sample_trigger_response_dag";
String url = String.format("%s/api/v1/dags/%s/dagRuns", webServerUrl, dagName);
logger.info(String.format("Triggering DAG %s as a result of an event on the object %s.",
dagName, event.name));
logger.info(String.format("Triggering DAG via the following URL: %s", url));
GoogleCredentials googleCredentials = GoogleCredentials.getApplicationDefault()
.createScoped("https://www.googleapis.com/auth/cloud-platform");
HttpCredentialsAdapter credentialsAdapter = new HttpCredentialsAdapter(googleCredentials);
HttpRequestFactory requestFactory =
new NetHttpTransport().createRequestFactory(credentialsAdapter);
Map<String, Map<String, String>> json = new HashMap<String, Map<String, String>>();
Map<String, String> conf = new HashMap<String, String>();
conf.put("bucket", event.bucket);
conf.put("name", event.name);
conf.put("generation", event.generation);
conf.put("operation", context.eventType());
json.put("conf", conf);
HttpContent content = new JsonHttpContent(new GsonFactory(), json);
HttpRequest request = requestFactory.buildPostRequest(new GenericUrl(url), content);
request.getHeaders().setContentType("application/json");
HttpResponse response;
try {
response = request.execute();
int statusCode = response.getStatusCode();
logger.info("Response code: " + statusCode);
logger.info(response.parseAsString());
} catch (HttpResponseException e) {
// https://cloud.google.com/java/docs/reference/google-http-client/latest/com.google.api.client.http.HttpResponseException
logger.info("Received HTTP exception");
logger.info(e.getLocalizedMessage());
logger.info("- 400 error: wrong arguments passed to Airflow API");
logger.info("- 401 error: check if service account has Composer User role");
logger.info("- 403 error: check Airflow RBAC roles assigned to service account");
logger.info("- 404 error: check Web Server URL");
} catch (Exception e) {
logger.info("Received exception");
logger.info(e.getLocalizedMessage());
}
}
/** Details of the storage event. */
public static class GcsEvent {
/** Bucket name. */
String bucket;
/** Object name. */
String name;
/** Object version. */
String generation;
}
}
Testar a função
Para verificar se a função e o DAG funcionam conforme o esperado:
- Aguarde até que a função seja implantada.
- Faça upload de um arquivo para o bucket do Cloud Storage. Como alternativa, é possível acionar a função manualmente selecionando a ação Testar a função para ela no console do Google Cloud.
- Verifique a página do DAG na interface da Web do Airflow. O DAG precisa ter uma execução ativa ou já concluída.
- Na IU do Airflow, verifique os registros de tarefas desta execução. Você verá que a tarefa
print_gcs_info
gera os dados recebidos da função para os registros:
Python
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
{bucket: example-storage-for-gcf-triggers, contentType: text/plain,
crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
return code 0h
Java
[2023-02-08, 08:00:09 UTC] {subprocess.py:86} INFO - Output:
[2023-02-08, 08:00:09 UTC] {subprocess.py:93} INFO - {bucket: example-storage-for-gcf-triggers, generation: 1675843189006715, name: file.txt, operation: google.storage.object.create}
[2023-02-08, 08:00:09 UTC] {subprocess.py:97} INFO - Command exited with return code 0
A seguir
- Acessar a IU do Airflow
- Acessar a API REST do Airflow
- Gravar DAGs
- Gravar funções do Cloud Run
- Gatilhos do Cloud Storage