Activa los DAG con Cloud Functions y mensajes de Pub/Sub

Cloud Composer 1 | Cloud Composer 2

En esta página, se explica cómo crear una arquitectura push basada en eventos mediante la activación de DAG de Cloud Composer en respuesta a los cambios de tema de Pub/Sub. En los ejemplos de este instructivo, se demuestra el control del ciclo completo de administración de Pub/Sub, incluida la administración de suscripciones, como parte del proceso de DAG. Es adecuado para algunos de los casos de uso comunes en los que necesitas activar los DAG, pero no deseas configurar permisos de acceso adicionales.

Por ejemplo, los mensajes enviados a través de Pub/Sub se pueden usar como una solución si no deseas proporcionar acceso directo a un entorno de Cloud Composer por motivos de seguridad. Puedes configurar una Cloud Function que cree mensajes de Pub/Sub y los publique en un tema de Pub/Sub. Luego, puedes crear un DAG que extraiga mensajes de Pub/Sub y, luego, los maneje.

En este ejemplo específico, se crea una Cloud Function y se implementan dos DAG. El primer DAG extrae mensajes de Pub/Sub y activa el segundo DAG según el contenido del mensaje de Pub/Sub.

En este instructivo, se supone que estás familiarizado con Python y la consola de Google Cloud.

Objetivos

Costos

En este instructivo, se usan los siguientes componentes facturables de Google Cloud:

Una vez que finalices este instructivo, puedes borrar los recursos que creaste para evitar que se te sigan facturando. Consulta la sección Limpieza para obtener más detalles.

Antes de comenzar

Para este instructivo, necesitas un proyecto de Google Cloud. Configura el proyecto de la siguiente manera:

  1. En la consola de Google Cloud, selecciona o crea un proyecto:

    Ir al Selector de proyectos

  2. Asegúrate de tener habilitada la facturación para tu proyecto. Descubre cómo verificar si la facturación está habilitada en un proyecto.

  3. Asegúrate de que el usuario de tu proyecto de Google Cloud tenga los siguientes roles para crear los recursos necesarios:

    • Usuario de cuenta de servicio (roles/iam.serviceAccountUser)
    • Editor de Pub/Sub (roles/pubsub.editor)
    • Administrador de objetos de almacenamiento y entorno (roles/composer.environmentAndStorageObjectAdmin)
    • Administrador de Cloud Functions (roles/cloudfunctions.admin)
    • Visor de registros (roles/logging.viewer)
  4. Asegúrate de que la cuenta de servicio que ejecuta tu Cloud Function tenga los permisos suficientes en tu proyecto para acceder a Pub/Sub. Según la configuración predeterminada, Cloud Functions usa la cuenta de servicio predeterminada de App Engine. Esta cuenta de servicio tiene la función de Editor, que tiene permisos suficientes para este instructivo.

Habilita las API para tu proyecto.

Console

Habilita las API de Cloud Composer, Cloud Functions, and Pub/Sub.

Habilita las API

gcloud

Habilita las APIs de Cloud Composer, Cloud Functions, and Pub/Sub:

gcloud services enable composer.googleapis.com cloudfunctions.googleapis.com pubsub.googleapis.com

Terraform

Para habilitar la API de Cloud Composer en tu proyecto, agrega las siguientes definiciones de recursos a tu secuencia de comandos de Terraform:

resource "google_project_service" "composer_api" {
  project = "<PROJECT_ID>"
  service = "composer.googleapis.com"
  // Disabling Cloud Composer API might irreversibly break all other
  // environments in your project.
  // This parameter prevents automatic disabling
  // of the API when the resource is destroyed.
  // We recommend to disable the API only after all environments are deleted.
  disable_on_destroy = false
}

resource "google_project_service" "pubsub_api" {
  project = "<PROJECT_ID>"
  service = "pubsub.googleapis.com"
  disable_on_destroy = false
}

resource "google_project_service" "functions_api" {
  project = "<PROJECT_ID>"
  service = "cloudfunctions.googleapis.com"
  disable_on_destroy = false
}

Reemplaza <PROJECT_ID> por el ID de tu proyecto. Por ejemplo, example-project.

Crea tu entorno de Cloud Composer

Crea un entorno de Cloud Composer 2.

