Acione os DAGs do Cloud Composer com o Cloud Functions e a API REST do Airflow

Cloud Composer 1 | Cloud Composer 2

Nesta página, descrevemos como usar o Cloud Functions para acionar os DAGs do Cloud Composer em resposta a eventos.

O Apache Airflow foi projetado para executar DAGs regularmente, mas também é possível acioná-los em resposta a eventos. Uma forma de fazer isso é usando o Cloud Functions para acionar DAGs do Cloud Composer quando ocorre um evento especificado.

No exemplo deste guia, um DAG é executado sempre que ocorre uma mudança em um bucket do Cloud Storage. Alterações em qualquer objeto em um bucket acionam uma função. Essa função faz uma solicitação à API REST do Airflow do ambiente do Cloud Composer. O Airflow processa essa solicitação e executa um DAG. O DAG mostra informações sobre a alteração.

Antes de começar

Verifique a configuração de rede do ambiente

Esta solução não funciona em configurações de IP privado e VPC Service Controls, porque não é possível configurar a conectividade do Cloud Functions com o servidor da Web do Airflow nessas configurações.

No Cloud Composer 2, é possível usar outra abordagem: acionar DAGs usando mensagens do Cloud Functions e do Pub/Sub

Ativar as APIs do projeto

Console

Ative as APIs Cloud Composer and Cloud Functions.

Ative as APIs

gcloud

Ative as APIs Cloud Composer and Cloud Functions:

gcloud services enable cloudfunctions.googleapis.com composer.googleapis.com

Ativar a API REST do Airflow

Para o Airflow 2, a API REST estável já está ativada por padrão. Se a API estável estiver desativada no ambiente, ative a API REST estável.

Permitir chamadas de API para a API REST do Airflow usando o controle de acesso do servidor da Web

O Cloud Functions pode entrar em contato com a API REST do Airflow usando endereços IPv4 ou IPv6.

Se você não tiver certeza de qual será o intervalo de IP de chamada, use uma opção de configuração padrão no controle de acesso do servidor da Web, que é All IP addresses have access (default), para não bloquear acidentalmente suas Cloud Functions.

Criar um bucket do Cloud Storage

Este exemplo aciona um DAG em resposta a alterações em um bucket do Cloud Storage. Crie um novo bucket para usar neste exemplo.

Ver o URL do servidor da Web do Airflow

Este exemplo faz solicitações da API REST para o endpoint do servidor da Web do Airflow. Use o URL do servidor da Web do Airflow no código da função do Cloud.

Console

  1. No console do Google Cloud, acesse a página Ambientes.

    Acessar "Ambientes"

  2. Clique no nome do seu ambiente.

  3. Na página Detalhes do ambiente, acesse a guia Configuração do ambiente.

  4. O URL do servidor da Web do Airflow está listado no item da IU da Web do Airflow.

gcloud

Execute este comando:

gcloud composer environments describe ENVIRONMENT_NAME \
    --location LOCATION \
    --format='value(config.airflowUri)'

Substitua:

  • ENVIRONMENT_NAME pelo nome do ambiente
  • LOCATION pela região em que o ambiente está localizado;

Fazer upload de um DAG para o ambiente

Faça o upload de um DAG para seu ambiente. O exemplo a seguir mostra a configuração de execução do DAG recebida. Você acionará esse DAG a partir de uma função que vai criar neste guia depois.

import datetime

import airflow
from airflow.operators.bash import BashOperator


