Cloud Composer 3 | Cloud Composer 2 | Cloud Composer 1
En esta página, se explica cómo crear una arquitectura de envío basada en eventos activando DAGs de Cloud Composer en respuesta a cambios en temas de Pub/Sub. Los ejemplos de este instructivo demuestran el manejo del ciclo completo de administración de Pub/Sub, incluida la administración de suscripciones, como parte del proceso del DAG. Es adecuado para algunos de los casos de uso comunes cuando necesitas activar DAGs, pero no quieres configurar permisos de acceso adicionales.
Por ejemplo, los mensajes enviados a través de Pub/Sub se pueden usar como solución si no deseas proporcionar acceso directo a un entorno de Cloud Composer por motivos de seguridad. Puedes configurar una Cloud Run Function que cree mensajes de Pub/Sub y los publique en un tema de Pub/Sub. Luego, puedes crear un DAG que extraiga mensajes de Pub/Sub y, luego, los controle.
En este ejemplo específico, crearás una Cloud Run Function y, luego, implementarás dos DAG. El primer DAG extrae mensajes de Pub/Sub y activa el segundo DAG según el contenido del mensaje de Pub/Sub.
En este instructivo, se supone que estás familiarizado con Python y la consola de Google Cloud .
Objetivos
Costos
En este instructivo, se usan los siguientes componentes facturables de Google Cloud:
- Cloud Composer (consulta también los costos adicionales)
- Pub/Sub
- Cloud Run Functions
Cuando finalices este instructivo, podrás borrar los recursos creados para evitar que se te siga facturando. Consulta la sección Limpieza para obtener más detalles.
Antes de comenzar
Para este instructivo, necesitas un Google Cloud proyecto. Configura el proyecto de la siguiente manera:
En la Google Cloud consola, selecciona o crea un proyecto:
Asegúrate de tener habilitada la facturación para tu proyecto. Obtén información para verificar si la facturación está habilitada en un proyecto.
Asegúrate de que el usuario del proyecto Google Cloud tenga los siguientes roles para crear los recursos necesarios:
- Usuario de cuenta de servicio (
roles/iam.serviceAccountUser
) - Editor de Pub/Sub (
roles/pubsub.editor
) - Administrador de objetos de almacenamiento y entorno
(
roles/composer.environmentAndStorageObjectAdmin
) - Administrador de Cloud Run Functions (
roles/cloudfunctions.admin
) - Visor de registros (
roles/logging.viewer
)
- Usuario de cuenta de servicio (
Asegúrate de que la cuenta de servicio que ejecuta tu función de Cloud Run tenga permisos suficientes en tu proyecto para acceder a Pub/Sub. De forma predeterminada, las funciones de Cloud Run usan la cuenta de servicio predeterminada de App Engine. Esta cuenta de servicio tiene el rol de Editor, que tiene permisos suficientes para este instructivo.
Habilita las API para tu proyecto.
Console
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs.
gcloud
Enable the Cloud Composer, Cloud Run functions, and Pub/Sub APIs:
gcloud services enable composer.googleapis.comcloudfunctions.googleapis.com pubsub.googleapis.com
Terraform
Habilita la API de Cloud Composer en tu proyecto agregando las siguientes definiciones de recursos a tu secuencia de comandos de Terraform:
resource "google_project_service" "composer_api" {
project = "<PROJECT_ID>"
service = "composer.googleapis.com"
// Disabling Cloud Composer API might irreversibly break all other
// environments in your project.
// This parameter prevents automatic disabling
// of the API when the resource is destroyed.
// We recommend to disable the API only after all environments are deleted.
disable_on_destroy = false
// this flag is introduced in 5.39.0 version of Terraform. If set to true it will
//prevent you from disabling composer_api through Terraform if any environment was
//there in the last 30 days
check_if_service_has_usage_on_destroy = true
}
resource "google_project_service" "pubsub_api" {
project = "<PROJECT_ID>"
service = "pubsub.googleapis.com"
disable_on_destroy = false
}
resource "google_project_service" "functions_api" {
project = "<PROJECT_ID>"
service = "cloudfunctions.googleapis.com"
disable_on_destroy = false
}
Reemplaza <PROJECT_ID>
por el ID del proyecto de tu proyecto. Por ejemplo, example-project
.
Crea tu entorno de Cloud Composer
Crea un entorno de Cloud Composer 2.
Como parte de este procedimiento, otorgas el rol de Extensión del agente de servicio de la API de Cloud Composer v2 (roles/composer.ServiceAgentV2Ext
) a la cuenta del agente de servicio de Composer. Cloud Composer usa esta cuenta para realizar operaciones en tu proyecto Google Cloud .
Crea un tema de Pub/Sub
En este ejemplo, se activa un DAG en respuesta a un mensaje enviado a un tema de Pub/Sub. Crea un tema de Pub/Sub para usar en este ejemplo:
Console
En la consola de Google Cloud , ve a la página Temas de Pub/Sub.
Haz clic en Crear tema.
En el campo ID de tema, ingresa
dag-topic-trigger
como ID para tu tema.Deja las demás opciones con sus valores predeterminados.
Haz clic en Crear tema.
gcloud
Para crear un tema, ejecuta el comando gcloud pubsub topics create en Google Cloud CLI:
gcloud pubsub topics create dag-topic-trigger
Terraform
Agrega las siguientes definiciones de recursos a tu secuencia de comandos de Terraform:
resource "google_pubsub_topic" "trigger" {
project = "<PROJECT_ID>"
name = "dag-topic-trigger"
message_retention_duration = "86600s"
}
Reemplaza <PROJECT_ID>
por el ID del proyecto de tu proyecto. Por ejemplo, example-project
.
Sube tus DAGs
Sube DAGs a tu entorno:
- Guarda el siguiente archivo DAG en tu computadora local.
- Reemplaza
<PROJECT_ID>
por el ID del proyecto de tu proyecto. Por ejemplo,example-project
. - Sube el archivo de DAG editado a tu entorno.
El código de muestra contiene dos DAG: trigger_dag
y target_dag
.
El DAG trigger_dag
se suscribe a un tema de Pub/Sub, extrae mensajes de Pub/Sub y activa otro DAG especificado en el ID del DAG de los datos del mensaje de Pub/Sub. En este ejemplo, trigger_dag
activa el DAG target_dag
, que genera mensajes en los registros de tareas.
El DAG trigger_dag
contiene las siguientes tareas:
subscribe_task
: Suscríbete a un tema de Pub/Sub.pull_messages_operator
: Lee los datos de un mensaje de Pub/Sub conPubSubPullOperator
.trigger_target_dag
: Activa otro DAG (en este ejemplo,target_dag
) según los datos de los mensajes extraídos del tema de Pub/Sub.
El DAG target_dag
contiene solo una tarea: output_to_logs
. Esta tarea imprime mensajes en el registro de tareas con un retraso de un segundo.
Implementa una Cloud Run Function que publique mensajes en un tema de Pub/Sub
En esta sección, implementarás una función de Cloud Run que publica mensajes en un tema de Pub/Sub.
Crea una función de Cloud Run y especifica su configuración
Console
En la consola de Google Cloud , ve a la página de Cloud Run functions.
Haz clic en Crear función.
En el campo Entorno, selecciona 1ª gen.
En el campo Nombre de la función, ingresa el nombre de tu función:
pubsub-publisher
.En el campo Activador, selecciona HTTP.
En la sección Autenticación, selecciona Permitir invocaciones no autenticadas. Esta opción otorga a los usuarios no autenticados la capacidad de invocar una función de HTTP.
Haz clic en Guardar.
Haz clic en Siguiente para avanzar al paso Código.
Terraform
Considera usar la consola de Google Cloud para este paso, ya que no hay una forma directa de administrar el código fuente de la función desde Terraform.
En este ejemplo, se muestra cómo subir una función de Cloud Run desde un archivo ZIP local creando un bucket de Cloud Storage, almacenando el archivo en este bucket y, luego, usando el archivo del bucket como fuente para la función de Cloud Run. Si usas este enfoque, Terraform no actualizará automáticamente el código fuente de tu función, incluso si creas un archivo nuevo. Para volver a subir el código de la función, puedes cambiar el nombre del archivo.
- Descarga los archivos
pubsub_publisher.py
yrequirements.txt
. - En el archivo
pubsub_publisher.py
, reemplaza<PROJECT_ID>
por el ID del proyecto de tu proyecto. Por ejemplo,example-project
. - Crea un archivo ZIP llamado
pubsub_function.zip
con el archivopbusub_publisner.py
y el archivorequirements.txt
. - Guarda el archivo ZIP en un directorio en el que se almacene tu secuencia de comandos de Terraform.
- Agrega las siguientes definiciones de recursos a tu secuencia de comandos de Terraform y reemplaza
<PROJECT_ID>
por el ID de tu proyecto.
resource "google_storage_bucket" "cloud_function_bucket" {
project = <PROJECT_ID>
name = "<PROJECT_ID>-cloud-function-source-code"
location = "US"
force_destroy = true
uniform_bucket_level_access = true
}
resource "google_storage_bucket_object" "cloud_function_source" {
name = "pubsub_function.zip"
bucket = google_storage_bucket.cloud_function_bucket.name
source = "./pubsub_function.zip"
}
resource "google_cloudfunctions_function" "pubsub_function" {
project = <PROJECT_ID>
name = "pubsub-publisher"
runtime = "python310"
region = "us-central1"
available_memory_mb = 128
source_archive_bucket = google_storage_bucket.cloud_function_bucket.name
source_archive_object = "pubsub_function.zip"
timeout = 60
entry_point = "pubsub_publisher"
trigger_http = true
}
Especifica los parámetros de código de la función de Cloud Run
Console
En el paso Código, en el campo Entorno de ejecución, selecciona el entorno de ejecución de lenguaje que usa tu función. En este ejemplo, selecciona Python 3.10.
En el campo Punto de entrada, ingresa
pubsub_publisher
. Este es el código que se ejecuta cuando se ejecuta tu función de Cloud Run. El valor de esta marca debe ser un nombre de función o un nombre de clase completamente calificado que exista en tu código fuente.
Terraform
Omitir este paso Los parámetros de la función de Cloud Run ya están definidos en el recurso google_cloudfunctions_function
.
Sube el código de tu función de Cloud Run
Console
En el campo Código fuente, selecciona la opción adecuada para el modo en que proporcionarás el código fuente de la función. En este instructivo, agrega el código de tu función con el editor intercalado de Cloud Run Functions. Como alternativa, puedes subir un archivo ZIP o usar Cloud Source Repositories.
- Coloca el siguiente ejemplo de código en el archivo main.py.
- Reemplaza
<PROJECT_ID>
por el ID del proyecto de tu proyecto. Por ejemplo,example-project
.
Terraform
Omitir este paso Los parámetros de la función de Cloud Run ya están definidos en el recurso google_cloudfunctions_function
.
Especifica las dependencias de tu función de Cloud Run
Console
Especifica las dependencias de la función en el archivo de metadatos requirements.txt:
Cuando implementas tu función, las funciones de Cloud Run descargan e instalan
las dependencias declaradas en el archivo requirements.txt, una línea por paquete.
Este archivo debe estar en el mismo directorio que el archivo main.py que contiene el código de tu función. Para obtener más detalles, consulta Archivos de requisitos en la documentación de pip
.
Terraform
Omitir este paso Las dependencias de las funciones de Cloud Run se definen en el archivo requirements.txt
del archivo pubsub_function.zip
.
Implementa tu Cloud Run Function
Console
Haz clic en Implementar. Cuando la implementación finaliza correctamente, la función aparece con una marca de verificación verde en la página Cloud Run Functions en laGoogle Cloud consola.
Asegúrate de que la cuenta de servicio que ejecuta tu Cloud Run Function tenga permisos suficientes en tu proyecto para acceder a Pub/Sub.
Terraform
Inicializa Terraform mediante este comando:
terraform init
Revisa la configuración y verifica que los recursos que creará o actualizará Terraform coincidan con tus expectativas:
terraform plan
Para verificar si tu configuración es válida, ejecuta el siguiente comando:
terraform validate
Para aplicar la configuración de Terraform, ejecuta el siguiente comando y, luego, escribe yes cuando se te solicite:
terraform apply
Espera hasta que Terraform muestre el mensaje “¡Aplicación completa!”.
En la consola de Google Cloud , navega a tus recursos en la IU para asegurarte de que Terraform los haya creado o actualizado.
Prueba tu función de Cloud Run
Para verificar que tu función publique un mensaje en un tema de Pub/Sub y que los DAG de ejemplo funcionen según lo previsto, haz lo siguiente:
Verifica que los DAG estén activos:
En la consola de Google Cloud , ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña DAGs.
Verifica los valores de la columna Estado para los DAGs llamados
trigger_dag
ytarget_dag
. Ambos DAG deben estar en el estadoActive
.
Envía un mensaje de prueba de Pub/Sub. Puedes hacerlo en Cloud Shell:
En la consola de Google Cloud , ve a la página Funciones.
Haz clic en el nombre de tu función,
pubsub-publisher
.Ve a la pestaña Pruebas.
En la sección Configurar evento de activación, ingresa el siguiente par clave-valor de JSON:
{"message": "target_dag"}
. No modifiques el par clave-valor, ya que este mensaje activará el DAG de prueba más adelante.En la sección Comando de prueba, haz clic en Probar en Cloud Shell.
En la terminal de Cloud Shell, espera hasta que aparezca un comando automáticamente. Presiona
Enter
para ejecutar este comando.Si aparece el mensaje Autoriza Cloud Shell, haz clic en Autorizar.
Comprueba que el contenido del mensaje corresponda al mensaje de Pub/Sub. En este ejemplo, el mensaje de salida debe comenzar con
Message b'target_dag' with message_length 10 published to
como respuesta de tu función.
Verifica que se haya activado
target_dag
:Espera al menos un minuto para que se complete una nueva ejecución del DAG de
trigger_dag
.En la consola de Google Cloud , ve a la página Entornos.
En la lista de entornos, haz clic en el nombre de tu entorno. Se abrirá la página Detalles del entorno.
Ve a la pestaña DAGs.
Haz clic en
trigger_dag
para ir a la página Detalles del DAG. En la pestaña Ejecuciones, se muestra una lista de las ejecuciones del DAGtrigger_dag
.Este DAG se ejecuta cada minuto y procesa todos los mensajes de Pub/Sub que se envían desde la función. Si no se enviaron mensajes, la tarea
trigger_target
se marcará comoSkipped
en los registros de ejecución del DAG. Si se activaron DAGs, la tareatrigger_target
se marca comoSuccess
.Revisa varias ejecuciones recientes del DAG para ubicar una en la que las tres tareas (
subscribe_task
,pull_messages_operator
ytrigger_target
) tengan el estadoSuccess
.Vuelve a la pestaña DAGs y verifica que la columna Ejecuciones exitosas del DAG
target_dag
muestre una ejecución exitosa.
Resumen
En este instructivo, aprendiste a usar Cloud Run functions para publicar mensajes en un tema de Pub/Sub y a implementar un DAG que se suscribe a un tema de Pub/Sub, extrae mensajes de Pub/Sub y activa otro DAG especificado en el ID del DAG de los datos del mensaje.
También existen otras formas de crear y administrar suscripciones de Pub/Sub y de activar DAGs que no se abordan en este instructivo. Por ejemplo, puedes usar Cloud Run Functions para activar DAG de Airflow cuando se produce un evento especificado. Consulta nuestros instructivos para probar las otras Google Cloud funciones por tu cuenta.
Limpia
Para evitar que se apliquen cargos a tu Google Cloud cuenta por los recursos usados en este instructivo, borra el proyecto que contiene los recursos o conserva el proyecto y borra los recursos individuales.
Borra el proyecto
Delete a Google Cloud project:
gcloud projects delete PROJECT_ID
Borra los recursos individuales
Si planeas explorar varios instructivos y guías de inicio rápido, la reutilización de proyectos puede ayudarte a evitar exceder los límites de las cuotas del proyecto.
Console
- Borra el entorno de Cloud Composer. También borrarás el bucket del entorno durante este procedimiento.
- Borra el tema de Pub/Sub,
dag-topic-trigger
. Borra la función de Cloud Run.
En la Google Cloud consola, ve a Cloud Run functions.
Haz clic en la casilla de verificación de la función que deseas borrar,
pubsub-publisher
.Haz clic en Borrar y, luego, sigue las instrucciones.
Terraform
- Asegúrate de que tu secuencia de comandos de Terraform no contenga entradas para los recursos que tu proyecto aún necesita. Por ejemplo, es posible que desees mantener habilitadas algunas APIs y los permisos de IAM aún asignados (si agregaste esas definiciones a tu secuencia de comandos de Terraform).
- Ejecuta
terraform destroy
. - Borra manualmente el bucket del entorno. Cloud Composer no lo borra automáticamente. Puedes hacerlo desde la consola de Google Cloud o Google Cloud CLI.
¿Qué sigue?
- Prueba de DAG
- Prueba funciones de HTTP
- Implementa una Cloud Run Function
- Prueba otras Google Cloud funciones. Revisa nuestros instructivos.