Desarrolla receptores de eventos

En esta página, se muestra cómo crear un servicio de receptor de eventos que se puede implementar en Cloud Run for Anthos en Google Cloud.

Tu servicio de Cloud Run for Anthos en Google Cloud puede recibir eventos de cualquier fuente que use el CloudEvents estándar. Los eventos para Cloud Run for Anthos admiten fuentes de eventos nativas y personalizadas.

Las fuentes nativas de eventos incluyen:

Los activadores de fuentes de eventos personalizados pueden filtrar solicitudes HTTP de cualquier fuente según el valor del encabezado HTTP Ce-Type:.

Usa una biblioteca del SDK de CloudEvents

Puedes desarrollar servicios de receptor de eventos con la biblioteca del SDK de CloudEvents, disponible para los siguientes lenguajes:

Formato de eventos

Los servicios reciben eventos de agentes de eventos como solicitudes HTTP POST formateadas según la especificación de vinculación de protocolo HTTP 1.0 de CloudEvents. Cada solicitud HTTP contiene encabezados y cuerpo específicos del evento, y se envían a la ruta raíz (/) del servicio.

Un encabezado HTTP de evento tiene los siguientes atributos agregados a un encabezado HTTP estándar:

Header Descripción Ejemplo
ce-id Identificador único para el evento. 1096434104173400
ce-source Identifica la fuente del evento. //pubsub.googleapis.com/projects/serverless-com-demo/topics/my-topic
ce-specversion La versión de especificación de CloudEvent que se usó para este evento. 1.0
ce-type El tipo de datos del evento. google.cloud.pubsub.topic.v1.messagePublished
ce-time (opcional) Hora de generación del evento, en formato RFC 3339. 2020-12-20T13:37:33.647Z

El cuerpo HTTP para todos los eventos está en el repositorio de CloudEvents. El repositorio contiene búferes de protocolo para eventos junto con el esquema JSON y ejemplos.

Encabezados HTTP del evento

Los siguientes encabezados HTTP se agregan a los eventos de cada fuente:

Cloud Audit Logs

Encabezado HTTP Valor Descripción
ce-id projects/YOUR-PROJECT/logs/cloudaudit.googleapis.com%2Factivity50dhyee5b2531586289185422687 (ejemplo) ID único de este mensaje, asignado por el servidor cuando se genera el evento.
ce-source //storage.googleapis.com/projects/YOUR-PROJECT Una string que contiene tu proyecto.
ce-specversion 1.0 La versión de la especificación que se usa para este evento.
ce-type google.events.cloud.audit.v1 La constante de tipo CloudEvent.
ce-time 2020-10-08T17:57:33.647Z (ejemplo) La hora a la que se envió el mensaje.
ce-dataschema type.googleapis.com/google.logging.v2.LogEntry El esquema para los datos HTTP POST
ce-subject storage.googleapis.com/projects/_/buckets/single-region-bucket Metadatos sobre la ubicación del depósito de GCS.

Cloud Storage

Encabezado HTTP Valor Descripción
ce-id 1096434104173400 (ejemplo) ID único de este mensaje, asignado por el servidor cuando se genera el evento.
ce-source //storage.googleapis.com/projects/_/buckets/BUCKET-NAME Una string que contiene tu depósito de Cloud Storage.
ce-specversion 1.0 La versión de la especificación que se usa para este evento.
ce-type google.cloud.storage.object.v1.finalized
google.cloud.storage.object.v1.archived
google.cloud.storage.object.v1.deleted
google.cloud.storage.object.v1.metadataUpdated
La constante de tipo CloudEvent.
ce-time 2020-10-08T17:57:33.647Z (ejemplo) La hora a la que se envió el mensaje.
ce-dataschema https://raw.githubusercontent.com/googleapis/google-cloudevents/master/proto/google/events/cloud/storage/v1/data.proto El esquema para los datos HTTP POST
ce-subject objects/OBJECT_NAME Metadatos sobre la ubicación del depósito de GCS.

Cloud Scheduler

