Desarrolla receptores de eventos

En esta página, se muestra cómo crear un servicio de receptor de eventos implementable en Cloud Run for Anthos en Google Cloud.

El servicio de Cloud Run for Anthos puede recibir eventos de cualquier fuente que use el estándar de CloudEvents. Los eventos para Cloud Run for Anthos admiten fuentes de eventos nativas y personalizadas.

Las fuentes nativas de eventos incluyen los siguientes servicios:

Los activadores de fuentes de eventos personalizados pueden filtrar solicitudes HTTP de cualquier fuente según el valor del encabezado HTTP Ce-Type:.

Formato de CloudEvent

Los servicios reciben eventos de routers de eventos como solicitudes POST HTTP con formato de la especificación de vinculación del protocolo HTTP versión 1.0 de CloudEvents. Cada solicitud HTTP contiene encabezados y cuerpos específicos del evento, y se envían a la ruta raíz (/) mediante una solicitud POST al servicio.

Cada evento tiene los siguientes encabezados HTTP estándar de CloudEvent:

Encabezado Descripción Ejemplo
ce-id Identificador único para el evento. 1096434104173400
ce-source Identifica la fuente del evento. //pubsub.googleapis.com/projects/serverless-com-demo/topics/my-topic
ce-specversion La versión de la especificación de CloudEvent que se usa para este evento. 1.0
ce-type El tipo de datos del evento. google.cloud.pubsub.topic.v1.messagePublished
ce-time (opcional) Hora de generación del evento, en formato RFC 3339. 2020-12-20T13:37:33.647Z

El cuerpo HTTP para todos los eventos está en el repositorio de CloudEvents. El repositorio contiene búferes de protocolo para eventos junto con el esquema JSON y ejemplos.

Encabezados HTTP del evento

Los siguientes encabezados HTTP se agregan a los eventos de cada fuente:

Registros de auditoría de Cloud

Encabezado HTTP Valor Descripción
ce-id (ejemplo) projects/YOUR-PROJECT/logs/cloudaudit.googleapis.com%2Factivity50dhyee5b2531586289185422687 ID único de este mensaje, asignado por el servidor cuando se genera el evento.
ce-source //storage.googleapis.com/projects/YOUR-PROJECT Una string que contiene tu proyecto.
ce-specversion 1.0 La versión de la especificación que se usa para este evento.
ce-type google.events.cloud.audit.v1 La constante de tipo CloudEvent.
ce-time (ejemplo) 2020-10-08T17:57:33.647Z La hora a la que se envió el mensaje.
ce-dataschema type.googleapis.com/google.logging.v2.LogEntry El esquema para los datos POST HTTP.
ce-subject storage.googleapis.com/projects/_/buckets/single-region-bucket Metadatos sobre la ubicación del bucket de GCS.

Cloud Storage

Encabezado HTTP Valor Descripción
ce-id (ejemplo) 1096434104173400 ID único de este mensaje, asignado por el servidor cuando se genera el evento.
ce-source //storage.googleapis.com/projects/_/buckets/BUCKET-NAME Una string que contiene el bucket de Cloud Storage.
ce-specversion 1.0 La versión de la especificación que se usa para este evento.
ce-type google.cloud.storage.object.v1.finalized
google.cloud.storage.object.v1.archived
google.cloud.storage.object.v1.deleted
google.cloud.storage.object.v1.metadataUpdated
La constante de tipo CloudEvent.
ce-time (ejemplo) 2020-10-08T17:57:33.647Z La hora a la que se envió el mensaje.
ce-dataschema https://raw.githubusercontent.com/googleapis/google-cloudevents/master/proto/google/events/cloud/storage/v1/data.proto El esquema para los datos POST HTTP.
ce-subject objects/OBJECT_NAME Metadatos sobre la ubicación del bucket de GCS.

Cloud Scheduler

Encabezado HTTP Valor Descripción
ce-id (ejemplo) 1096434104173400 ID único de este mensaje, asignado por el servidor cuando se genera el evento.
ce-source //cloudscheduler.googleapis.com/JOB-NAME Una string que contiene el nombre del trabajo de Cloud Scheduler.
ce-specversion 1.0 La versión de la especificación que se usa para este evento.
ce-type google.cloud.scheduler.job.v1.executed La constante de tipo CloudEvent.
ce-time (ejemplo) 2020-10-08T17:57:33.647Z La hora a la que se envió el mensaje.
ce-dataschema https://raw.githubusercontent.com/googleapis/google-cloudevents/master/proto/google/events/cloud/scheduler/v1/data.proto El esquema para los datos POST HTTP.
ce-subject N/A Metadatos sobre el trabajo de Cloud Scheduler

