Mengembangkan penerima peristiwa

Halaman ini menunjukkan cara membuat dan men-deploy layanan penerima peristiwa. Layanan target menerima permintaan HTTP yang berisi peristiwa dalam format CloudEvents.

Penyedia (sumber) peristiwa dapat memberikan jenis peristiwa berikut:

Respons penerima peristiwa

Layanan penerima Anda harus mengirim respons 2xx HTTP untuk menandakan tanda terima peristiwa yang berhasil ke router. Router memperlakukan semua respons HTTP lainnya sebagai kegagalan pengiriman dan akan mengirim ulang peristiwa.

Repositori open source

Struktur isi HTTP untuk semua peristiwa tersedia di repositori GitHub CloudEvents.

Repositori ini berisi hal berikut untuk membantu Anda memahami dan menggunakan data CloudEvents dalam bahasa pemrograman:

  • Google Protocol Buffers untuk payload data CloudEvents
  • Skema JSON yang dihasilkan
  • Katalog skema JSON publik

Link ke library klien juga disertakan.

Menggunakan library CloudEvents SDK

Anda dapat mengembangkan layanan penerima peristiwa menggunakan library CloudEvents SDK, yang tersedia untuk bahasa berikut:

Library ini bersifat open source dan mempermudah transformasi permintaan HTTP Anda menjadi objek CloudEvents idiomatis bahasa.

Contoh kode sumber penerima

Cloud Audit Logs

Kode contoh menunjukkan cara membaca peristiwa Cloud Storage menggunakan Cloud Audit Logs dalam layanan yang di-deploy ke Cloud Run.

Python

@app.route("/", methods=["POST"])
def index():
    # Create a CloudEvent object from the incoming request
    event = from_http(request.headers, request.data)
    # Gets the GCS bucket name from the CloudEvent
    # Example: "storage.googleapis.com/projects/_/buckets/my-bucket"
    bucket = event.get("subject")

    print(f"Detected change in Cloud Storage bucket: {bucket}")
    return (f"Detected change in Cloud Storage bucket: {bucket}", 200)

Java

import io.cloudevents.CloudEvent;
import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.spring.http.CloudEventHttpUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class EventController {

  @RequestMapping(value = "/", method = RequestMethod.POST, consumes = "application/json")
  public ResponseEntity<String> receiveMessage(
      @RequestBody String body, @RequestHeader HttpHeaders headers) {
    CloudEvent event;
    try {
      event =
          CloudEventHttpUtils.fromHttp(headers)
              .withData(headers.getContentType().toString(), body.getBytes())
              .build();
    } catch (CloudEventRWException e) {
      return new ResponseEntity<>(e.getMessage(), HttpStatus.BAD_REQUEST);
    }

    String ceSubject = event.getSubject();
    String msg = "Detected change in Cloud Storage bucket: " + ceSubject;
    System.out.println(msg);
    return new ResponseEntity<>(msg, HttpStatus.OK);
  }
}

Node.js

const express = require('express');
const app = express();

app.use(express.json());
app.post('/', (req, res) => {
  if (!req.header('ce-subject')) {
    return res
      .status(400)
      .send('Bad Request: missing required header: ce-subject');
  }

  console.log(
    `Detected change in Cloud Storage bucket: ${req.header('ce-subject')}`
  );
  return res
    .status(200)
    .send(
      `Detected change in Cloud Storage bucket: ${req.header('ce-subject')}`
    );
});

module.exports = app;

Go


// Processes CloudEvents containing Cloud Audit Logs for Cloud Storage
package main

import (
	"fmt"
	"log"
	"net/http"
	"os"

	cloudevent "github.com/cloudevents/sdk-go/v2"
)