Encabezado HTTP Valor Descripción
ce-id 1096434104173400 (ejemplo) ID único de este mensaje, asignado por el servidor cuando se genera el evento.
ce-source //cloudscheduler.googleapis.com/JOB-NAME Una string que contiene el nombre del trabajo de Cloud Scheduler.
ce-specversion 1.0 La versión de la especificación que se usa para este evento.
ce-type google.cloud.scheduler.job.v1.executed La constante de tipo CloudEvent.
ce-time 2020-10-08T17:57:33.647Z (ejemplo) La hora a la que se envió el mensaje.
ce-dataschema https://raw.githubusercontent.com/googleapis/google-cloudevents/master/proto/google/events/cloud/scheduler/v1/data.proto El esquema para los datos HTTP POST
ce-subject N/A Metadatos sobre el trabajo de Cloud Scheduler

Pub/Sub

Encabezado HTTP Valor Descripción
ce-id 1096434104173400 (ejemplo) ID único de este mensaje, asignado por el servidor cuando se genera el evento.
ce-source //pubsub.googleapis.com/projects/YOUR-PROJECT/topics/YOUR-TOPIC Una string que contiene tu proyecto y tu tema.
ce-specversion 1.0 La versión de la especificación que se usa para este evento.
ce-type google.events.cloud.pubsub.v1 La constante de tipo CloudEvent.
ce-time 2020-10-08T17:57:33.647Z (ejemplo) La hora a la que se envió el mensaje.

Cuerpo HTTP del evento

El cuerpo HTTP de la solicitud es similar a lo siguiente:

Cloud Audit Logs

{
"insertId": "9frck8cf9j",
"logName": "projects/test-project/logs/cloudaudit.googleapis.com%2Factivity",
"protoPayload": {
  "@type": "type.googleapis.com/google.cloud.audit.AuditLog",
  "authenticationInfo": {
    "principalEmail": "robot@test-project.iam.gserviceaccount.com",
    "principalSubject": "user:robot@test-project.iam.gserviceaccount.com",
    "serviceAccountKeyName": "//iam.googleapis.com/projects/test-project/serviceAccounts/robot@test-project.iam.gserviceaccount.com/keys/90f662482321f1ca8e82ea675b1a1c30c1fe681f"
  },
  "authorizationInfo": [
    {
      "granted": true,
      "permission": "pubsub.topics.create",
      "resource": "projects/test-project",
      "resourceAttributes": {}
    }
  ],
  "methodName": "google.pubsub.v1.Publisher.CreateTopic",
  "request": {
    "@type": "type.googleapis.com/google.pubsub.v1.Topic",
    "name": "projects/test-project/topics/test-auditlogs-source"
  },
  "requestMetadata": {
    "callerIp": "192.168.0.1",
    "callerNetwork": "//compute.googleapis.com/projects/google.com:my-laptop/global/networks/__unknown__",
    "callerSuppliedUserAgent": "google-cloud-sdk",
    "destinationAttributes": {},
    "requestAttributes": {
      "auth": {},
      "time": "2020-06-30T16:14:47.600710407Z"
    }
  },
  "resourceLocation": {
    "currentLocations": [
      "asia-east1",
      "asia-northeast1",
      "asia-southeast1",
      "australia-southeast1",
      "europe-north1",
      "europe-west1",
      "europe-west2",
      "europe-west3",
      "europe-west4",
      "us-central1",
      "us-central2",
      "us-east1",
      "us-east4",
      "us-west1",
      "us-west2",
      "us-west3",
      "us-west4"
    ]
  },
  "resourceName": "projects/test-project/topics/test-auditlogs-source",
  "response": {
    "@type": "type.googleapis.com/google.pubsub.v1.Topic",
    "messageStoragePolicy": {
      "allowedPersistenceRegions": [
        "asia-east1",
        "asia-northeast1",
        "asia-southeast1",
        "australia-southeast1",
        "europe-north1",
        "europe-west1",
        "europe-west2",
        "europe-west3",
        "europe-west4",
        "us-central1",
        "us-central2",
        "us-east1",
        "us-east4",
        "us-west1",
        "us-west2",
        "us-west3",
        "us-west4"
      ]
    },
    "name": "projects/test-project/topics/test-auditlogs-source"
  },
  "serviceName": "pubsub.googleapis.com"
},
"receiveTimestamp": "2020-06-30T16:14:48.401489148Z",
"resource": {
  "labels": {
    "project_id": "test-project",
    "topic_id": "projects/test-project/topics/test-auditlogs-source"
  },
  "type": "pubsub_topic"
},
"severity": "NOTICE",
"timestamp": "2020-06-30T16:14:47.593398572Z"
}

Cloud Storage

