Activar DAGs con Cloud Functions y mensajes de Pub/Sub

Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1

En esta página se explica cómo crear una arquitectura push basada en eventos activando los DAGs de Cloud Composer en respuesta a los cambios en los temas de Pub/Sub. Los ejemplos de este tutorial muestran cómo gestionar el ciclo completo de Pub/Sub, incluida la gestión de suscripciones, como parte del proceso de DAG. Es adecuada para algunos de los casos de uso habituales en los que necesitas activar DAGs, pero no quieres configurar permisos de acceso adicionales.

Por ejemplo, los mensajes enviados a través de Pub/Sub se pueden usar como solución si no quieres proporcionar acceso directo a un entorno de Cloud Composer por motivos de seguridad. Puedes configurar una función de Cloud Run que cree mensajes de Pub/Sub y los publique en un tema de Pub/Sub. A continuación, puedes crear un DAG que extraiga mensajes de Pub/Sub y los gestione.

En este ejemplo concreto, crearás una función de Cloud Run y desplegarás dos DAGs. El primer DAG extrae mensajes de Pub/Sub y activa el segundo DAG según el contenido del mensaje de Pub/Sub.

En este tutorial se da por hecho que tienes conocimientos de Python y de la Google Cloud consola.

Objetivos

Costes

En este tutorial se usan los siguientes componentes facturables de Google Cloud:

Cuando hayas terminado este tutorial, puedes eliminar los recursos que hayas creado para evitar que se te siga facturando. Consulta Limpiar para obtener más información.

Antes de empezar

Para este tutorial, necesitas un Google Cloud proyecto. Configura el proyecto de la siguiente manera:

  1. En la Google Cloud consola, selecciona o crea un proyecto:

    Ir al selector de proyectos

  2. Comprueba que la facturación esté habilitada en tu proyecto. Consulta cómo comprobar si la facturación está habilitada en un proyecto.

  3. Asegúrate de que el usuario de tu proyecto Google Cloud tenga los siguientes roles para crear los recursos necesarios:

    • Usuario de cuenta de servicio (roles/iam.serviceAccountUser)
    • Editor de Pub/Sub (roles/pubsub.editor)
    • Administrador de objetos de entorno y almacenamiento (roles/composer.environmentAndStorageObjectAdmin)
    • Administrador de funciones de Cloud Run (roles/cloudfunctions.admin)
    • Visualizador de registros (roles/logging.viewer)
  4. Asegúrate de que la cuenta de servicio que ejecuta tu función de Cloud Run tenga los permisos suficientes en tu proyecto para acceder a Pub/Sub. De forma predeterminada, las funciones de Cloud Run usan la cuenta de servicio predeterminada de App Engine. Esta cuenta de servicio tiene el rol Editor, que tiene permisos suficientes para este tutorial.

Habilitar APIs en tu proyecto

Consola

Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.

Enable the APIs

gcloud

Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:

gcloud services enable composer.googleapis.com cloudfunctions.googleapis.com pubsub.googleapis.com

Terraform

Habilita la API Cloud Composer en tu proyecto añadiendo las siguientes definiciones de recursos a tu secuencia de comandos de Terraform:

resource "google_project_service" "composer_api" {
  project = "<PROJECT_ID>"
  service = "composer.googleapis.com"
  // Disabling Cloud Composer API might irreversibly break all other
  // environments in your project.
  // This parameter prevents automatic disabling
  // of the API when the resource is destroyed.
  // We recommend to disable the API only after all environments are deleted.
  disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
  check_if_service_has_usage_on_destroy = true
}

resource "google_project_service" "pubsub_api" {
  project = "<PROJECT_ID>"
  service = "pubsub.googleapis.com"
  disable_on_destroy = false
}

resource "google_project_service" "functions_api" {
  project = "<PROJECT_ID>"
  service = "cloudfunctions.googleapis.com"
  disable_on_destroy = false
}

Sustituye <PROJECT_ID> por el ID de proyecto de tu proyecto. Por ejemplo, example-project.

Crear un entorno de Cloud Composer

Crea un entorno de Cloud Composer 2.

Como parte de este procedimiento, asignas el rol Extensión del agente de servicio de la API de Cloud Composer v2 (roles/composer.ServiceAgentV2Ext) a la cuenta del agente de servicio de Composer. Cloud Composer usa esta cuenta para realizar operaciones en tu proyecto Google Cloud .