// HelloEventsStorage receives and processes a Cloud Audit Log event with Cloud Storage data.
func HelloEventsStorage(w http.ResponseWriter, r *http.Request) {
	if r.Method != http.MethodPost {
		http.Error(w, "Expected HTTP POST request with CloudEvent payload", http.StatusMethodNotAllowed)
		return
	}

	event, err := cloudevent.NewEventFromHTTPRequest(r)
	if err != nil {
		log.Printf("cloudevent.NewEventFromHTTPRequest: %v", err)
		http.Error(w, "Failed to create CloudEvent from request.", http.StatusBadRequest)
		return
	}
	s := fmt.Sprintf("Detected change in Cloud Storage bucket: %s", event.Subject())
	fmt.Fprintln(w, s)
}

C#


using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
    }

    public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILogger<Startup> logger)
    {
        if (env.IsDevelopment())
        {
            app.UseDeveloperExceptionPage();
        }

        logger.LogInformation("Service is starting...");

        app.UseRouting();

        app.UseEndpoints(endpoints =>
        {
            endpoints.MapPost("/", async context =>
            {
                logger.LogInformation("Handling HTTP POST");

                var ceSubject = context.Request.Headers["ce-subject"];
                logger.LogInformation($"ce-subject: {ceSubject}");

                if (string.IsNullOrEmpty(ceSubject))
                {
                    context.Response.StatusCode = 400;
                    await context.Response.WriteAsync("Bad Request: expected header Ce-Subject");
                    return;
                }

                await context.Response.WriteAsync($"GCS CloudEvent type: {ceSubject}");
            });
        });
    }
}

Pub/Sub

Kode contoh menunjukkan cara membaca peristiwa Pub/Sub dalam layanan yang di-deploy ke Cloud Run.

Python

@app.route("/", methods=["POST"])
def index():
    data = request.get_json()
    if not data:
        msg = "no Pub/Sub message received"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400

    if not isinstance(data, dict) or "message" not in data:
        msg = "invalid Pub/Sub message format"
        print(f"error: {msg}")
        return f"Bad Request: {msg}", 400

    pubsub_message = data["message"]

    name = "World"
    if isinstance(pubsub_message, dict) and "data" in pubsub_message:
        name = base64.b64decode(pubsub_message["data"]).decode("utf-8").strip()

    resp = f"Hello, {name}! ID: {request.headers.get('ce-id')}"
    print(resp)

    return (resp, 200)

Java

import com.example.cloudrun.eventpojos.PubSubBody;
import java.util.Base64;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class EventController {
  @RequestMapping(value = "/", method = RequestMethod.POST)
  public ResponseEntity<String> receiveMessage(
      @RequestBody PubSubBody body, @RequestHeader Map<String, String> headers) {
    // Get PubSub message from request body.
    PubSubBody.PubSubMessage message = body.getMessage();
    if (message == null) {
      String msg = "No Pub/Sub message received.";
      System.out.println(msg);
      return new ResponseEntity<>(msg, HttpStatus.BAD_REQUEST);
    }

    String data = message.getData();
    if (data == null || data.isEmpty()) {
      String msg = "Invalid Pub/Sub message format.";
      System.out.println(msg);
      return new ResponseEntity<>(msg, HttpStatus.BAD_REQUEST);
    }

    String name =
        !StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : "World";
    String ceId = headers.getOrDefault("ce-id", "");
    String msg = String.format("Hello, %s! ID: %s", name, ceId);
    System.out.println(msg);
    return new ResponseEntity<>(msg, HttpStatus.OK);
  }
}

Node.js

const express = require('express');
const {
  toMessagePublishedData,
} = require('@google/events/cloud/pubsub/v1/MessagePublishedData');
const app = express();