{
"kind": "storage#object",
"id": "BUCKET-NAME/OBJECT-NAME/GENERATION",
"selfLink": "https://www.googleapis.com/storage/v1/b/BUCKET-NAME/o/OBJECT-NAME.txt",
"name": "OBJECT-NAME",
"bucket": "BUCKET-NAME",
"generation": "1593534371944198",
"metageneration": "1",
"contentType": "text/plain",
"timeCreated": "2020-09-11T23:55:39.472Z",
"updated": "2020-09-11T23:55:39.472Z",
"storageClass": "STANDARD",
"timeStorageClassUpdated": "2020-09-11T23:55:39.472Z",
"size": "FILESIZE",
"md5Hash": "lc2/e7x2XJ4BnKelDsjAjw==",
"mediaLink": "https://www.googleapis.com/download/storage/v1/b/BUCKET-NAME/o/OBJECT-NAME?generation=GENERATION&alt=media",
"contentLanguage": "en",
"crc32c": "lPCM8g==",
"etag": "CLOV7t+m4usCEAE="
}

Cloud Scheduler

{
"custom_data": "c2NoZWR1bGVyIGN1c3RvbSBkYXRh" // base64 encoded "scheduler custom data"
}

Pub/Sub

{
"subscription": "cre-src_rc3_source-for-knativegcp-test-pubsub-tr_fcdf7716-c4bd-43b9-8ccc-e6e8ff848cd4",
"message": {
  "messageId": "1314133748793931",
  "data": "eyJIZWxsbyI6ICJ3b3JsZCJ9", // base64 encoded '{"Hello": "world"}'
  "publishTime": "2020-06-30T16:32:57.012Z"
}
}

Respuesta del receptor de eventos

Tu servicio de receptor debe enviar una respuesta HTTP 2xx para indicar que la recepción se realizó correctamente en el agente. El agente trata todas las demás respuestas HTTP como falla de entrega y reenvía el evento.

Ejemplo de código fuente del receptor

Python

@app.route('/', methods=['POST'])
def index():
    # Create CloudEvent from HTTP headers and body
    try:
        event = from_http(request.headers, request.get_data())

    except cloud_exceptions.MissingRequiredFields as e:
        print(f"cloudevents.exceptions.MissingRequiredFields: {e}")
        return "Failed to find all required cloudevent fields. ", 400

    except cloud_exceptions.InvalidStructuredJSON as e:
        print(f"cloudevents.exceptions.InvalidStructuredJSON: {e}")
        return "Could not deserialize the payload as JSON. ", 400

    except cloud_exceptions.InvalidRequiredFields as e:
        print(f"cloudevents.exceptions.InvalidRequiredFields: {e}")
        return "Request contained invalid required cloudevent fields. ", 400

    envelope = event.data

    if not envelope:
        msg = 'no Pub/Sub message received'
        print(f'error: {msg}')
        return f'Bad Request: {msg}', 400

    if not isinstance(envelope, dict) or 'message' not in envelope:
        msg = 'invalid Pub/Sub message format'
        print(f'error: {msg}')
        return f'Bad Request: {msg}', 400

    pubsub_message = envelope['message']

    name = 'World'
    if isinstance(pubsub_message, dict) and 'data' in pubsub_message:
        name = base64.b64decode(pubsub_message['data']).decode('utf-8').strip()

    resp = f"Hello, {name}! ID: {event['id']}"
    print(resp)
    return (resp, 200)

Java

import com.example.cloudrun.eventpojos.PubSubBody;
import java.util.Base64;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class EventController {
  @RequestMapping(value = "/", method = RequestMethod.POST)
  public ResponseEntity<String> receiveMessage(
      @RequestBody PubSubBody body, @RequestHeader Map<String, String> headers) {
    // Get PubSub message from request body.
    PubSubBody.PubSubMessage message = body.getMessage();
    if (message == null) {
      String msg = "No Pub/Sub message received.";
      System.out.println(msg);
      return new ResponseEntity<String>(msg, HttpStatus.BAD_REQUEST);
    }

    String data = message.getData();
    if (data == null || data.isEmpty()) {
      String msg = "Invalid Pub/Sub message format.";
      System.out.println(msg);
      return new ResponseEntity<String>(msg, HttpStatus.BAD_REQUEST);
    }

    String name =
        !StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : "World";
    String ceId = headers.getOrDefault("ce-id", "");
    String msg = String.format("Hello, %s! ID: %s", name, ceId);
    System.out.println(msg);
    return new ResponseEntity<String>(msg, HttpStatus.OK);
  }
}

