Sviluppo dei servizi di ricezione di eventi

Questa pagina mostra come creare ed eseguire il deployment di un servizio di ricezione di eventi. Il servizio di destinazione riceve richieste HTTP contenenti l'evento nel formato CloudEvents.

I fornitori di eventi (sorgenti) possono fornire i seguenti tipi di eventi:

Risposta del destinatario di eventi

Il servizio ricevitore deve inviare una risposta 2xx HTTP per segnalare al router la corretta ricezione dell'evento. Il router considera tutte le altre risposte HTTP come errori di recapito e invia di nuovo l'evento.

Repository open source

La struttura del corpo HTTP per tutti gli eventi è disponibile nel repository GitHub di CloudEvents.

Il repository contiene quanto segue per aiutarti a comprendere e utilizzare i dati CloudEvents nel tuo linguaggio di programmazione:

  • buffer di protocollo Google per payload di dati CloudEvents
  • Schemi JSON generati
  • Un catalogo di schemi JSON pubblico

Sono inclusi anche i link alle librerie client.

Usa una libreria CloudEvents SDK

Puoi sviluppare servizi di ricezione di eventi utilizzando la libreria CloudEvents SDK, disponibile per le seguenti lingue:

Queste librerie sono open source e semplificano la trasformazione della richiesta HTTP in un oggetto CloudEvents con linguaggio idiomatico.

Esempio di codice sorgente del ricevitore

Cloud Audit Logs

Il codice campione mostra come leggere gli eventi di Cloud Storage utilizzando Cloud Audit Logs in un servizio di cui è stato eseguito il deployment in Cloud Run.

Python

@app.route("/", methods=["POST"])
def index():
    # Create a CloudEvent object from the incoming request
    event = from_http(request.headers, request.data)
    # Gets the GCS bucket name from the CloudEvent
    # Example: "storage.googleapis.com/projects/_/buckets/my-bucket"
    bucket = event.get("subject")

    print(f"Detected change in Cloud Storage bucket: {bucket}")
    return (f"Detected change in Cloud Storage bucket: {bucket}", 200)

Java

import io.cloudevents.CloudEvent;
import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.spring.http.CloudEventHttpUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class EventController {

  @RequestMapping(value = "/", method = RequestMethod.POST, consumes = "application/json")
  public ResponseEntity<String> receiveMessage(
      @RequestBody String body, @RequestHeader HttpHeaders headers) {
    CloudEvent event;
    try {
      event =
          CloudEventHttpUtils.fromHttp(headers)
              .withData(headers.getContentType().toString(), body.getBytes())
              .build();
    } catch (CloudEventRWException e) {
      return new ResponseEntity<>(e.getMessage(), HttpStatus.BAD_REQUEST);
    }

    String ceSubject = event.getSubject();
    String msg = "Detected change in Cloud Storage bucket: " + ceSubject;
    System.out.println(msg);
    return new ResponseEntity<>(msg, HttpStatus.OK);
  }
}

Node.js

const express = require('express');
const app = express();

app.use(express.json());
app.post('/', (req, res) => {
  if (!req.header('ce-subject')) {
    return res
      .status(400)
      .send('Bad Request: missing required header: ce-subject');
  }

  console.log(
    `Detected change in Cloud Storage bucket: ${req.header('ce-subject')}`
  );
  return res
    .status(200)
    .send(
      `Detected change in Cloud Storage bucket: ${req.header('ce-subject')}`
    );
});

module.exports = app;

Go


// Processes CloudEvents containing Cloud Audit Logs for Cloud Storage
package main

import (
	"fmt"
	"log"
	"net/http"
	"os"

	cloudevent "github.com/cloudevents/sdk-go/v2"
)

// HelloEventsStorage receives and processes a Cloud Audit Log event with Cloud Storage data.
func HelloEventsStorage(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "Expected HTTP POST request with CloudEvent payload", http.StatusMethodNotAllowed)
		return
	}

	event, err := cloudevent.NewEventFromHTTPRequest(r)
	if err != nil {
		log.Printf("cloudevent.NewEventFromHTTPRequest: %v", err)
		http.Error(w, "Failed to create CloudEvent from request.", http.StatusBadRequest)
		return
	}
	s := fmt.Sprintf("Detected change in Cloud Storage bucket: %s", event.Subject())
	fmt.Fprintln(w, s)
}

C#


using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
    }

    public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILogger<Startup> logger)
    {
        if (env.IsDevelopment())
        {
            app.UseDeveloperExceptionPage();
        }

        logger.LogInformation("Service is starting...");

        app.UseRouting();

        app.UseEndpoints(endpoints =>
        {
            endpoints.MapPost("/", async context =>
            {
                logger.LogInformation("Handling HTTP POST");

                var ceSubject = context.Request.Headers["ce-subject"];
                logger.LogInformation($"ce-subject: {ceSubject}");

                if (string.IsNullOrEmpty(ceSubject))
                {
                    context.Response.StatusCode = 400;
                    await context.Response.WriteAsync("Bad Request: expected header Ce-Subject");
                    return;
                }

                await context.Response.WriteAsync($"GCS CloudEvent type: {ceSubject}");
            });
        });
    }
}

Pub/Sub

Il codice campione mostra come leggere gli eventi Pub/Sub in un servizio di cui è stato eseguito il deployment in Cloud Run.

Python

@app.route("/", methods=["POST"])
def index():
    data = request.get_json()
    if not data:
        msg = "no Pub/Sub message received"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400

    if not isinstance(data, dict) or "message" not in data:
        msg = "invalid Pub/Sub message format"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400

    pubsub_message = data["message"]

    name = "World"
    if isinstance(pubsub_message, dict) and "data" in pubsub_message:
        name = base64.b64decode(pubsub_message["data"]).decode("utf-8").strip()

    resp = f"Hello, {name}! ID: {request.headers.get('ce-id')}"
    print(resp)

    return (resp, 200)