app.use(express.json());
app.post('/', (req, res) => {
  if (!req.body) {
    const errorMessage = 'no Pub/Sub message received';
    res.status(400).send(`Bad Request: ${errorMessage}`);
    console.log(`Bad Request: ${errorMessage}`);
    return;
  }
  if (!req.body.message) {
    const errorMessage = 'invalid Pub/Sub message format';
    res.status(400).send(`Bad Request: ${errorMessage}`);
    console.log(`Bad Request: ${errorMessage}`);
    return;
  }
  // Cast to MessagePublishedEvent for IDE autocompletion
  const pubSubMessage = toMessagePublishedData(req.body);
  const name =
    pubSubMessage.message && pubSubMessage.message.data
      ? Buffer.from(pubSubMessage.message.data, 'base64').toString().trim()
      : 'World';

  const result = `Hello, ${name}! ID: ${req.get('ce-id') || ''}`;
  console.log(result);
  res.send(result);
});

module.exports = app;

Go


// Sample pubsub is a Cloud Run service which handles Pub/Sub messages.
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
)

// PubSubMessage is the payload of a Pub/Sub event.
// See the documentation for more details:
// https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
type PubSubMessage struct {
	Message struct {
		Data []byte `json:"data,omitempty"`
		ID   string `json:"id"`
	} `json:"message"`
	Subscription string `json:"subscription"`
}

// HelloEventsPubSub receives and processes a Pub/Sub push message.
func HelloEventsPubSub(w http.ResponseWriter, r *http.Request) {
	var e PubSubMessage
	if err := json.NewDecoder(r.Body).Decode(&e); err != nil {
		http.Error(w, "Bad HTTP Request", http.StatusBadRequest)
		log.Printf("Bad HTTP Request: %v", http.StatusBadRequest)
		return
	}
	name := string(e.Message.Data)
	if name == "" {
		name = "World"
	}
	s := fmt.Sprintf("Hello, %s! ID: %s", name, string(r.Header.Get("Ce-Id")))
	log.Printf(s)
	fmt.Fprintln(w, s)
}

C#


using CloudNative.CloudEvents;
using CloudNative.CloudEvents.AspNetCore;
using Google.Events.Protobuf.Cloud.PubSub.V1;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
    }

    public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILogger<Startup> logger)
    {
        if (env.IsDevelopment())
        {
            app.UseDeveloperExceptionPage();
        }

        logger.LogInformation("Service is starting...");

        app.UseRouting();

        app.UseEndpoints(endpoints =>
        {
            endpoints.MapPost("/", async context =>
            {
                var formatter = CloudEventFormatterAttribute.CreateFormatter(typeof(MessagePublishedData));
                var cloudEvent = await context.Request.ToCloudEventAsync(formatter);
                logger.LogInformation("Received CloudEvent\n" + GetEventLog(cloudEvent));

                var messagePublishedData = (MessagePublishedData) cloudEvent.Data;
                var pubSubMessage = messagePublishedData.Message;
                if (pubSubMessage == null)
                {
                    context.Response.StatusCode = 400;
                    await context.Response.WriteAsync("Bad request: Invalid Pub/Sub message format");
                    return;
                }

                var data = pubSubMessage.Data;
                logger.LogInformation($"Data: {data.ToBase64()}");

                var name = data.ToStringUtf8();
                logger.LogInformation($"Extracted name: {name}");

                var id = context.Request.Headers["ce-id"];
                await context.Response.WriteAsync($"Hello {name}! ID: {id}");
            });
        });
    }

    private string GetEventLog(CloudEvent cloudEvent)
    {
        return $"ID: {cloudEvent.Id}\n"
            + $"Source: {cloudEvent.Source}\n"
            + $"Type: {cloudEvent.Type}\n"
            + $"Subject: {cloudEvent.Subject}\n"
            + $"DataSchema: {cloudEvent.DataSchema}\n"
            + $"DataContentType: {cloudEvent.DataContentType}\n"
            + $"Time: {cloudEvent.Time?.UtcDateTime:yyyy-MM-dd'T'HH:mm:ss.fff'Z'}\n"
            + $"SpecVersion: {cloudEvent.SpecVersion}\n"
            + $"Data: {cloudEvent.Data}";
    }
}

Langkah berikutnya