Pub/Sub

Encabezado HTTP Valor Descripción
ce-id (ejemplo) 1096434104173400 ID único de este mensaje, asignado por el servidor cuando se genera el evento.
ce-source //pubsub.googleapis.com/projects/YOUR-PROJECT/topics/YOUR-TOPIC Una string que contiene el proyecto y el tema.
ce-specversion 1.0 La versión de la especificación que se usa para este evento.
ce-type google.events.cloud.pubsub.v1 La constante de tipo CloudEvent.
ce-time (ejemplo) 2020-10-08T17:57:33.647Z La hora a la que se envió el mensaje.

Cuerpo HTTP del evento

El cuerpo HTTP de la solicitud es similar a lo siguiente:

Registros de auditoría de Cloud

{
"insertId": "9frck8cf9j",
"logName": "projects/test-project/logs/cloudaudit.googleapis.com%2Factivity",
"protoPayload": {
  "@type": "type.googleapis.com/google.cloud.audit.AuditLog",
  "authenticationInfo": {
    "principalEmail": "robot@test-project.iam.gserviceaccount.com",
    "principalSubject": "user:robot@test-project.iam.gserviceaccount.com",
    "serviceAccountKeyName": "//iam.googleapis.com/projects/test-project/serviceAccounts/robot@test-project.iam.gserviceaccount.com/keys/90f662482321f1ca8e82ea675b1a1c30c1fe681f"
  },
  "authorizationInfo": [
    {
      "granted": true,
      "permission": "pubsub.topics.create",
      "resource": "projects/test-project",
      "resourceAttributes": {}
    }
  ],
  "methodName": "google.pubsub.v1.Publisher.CreateTopic",
  "request": {
    "@type": "type.googleapis.com/google.pubsub.v1.Topic",
    "name": "projects/test-project/topics/test-auditlogs-source"
  },
  "requestMetadata": {
    "callerIp": "192.168.0.1",
    "callerNetwork": "//compute.googleapis.com/projects/google.com:my-laptop/global/networks/__unknown__",
    "callerSuppliedUserAgent": "google-cloud-sdk",
    "destinationAttributes": {},
    "requestAttributes": {
      "auth": {},
      "time": "2020-06-30T16:14:47.600710407Z"
    }
  },
  "resourceLocation": {
    "currentLocations": [
      "asia-east1",
      "asia-northeast1",
      "asia-southeast1",
      "australia-southeast1",
      "europe-north1",
      "europe-west1",
      "europe-west2",
      "europe-west3",
      "europe-west4",
      "us-central1",
      "us-central2",
      "us-east1",
      "us-east4",
      "us-west1",
      "us-west2",
      "us-west3",
      "us-west4"
    ]
  },
  "resourceName": "projects/test-project/topics/test-auditlogs-source",
  "response": {
    "@type": "type.googleapis.com/google.pubsub.v1.Topic",
    "messageStoragePolicy": {
      "allowedPersistenceRegions": [
        "asia-east1",
        "asia-northeast1",
        "asia-southeast1",
        "australia-southeast1",
        "europe-north1",
        "europe-west1",
        "europe-west2",
        "europe-west3",
        "europe-west4",
        "us-central1",
        "us-central2",
        "us-east1",
        "us-east4",
        "us-west1",
        "us-west2",
        "us-west3",
        "us-west4"
      ]
    },
    "name": "projects/test-project/topics/test-auditlogs-source"
  },
  "serviceName": "pubsub.googleapis.com"
},
"receiveTimestamp": "2020-06-30T16:14:48.401489148Z",
"resource": {
  "labels": {
    "project_id": "test-project",
    "topic_id": "projects/test-project/topics/test-auditlogs-source"
  },
  "type": "pubsub_topic"
},
"severity": "NOTICE",
"timestamp": "2020-06-30T16:14:47.593398572Z"
}

Cloud Storage

