Ereignisempfänger entwickeln

Auf dieser Seite wird gezeigt, wie Sie einen Ereignisempfänger erstellen, der in Cloud Run for Anthos in Google Cloud bereitgestellt werden kann.

Ihr Cloud Run for Anthos in Google Cloud-Dienst kann Ereignisse von jeder Quelle empfangen, die den CloudEvents-Standard verwendet. Ereignisse für Cloud Run for Anthos unterstützen native und benutzerdefinierte Ereignisquellen.

Beispiele für native Ereignisquellen:

Trigger für benutzerdefinierte Ereignisquellen können HTTP-Anfragen aus einer beliebigen Quelle basierend auf dem Wert des Ce-Type:-HTTP-Headers filtern.

CloudEvents SDK-Bibliothek verwenden

Sie können Event-Receiver-Dienste mit der CloudEvents SDK-Bibliothek entwickeln, die in den folgenden Sprachen verfügbar ist:

Ereignisformat

Dienste empfangen Ereignisse von Ereignis-Brokern als HTTP-POST-Anfragen, die gemäß der HTTP-Protokollbindungsspezifikation von CloudEvents Version 1.0 formatiert wurden. Alle HTTP-Anfragen enthalten ereignisspezifische Header und Text und werden an den Root-Pfad (/) des Dienstes gesendet.

Dem HTTP-Header eines Ereignisses werden die folgenden Attribute zu einem Standard-HTTP-Header hinzugefügt:

Header Beschreibung Beispiel
ce-id Eindeutige Kennung für das Ereignis. 1096434104173400
ce-source Gibt die Quelle des Ereignisses an. //pubsub.googleapis.com/projects/serverless-com-demo/topics/my-topic
ce-specversion Die für dieses Ereignis verwendete Version der CloudEvent-Spezifikation. 1.0
ce-type Der Typ der Ereignisdaten. google.cloud.pubsub.topic.v1.messagePublished
ce-time (optional) Zeitpunkt der Ereigniserstellung im Format RFC 3339. 2020-12-20T13:37:33.647Z

Der HTTP-Text für alle Ereignisse befindet sich im CloudEvents-Repository. Das Repository enthält Protokollpuffer für Ereignisse zusammen mit dem JSON-Schema und Beispielen.

Ereignis-HTTP-Header

Die folgenden HTTP-Header werden den Ereignissen aus jeder Quelle hinzugefügt:

Cloud-Audit-Logs

HTTP-Header Wert Beschreibung
ce-id (Beispiel) projects/YOUR-PROJECT/logs/cloudaudit.googleapis.com%2Factivity50dhyee5b2531586289185422687 Eindeutige ID dieser Nachricht, die vom Server zugewiesen wird, wenn das Ereignis generiert wird.
ce-source //storage.googleapis.com/projects/YOUR-PROJECT Ein String, der Ihr Projekt enthält.
ce-specversion 1.0 Die für dieses Ereignis verwendete Spezifikationsversion.
ce-type google.events.cloud.audit.v1 Die CloudEvent-Typkonstante.
ce-time (Beispiel) 2020-10-08T17:57:33.647Z Der Zeitpunkt, zu dem die Nachricht gesendet wurde.
ce-dataschema type.googleapis.com/google.logging.v2.LogEntry Das Schema für die HTTP-POST-Daten.
ce-subject storage.googleapis.com/projects/_/buckets/single-region-bucket Metadaten zum Speicherort des GCS-Buckets.

Cloud Storage

HTTP-Header Wert Beschreibung
ce-id (Beispiel) 1096434104173400 Eindeutige ID dieser Nachricht, die vom Server zugewiesen wird, wenn das Ereignis generiert wird.
ce-source //storage.googleapis.com/projects/_/buckets/BUCKET-NAME Ein String mit Ihrem Cloud Storage-Bucket.
ce-specversion 1.0 Die für dieses Ereignis verwendete Spezifikationsversion.
ce-type google.cloud.storage.object.v1.finalized
google.cloud.storage.object.v1.archived
google.cloud.storage.object.v1.deleted
google.cloud.storage.object.v1.metadataUpdated
Die CloudEvent-Typkonstante.
ce-time (Beispiel) 2020-10-08T17:57:33.647Z Der Zeitpunkt, zu dem die Nachricht gesendet wurde.
ce-dataschema https://raw.githubusercontent.com/googleapis/google-cloudevents/master/proto/google/events/cloud/storage/v1/data.proto Das Schema für die HTTP-POST-Daten.
ce-subject objects/OBJECT_NAME Metadaten zum Speicherort des GCS-Buckets.

Cloud Scheduler