Como parte de este procedimiento, otorgas el rol Extensión del agente de servicio de la API de Cloud Composer v2 (roles/composer.ServiceAgentV2Ext) a la cuenta del agente de servicio de Composer. Cloud Composer usa esta cuenta para realizar operaciones en tu proyecto de Google Cloud.

Crear un tema de Pub/Sub

En este ejemplo, se activa un DAG en respuesta a un mensaje enviado a un tema de Pub/Sub. Crea un tema de Pub/Sub para usarlo en este ejemplo:

Console

  1. En la consola de Google Cloud, ve a la página Temas de Pub/Sub.

    Ir a Temas de Pub/Sub

  2. Haz clic en Crear tema.

  3. En el campo ID del tema, ingresa dag-topic-trigger como ID del tema.

  4. Deja las demás opciones en su configuración predeterminada.

  5. Haz clic en Crear tema.

gcloud

Para crear un tema, ejecuta el comando gcloud pubsub topics create en Google Cloud CLI:

gcloud pubsub topics create dag-topic-trigger

Terraform

Agrega las siguientes definiciones de recursos a tu secuencia de comandos de Terraform:

resource "google_pubsub_topic" "trigger" {
  project                    = "<PROJECT_ID>"
  name                       = "dag-topic-trigger"
  message_retention_duration = "86600s"
}

Reemplaza <PROJECT_ID> por el ID de tu proyecto. Por ejemplo, example-project.

Sube tus DAG

Sube los DAG a tu entorno:

  1. Guarda el siguiente archivo DAG en tu computadora local.
  2. Reemplaza <PROJECT_ID> por el ID de tu proyecto. Por ejemplo, example-project.
  3. Sube el archivo DAG editado a tu entorno.
from __future__ import annotations

from datetime import datetime
import time

from airflow import DAG
from airflow import XComArg
from airflow.operators.python import PythonOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator
from airflow.providers.google.cloud.operators.pubsub import (
    PubSubCreateSubscriptionOperator,
    PubSubPullOperator,
)

PROJECT_ID = "<PROJECT_ID>"
TOPIC_ID = "dag-topic-trigger"
SUBSCRIPTION = "trigger_dag_subscription"

def handle_messages(pulled_messages, context):
    dag_ids = list()
    for idx, m in enumerate(pulled_messages):
        data = m.message.data.decode("utf-8")
        print(f"message {idx} data is {data}")
        dag_ids.append(data)
    return dag_ids

# This DAG will run minutely and handle pub/sub messages by triggering target DAG
with DAG(
    "trigger_dag",
    start_date=datetime(2021, 1, 1),
    schedule_interval="* * * * *",
    max_active_runs=1,
    catchup=False,
) as trigger_dag:
    # If subscription exists, we will use it. If not - create new one
    subscribe_task = PubSubCreateSubscriptionOperator(
        task_id="subscribe_task",
        project_id=PROJECT_ID,
        topic=TOPIC_ID,
        subscription=SUBSCRIPTION,
    )

    subscription = subscribe_task.output

    # Proceed maximum 50 messages in callback function handle_messages
    # Here we acknowledge messages automatically. You can use PubSubHook.acknowledge to acknowledge in downstream tasks
    # https://airflow.apache.org/docs/apache-airflow-providers-google/stable/_api/airflow/providers/google/cloud/hooks/pubsub/index.html#airflow.providers.google.cloud.hooks.pubsub.PubSubHook.acknowledge
    pull_messages_operator = PubSubPullOperator(
        task_id="pull_messages_operator",
        project_id=PROJECT_ID,
        ack_messages=True,
        messages_callback=handle_messages,
        subscription=subscription,
        max_messages=50,
    )

    # Here we use Dynamic Task Mapping to trigger DAGs according to messages content
    # https://airflow.apache.org/docs/apache-airflow/2.3.0/concepts/dynamic-task-mapping.html
    trigger_target_dag = TriggerDagRunOperator.partial(task_id="trigger_target").expand(
        trigger_dag_id=XComArg(pull_messages_operator)
    )

    (subscribe_task >> pull_messages_operator >> trigger_target_dag)

def _some_heavy_task():
    print("Do some operation...")
    time.sleep(1)
    print("Done!")

# Simple target DAG
with DAG(
    "target_dag",
    start_date=datetime(2022, 1, 1),
    # Not scheduled, trigger only
    schedule_interval=None,
    catchup=False,
) as target_dag:
    some_heavy_task = PythonOperator(
        task_id="some_heavy_task", python_callable=_some_heavy_task
    )

    (some_heavy_task)