{
"kind": "storage#object",
"id": "BUCKET-NAME/OBJECT-NAME/GENERATION",
"selfLink": "https://www.googleapis.com/storage/v1/b/BUCKET-NAME/o/OBJECT-NAME.txt",
"name": "OBJECT-NAME",
"bucket": "BUCKET-NAME",
"generation": "1593534371944198",
"metageneration": "1",
"contentType": "text/plain",
"timeCreated": "2020-09-11T23:55:39.472Z",
"updated": "2020-09-11T23:55:39.472Z",
"storageClass": "STANDARD",
"timeStorageClassUpdated": "2020-09-11T23:55:39.472Z",
"size": "FILESIZE",
"md5Hash": "lc2/e7x2XJ4BnKelDsjAjw==",
"mediaLink": "https://www.googleapis.com/download/storage/v1/b/BUCKET-NAME/o/OBJECT-NAME?generation=GENERATION&alt=media",
"contentLanguage": "en",
"crc32c": "lPCM8g==",
"etag": "CLOV7t+m4usCEAE="
}

Cloud Scheduler

{
"custom_data": "c2NoZWR1bGVyIGN1c3RvbSBkYXRh" // base64 encoded "scheduler custom data"
}

Pub/Sub

{
"subscription": "cre-src_rc3_source-for-knativegcp-test-pubsub-tr_fcdf7716-c4bd-43b9-8ccc-e6e8ff848cd4",
"message": {
  "messageId": "1314133748793931",
  "data": "eyJIZWxsbyI6ICJ3b3JsZCJ9", // base64 encoded '{"Hello": "world"}'
  "publishTime": "2020-06-30T16:32:57.012Z"
}
}

Respuesta del receptor de eventos

El servicio del receptor debe enviar una respuesta HTTP 2xx para indicarle al router que el evento se entregó de forma correcta. El router trata todas las demás respuestas HTTP como error en la entrega y reenvía el evento.

Usa una biblioteca del SDK de CloudEvents

Puedes desarrollar servicios de receptor de eventos mediante la biblioteca del SDK de CloudEvents, que está disponible para los siguientes lenguajes:

Estas bibliotecas son de código abierto y facilitan la transformación de la solicitud HTTP en un objeto de CloudEvent de lenguaje idiomático.

Ejemplo de código fuente del receptor

El código de muestra indica cómo leer los eventos de Pub/Sub en un servicio implementado en Cloud Run.

Python

@app.route('/', methods=['POST'])
def index():
    data = request.get_json()
    if not data:
        msg = 'no Pub/Sub message received'
        print(f'error: {msg}')
        return f'Bad Request: {msg}', 400

    if not isinstance(data, dict) or 'message' not in data:
        msg = 'invalid Pub/Sub message format'
        print(f'error: {msg}')
        return f'Bad Request: {msg}', 400

    pubsub_message = data['message']

    name = 'World'
    if isinstance(pubsub_message, dict) and 'data' in pubsub_message:
        name = base64.b64decode(pubsub_message['data']).decode('utf-8').strip()

    resp = f"Hello, {name}! ID: {request.headers.get('ce-id')}"
    print(resp)

    return (resp, 200)

Java

import com.example.cloudrun.eventpojos.PubSubBody;
import java.util.Base64;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class EventController {
  @RequestMapping(value = "/", method = RequestMethod.POST)
  public ResponseEntity<String> receiveMessage(
      @RequestBody PubSubBody body, @RequestHeader Map<String, String> headers) {
    // Get PubSub message from request body.
    PubSubBody.PubSubMessage message = body.getMessage();
    if (message == null) {
      String msg = "No Pub/Sub message received.";
      System.out.println(msg);
      return new ResponseEntity<String>(msg, HttpStatus.BAD_REQUEST);
    }

    String data = message.getData();
    if (data == null || data.isEmpty()) {
      String msg = "Invalid Pub/Sub message format.";
      System.out.println(msg);
      return new ResponseEntity<String>(msg, HttpStatus.BAD_REQUEST);
    }

    String name =
        !StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : "World";
    String ceId = headers.getOrDefault("ce-id", "");
    String msg = String.format("Hello, %s! ID: %s", name, ceId);
    System.out.println(msg);
    return new ResponseEntity<String>(msg, HttpStatus.OK);
  }
}

