本页面介绍了如何创建和部署事件接收器服务。目标服务接收包含 CloudEvents 格式事件的 HTTP 请求。
事件提供方(来源)可以提供以下事件类型:
事件接收器响应
您的接收器服务应该发送 HTTP 2xx
响应,以向路由器发出成功接收事件的信号。路由器将其他所有 HTTP 响应视为传送失败,并会重新发送事件。
开源代码库
所有事件的 HTTP 正文结构都位于 CloudEvents GitHub 代码库中。
该代码库包含以下内容,帮助您了解和使用采用您的编程语言的 CloudEvents 数据:
- 用于 CloudEvents 数据载荷的 Google Protocol Buffers
- 生成的 JSON 架构
- 公开 JSON 架构目录
还包含指向客户端库的链接。
使用 CloudEvents SDK 库
您可以使用以下语言的 CloudEvents SDK 库开发事件接收器服务:
这些库是开源库,可让您更轻松地将 HTTP 请求转换为语言惯用的 CloudEvent 对象。
接收器源代码示例
Cloud Audit Logs
该示例代码演示了如何在部署到 Cloud Run 的服务中使用 Cloud Audit Logs 读取 Cloud Storage 事件。
Python
@app.route("/", methods=["POST"])
def index():
# Create a CloudEvent object from the incoming request
event = from_http(request.headers, request.data)
# Gets the GCS bucket name from the CloudEvent
# Example: "storage.googleapis.com/projects/_/buckets/my-bucket"
bucket = event.get("subject")
print(f"Detected change in Cloud Storage bucket: {bucket}")
return (f"Detected change in Cloud Storage bucket: {bucket}", 200)
Java
import io.cloudevents.CloudEvent;
import io.cloudevents.rw.CloudEventRWException;
import io.cloudevents.spring.http.CloudEventHttpUtils;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class EventController {
@RequestMapping(value = "/", method = RequestMethod.POST, consumes = "application/json")
public ResponseEntity<String> receiveMessage(
@RequestBody String body, @RequestHeader HttpHeaders headers) {
CloudEvent event;
try {
event =
CloudEventHttpUtils.fromHttp(headers)
.withData(headers.getContentType().toString(), body.getBytes())
.build();
} catch (CloudEventRWException e) {
return new ResponseEntity<>(e.getMessage(), HttpStatus.BAD_REQUEST);
}
String ceSubject = event.getSubject();
String msg = "Detected change in Cloud Storage bucket: " + ceSubject;
System.out.println(msg);
return new ResponseEntity<>(msg, HttpStatus.OK);
}
}
Node.js
const express = require('express');
const app = express();
app.use(express.json());
app.post('/', (req, res) => {
if (!req.header('ce-subject')) {
return res
.status(400)
.send('Bad Request: missing required header: ce-subject');
}
console.log(
`Detected change in Cloud Storage bucket: ${req.header('ce-subject')}`
);
return res
.status(200)
.send(
`Detected change in Cloud Storage bucket: ${req.header('ce-subject')}`
);
});
module.exports = app;
Go
// Processes CloudEvents containing Cloud Audit Logs for Cloud Storage
package main
import (
"fmt"
"log"
"net/http"
"os"
cloudevent "github.com/cloudevents/sdk-go/v2"
)
// HelloEventsStorage receives and processes a Cloud Audit Log event with Cloud Storage data.
func HelloEventsStorage(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Expected HTTP POST request with CloudEvent payload", http.StatusMethodNotAllowed)
return
}
event, err := cloudevent.NewEventFromHTTPRequest(r)
if err != nil {
log.Printf("cloudevent.NewEventFromHTTPRequest: %v", err)
http.Error(w, "Failed to create CloudEvent from request.", http.StatusBadRequest)
return
}
s := fmt.Sprintf("Detected change in Cloud Storage bucket: %s", event.Subject())
fmt.Fprintln(w, s)
}
C#
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILogger<Startup> logger)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
logger.LogInformation("Service is starting...");
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapPost("/", async context =>
{
logger.LogInformation("Handling HTTP POST");
var ceSubject = context.Request.Headers["ce-subject"];
logger.LogInformation($"ce-subject: {ceSubject}");
if (string.IsNullOrEmpty(ceSubject))
{
context.Response.StatusCode = 400;
await context.Response.WriteAsync("Bad Request: expected header Ce-Subject");
return;
}
await context.Response.WriteAsync($"GCS CloudEvent type: {ceSubject}");
});
});
}
}
Pub/Sub
该示例代码演示了如何读取部署到 Cloud Run 的服务中的 Pub/Sub 事件。
Python
@app.route("/", methods=["POST"])
def index():
data = request.get_json()
if not data:
msg = "no Pub/Sub message received"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
if not isinstance(data, dict) or "message" not in data:
msg = "invalid Pub/Sub message format"
print(f"error: {msg}")
return f"Bad Request: {msg}", 400
pubsub_message = data["message"]
name = "World"
if isinstance(pubsub_message, dict) and "data" in pubsub_message:
name = base64.b64decode(pubsub_message["data"]).decode("utf-8").strip()
resp = f"Hello, {name}! ID: {request.headers.get('ce-id')}"
print(resp)
return (resp, 200)
Java
import com.example.cloudrun.eventpojos.PubSubBody;
import java.util.Base64;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@RestController
public class EventController {
@RequestMapping(value = "/", method = RequestMethod.POST)
public ResponseEntity<String> receiveMessage(
@RequestBody PubSubBody body, @RequestHeader Map<String, String> headers) {
// Get PubSub message from request body.
PubSubBody.PubSubMessage message = body.getMessage();
if (message == null) {
String msg = "No Pub/Sub message received.";
System.out.println(msg);
return new ResponseEntity<>(msg, HttpStatus.BAD_REQUEST);
}
String data = message.getData();
if (data == null || data.isEmpty()) {
String msg = "Invalid Pub/Sub message format.";
System.out.println(msg);
return new ResponseEntity<>(msg, HttpStatus.BAD_REQUEST);
}
String name =
!StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : "World";
String ceId = headers.getOrDefault("ce-id", "");
String msg = String.format("Hello, %s! ID: %s", name, ceId);
System.out.println(msg);
return new ResponseEntity<>(msg, HttpStatus.OK);
}
}
Node.js
const express = require('express');
const {
toMessagePublishedData,
} = require('@google/events/cloud/pubsub/v1/MessagePublishedData');
const app = express();
app.use(express.json());
app.post('/', (req, res) => {
if (!req.body) {
const errorMessage = 'no Pub/Sub message received';
res.status(400).send(`Bad Request: ${errorMessage}`);
console.log(`Bad Request: ${errorMessage}`);
return;
}
if (!req.body.message) {
const errorMessage = 'invalid Pub/Sub message format';
res.status(400).send(`Bad Request: ${errorMessage}`);
console.log(`Bad Request: ${errorMessage}`);
return;
}
// Cast to MessagePublishedEvent for IDE autocompletion
const pubSubMessage = toMessagePublishedData(req.body);
const name =
pubSubMessage.message && pubSubMessage.message.data
? Buffer.from(pubSubMessage.message.data, 'base64').toString().trim()
: 'World';
const result = `Hello, ${name}! ID: ${req.get('ce-id') || ''}`;
console.log(result);
res.send(result);
});
module.exports = app;
Go
// Sample pubsub is a Cloud Run service which handles Pub/Sub messages.
package main
import (
"encoding/json"
"fmt"
"log"
"net/http"
"os"
)
// PubSubMessage is the payload of a Pub/Sub event.
// See the documentation for more details:
// https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
type PubSubMessage struct {
Message struct {
Data []byte `json:"data,omitempty"`
ID string `json:"id"`
} `json:"message"`
Subscription string `json:"subscription"`
}
// HelloEventsPubSub receives and processes a Pub/Sub push message.
func HelloEventsPubSub(w http.ResponseWriter, r *http.Request) {
var e PubSubMessage
if err := json.NewDecoder(r.Body).Decode(&e); err != nil {
http.Error(w, "Bad HTTP Request", http.StatusBadRequest)
log.Printf("Bad HTTP Request: %v", http.StatusBadRequest)
return
}
name := string(e.Message.Data)
if name == "" {
name = "World"
}
s := fmt.Sprintf("Hello, %s! ID: %s", name, string(r.Header.Get("Ce-Id")))
log.Printf(s)
fmt.Fprintln(w, s)
}
C#
using CloudNative.CloudEvents;
using CloudNative.CloudEvents.AspNetCore;
using Google.Events.Protobuf.Cloud.PubSub.V1;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
public class Startup
{
public void ConfigureServices(IServiceCollection services)
{
}
public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILogger<Startup> logger)
{
if (env.IsDevelopment())
{
app.UseDeveloperExceptionPage();
}
logger.LogInformation("Service is starting...");
app.UseRouting();
app.UseEndpoints(endpoints =>
{
endpoints.MapPost("/", async context =>
{
var formatter = CloudEventFormatterAttribute.CreateFormatter(typeof(MessagePublishedData));
var cloudEvent = await context.Request.ToCloudEventAsync(formatter);
logger.LogInformation("Received CloudEvent\n" + GetEventLog(cloudEvent));
var messagePublishedData = (MessagePublishedData) cloudEvent.Data;
var pubSubMessage = messagePublishedData.Message;
if (pubSubMessage == null)
{
context.Response.StatusCode = 400;
await context.Response.WriteAsync("Bad request: Invalid Pub/Sub message format");
return;
}
var data = pubSubMessage.Data;
logger.LogInformation($"Data: {data.ToBase64()}");
var name = data.ToStringUtf8();
logger.LogInformation($"Extracted name: {name}");
var id = context.Request.Headers["ce-id"];
await context.Response.WriteAsync($"Hello {name}! ID: {id}");
});
});
}
private string GetEventLog(CloudEvent cloudEvent)
{
return $"ID: {cloudEvent.Id}\n"
+ $"Source: {cloudEvent.Source}\n"
+ $"Type: {cloudEvent.Type}\n"
+ $"Subject: {cloudEvent.Subject}\n"
+ $"DataSchema: {cloudEvent.DataSchema}\n"
+ $"DataContentType: {cloudEvent.DataContentType}\n"
+ $"Time: {cloudEvent.Time?.UtcDateTime:yyyy-MM-dd'T'HH:mm:ss.fff'Z'}\n"
+ $"SpecVersion: {cloudEvent.SpecVersion}\n"
+ $"Data: {cloudEvent.Data}";
}
}