Crear un tema de Pub/Sub

En este ejemplo, se activa un DAG en respuesta a un mensaje enviado a un tema de Pub/Sub. Crea un tema de Pub/Sub para usarlo en este ejemplo:

Consola

  1. En la Google Cloud consola, ve a la página Temas de Pub/Sub.

    Ir a Temas de Pub/Sub

  2. Haz clic en Crear tema.

  3. En el campo ID de tema, introduce dag-topic-trigger como ID del tema.

  4. Deja el resto de las opciones con sus valores predeterminados.

  5. Haz clic en Crear tema.

gcloud

Para crear un tema, ejecuta el comando gcloud pubsub topics create en la CLI de Google Cloud:

gcloud pubsub topics create dag-topic-trigger

Terraform

Añade las siguientes definiciones de recursos a tu secuencia de comandos de Terraform:

resource "google_pubsub_topic" "trigger" {
  project                    = "<PROJECT_ID>"
  name                       = "dag-topic-trigger"
  message_retention_duration = "86600s"
}

Sustituye <PROJECT_ID> por el ID de proyecto de tu proyecto. Por ejemplo, example-project.

Subir tus DAGs

Sube los DAGs a tu entorno:

  1. Guarda el siguiente archivo DAG en tu ordenador local.
  2. Sustituye <PROJECT_ID> por el ID de proyecto de tu proyecto. Por ejemplo, example-project.
  3. Sube el archivo DAG editado a tu entorno.
from __future__ import annotations

from datetime import datetime
import time

from airflow import DAG
from airflow import XComArg
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.google.cloud.operators.pubsub import (
    PubSubCreateSubscriptionOperator,
    PubSubPullOperator,
)

PROJECT_ID = "<PROJECT_ID>"
TOPIC_ID = "dag-topic-trigger"
SUBSCRIPTION = "trigger_dag_subscription"


def handle_messages(pulled_messages, context):
    dag_ids = list()
    for idx, m in enumerate(pulled_messages):
        data = m.message.data.decode("utf-8")
        print(f"message {idx} data is {data}")
        dag_ids.append(data)
    return dag_ids


# This DAG will run minutely and handle pub/sub messages by triggering target DAG
with DAG(
    "trigger_dag",
    start_date=datetime(2021, 1, 1),
    schedule_interval="* * * * *",
    max_active_runs=1,
    catchup=False,
) as trigger_dag:
    # If subscription exists, we will use it. If not - create new one
    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task",
        project_id=PROJECT_ID,
        topic=TOPIC_ID,
        subscription=SUBSCRIPTION,
    )

    subscription = subscribe_task.output

    # Proceed maximum 50 messages in callback function handle_messages
    # Here we acknowledge messages automatically. You can use PubSubHook.acknowledge to acknowledge in downstream tasks
    # https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/pubsub/index.html#airflow.providers.google.cloud.hooks.pubsub.PubSubHook.acknowledge
    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages_operator",
        project_id=PROJECT_ID,
        ack_messages=True,
        messages_callback=handle_messages,
        subscription=subscription,
        max_messages=50,
    )

    # Here we use Dynamic Task Mapping to trigger DAGs according to messages content
    # https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html
    trigger_target_dag = TriggerDagRunOperator.partial(task_id="trigger_target").expand(
        trigger_dag_id=XComArg(pull_messages_operator)
    )

    (subscribe_task >> pull_messages_operator >> trigger_target_dag)


def _some_heavy_task():
    print("Do some operation...")
    time.sleep(1)
    print("Done!")