El código de muestra contiene dos DAG: trigger_dag y target_dag.

El DAG trigger_dag se suscribe a un tema de Pub/Sub, extrae mensajes de Pub/Sub y activa otro DAG especificado en el ID de DAG de los datos del mensaje de Pub/Sub. En este ejemplo, trigger_dag activa el DAG target_dag, que envía mensajes a los registros de tareas.

El DAG trigger_dag contiene las siguientes tareas:

  • subscribe_task: Suscríbete a un tema de Pub/Sub.
  • pull_messages_operator: lee datos de mensajes de Pub/Sub con PubSubPullOperator.
  • trigger_target_dag: Activa otro DAG (en este ejemplo, target_dag) según los datos en los mensajes extraídos del tema de Pub/Sub.

El DAG target_dag contiene solo una tarea: output_to_logs. Esta tarea imprime mensajes en el registro de tareas con un segundo de retraso.

Implementar una Cloud Function que publique mensajes en un tema de Pub/Sub

En esta sección, implementarás una Cloud Function que publica mensajes en un tema de Pub/Sub.

Crea una Cloud Function y especifica su configuración

Console

  1. En la consola de Google Cloud, ve a la página Cloud Functions.

    Ir a Cloud Functions

  2. Haz clic en Crear función.

  3. En el campo Entorno, selecciona 1st gen.

  4. En el campo Nombre de la función, ingresa el nombre de tu función: pubsub-publisher.

  5. En el campo Activador, selecciona HTTP.

  6. En la sección Autenticación, selecciona Permitir invocaciones no autenticadas. Esta opción otorga a los usuarios no autenticados la capacidad de invocar una función de HTTP.

  7. Haz clic en Guardar.

  8. Haz clic en Siguiente para avanzar al paso Código.

Terraform

Considera usar la consola de Google Cloud para este paso, ya que no hay una manera sencilla de administrar el código fuente de la función desde Terraform.

En este ejemplo, se muestra cómo puedes subir una Cloud Function desde un archivo ZIP local. Para ello, debes crear un bucket de Cloud Storage, almacenar el archivo en este bucket y, luego, usar el archivo del bucket como fuente para la Cloud Function. Si usas este enfoque, Terraform no actualiza automáticamente el código fuente de la función, incluso si creas un archivo nuevo. Para volver a subir el código de la función, puedes cambiar el nombre del archivo.

  1. Descarga los archivos pubsub_publisher.py y requirements.txt.
  2. En el archivo pubsub_publisher.py, reemplaza <PROJECT_ID> por el ID del proyecto. Por ejemplo, example-project.
  3. Crea un archivo ZIP llamado pubsub_function.zip con los archivos pbusub_publisner.py y requirements.txt.
  4. Guarda el archivo ZIP en un directorio en el que esté almacenada tu secuencia de comandos de Terraform.
  5. Agrega las siguientes definiciones de recursos a la secuencia de comandos de Terraform y reemplaza <PROJECT_ID> por el ID de tu proyecto.
resource "google_storage_bucket" "cloud_function_bucket" {
  project        = <PROJECT_ID>
  name           = "<PROJECT_ID>-cloud-function-source-code"
  location       = "US"
  force_destroy  = true
  uniform_bucket_level_access = true
}

resource "google_storage_bucket_object" "cloud_function_source" {
  name   = "pubsub_function.zip"
  bucket = google_storage_bucket.cloud_function_bucket.name
  source = "./pubsub_function.zip"
}

resource "google_cloudfunctions_function" "pubsub_function" {
  project = <PROJECT_ID>
  name    = "pubsub-publisher"
  runtime = "python310"
  region  = "us-central1"

  available_memory_mb   = 128
  source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
  source_archive_object = "pubsub_function.zip"
  timeout               = 60
  entry_point           = "pubsub_publisher"
  trigger_http          = true
}

Especifica los parámetros de código de Cloud Function

Console

  1. En el paso Código, en el campo Entorno de ejecución, selecciona el lenguaje de ejecución que usa tu función. En este ejemplo, selecciona Python 3.10.

  2. En el campo Punto de entrada, ingresa pubsub_publisher. Este es el código que se ejecuta cuando se ejecuta tu Cloud Function. El valor de esta marca debe ser un nombre de función o un nombre de clase completamente calificado que exista en tu código fuente.