HTTP-Header Wert Beschreibung
ce-id (Beispiel) 1096434104173400 Eindeutige ID dieser Nachricht, die vom Server zugewiesen wird, wenn das Ereignis generiert wird.
ce-source //cloudscheduler.googleapis.com/JOB-NAME Ein String mit dem Namen Ihres Cloud Scheduler-Jobs.
ce-specversion 1.0 Die für dieses Ereignis verwendete Spezifikationsversion.
ce-type google.cloud.scheduler.job.v1.executed Die CloudEvent-Typkonstante.
ce-time (Beispiel) 2020-10-08T17:57:33.647Z Der Zeitpunkt, zu dem die Nachricht gesendet wurde.
ce-dataschema https://raw.githubusercontent.com/googleapis/google-cloudevents/master/proto/google/events/cloud/scheduler/v1/data.proto Das Schema für die HTTP-POST-Daten.
ce-subject Metadaten zum Cloud Scheduler-Job

Pub/Sub

HTTP-Header Wert Beschreibung
ce-id (Beispiel) 1096434104173400 Eindeutige ID dieser Nachricht, die vom Server zugewiesen wird, wenn das Ereignis generiert wird.
ce-source //pubsub.googleapis.com/projects/YOUR-PROJECT/topics/YOUR-TOPIC Ein String mit Ihrem Projekt und Thema.
ce-specversion 1.0 Die für dieses Ereignis verwendete Spezifikationsversion.
ce-type google.events.cloud.pubsub.v1 Die CloudEvent-Typkonstante.
ce-time (Beispiel) 2020-10-08T17:57:33.647Z Der Zeitpunkt, zu dem die Nachricht gesendet wurde.

HTTP-Ereignistext

Der HTTP-Text der Anfrage sieht in etwa so aus:

Cloud-Audit-Logs

{
"insertId": "9frck8cf9j",
"logName": "projects/test-project/logs/cloudaudit.googleapis.com%2Factivity",
"protoPayload": {
  "@type": "type.googleapis.com/google.cloud.audit.AuditLog",
  "authenticationInfo": {
    "principalEmail": "robot@test-project.iam.gserviceaccount.com",
    "principalSubject": "user:robot@test-project.iam.gserviceaccount.com",
    "serviceAccountKeyName": "//iam.googleapis.com/projects/test-project/serviceAccounts/robot@test-project.iam.gserviceaccount.com/keys/90f662482321f1ca8e82ea675b1a1c30c1fe681f"
  },
  "authorizationInfo": [
    {
      "granted": true,
      "permission": "pubsub.topics.create",
      "resource": "projects/test-project",
      "resourceAttributes": {}
    }
  ],
  "methodName": "google.pubsub.v1.Publisher.CreateTopic",
  "request": {
    "@type": "type.googleapis.com/google.pubsub.v1.Topic",
    "name": "projects/test-project/topics/test-auditlogs-source"
  },
  "requestMetadata": {
    "callerIp": "192.168.0.1",
    "callerNetwork": "//compute.googleapis.com/projects/google.com:my-laptop/global/networks/__unknown__",
    "callerSuppliedUserAgent": "google-cloud-sdk",
    "destinationAttributes": {},
    "requestAttributes": {
      "auth": {},
      "time": "2020-06-30T16:14:47.600710407Z"
    }
  },
  "resourceLocation": {
    "currentLocations": [
      "asia-east1",
      "asia-northeast1",
      "asia-southeast1",
      "australia-southeast1",
      "europe-north1",
      "europe-west1",
      "europe-west2",
      "europe-west3",
      "europe-west4",
      "us-central1",
      "us-central2",
      "us-east1",
      "us-east4",
      "us-west1",
      "us-west2",
      "us-west3",
      "us-west4"
    ]
  },
  "resourceName": "projects/test-project/topics/test-auditlogs-source",
  "response": {
    "@type": "type.googleapis.com/google.pubsub.v1.Topic",
    "messageStoragePolicy": {
      "allowedPersistenceRegions": [
        "asia-east1",
        "asia-northeast1",
        "asia-southeast1",
        "australia-southeast1",
        "europe-north1",
        "europe-west1",
        "europe-west2",
        "europe-west3",
        "europe-west4",
        "us-central1",
        "us-central2",
        "us-east1",
        "us-east4",
        "us-west1",
        "us-west2",
        "us-west3",
        "us-west4"
      ]
    },
    "name": "projects/test-project/topics/test-auditlogs-source"
  },
  "serviceName": "pubsub.googleapis.com"
},
"receiveTimestamp": "2020-06-30T16:14:48.401489148Z",
"resource": {
  "labels": {
    "project_id": "test-project",
    "topic_id": "projects/test-project/topics/test-auditlogs-source"
  },
  "type": "pubsub_topic"
},
"severity": "NOTICE",
"timestamp": "2020-06-30T16:14:47.593398572Z"
}

