Acione DAGs com o Cloud Functions

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

Esta página descreve como usar as funções do Cloud Run para acionar DAGs do Cloud Composer em resposta a eventos.

O Apache Airflow foi concebido para executar DAGs de forma regular, mas também pode acionar DAGs em resposta a eventos. Uma forma de o fazer é usar as funções do Cloud Run para acionar DAGs do Cloud Composer quando ocorre um evento especificado.

O exemplo neste guia executa um DAG sempre que ocorre uma alteração num contentor do Cloud Storage. As alterações a qualquer objeto num contentor acionam uma função. Esta função faz um pedido à API REST do Airflow do seu ambiente do Cloud Composer. O Airflow processa este pedido e executa um DAG. O DAG produz informações sobre a alteração.

Antes de começar

Verifique a configuração de rede do seu ambiente

Esta solução não funciona em configurações de IP privado e VPC Service Controls, porque não é possível configurar a conetividade das funções do Cloud Run ao servidor Web do Airflow nestas configurações.

No Cloud Composer 2, pode usar outra abordagem: Acione DAGs com funções do Cloud Run e mensagens do Pub/Sub

Ative APIs para o seu projeto

Consola

Enable the Cloud Composer and Cloud Run functions APIs.

Enable the APIs

gcloud

Enable the Cloud Composer and Cloud Run functions APIs:

gcloud services enable cloudfunctions.googleapis.com composer.googleapis.com

Ative a API REST do Airflow

Para o Airflow 2, a API REST estável já está ativada por predefinição. Se o seu ambiente tiver a API estável desativada, ative a API REST estável.

Permita chamadas à API Airflow REST através do controlo de acesso do servidor Web

As funções do Cloud Run podem contactar a API REST Airflow através do endereço IPv4 ou IPv6.

Se não tiver a certeza do que será o intervalo de IPs de chamadas, use uma opção de configuração predefinida no controlo de acesso do servidor Web, que é All IP addresses have access (default)para não bloquear acidentalmente as suas funções do Cloud Run.

Crie um contentor do Cloud Storage

Este exemplo aciona um DAG em resposta a alterações num contentor do Cloud Storage. Crie um novo contentor para usar neste exemplo.

Obtenha o URL do servidor Web do Airflow

Este exemplo faz pedidos da API REST ao ponto final do servidor Web do Airflow. Usa o URL do servidor Web do Airflow no código da função do Google Cloud.

Consola

  1. Na Google Cloud consola, aceda à página Ambientes.

    Aceder a Ambientes

  2. Clique no nome do seu ambiente.

  3. Na página Detalhes do ambiente, aceda ao separador Configuração do ambiente.

  4. O URL do servidor Web do Airflow está listado no item IU Web do Airflow.

gcloud

Execute o seguinte comando:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format='value(config.airflowUri)'

Substituir:

  • ENVIRONMENT_NAME com o nome do ambiente.
  • LOCATION com a região onde o ambiente está localizado.

Carregue um DAG para o seu ambiente

Carregue um DAG para o seu ambiente. O exemplo de DAG seguinte produz a configuração de execução do DAG recebida. Aciona este DAG a partir de uma função, que cria mais tarde neste guia.

import datetime

import airflow
from airflow.operators.bash import BashOperator