Terraform

Omite este paso. Los parámetros de Cloud Function ya están definidos en el recurso google_cloudfunctions_function.

Sube el código de tu Cloud Function

Console

En el campo Código fuente, selecciona la opción adecuada para proporcionar el código fuente de la función. En este instructivo, agrega el código de tu función con el editor intercalado de Cloud Functions. Como alternativa, puedes subir un archivo ZIP o usar Cloud Source Repositories.

  1. Ingresa el siguiente ejemplo de código en el archivo main.py.
  2. Reemplaza <PROJECT_ID> por el ID de tu proyecto. Por ejemplo, example-project.
from google.cloud import pubsub_v1

project = "<PROJECT_ID>"
topic = "dag-topic-trigger"

def pubsub_publisher(request):
    """Publish message from HTTP request to Pub/Sub topic.
    Args:
        request (flask.Request): HTTP request object.
    Returns:
        The response text with message published into Pub/Sub topic
        Response object using
        `make_response <http://flask.pocoo.org/docs/1.0/api/#flask.Flask.make_response>`.
    """
    request_json = request.get_json()
    print(request_json)
    if request.args and "message" in request.args:
        data_str = request.args.get("message")
    elif request_json and "message" in request_json:
        data_str = request_json["message"]
    else:
        return "Message content not found! Use 'message' key to specify"

    publisher = pubsub_v1.PublisherClient()
    # The `topic_path` method creates a fully qualified identifier
    # in the form `projects/{project_id}/topics/{topic_id}`
    topic_path = publisher.topic_path(project, topic)

    # The required data format is a bytestring
    data = data_str.encode("utf-8")
    # When you publish a message, the client returns a future.
    message_length = len(data_str)
    future = publisher.publish(topic_path, data, message_length=str(message_length))
    print(future.result())

    return f"Message {data} with message_length {message_length} published to {topic_path}."

Terraform

Omite este paso. Los parámetros de Cloud Function ya están definidos en el recurso google_cloudfunctions_function.

Especifica las dependencias de tu Cloud Function

Console

Especifica las dependencias de la función en el archivo de metadatos requirements.txt:

requests-toolbelt==1.0.0
google-auth==2.19.1
google-cloud-pubsub==2.17.0

Cuando implementas tu función, Cloud Functions descarga e instala dependencias declaradas en el archivo requirements.txt, una línea por paquete. Este archivo debe estar en el mismo directorio que el archivo main.py que contiene el código de tu función. Para obtener más información, consulta Archivos de requisitos en la documentación de pip.

Terraform

Omite este paso. Las dependencias de Cloud Function se definen en el archivo requirements.txt del archivo pubsub_function.zip.

Implementa tu Cloud Function

Console

Haz clic en Implementar. Cuando la implementación finaliza correctamente, la función aparece con una marca de verificación verde en la página de Cloud Functions en la consola de Google Cloud.

Asegúrate de que la cuenta de servicio que ejecuta tu Cloud Function tenga permisos suficientes en tu proyecto para acceder a Pub/Sub.

Terraform

  1. Inicializa Terraform mediante este comando:

    terraform init
    
  2. Revisa la configuración y verifica que los recursos que Terraform creará o actualizará coincidan con tus expectativas:

    terraform plan
    
  3. Para verificar si la configuración es válida, ejecuta el siguiente comando:

    terraform validate
    
  4. Para aplicar la configuración de Terraform, ejecuta el siguiente comando y, luego, ingresa “sí” en el mensaje:

    terraform apply
    

Espera hasta que Terraform muestre el mensaje “Apply complete!”.

En la consola de Google Cloud, navega a tus recursos en la IU para asegurarte de que Terraform los haya creado o actualizado.

Prueba tu función de Cloud Functions