Cloud Storage

{
"kind": "storage#object",
"id": "BUCKET-NAME/OBJECT-NAME/GENERATION",
"selfLink": "https://www.googleapis.com/storage/v1/b/BUCKET-NAME/o/OBJECT-NAME.txt",
"name": "OBJECT-NAME",
"bucket": "BUCKET-NAME",
"generation": "1593534371944198",
"metageneration": "1",
"contentType": "text/plain",
"timeCreated": "2020-09-11T23:55:39.472Z",
"updated": "2020-09-11T23:55:39.472Z",
"storageClass": "STANDARD",
"timeStorageClassUpdated": "2020-09-11T23:55:39.472Z",
"size": "FILESIZE",
"md5Hash": "lc2/e7x2XJ4BnKelDsjAjw==",
"mediaLink": "https://www.googleapis.com/download/storage/v1/b/BUCKET-NAME/o/OBJECT-NAME?generation=GENERATION&alt=media",
"contentLanguage": "en",
"crc32c": "lPCM8g==",
"etag": "CLOV7t+m4usCEAE="
}

Cloud Scheduler

{
"custom_data": "c2NoZWR1bGVyIGN1c3RvbSBkYXRh" // base64 encoded "scheduler custom data"
}

Pub/Sub

{
"subscription": "cre-src_rc3_source-for-knativegcp-test-pubsub-tr_fcdf7716-c4bd-43b9-8ccc-e6e8ff848cd4",
"message": {
  "messageId": "1314133748793931",
  "data": "eyJIZWxsbyI6ICJ3b3JsZCJ9", // base64 encoded '{"Hello": "world"}'
  "publishTime": "2020-06-30T16:32:57.012Z"
}
}

Antwort des Ereignisempfängers

Der Empfängerdienst sollte eine HTTP-2xx-Antwort senden, um einen erfolgreichen Ereignisempfang an den Broker zu signalisieren. Der Broker behandelt alle anderen HTTP-Antworten als Zustellungsfehler und sendet das Ereignis noch einmal.

Quellcode des Beispielempfängers

Python

@app.route('/', methods=['POST'])
def index():
    # Create CloudEvent from HTTP headers and body
    try:
        event = from_http(request.headers, request.get_data())

    except cloud_exceptions.MissingRequiredFields as e:
        print(f"cloudevents.exceptions.MissingRequiredFields: {e}")
        return "Failed to find all required cloudevent fields. ", 400

    except cloud_exceptions.InvalidStructuredJSON as e:
        print(f"cloudevents.exceptions.InvalidStructuredJSON: {e}")
        return "Could not deserialize the payload as JSON. ", 400

    except cloud_exceptions.InvalidRequiredFields as e:
        print(f"cloudevents.exceptions.InvalidRequiredFields: {e}")
        return "Request contained invalid required cloudevent fields. ", 400

    envelope = event.data

    if not envelope:
        msg = 'no Pub/Sub message received'
        print(f'error: {msg}')
        return f'Bad Request: {msg}', 400

    if not isinstance(envelope, dict) or 'message' not in envelope:
        msg = 'invalid Pub/Sub message format'
        print(f'error: {msg}')
        return f'Bad Request: {msg}', 400

    pubsub_message = envelope['message']

    name = 'World'
    if isinstance(pubsub_message, dict) and 'data' in pubsub_message:
        name = base64.b64decode(pubsub_message['data']).decode('utf-8').strip()

    resp = f"Hello, {name}! ID: {event['id']}"
    print(resp)
    return (resp, 200)

Java

import com.example.cloudrun.eventpojos.PubSubBody;
import java.util.Base64;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class EventController {
  @RequestMapping(value = "/", method = RequestMethod.POST)
  public ResponseEntity<String> receiveMessage(
      @RequestBody PubSubBody body, @RequestHeader Map<String, String> headers) {
    // Get PubSub message from request body.
    PubSubBody.PubSubMessage message = body.getMessage();
    if (message == null) {
      String msg = "No Pub/Sub message received.";
      System.out.println(msg);
      return new ResponseEntity<String>(msg, HttpStatus.BAD_REQUEST);
    }

    String data = message.getData();
    if (data == null || data.isEmpty()) {
      String msg = "Invalid Pub/Sub message format.";
      System.out.println(msg);
      return new ResponseEntity<String>(msg, HttpStatus.BAD_REQUEST);
    }

    String name =
        !StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : "World";
    String ceId = headers.getOrDefault("ce-id", "");
    String msg = String.format("Hello, %s! ID: %s", name, ceId);
    System.out.println(msg);
    return new ResponseEntity<String>(msg, HttpStatus.OK);
  }
}