Node.js

const express = require('express');
const {
  toMessagePublishedData,
} = require('@google/events/cloud/pubsub/v1/MessagePublishedData');
const app = express();

app.use(express.json());
app.post('/', (req, res) => {
  if (!req.body) {
    const errorMessage = 'no Pub/Sub message received';
    res.status(400).send(`Bad Request: ${errorMessage}`);
    console.log(`Bad Request: ${errorMessage}`);
    return;
  }
  if (!req.body.message) {
    const errorMessage = 'invalid Pub/Sub message format';
    res.status(400).send(`Bad Request: ${errorMessage}`);
    console.log(`Bad Request: ${errorMessage}`);
    return;
  }
  // Cast to MessagePublishedEvent for IDE autocompletion
  const pubSubMessage = toMessagePublishedData(req.body);
  const name =
    pubSubMessage.message && pubSubMessage.message.data
      ? Buffer.from(pubSubMessage.message.data, 'base64').toString().trim()
      : 'World';

  const result = `Hello, ${name}! ID: ${req.get('ce-id') || ''}`;
  console.log(result);
  res.send(result);
});

module.exports = app;

Go


// Sample pubsub is a Cloud Run service which handles Pub/Sub messages.
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
)

// PubSubMessage is the payload of a Pub/Sub event.
type PubSubMessage struct {
	Message struct {
		Data []byte `json:"data,omitempty"`
		ID   string `json:"id"`
	} `json:"message"`
	Subscription string `json:"subscription"`
}

// HelloEventsPubSub receives and processes a Pub/Sub push message.
func HelloEventsPubSub(w http.ResponseWriter, r *http.Request) {
	var e PubSubMessage
	if err := json.NewDecoder(r.Body).Decode(&e); err != nil {
		http.Error(w, "Bad HTTP Request", http.StatusBadRequest)
		log.Printf("Bad HTTP Request: %v", http.StatusBadRequest)
		return
	}
	name := string(e.Message.Data)
	if name == "" {
		name = "World"
	}
	s := fmt.Sprintf("Hello, %s! ID: %s", name, string(r.Header.Get("Ce-Id")))
	log.Printf(s)
	fmt.Fprintln(w, s)
}

C#


using CloudNative.CloudEvents;
using Google.Events;
using Google.Events.Protobuf.Cloud.PubSub.V1;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
    }

    public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILogger<Startup> logger)
    {
        if (env.IsDevelopment())
        {
            app.UseDeveloperExceptionPage();
        }

        logger.LogInformation("Service is starting...");

        app.UseRouting();

        app.UseEndpoints(endpoints =>
        {
            endpoints.MapPost("/", async context =>
            {
                var cloudEvent = await context.Request.ReadCloudEventAsync();
                logger.LogInformation("Received CloudEvent\n" + GetEventLog(cloudEvent));

                var messagePublishedData = CloudEventConverters.ConvertCloudEventData<MessagePublishedData>(cloudEvent);
                var pubSubMessage = messagePublishedData.Message;
                if (pubSubMessage == null)
                {
                    context.Response.StatusCode = 400;
                    await context.Response.WriteAsync("Bad request: Invalid Pub/Sub message format");
                    return;
                }

                var data = pubSubMessage.Data;
                logger.LogInformation($"Data: {data.ToBase64()}");

                var name = data.ToStringUtf8();
                logger.LogInformation($"Extracted name: {name}");

                var id = context.Request.Headers["ce-id"];
                await context.Response.WriteAsync($"Hello {name}! ID: {id}");
            });
        });
    }

    private string GetEventLog(CloudEvent cloudEvent)
    {
        return $"ID: {cloudEvent.Id}\n"
            + $"Source: {cloudEvent.Source}\n"
            + $"Type: {cloudEvent.Type}\n"
            + $"Subject: {cloudEvent.Subject}\n"
            + $"DataSchema: {cloudEvent.DataSchema}\n"
            + $"DataContentType: {cloudEvent.DataContentType}\n"
            + $"Time: {cloudEvent.Time?.ToUniversalTime():yyyy-MM-dd'T'HH:mm:ss.fff'Z'}\n"
            + $"SpecVersion: {cloudEvent.SpecVersion}\n"
            + $"Data: {cloudEvent.Data}";
    }
}

Próximos pasos