Node.js

const express = require('express');
const app = express();

app.use(express.json());
app.post('/', (req, res) => {
  if (!req.body) {
    const errorMessage = 'no Pub/Sub message received';
    res.status(400).send(`Bad Request: ${errorMessage}`);
    console.log(`Bad Request: ${errorMessage}`);
    return;
  }
  if (!req.body.message) {
    const errorMessage = 'invalid Pub/Sub message format';
    res.status(400).send(`Bad Request: ${errorMessage}`);
    console.log(`Bad Request: ${errorMessage}`);
    return;
  }
  const pubSubMessage = req.body.message;
  const name = pubSubMessage.data
    ? Buffer.from(pubSubMessage.data, 'base64').toString().trim()
    : 'World';

  const result = `Hello, ${name}! ID: ${req.get('ce-id') || ''}`;
  console.log(result);
  res.send(result);
});

module.exports = app;

Go


// Sample run-events-pubsub is a Cloud Run service which handles Pub/Sub messages.
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
)

// PubSubMessage is the payload of a Pub/Sub event.
type PubSubMessage struct {
	Message struct {
		Data []byte `json:"data,omitempty"`
		ID   string `json:"id"`
	} `json:"message"`
	Subscription string `json:"subscription"`
}

// HelloEventsPubSub receives and processes a Pub/Sub push message.
func HelloEventsPubSub(w http.ResponseWriter, r *http.Request) {
	var e PubSubMessage
	if err := json.NewDecoder(r.Body).Decode(&e); err != nil {
		http.Error(w, "Bad HTTP Request", http.StatusBadRequest)
		log.Printf("Bad HTTP Request: %v", http.StatusBadRequest)
		return
	}
	name := string(e.Message.Data)
	if name == "" {
		name = "World"
	}
	s := fmt.Sprintf("Hello, %s! ID: %s", name, string(r.Header.Get("Ce-Id")))
	log.Printf(s)
	fmt.Fprintln(w, s)
}

func main() {
	http.HandleFunc("/", HelloEventsPubSub)
	// Determine port for HTTP service.
	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
		log.Printf("Defaulting to port %s", port)
	}
	// Start HTTP server.
	log.Printf("Listening on port %s", port)
	if err := http.ListenAndServe(":"+port, nil); err != nil {
		log.Fatal(err)
	}
}

C#


using CloudNative.CloudEvents;
using Google.Events;
using Google.Events.Protobuf.Cloud.PubSub.V1;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
    }

    public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILogger<Startup> logger)
    {
        if (env.IsDevelopment())
        {
            app.UseDeveloperExceptionPage();
        }

        logger.LogInformation("Service is starting...");

        app.UseRouting();

        app.UseEndpoints(endpoints =>
        {
            endpoints.MapPost("/", async context =>
            {
                var cloudEvent = await context.Request.ReadCloudEventAsync();
                logger.LogInformation("Received CloudEvent\n" + GetEventLog(cloudEvent));

                var messagePublishedData = CloudEventConverters.ConvertCloudEventData<MessagePublishedData>(cloudEvent);
                var pubSubMessage = messagePublishedData.Message;
                if (pubSubMessage == null)
                {
                    context.Response.StatusCode = 400;
                    await context.Response.WriteAsync("Bad request: Invalid Pub/Sub message format");
                    return;
                }

                var data = pubSubMessage.Data;
                logger.LogInformation($"Data: {data.ToBase64()}");

                var name = data.ToStringUtf8();
                logger.LogInformation($"Extracted name: {name}");

                var id = context.Request.Headers["ce-id"];
                await context.Response.WriteAsync($"Hello {name}! ID: {id}");
            });
        });
    }

    private string GetEventLog(CloudEvent cloudEvent)
    {
        return $"ID: {cloudEvent.Id}\n"
            + $"Source: {cloudEvent.Source}\n"
            + $"Type: {cloudEvent.Type}\n"
            + $"Subject: {cloudEvent.Subject}\n"
            + $"DataSchema: {cloudEvent.DataSchema}\n"
            + $"DataContentType: {cloudEvent.DataContentType}\n"
            + $"Time: {cloudEvent.Time?.ToUniversalTime():yyyy-MM-dd'T'HH:mm:ss.fff'Z'}\n"
            + $"SpecVersion: {cloudEvent.SpecVersion}\n"
            + $"Data: {cloudEvent.Data}";
    }
}

¿Qué sigue?