with airflow.DAG(
    "composer_sample_trigger_response_dag",
    start_date=datetime.datetime(2021, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
) as dag:
    # Print the dag_run's configuration, which includes information about the
    # Cloud Storage object change.
    print_gcs_info = BashOperator(
        task_id="print_gcs_info", bash_command="echo {{ dag_run.conf }}"
    )

Implantar uma Função do Cloud que aciona o DAG

É possível implantar uma função do Cloud usando a linguagem de sua preferência compatível com o Cloud Functions ou o Cloud Run. Neste tutorial, demonstramos uma função do Cloud implementada em Python e Java.

Especificar os parâmetros de configuração da função do Cloud

  • Gatilho. Neste exemplo, selecione um gatilho que funcione quando um novo objeto for criado em um bucket ou um objeto existente for substituído.

    • Tipo de gatilho. Cloud Storage.

    • Tipo de evento. Finalizar/Criar.

    • Bucket. Selecione um bucket que precisa acionar essa função.

    • Tentar novamente em caso de falha. Recomendamos desativar essa opção para os fins deste exemplo. Se você usar sua própria função em um ambiente de produção, ative essa opção para processar erros temporários.

  • Conta de serviço do ambiente de execução, na seção Configurações de ambiente de execução, build, conexões e segurança. Use uma das seguintes opções, dependendo das suas preferências:

    • Selecione Conta de serviço padrão do Compute Engine. Com as permissões padrão do IAM, essa conta pode executar funções que acessam os ambientes do Cloud Composer.

    • Crie uma conta de serviço personalizada que tenha o papel de Usuário do Composer e especifique-a como uma conta de serviço do ambiente de execução para essa função. Essa opção segue o princípio do privilégio mínimo.

  • Ambiente de execução e ponto de entrada, na etapa Código:

    • (Python) Ao adicionar o código para este exemplo, selecione o ambiente de execução Python 3.7 ou posterior e especifique trigger_dag_gcf como o ponto de entrada.

    • (Java) Ao adicionar o código para este exemplo, selecione o ambiente de execução do Java 17 e especifique com.example.Example como ponto de entrada.

Adicionar requisitos

Python

Especifique as dependências no arquivo requirements.txt:

google-auth==2.19.1
requests==2.31.0

Java

Adicione as seguintes dependências à seção dependencies no pom.xml gerado pela IU do Google Cloud Functions.

    <dependency>
      <groupId>com.google.apis</groupId>
      <artifactId>google-api-services-docs</artifactId>
      <version>v1-rev20210707-1.32.1</version>
    </dependency>
    <dependency>
      <groupId>com.google.api-client</groupId>
      <artifactId>google-api-client</artifactId>
      <version>1.32.1</version>
    </dependency>
    <dependency>
      <groupId>com.google.auth</groupId>
      <artifactId>google-auth-library-credentials</artifactId>
      <version>1.14.0</version>
    </dependency>
    <dependency>
      <groupId>com.google.auth</groupId>
      <artifactId>google-auth-library-oauth2-http</artifactId>
      <version>1.14.0</version>
    </dependency>

Python

Adicione o código para acionar DAGs usando a API REST do Airflow. Crie um arquivo chamado composer2_airflow_rest_api.py e coloque nele o código para fazer chamadas da API REST do Airflow.

Não altere nenhuma variável. A função do Cloud importa esse arquivo do arquivo main.py.

from __future__ import annotations

from typing import Any

import google.auth
from google.auth.transport.requests import AuthorizedSession
import requests


# Following GCP best practices, these credentials should be
# constructed at start-up time and used throughout
# https://cloud.google.com/apis/docs/client-libraries-best-practices
AUTH_SCOPE = "https://www.googleapis.com/auth/cloud-platform"
CREDENTIALS, _ = google.auth.default(scopes=[AUTH_SCOPE])


def make_composer2_web_server_request(
    url: str, method: str = "GET", **kwargs: Any
) -> google.auth.transport.Response:
    """
    Make a request to Cloud Composer 2 environment's web server.
    Args:
      url: The URL to fetch.
      method: The request method to use ('GET', 'OPTIONS', 'HEAD', 'POST', 'PUT',
        'PATCH', 'DELETE')
      **kwargs: Any of the parameters defined for the request function:
                https://github.com/requests/requests/blob/master/requests/api.py
                  If no timeout is provided, it is set to 90 by default.
    """

    authed_session = AuthorizedSession(CREDENTIALS)

    # Set the default timeout, if missing
    if "timeout" not in kwargs:
        kwargs["timeout"] = 90

    return authed_session.request(method, url, **kwargs)


def trigger_dag(web_server_url: str, dag_id: str, data: dict) -> str:
    """
    Make a request to trigger a dag using the stable Airflow 2 REST API.
    https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html

    Args:
      web_server_url: The URL of the Airflow 2 web server.
      dag_id: The DAG ID.
      data: Additional configuration parameters for the DAG run (json).
    """

    endpoint = f"api/v1/dags/{dag_id}/dagRuns"
    request_url = f"{web_server_url}/{endpoint}"
    json_data = {"conf": data}

    response = make_composer2_web_server_request(
        request_url, method="POST", json=json_data
    )

    if response.status_code == 403:
        raise requests.HTTPError(
            "You do not have a permission to perform this operation. "
            "Check Airflow RBAC roles for your account."
            f"{response.headers} / {response.text}"
        )
    elif response.status_code != 200:
        response.raise_for_status()
    else:
        return response.text

Coloque o seguinte código no arquivo main.py. Substitua o valor da variável web_server_url pelo endereço do servidor da Web do Airflow que você recebeu anteriormente.

# Copyright 2021 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""
Trigger a DAG in a Cloud Composer 2 environment in response to an event,
using Cloud Functions.
"""

from typing import Any

import composer2_airflow_rest_api

def trigger_dag_gcf(data, context=None):
    """
    Trigger a DAG and pass event data.

    Args:
      data: A dictionary containing the data for the event. Its format depends
      on the event.
      context: The context object for the event.

    For more information about the arguments, see:
    https://cloud.google.com/functions/docs/writing/background#function_parameters
    """

    # TODO(developer): replace with your values
    # Replace web_server_url with the Airflow web server address. To obtain this
    # URL, run the following command for your environment:
    # gcloud composer environments describe example-environment \
    #  --location=your-composer-region \
    #  --format="value(config.airflowUri)"
    web_server_url = (
        "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com"
    )
    # Replace with the ID of the DAG that you want to run.
    dag_id = 'composer_sample_trigger_response_dag'

    composer2_airflow_rest_api.trigger_dag(web_server_url, dag_id, data)

Java

Coloque o seguinte código no arquivo Example.java. Substitua o valor da variável webServerUrl pelo endereço do servidor da Web do Airflow que você recebeu anteriormente.


// Copyright 2022 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package com.example;

import com.example.Example.GcsEvent;
import com.google.api.client.http.GenericUrl;
import com.google.api.client.http.HttpContent;
import com.google.api.client.http.HttpRequest;
import com.google.api.client.http.HttpRequestFactory;
import com.google.api.client.http.HttpResponse;
import com.google.api.client.http.HttpResponseException;
import com.google.api.client.http.javanet.NetHttpTransport;
import com.google.api.client.http.json.JsonHttpContent;
import com.google.api.client.json.gson.GsonFactory;
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.functions.BackgroundFunction;
import com.google.cloud.functions.Context;
import java.util.HashMap;
import java.util.Map;
import java.util.logging.Logger;

/**
 * Cloud Function that triggers an Airflow DAG in response to an event (in
 * this case a Cloud Storage event).
 */
public class Example implements BackgroundFunction<GcsEvent> {
  private static final Logger logger = Logger.getLogger(Example.class.getName());

  // TODO(developer): replace with your values
  // Replace webServerUrl with the Airflow web server address. To obtain this
  // URL, run the following command for your environment:
  // gcloud composer environments describe example-environment \
  //  --location=your-composer-region \
  //  --format="value(config.airflowUri)"
  @Override
  public void accept(GcsEvent event, Context context) throws Exception {
    String webServerUrl = "https://example-airflow-ui-url-dot-us-central1.composer.googleusercontent.com";
    String dagName = "composer_sample_trigger_response_dag";
    String url = String.format("%s/api/v1/dags/%s/dagRuns", webServerUrl, dagName);

    logger.info(String.format("Triggering DAG %s as a result of an event on the object %s.",
      dagName, event.name));
    logger.info(String.format("Triggering DAG via the following URL: %s", url));

    GoogleCredentials googleCredentials = GoogleCredentials.getApplicationDefault()
        .createScoped("https://www.googleapis.com/auth/cloud-platform");
    HttpCredentialsAdapter credentialsAdapter = new HttpCredentialsAdapter(googleCredentials);
    HttpRequestFactory requestFactory =
      new NetHttpTransport().createRequestFactory(credentialsAdapter);

    Map<String, Map<String, String>> json = new HashMap<String, Map<String, String>>();
    Map<String, String> conf = new HashMap<String, String>();
    conf.put("bucket", event.bucket);
    conf.put("name", event.name);
    conf.put("generation", event.generation);
    conf.put("operation", context.eventType());
    json.put("conf", conf);
    HttpContent content = new JsonHttpContent(new GsonFactory(), json);
    HttpRequest request = requestFactory.buildPostRequest(new GenericUrl(url), content);
    request.getHeaders().setContentType("application/json");
    HttpResponse response;
    try {
      response = request.execute();
      int statusCode = response.getStatusCode();
      logger.info("Response code: " + statusCode);
      logger.info(response.parseAsString());
    } catch (HttpResponseException e) {
      // https://cloud.google.com/java/docs/reference/google-http-client/latest/com.google.api.client.http.HttpResponseException
      logger.info("Received HTTP exception");
      logger.info(e.getLocalizedMessage());
      logger.info("- 400 error: wrong arguments passed to Airflow API");
      logger.info("- 401 error: check if service account has Composer User role");
      logger.info("- 403 error: check Airflow RBAC roles assigned to service account");
      logger.info("- 404 error: check Web Server URL");
    } catch (Exception e) {
      logger.info("Received exception");
      logger.info(e.getLocalizedMessage());
    }
  }

  /** Details of the storage event. */
  public static class GcsEvent {
    /** Bucket name. */
    String bucket;
    /** Object name. */
    String name;
    /** Object version. */
    String generation;
  }
}

Testar a função

Para verificar se a função e o DAG funcionam conforme o esperado:

  1. Aguarde até que a função seja implantada.
  2. Faça upload de um arquivo para o bucket do Cloud Storage. Como alternativa, é possível acionar a função manualmente selecionando a ação Testar a função no console do Google Cloud.
  3. Verifique a página do DAG na interface da Web do Airflow. O DAG precisa ter uma execução ativa ou já concluída.
  4. Na IU do Airflow, verifique os registros de tarefas desta execução. Você verá que a tarefa print_gcs_info gera os dados recebidos da função para os registros:

Python

[2021-04-04 18:25:44,778] {bash_operator.py:154} INFO - Output:
[2021-04-04 18:25:44,781] {bash_operator.py:158} INFO - Triggered from GCF:
    {bucket: example-storage-for-gcf-triggers, contentType: text/plain,
    crc32c: dldNmg==, etag: COW+26Sb5e8CEAE=, generation: 1617560727904101,
    ... }
[2021-04-04 18:25:44,781] {bash_operator.py:162} INFO - Command exited with
    return code 0h

Java

[2023-02-08, 08:00:09 UTC] {subprocess.py:86} INFO - Output:
[2023-02-08, 08:00:09 UTC] {subprocess.py:93} INFO - {bucket: example-storage-for-gcf-triggers, generation: 1675843189006715, name: file.txt, operation: google.storage.object.create}
[2023-02-08, 08:00:09 UTC] {subprocess.py:97} INFO - Command exited with return code 0

A seguir