Para verificar que la función publique un mensaje en un tema de Pub/Sub y que los DAG de ejemplo funcionen según lo previsto, sigue estos pasos:

  1. Verifica que los DAG estén activos:

    1. En la consola de Google Cloud, ve a la página Entornos.

      Ir a Entornos

    2. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.

    3. Ve a la pestaña DAG.

    4. Verifica los valores en la columna Estado de los DAG llamados trigger_dag y target_dag. Ambos DAG deben tener el estado Active.

  2. Enviar un mensaje de prueba de Pub/Sub Puedes hacerlo en Cloud Shell:

    1. En la consola de Google Cloud, ve a la página Funciones.

      Ir a Cloud Functions

    2. Haz clic en el nombre de la función, pubsub-publisher.

    3. Ve a la pestaña Prueba.

    4. En la sección Configurar evento de activación, ingresa el siguiente par clave-valor JSON: {"message": "target_dag"}. No modifiques el par clave-valor, ya que este mensaje activa el DAG de prueba más tarde.

    5. En la sección Comando de prueba, haz clic en Probar en Cloud Shell.

    6. En la Terminal de Cloud Shell, espera hasta que aparezca automáticamente un comando. Presiona Enter para ejecutar este comando.

    7. Si aparece el mensaje Autoriza Cloud Shell, haz clic en Autorizar.

    8. Verifica que el contenido del mensaje corresponda al mensaje de Pub/Sub. En este ejemplo, el mensaje de salida debe comenzar con Message b'target_dag' with message_length 10 published to como respuesta de tu función.

  3. Verifica que se haya activado target_dag:

    1. Espera al menos un minuto para que se complete una nueva ejecución de DAG de trigger_dag.

    2. En la consola de Google Cloud, ve a la página Entornos.

      Ir a Entornos

    3. En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.

    4. Ve a la pestaña DAG.

    5. Haz clic en trigger_dag para ir a la página Detalles de DAG. En la pestaña Ejecuciones, se muestra una lista de ejecuciones de DAG para el DAG trigger_dag.

      Este DAG se ejecuta cada minuto y procesa todos los mensajes de Pub/Sub que se envían desde la función. Si no se enviaron mensajes, la tarea trigger_target se marca como Skipped en los registros de ejecución del DAG. Si se activaron los DAG, la tarea trigger_target se marca como Success.

    6. Revisa varias ejecuciones de DAG recientes para ubicar una ejecución de DAG en la que las tres tareas (subscribe_task, pull_messages_operator y trigger_target) tienen los estados Success.

    7. Regresa a la pestaña DAG y verifica que la columna Ejecuciones correctas del DAG target_dag muestre una ejecución exitosa.

Resumen

En este instructivo, aprendiste a usar Cloud Functions para publicar mensajes en un tema de Pub/Sub y, luego, implementar un DAG que se suscriba a un tema de Pub/Sub, extraer mensajes de Pub/Sub y activar otro DAG especificado en el ID de DAG de los datos del mensaje.

También existen formas alternativas de crear y administrar suscripciones de Pub/Sub y de activar DAG que están fuera del alcance de este instructivo. Por ejemplo, puedes usar Cloud Functions para activar los DAG de Airflow cuando se produzca un evento especificado. Consulta nuestros instructivos para probar las otras funciones de Google Cloud por tu cuenta.

Limpia

Para evitar que se apliquen cargos a tu cuenta de Google Cloud por los recursos usados en este instructivo, borra el proyecto que contiene los recursos o conserva el proyecto y borra los recursos individuales.

Borra el proyecto

    Borra un proyecto de Google Cloud:

    gcloud projects delete PROJECT_ID

Borra los recursos individuales

Si planeas explorar varios instructivos y guías de inicio rápido, la reutilización de proyectos puede ayudarte a evitar exceder los límites de las cuotas del proyecto.

Console

  1. Borra el entorno de Cloud Composer. También borrarás el bucket del entorno durante este procedimiento.
  2. Borra el tema de Pub/Sub, dag-topic-trigger.
  3. Borrar la Cloud Function

    1. En la consola de Google Cloud, ve a Cloud Functions.

      Ir a Cloud Functions

    2. Haz clic en la casilla de verificación de la función que deseas borrar, pubsub-publisher.

    3. Haz clic en Borrar y, luego, sigue las instrucciones.

Terraform

  1. Asegúrate de que la secuencia de comandos de Terraform no contenga entradas para recursos que tu proyecto aún requiera. Por ejemplo, es posible que desees mantener algunas APIs habilitadas y los permisos de IAM aún asignados (si agregaste esas definiciones a tu secuencia de comandos de Terraform).
  2. Ejecuta terraform destroy.
  3. Borra de forma manual el bucket del entorno. Cloud Composer no lo borra automáticamente. Puedes hacerlo desde la consola de Google Cloud o Google Cloud CLI.

¿Qué sigue?