Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
Esta página descreve como usar as funções do Cloud Run para acionar DAGs do Cloud Composer em resposta a eventos.
O Apache Airflow foi concebido para executar DAGs de forma regular, mas também pode acionar DAGs em resposta a eventos. Uma forma de o fazer é usar as funções do Cloud Run para acionar DAGs do Cloud Composer quando ocorre um evento especificado.
O exemplo neste guia executa um DAG sempre que ocorre uma alteração num contentor do Cloud Storage. As alterações a qualquer objeto num contentor acionam uma função. Esta função faz um pedido à API REST do Airflow do seu ambiente do Cloud Composer. O Airflow processa este pedido e executa um DAG. O DAG produz informações sobre a alteração.
Antes de começar
Verifique a configuração de rede do seu ambiente
Esta solução não funciona em configurações de IP privado e VPC Service Controls, porque não é possível configurar a conetividade das funções do Cloud Run ao servidor Web do Airflow nestas configurações.
No Cloud Composer 2, pode usar outra abordagem: Acione DAGs com funções do Cloud Run e mensagens do Pub/Sub
Ative APIs para o seu projeto
Consola
Enable the Cloud Composer and Cloud Run functions APIs.
gcloud
Enable the Cloud Composer and Cloud Run functions APIs:
gcloud services enable cloudfunctions.googleapis.comcomposer.googleapis.com
Ative a API REST do Airflow
Para o Airflow 2, a API REST estável já está ativada por predefinição. Se o seu ambiente tiver a API estável desativada, ative a API REST estável.
Permita chamadas à API Airflow REST através do controlo de acesso do servidor Web
As funções do Cloud Run podem contactar a API REST Airflow através do endereço IPv4 ou IPv6.
Se não tiver a certeza do que será o intervalo de IPs de chamadas, use uma opção de configuração predefinida no controlo de acesso do servidor Web, que é All IP addresses have access (default)
para não bloquear acidentalmente as suas funções do Cloud Run.
Crie um contentor do Cloud Storage
Este exemplo aciona um DAG em resposta a alterações num contentor do Cloud Storage. Crie um novo contentor para usar neste exemplo.
Obtenha o URL do servidor Web do Airflow
Este exemplo faz pedidos da API REST ao ponto final do servidor Web do Airflow. Usa o URL do servidor Web do Airflow no código da função do Google Cloud.
Consola
Na Google Cloud consola, aceda à página Ambientes.
Clique no nome do seu ambiente.
Na página Detalhes do ambiente, aceda ao separador Configuração do ambiente.
O URL do servidor Web do Airflow está listado no item IU Web do Airflow.
gcloud
Execute o seguinte comando:
gcloud composer environments describe ENVIRONMENT_NAME \
--location LOCATION \
--format='value(config.airflowUri)'
Substituir:
ENVIRONMENT_NAME
com o nome do ambiente.LOCATION
com a região onde o ambiente está localizado.
Carregue um DAG para o seu ambiente
Carregue um DAG para o seu ambiente. O exemplo de DAG seguinte produz a configuração de execução do DAG recebida. Aciona este DAG a partir de uma função, que cria mais tarde neste guia.
Implemente uma função do Cloud que acione o DAG
Pode implementar uma função do Cloud com o seu idioma preferido suportado pelas funções do Cloud Run ou pelo Cloud Run. Este tutorial demonstra uma função do Google Cloud implementada em Python e Java.
Especifique parâmetros de configuração da função do Google Cloud
Acionador. Para este exemplo, selecione um acionador que funcione quando um novo objeto é criado num contentor ou um objeto existente é substituído.
Tipo de acionador. Cloud Storage.
Tipo de evento. Finalizar / criar.
Segmento. Selecione um contentor que tem de acionar esta função.
Tentar novamente em caso de falha. Recomendamos que desative esta opção para os fins deste exemplo. Se usar a sua própria função num ambiente de produção, ative esta opção para resolver erros transitórios.
Conta de serviço de tempo de execução na secção Definições de tempo de execução, compilação, ligações e segurança. Use uma das seguintes opções, consoante as suas preferências:
Selecione Conta de serviço predefinida do Compute Engine. Com as autorizações do IAM predefinidas, esta conta pode executar funções que acedem a ambientes do Cloud Composer.
Crie uma conta de serviço personalizada com a função Utilizador do Composer e especifique-a como uma conta de serviço de tempo de execução para esta função. Esta opção segue o princípio do privilégio mínimo.
Tempo de execução e ponto de entrada, no passo Código:
(Python) Quando adicionar código para este exemplo, selecione o tempo de execução do Python 3.7 ou posterior e especifique
trigger_dag_gcf
como o ponto de entrada.(Java) Quando adicionar código para este exemplo, selecione o tempo de execução Java 17 e especifique
com.example.Example
como o ponto de entrada.
Adicione requisitos
Python
Especifique as dependências no ficheiro requirements.txt
:
Java
Adicione as seguintes dependências à secção dependencies
no ficheiro pom.xml
gerado pela IU do Google Cloud Functions.
<dependency>
<groupId>com.google.apis</groupId>
<artifactId>google-api-services-docs</artifactId>
<version>v1-rev20210707-1.32.1</version>
</dependency>
<dependency>
<groupId>com.google.api-client</groupId>
<artifactId>google-api-client</artifactId>
<version>1.32.1</version>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-credentials</artifactId>
<version>1.14.0</version>
</dependency>
<dependency>
<groupId>com.google.auth</groupId>
<artifactId>google-auth-library-oauth2-http</artifactId>
<version>1.14.0</version>
</dependency>
Python
Adicione o código para acionar DAGs através da API REST do Airflow. Crie um ficheiro com o nome
composer2_airflow_rest_api.py
e coloque o código para fazer chamadas à API REST do Airflow
neste ficheiro.
Não altere nenhuma variável. A função do Cloud importa este ficheiro do ficheiro main.py
.
Coloque o seguinte código no ficheiro main.py
. Substitua o valor da variável web_server_url
pelo endereço do servidor Web do Airflow que obteve anteriormente.
# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Trigger a DAG in a Cloud Composer 2 environment in response to an event,
using Cloud Functions.
"""
from typing import Any
import composer2_airflow_rest_api
def trigger_dag_gcf(data, context=None):
"""
Trigger a DAG and pass event data.
Args:
data: A dictionary containing the data for the event. Its format depends
on the event.
context: The context object for the event.
For more information about the arguments, see:
https://cloud.google.com/functions/docs/writing/background#function_parameters
"""
# TODO(developer): replace with your values
# Replace web_server_url with the Airflow web server address. To obtain this
# URL, run the following command for your environment:
# gcloud composer environments describe example-environment \
# --location=your-composer-region \
# --format="value(config.airflowUri)"
web_server_url = (
"https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
)
# Replace with the ID of the DAG that you want to run.
dag_id = 'composer_sample_trigger_response_dag'
composer2_airflow_rest_api.trigger_dag(web_server_url, dag_id, data)
Java
Coloque o seguinte código no ficheiro Example.java
. Substitua o valor da variável webServerUrl
pelo endereço do servidor Web do Airflow que obteve anteriormente.
// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package com.example;
import com.example.Example.GcsEvent;
import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpContent;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestFactory;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.http.json.JsonHttpContent;
import com.google.api.client.json.gson.GsonFactory;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;
/**
* Cloud Function that triggers an Airflow DAG in response to an event (in
* this case a Cloud Storage event).
*/
public class Example implements BackgroundFunction<GcsEvent> {
private static final Logger logger = Logger.getLogger(Example.class.getName());
// TODO(developer): replace with your values
// Replace webServerUrl with the Airflow web server address. To obtain this
// URL, run the following command for your environment:
// gcloud composer environments describe example-environment \
// --location=your-composer-region \
// --format="value(config.airflowUri)"
@Override
public void accept(GcsEvent event, Context context) throws Exception {
String webServerUrl = "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com";
String dagName = "composer_sample_trigger_response_dag";
String url = String.format("%s/api/v1/dags/%s/dagRuns", webServerUrl, dagName);
logger.info(String.format("Triggering DAG %s as a result of an event on the object %s.",
dagName, event.name));
logger.info(String.format("Triggering DAG via the following URL: %s", url));
GoogleCredentials googleCredentials = GoogleCredentials.getApplicationDefault()
.createScoped("https://www.googleapis.com/auth/cloud-platform");
HttpCredentialsAdapter credentialsAdapter = new HttpCredentialsAdapter(googleCredentials);
HttpRequestFactory requestFactory =
new NetHttpTransport().createRequestFactory(credentialsAdapter);
Map<String, Map<String, String>> json = new HashMap<String, Map<String, String>>();
Map<String, String> conf = new HashMap<String, String>();
conf.put("bucket", event.bucket);
conf.put("name", event.name);
conf.put("generation", event.generation);
conf.put("operation", context.eventType());
json.put("conf", conf);
HttpContent content = new JsonHttpContent(new GsonFactory(), json);
HttpRequest request = requestFactory.buildPostRequest(new GenericUrl(url), content);
request.getHeaders().setContentType("application/json");
HttpResponse response;
try {
response = request.execute();
int statusCode = response.getStatusCode();
logger.info("Response code: " + statusCode);
logger.info(response.parseAsString());
} catch (HttpResponseException e) {
// https://cloud.google.com/java/docs/reference/google-http-client/latest/com.google.api.client.http.HttpResponseException
logger.info("Received HTTP exception");
logger.info(e.getLocalizedMessage());
logger.info("- 400 error: wrong arguments passed to Airflow API");
logger.info("- 401 error: check if service account has Composer User role");
logger.info("- 403 error: check Airflow RBAC roles assigned to service account");
logger.info("- 404 error: check Web Server URL");
} catch (Exception e) {
logger.info("Received exception");
logger.info(e.getLocalizedMessage());
}
}
/** Details of the storage event. */
public static class GcsEvent {
/** Bucket name. */
String bucket;
/** Object name. */
String name;
/** Object version. */
String generation;
}
}
Teste a sua função
Para verificar se a função e o DAG funcionam conforme esperado:
- Aguarde até que a função seja implementada.
- Carregue um ficheiro para o seu contentor do Cloud Storage. Em alternativa, pode acionar a função manualmente selecionando a ação Testar a função para a mesma na Google Cloud consola.
- Verifique a página DAG na interface Web do Airflow. O DAG deve ter uma execução de DAG ativa ou já concluída.
- Na IU do Airflow, verifique os registos de tarefas desta execução. Deverá ver que a tarefa
print_gcs_info
produz os dados recebidos da função nos registos:
Python
[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
{bucket: example-storage-for-gcf-triggers, contentType: text/plain,
crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
return code 0h
Java
[2023-02-08, 08:00:09 UTC] {subprocess.py:86} INFO - Output:
[2023-02-08, 08:00:09 UTC] {subprocess.py:93} INFO - {bucket: example-storage-for-gcf-triggers, generation: 1675843189006715, name: file.txt, operation: google.storage.object.create}
[2023-02-08, 08:00:09 UTC] {subprocess.py:97} INFO - Command exited with return code 0
O que se segue?
- Aceda à IU do Airflow
- Aceda à API REST do Airflow
- Escreva DAGs
- Escreva funções do Cloud Run
- Acionadores do Cloud Storage