# Simple target DAG
with DAG(
    "target_dag",
    start_date=datetime(2022, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
    catchup=False,
) as target_dag:
    some_heavy_task = PythonOperator(
        task_id="some_heavy_task", python_callable=_some_heavy_task
    )

    (some_heavy_task)

El código de ejemplo contiene dos DAGs: trigger_dag y target_dag.

El DAG trigger_dag se suscribe a un tema de Pub/Sub, extrae mensajes de Pub/Sub y activa otro DAG especificado en el ID de DAG de los datos del mensaje de Pub/Sub. En este ejemplo, trigger_dag activa el DAG target_dag, que genera mensajes en los registros de tareas.

El DAG trigger_dag contiene las siguientes tareas:

  • subscribe_task: suscríbete a un tema de Pub/Sub.
  • pull_messages_operator: lee los datos de un mensaje de Pub/Sub con PubSubPullOperator.
  • trigger_target_dag: activa otro DAG (en este ejemplo, target_dag) según los datos de los mensajes extraídos del tema de Pub/Sub.

El DAG target_dag contiene solo una tarea: output_to_logs. Esta tarea imprime mensajes en el registro de tareas con un segundo de retraso.

Desplegar una función de Cloud Run que publique mensajes en un tema de Pub/Sub

En esta sección, desplegarás una función de Cloud Run que publica mensajes en un tema de Pub/Sub.

Crear una función de Cloud Run y especificar su configuración

Consola

  1. En la Google Cloud consola, ve a la página Funciones de Cloud Run.

    Ir a Cloud Run Functions

  2. Haz clic en Crear función.

  3. En el campo Environment (Entorno), selecciona 1st gen (1.ª gen.).

  4. En el campo Nombre de la función, introduce el nombre de la función: pubsub-publisher.

  5. En el campo Tipo de activador, selecciona HTTP.

  6. En la sección Autenticación, selecciona Permitir las invocaciones sin autenticar. Esta opción permite a los usuarios no autenticados invocar una función HTTP.

  7. Haz clic en Guardar.

  8. Haz clic en Siguiente para ir al paso Código.

Terraform

Te recomendamos que uses la Google Cloud consola para este paso, ya que no hay una forma sencilla de gestionar el código fuente de la función desde Terraform.

En este ejemplo se muestra cómo puedes subir una función de Cloud Run desde un archivo zip local creando un segmento de Cloud Storage, almacenando el archivo en este segmento y, a continuación, usando el archivo del segmento como origen de la función de Cloud Run. Si usas este método, Terraform no actualizará automáticamente el código fuente de tu función, aunque crees un archivo. Para volver a subir el código de la función, puedes cambiar el nombre del archivo.

  1. Descarga los archivos pubsub_publisher.py y requirements.txt.
  2. En el archivo pubsub_publisher.py, sustituye <PROJECT_ID> por el ID de proyecto de tu proyecto. Por ejemplo, example-project.
  3. Crea un archivo ZIP llamado pubsub_function.zip con el archivo pbusub_publisner.py y el archivo requirements.txt.
  4. Guarda el archivo ZIP en un directorio en el que se encuentre tu secuencia de comandos de Terraform.
  5. Añade las siguientes definiciones de recursos a tu secuencia de comandos de Terraform y sustituye <PROJECT_ID> por el ID del proyecto.
resource "google_storage_bucket" "cloud_function_bucket" {
  project        = <PROJECT_ID>
  name           = "<PROJECT_ID>-cloud-function-source-code"
  location       = "US"
  force_destroy  = true
  uniform_bucket_level_access = true
}

resource "google_storage_bucket_object" "cloud_function_source" {
  name   = "pubsub_function.zip"
  bucket = google_storage_bucket.cloud_function_bucket.name
  source = "./pubsub_function.zip"
}

resource "google_cloudfunctions_function" "pubsub_function" {
  project = <PROJECT_ID>
  name    = "pubsub-publisher"
  runtime = "python310"
  region  = "us-central1"

  available_memory_mb   = 128
  source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
  source_archive_object = "pubsub_function.zip"
  timeout               = 60
  entry_point           = "pubsub_publisher"
  trigger_http          = true
}

Especificar parámetros de código de función de Cloud Run

Consola

  1. En el paso Código, en el campo Tiempo de ejecución, selecciona el tiempo de ejecución del lenguaje que usa tu función. En este ejemplo, selecciona Python 3.10.

  2. En el campo Punto de entrada, introduce pubsub_publisher. Este es el código que se ejecuta cuando se ejecuta tu función de Cloud Run. El valor de esta marca debe ser el nombre de una función o el nombre completo de una clase que exista en el código fuente.

Terraform

Saltar este paso Los parámetros de la función de Cloud Run ya se han definido en el recurso google_cloudfunctions_function.

Subir el código de la función de Cloud Run

Consola

En el campo Código fuente, selecciona la opción adecuada para proporcionar el código fuente de la función. En este tutorial, añade el código de tu función mediante el editor insertado de Cloud Run Functions. También puedes subir un archivo ZIP o usar Cloud Source Repositories.

  1. Coloca el siguiente ejemplo de código en el archivo main.py.
  2. Sustituye <PROJECT_ID> por el ID de proyecto de tu proyecto. Por ejemplo, example-project.
from google.cloud import pubsub_v1

project = "<PROJECT_ID>"
topic = "dag-topic-trigger"


def pubsub_publisher(request):
    """Publish message from HTTP request to Pub/Sub topic.
    Args:
        request (flask.Request): HTTP request object.
    Returns:
        The response text with message published into Pub/Sub topic
        Response object using
        `make_response <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
    """
    request_json = request.get_json()
    print(request_json)
    if request.args and "message" in request.args:
        data_str = request.args.get("message")
    elif request_json and "message" in request_json:
        data_str = request_json["message"]
    else:
        return "Message content not found! Use 'message' key to specify"

    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_id}`
    topic_path = publisher.topic_path(project, topic)

    # The required data format is a bytestring
    data = data_str.encode("utf-8")
    # When you publish a message, the client returns a future.
    message_length = len(data_str)
    future = publisher.publish(topic_path, data, message_length=str(message_length))
    print(future.result())

    return f"Message {data} with message_length {message_length} published to {topic_path}."

Terraform

Saltar este paso Los parámetros de la función de Cloud Run ya se han definido en el recurso google_cloudfunctions_function.

Especificar las dependencias de la función de Cloud Run

Consola

Especifica las dependencias de la función en el archivo de metadatos requirements.txt:

requests-toolbelt==1.0.0
google-auth==2.38.0
google-cloud-pubsub==2.28.0

Cuando despliegas tu función, Cloud Run Functions descarga e instala las dependencias declaradas en el archivo requirements.txt, una por paquete. Este archivo debe estar en el mismo directorio que el archivo main.py que contiene el código de tu función. Para obtener más información, consulta la sección Archivos de requisitos de la documentación de pip.

Terraform

Saltar este paso Las dependencias de las funciones de Cloud Run se definen en el archivo requirements.txt del archivo pubsub_function.zip.

Desplegar una función de Cloud Run

Consola

Haz clic en Desplegar. Cuando el despliegue finalice correctamente, la función aparecerá con una marca de verificación verde en la página Funciones de Cloud Run de la consolaGoogle Cloud .

Asegúrate de que la cuenta de servicio que ejecuta tu función de Cloud Run tiene suficientes permisos en tu proyecto para acceder a Pub/Sub.

Terraform

  1. Inicializa Terraform:

    terraform init
    
  2. Revisa la configuración y comprueba que los recursos que Terraform va a crear o actualizar se ajustan a tus expectativas:

    terraform plan
    
  3. Para comprobar si la configuración es válida, ejecuta el siguiente comando:

    terraform validate
    
  4. Aplica la configuración de Terraform ejecutando el siguiente comando e introduciendo "yes" en la petición:

    terraform apply
    

Espera hasta que Terraform muestre el mensaje "Apply complete!".

En la Google Cloud consola, ve a tus recursos en la interfaz de usuario para comprobar que Terraform los ha creado o actualizado.

Probar la función de Cloud Run

Para comprobar que tu función publica un mensaje en un tema de Pub/Sub y que los DAG de ejemplo funcionan correctamente, haz lo siguiente:

  1. Comprueba que los DAGs estén activos:

    1. En la Google Cloud consola, ve a la página Entornos.

      Ir a Entornos

    2. En la lista de entornos, haz clic en el nombre del entorno. Se abrirá la página Detalles del entorno.

    3. Ve a la pestaña DAGs.

    4. Comprueba los valores de la columna Estado de los DAGs llamados trigger_dag y target_dag. Ambos DAGs deben estar en el estado Active.

  2. Envía un mensaje de Pub/Sub de prueba. Puedes hacerlo en Cloud Shell:

    1. En la Google Cloud consola, ve a la página Funciones.

      Ir a Cloud Run Functions

    2. Haz clic en el nombre de la función pubsub-publisher.

    3. Vaya a la pestaña Pruebas.

    4. En la sección Configurar evento de activación, introduce el siguiente par clave-valor de JSON: {"message": "target_dag"}. No modifiques el par clave-valor, ya que este mensaje activará el DAG de prueba más adelante.

    5. En la sección Test Command (Comando de prueba), haz clic en Test in Cloud Shell (Probar en Cloud Shell).

    6. En Terminal de Cloud Shell, espera hasta que aparezca un comando automáticamente. Ejecuta este comando pulsando Enter.

    7. Si aparece el mensaje Autorizar Cloud Shell, haz clic en Autorizar.

    8. Comprueba que el contenido del mensaje se corresponda con el mensaje de Pub/Sub. En este ejemplo, el mensaje de salida debe empezar por Message b'target_dag' with message_length 10 published to como respuesta de tu función.

  3. Comprueba que se haya activado target_dag:

    1. Espera al menos un minuto para que se complete una nueva ejecución de DAG de trigger_dag.

    2. En la Google Cloud consola, ve a la página Entornos.

      Ir a Entornos

    3. En la lista de entornos, haz clic en el nombre del entorno. Se abrirá la página Detalles del entorno.

    4. Ve a la pestaña DAGs.

    5. Haz clic en trigger_dag para ir a la página Detalles del DAG. En la pestaña Ejecuciones, se muestra una lista de ejecuciones de DAG del DAG trigger_dag.

      Este DAG se ejecuta cada minuto y procesa todos los mensajes de Pub/Sub enviados desde la función. Si no se ha enviado ningún mensaje, la tarea trigger_target se marca como Skipped en los registros de ejecución del DAG. Si se han activado los DAGs, la tarea trigger_target se marca como Success.

    6. Revisa varias ejecuciones de DAG recientes para localizar una en la que las tres tareas (subscribe_task, pull_messages_operator y trigger_target) tengan el estado Success.

    7. Vuelve a la pestaña DAGs y comprueba que la columna Successful runs (Ejecuciones correctas) del DAG target_dag muestra una ejecución correcta.

Resumen

En este tutorial, has aprendido a usar funciones de Cloud Run para publicar mensajes en un tema de Pub/Sub y a desplegar un DAG que se suscribe a un tema de Pub/Sub, extrae mensajes de Pub/Sub y activa otro DAG especificado en el ID de DAG de los datos del mensaje.

También hay otras formas de crear y gestionar suscripciones de Pub/Sub y activar DAGs que no se incluyen en este tutorial. Por ejemplo, puedes usar funciones de Cloud Run para activar DAGs de Airflow cuando se produzca un evento específico. Consulta nuestros tutoriales para probar las otras Google Cloud funciones.

Limpieza

Para evitar que se apliquen cargos en tu cuenta de Google Cloud por los recursos utilizados en este tutorial, elimina el proyecto que contiene los recursos o conserva el proyecto y elimina los recursos.

Eliminar el proyecto

    Delete a Google Cloud project:

    gcloud projects delete PROJECT_ID

Eliminar recursos concretos

Si tienes previsto consultar varios tutoriales y guías de inicio rápido, reutilizar los proyectos puede ayudarte a no superar los límites de cuota de proyectos.

Consola

  1. Elimina el entorno de Cloud Composer. También eliminarás el contenedor del entorno durante este procedimiento.
  2. Elimina el tema de Pub/Sub, dag-topic-trigger.
  3. Elimina la función de Cloud Run.

    1. En la Google Cloud consola, ve a Cloud Run Functions.

      Ir a Cloud Run Functions

    2. Marca la casilla de la función que quieras eliminar. pubsub-publisher

    3. Haz clic en Eliminar y sigue las instrucciones.

Terraform

  1. Asegúrate de que tu secuencia de comandos de Terraform no contenga entradas de recursos que tu proyecto siga necesitando. Por ejemplo, puede que quieras mantener algunas APIs habilitadas y permisos de gestión de identidades y accesos asignados (si has añadido esas definiciones a tu script de Terraform).
  2. Ejecuta terraform destroy.
  3. Elimina manualmente el segmento del entorno. Cloud Composer no lo elimina automáticamente. Puedes hacerlo desde la Google Cloud consola o la CLI de Google Cloud.

Siguientes pasos