with airflow.DAG(
    "composer_sample_trigger_response_dag",
    start_date=datetime.datetime(2021, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
) as dag:
    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = BashOperator(
        task_id="print_gcs_info", bash_command="echo {{ dag_run.conf }}"
    )

Implemente uma função do Cloud que acione o DAG

Pode implementar uma função do Cloud com o seu idioma preferido suportado pelas funções do Cloud Run ou pelo Cloud Run. Este tutorial demonstra uma função do Google Cloud implementada em Python e Java.

Especifique parâmetros de configuração da função do Google Cloud

  • Acionador. Para este exemplo, selecione um acionador que funcione quando um novo objeto é criado num contentor ou um objeto existente é substituído.

    • Tipo de acionador. Cloud Storage.

    • Tipo de evento. Finalizar / criar.

    • Segmento. Selecione um contentor que tem de acionar esta função.

    • Tentar novamente em caso de falha. Recomendamos que desative esta opção para os fins deste exemplo. Se usar a sua própria função num ambiente de produção, ative esta opção para resolver erros transitórios.

  • Conta de serviço de tempo de execução na secção Definições de tempo de execução, compilação, ligações e segurança. Use uma das seguintes opções, consoante as suas preferências:

    • Selecione Conta de serviço predefinida do Compute Engine. Com as autorizações do IAM predefinidas, esta conta pode executar funções que acedem a ambientes do Cloud Composer.

    • Crie uma conta de serviço personalizada com a função Utilizador do Composer e especifique-a como uma conta de serviço de tempo de execução para esta função. Esta opção segue o princípio do privilégio mínimo.

  • Tempo de execução e ponto de entrada, no passo Código:

    • (Python) Quando adicionar código para este exemplo, selecione o tempo de execução do Python 3.7 ou posterior e especifique trigger_dag_gcf como o ponto de entrada.

    • (Java) Quando adicionar código para este exemplo, selecione o tempo de execução Java 17 e especifique com.example.Example como o ponto de entrada.

Adicione requisitos

Python

Especifique as dependências no ficheiro requirements.txt:

google-auth==2.38.0
requests==2.32.2

Java

Adicione as seguintes dependências à secção dependencies no ficheiro pom.xml gerado pela IU do Google Cloud Functions.

    <dependency>
      <groupId>com.google.apis</groupId>
      <artifactId>google-api-services-docs</artifactId>
      <version>v1-rev20210707-1.32.1</version>
    </dependency>
    <dependency>
      <groupId>com.google.api-client</groupId>
      <artifactId>google-api-client</artifactId>
      <version>1.32.1</version>
    </dependency>
    <dependency>
      <groupId>com.google.auth</groupId>
      <artifactId>google-auth-library-credentials</artifactId>
      <version>1.14.0</version>
    </dependency>
    <dependency>
      <groupId>com.google.auth</groupId>
      <artifactId>google-auth-library-oauth2-http</artifactId>
      <version>1.14.0</version>
    </dependency>

Python

Adicione o código para acionar DAGs através da API REST do Airflow. Crie um ficheiro com o nome composer2_airflow_rest_api.py e coloque o código para fazer chamadas à API REST do Airflow neste ficheiro.

Não altere nenhuma variável. A função do Cloud importa este ficheiro do ficheiro main.py.

from __future__ import annotations

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests


# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])


def make_composer2_web_server_request(
    url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
    """
    Make a request to Cloud Composer 2 environment's web server.
    Args:
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                  If no timeout is provided, it is set to 90 by default.
    """

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    return authed_session.request(method, url, **kwargs)


def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
    """
    Make a request to trigger a dag using the stable Airflow 2 REST API.
    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    Args:
      web_server_url: The URL of the Airflow 2 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).
    """

    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data
    )

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
        )
    elif response.status_code != 200:
        response.raise_for_status()
    else:
        return response.text

Coloque o seguinte código no ficheiro main.py. Substitua o valor da variável web_server_url pelo endereço do servidor Web do Airflow que obteve anteriormente.

# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Trigger a DAG in a Cloud Composer 2 environment in response to an event,
using Cloud Functions.
"""

from typing import Any

import composer2_airflow_rest_api

def trigger_dag_gcf(data, context=None):
    """
    Trigger a DAG and pass event data.

    Args:
      data: A dictionary containing the data for the event. Its format depends
      on the event.
      context: The context object for the event.

    For more information about the arguments, see:
    https://cloud.google.com/functions/docs/writing/background#function_parameters
    """

    # TODO(developer): replace with your values
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    web_server_url = (
        "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
    )
    # Replace with the ID of the DAG that you want to run.
    dag_id = 'composer_sample_trigger_response_dag'

    composer2_airflow_rest_api.trigger_dag(web_server_url, dag_id, data)

Java

Coloque o seguinte código no ficheiro Example.java. Substitua o valor da variável webServerUrl pelo endereço do servidor Web do Airflow que obteve anteriormente.


// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.example;

import com.example.Example.GcsEvent;
import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpContent;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestFactory;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.http.json.JsonHttpContent;
import com.google.api.client.json.gson.GsonFactory;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;

/**
 * Cloud Function that triggers an Airflow DAG in response to an event (in
 * this case a Cloud Storage event).
 */
public class Example implements BackgroundFunction<GcsEvent> {
  private static final Logger logger = Logger.getLogger(Example.class.getName());

  // TODO(developer): replace with your values
  // Replace webServerUrl with the Airflow web server address. To obtain this
  // URL, run the following command for your environment:
  // gcloud composer environments describe example-environment \
  //  --location=your-composer-region \
  //  --format="value(config.airflowUri)"
  @Override
  public void accept(GcsEvent event, Context context) throws Exception {
    String webServerUrl = "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com";
    String dagName = "composer_sample_trigger_response_dag";
    String url = String.format("%s/api/v1/dags/%s/dagRuns", webServerUrl, dagName);

    logger.info(String.format("Triggering DAG %s as a result of an event on the object %s.",
      dagName, event.name));
    logger.info(String.format("Triggering DAG via the following URL: %s", url));

    GoogleCredentials googleCredentials = GoogleCredentials.getApplicationDefault()
        .createScoped("https://www.googleapis.com/auth/cloud-platform");
    HttpCredentialsAdapter credentialsAdapter = new HttpCredentialsAdapter(googleCredentials);
    HttpRequestFactory requestFactory =
      new NetHttpTransport().createRequestFactory(credentialsAdapter);

    Map<String, Map<String, String>> json = new HashMap<String, Map<String, String>>();
    Map<String, String> conf = new HashMap<String, String>();
    conf.put("bucket", event.bucket);
    conf.put("name", event.name);
    conf.put("generation", event.generation);
    conf.put("operation", context.eventType());
    json.put("conf", conf);
    HttpContent content = new JsonHttpContent(new GsonFactory(), json);
    HttpRequest request = requestFactory.buildPostRequest(new GenericUrl(url), content);
    request.getHeaders().setContentType("application/json");
    HttpResponse response;
    try {
      response = request.execute();
      int statusCode = response.getStatusCode();
      logger.info("Response code: " + statusCode);
      logger.info(response.parseAsString());
    } catch (HttpResponseException e) {
      // https://cloud.google.com/java/docs/reference/google-http-client/latest/com.google.api.client.http.HttpResponseException
      logger.info("Received HTTP exception");
      logger.info(e.getLocalizedMessage());
      logger.info("- 400 error: wrong arguments passed to Airflow API");
      logger.info("- 401 error: check if service account has Composer User role");
      logger.info("- 403 error: check Airflow RBAC roles assigned to service account");
      logger.info("- 404 error: check Web Server URL");
    } catch (Exception e) {
      logger.info("Received exception");
      logger.info(e.getLocalizedMessage());
    }
  }

  /** Details of the storage event. */
  public static class GcsEvent {
    /** Bucket name. */
    String bucket;
    /** Object name. */
    String name;
    /** Object version. */
    String generation;
  }
}

Teste a sua função

Para verificar se a função e o DAG funcionam conforme esperado:

  1. Aguarde até que a função seja implementada.
  2. Carregue um ficheiro para o seu contentor do Cloud Storage. Em alternativa, pode acionar a função manualmente selecionando a ação Testar a função para a mesma na Google Cloud consola.
  3. Verifique a página DAG na interface Web do Airflow. O DAG deve ter uma execução de DAG ativa ou já concluída.
  4. Na IU do Airflow, verifique os registos de tarefas desta execução. Deverá ver que a tarefa print_gcs_info produz os dados recebidos da função nos registos:

Python

[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
    {bucket: example-storage-for-gcf-triggers, contentType: text/plain,
    crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
    ... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
    return code 0h

Java

[2023-02-08, 08:00:09 UTC] {subprocess.py:86} INFO - Output:
[2023-02-08, 08:00:09 UTC] {subprocess.py:93} INFO - {bucket: example-storage-for-gcf-triggers, generation: 1675843189006715, name: file.txt, operation: google.storage.object.create}
[2023-02-08, 08:00:09 UTC] {subprocess.py:97} INFO - Command exited with return code 0

O que se segue?