Node.js

const express = require('express');
const app = express();

app.use(express.json());
app.post('/', (req, res) => {
  if (!req.body) {
    const errorMessage = 'no Pub/Sub message received';
    res.status(400).send(`Bad Request: ${errorMessage}`);
    console.log(`Bad Request: ${errorMessage}`);
    return;
  }
  if (!req.body.message) {
    const errorMessage = 'invalid Pub/Sub message format';
    res.status(400).send(`Bad Request: ${errorMessage}`);
    console.log(`Bad Request: ${errorMessage}`);
    return;
  }
  const pubSubMessage = req.body.message;
  const name = pubSubMessage.data
    ? Buffer.from(pubSubMessage.data, 'base64').toString().trim()
    : 'World';

  const result = `Hello, ${name}! ID: ${req.get('ce-id') || ''}`;
  console.log(result);
  res.send(result);
});

module.exports = app;

Go


// Sample run-events-pubsub is a Cloud Run service which handles Pub/Sub messages.
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
)

// PubSubMessage is the payload of a Pub/Sub event.
type PubSubMessage struct {
	Message struct {
		Data []byte `json:"data,omitempty"`
		ID   string `json:"id"`
	} `json:"message"`
	Subscription string `json:"subscription"`
}

// HelloEventsPubSub receives and processes a Pub/Sub push message.
func HelloEventsPubSub(w http.ResponseWriter, r *http.Request) {
	var e PubSubMessage
	if err := json.NewDecoder(r.Body).Decode(&e); err != nil {
		http.Error(w, "Bad HTTP Request", http.StatusBadRequest)
		log.Printf("Bad HTTP Request: %v", http.StatusBadRequest)
		return
	}
	name := string(e.Message.Data)
	if name == "" {
		name = "World"
	}
	s := fmt.Sprintf("Hello, %s! ID: %s", name, string(r.Header.Get("Ce-Id")))
	log.Printf(s)
	fmt.Fprintln(w, s)
}

func main() {
	http.HandleFunc("/", HelloEventsPubSub)
	// Determine port for HTTP service.
	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
		log.Printf("Defaulting to port %s", port)
	}
	// Start HTTP server.
	log.Printf("Listening on port %s", port)
	if err := http.ListenAndServe(":"+port, nil); err != nil {
		log.Fatal(err)
	}
}

C#


using CloudNative.CloudEvents;
using Google.Events;
using Google.Events.Protobuf.Cloud.PubSub.V1;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
    }

    public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILogger<Startup> logger)
    {
        if (env.IsDevelopment())
        {
            app.UseDeveloperExceptionPage();
        }

        logger.LogInformation("Service is starting...");

        app.UseRouting();

        app.UseEndpoints(endpoints =>
        {
            endpoints.MapPost("/", async context =>
            {
                var cloudEvent = await context.Request.ReadCloudEventAsync();
                logger.LogInformation("Received CloudEvent\n" + GetEventLog(cloudEvent));

                var messagePublishedData = CloudEventConverters.ConvertCloudEventData<MessagePublishedData>(cloudEvent);
                var pubSubMessage = messagePublishedData.Message;
                if (pubSubMessage == null)
                {
                    context.Response.StatusCode = 400;
                    await context.Response.WriteAsync("Bad request: Invalid Pub/Sub message format");
                    return;
                }

                var data = pubSubMessage.Data;
                logger.LogInformation($"Data: {data.ToBase64()}");

                var name = data.ToStringUtf8();
                logger.LogInformation($"Extracted name: {name}");

                var id = context.Request.Headers["ce-id"];
                await context.Response.WriteAsync($"Hello {name}! ID: {id}");
            });
        });
    }

    private string GetEventLog(CloudEvent cloudEvent)
    {
        return $"ID: {cloudEvent.Id}\n"
            + $"Source: {cloudEvent.Source}\n"
            + $"Type: {cloudEvent.Type}\n"
            + $"Subject: {cloudEvent.Subject}\n"
            + $"DataSchema: {cloudEvent.DataSchema}\n"
            + $"DataContentType: {cloudEvent.DataContentType}\n"
            + $"Time: {cloudEvent.Time?.ToUniversalTime():yyyy-MM-dd'T'HH:mm:ss.fff'Z'}\n"
            + $"SpecVersion: {cloudEvent.SpecVersion}\n"
            + $"Data: {cloudEvent.Data}";
    }
}

Nächste Schritte