Java

import com.example.cloudrun.eventpojos.PubSubBody;
import java.util.Base64;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class EventController {
  @RequestMapping(value = "/", method = RequestMethod.POST)
  public ResponseEntity<String> receiveMessage(
      @RequestBody PubSubBody body, @RequestHeader Map<String, String> headers) {
    // Get PubSub message from request body.
    PubSubBody.PubSubMessage message = body.getMessage();
    if (message == null) {
      String msg = "No Pub/Sub message received.";
      System.out.println(msg);
      return new ResponseEntity<>(msg, HttpStatus.BAD_REQUEST);
    }

    String data = message.getData();
    if (data == null || data.isEmpty()) {
      String msg = "Invalid Pub/Sub message format.";
      System.out.println(msg);
      return new ResponseEntity<>(msg, HttpStatus.BAD_REQUEST);
    }

    String name =
        !StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : "World";
    String ceId = headers.getOrDefault("ce-id", "");
    String msg = String.format("Hello, %s! ID: %s", name, ceId);
    System.out.println(msg);
    return new ResponseEntity<>(msg, HttpStatus.OK);
  }
}

Node.js

const express = require('express');
const {
  toMessagePublishedData,
} = require('@google/events/cloud/pubsub/v1/MessagePublishedData');
const app = express();

app.use(express.json());
app.post('/', (req, res) => {
  if (!req.body) {
    const errorMessage = 'no Pub/Sub message received';
    res.status(400).send(`Bad Request: ${errorMessage}`);
    console.log(`Bad Request: ${errorMessage}`);
    return;
  }
  if (!req.body.message) {
    const errorMessage = 'invalid Pub/Sub message format';
    res.status(400).send(`Bad Request: ${errorMessage}`);
    console.log(`Bad Request: ${errorMessage}`);
    return;
  }
  // Cast to MessagePublishedEvent for IDE autocompletion
  const pubSubMessage = toMessagePublishedData(req.body);
  const name =
    pubSubMessage.message && pubSubMessage.message.data
      ? Buffer.from(pubSubMessage.message.data, 'base64').toString().trim()
      : 'World';

  const result = `Hello, ${name}! ID: ${req.get('ce-id') || ''}`;
  console.log(result);
  res.send(result);
});

module.exports = app;

Go


// Sample pubsub is a Cloud Run service which handles Pub/Sub messages.
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
)

// PubSubMessage is the payload of a Pub/Sub event.
// See the documentation for more details:
// https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
type PubSubMessage struct {
	Message struct {
		Data []byte `json:"data,omitempty"`
		ID   string `json:"id"`
	} `json:"message"`
	Subscription string `json:"subscription"`
}

// HelloEventsPubSub receives and processes a Pub/Sub push message.
func HelloEventsPubSub(w http.ResponseWriter, r *http.Request) {
	var e PubSubMessage
	if err := json.NewDecoder(r.Body).Decode(&e); err != nil {
		http.Error(w, "Bad HTTP Request", http.StatusBadRequest)
		log.Printf("Bad HTTP Request: %v", http.StatusBadRequest)
		return
	}
	name := string(e.Message.Data)
	if name == "" {
		name = "World"
	}
	s := fmt.Sprintf("Hello, %s! ID: %s", name, string(r.Header.Get("Ce-Id")))
	log.Printf(s)
	fmt.Fprintln(w, s)
}

C#


using CloudNative.CloudEvents;
using CloudNative.CloudEvents.AspNetCore;
using Google.Events.Protobuf.Cloud.PubSub.V1;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
    }

    public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILogger<Startup> logger)
    {
        if (env.IsDevelopment())
        {
            app.UseDeveloperExceptionPage();
        }

        logger.LogInformation("Service is starting...");

        app.UseRouting();

        app.UseEndpoints(endpoints =>
        {
            endpoints.MapPost("/", async context =>
            {
                var formatter = CloudEventFormatterAttribute.CreateFormatter(typeof(MessagePublishedData));
                var cloudEvent = await context.Request.ToCloudEventAsync(formatter);
                logger.LogInformation("Received CloudEvent\n" + GetEventLog(cloudEvent));

                var messagePublishedData = (MessagePublishedData) cloudEvent.Data;
                var pubSubMessage = messagePublishedData.Message;
                if (pubSubMessage == null)
                {
                    context.Response.StatusCode = 400;
                    await context.Response.WriteAsync("Bad request: Invalid Pub/Sub message format");
                    return;
                }

                var data = pubSubMessage.Data;
                logger.LogInformation($"Data: {data.ToBase64()}");

                var name = data.ToStringUtf8();
                logger.LogInformation($"Extracted name: {name}");

                var id = context.Request.Headers["ce-id"];
                await context.Response.WriteAsync($"Hello {name}! ID: {id}");
            });
        });
    }

    private string GetEventLog(CloudEvent cloudEvent)
    {
        return $"ID: {cloudEvent.Id}\n"
            + $"Source: {cloudEvent.Source}\n"
            + $"Type: {cloudEvent.Type}\n"
            + $"Subject: {cloudEvent.Subject}\n"
            + $"DataSchema: {cloudEvent.DataSchema}\n"
            + $"DataContentType: {cloudEvent.DataContentType}\n"
            + $"Time: {cloudEvent.Time?.UtcDateTime:yyyy-MM-dd'T'HH:mm:ss.fff'Z'}\n"
            + $"SpecVersion: {cloudEvent.SpecVersion}\n"
            + $"Data: {cloudEvent.Data}";
    }
}

Passaggi successivi