Developing event receivers

This page shows you how to create a event receiver service that can be deployed to Cloud Run for Anthos on Google Cloud.

Your Cloud Run for Anthos on Google Cloud service can receive events from any source that uses the CloudEvents standard. Events for Cloud Run for Anthos supports native and custom event sources.

Native event sources include:

Custom event source triggers can filter HTTP requests from any source based on the value of Ce-Type: HTTP header.

Using a CloudEvents SDK library

You can develop event receiver services using the CloudEvents SDK library, available for the following languages:

Event format

Services receive events from event brokers as HTTP POST requests formatted to the CloudEvents version 1.0 HTTP protocol binding specification. Each HTTP request contains event specific headers and body, and are sent to the root path (/) of the service.

An event HTTP header has the following attributes added to a standard HTTP header:

Header Description Example
ce-id Unique identifier for the event. 1096434104173400
ce-source Identifies the source of the event. //pubsub.googleapis.com/projects/serverless-com-demo/topics/my-topic
ce-specversion The CloudEvent specification version used for this event. 1.0
ce-type The type of event data. google.cloud.pubsub.topic.v1.messagePublished
ce-time (optional) Event generation time, in RFC 3339 format. 2020-12-20T13:37:33.647Z

The HTTP body for all events are in the CloudEvents repository. The repository contains Protocol Buffers for events alongside the JSON schema and examples.

Event HTTP headers

The following HTTP headers are added to events from each source:

Cloud Audit Logs

HTTP Header Value Description
ce-id (example) projects/YOUR-PROJECT/logs/cloudaudit.googleapis.com%2Factivity50dhyee5b2531586289185422687 Unique ID of this message, assigned by the server when the event is generated.
ce-source //storage.googleapis.com/projects/YOUR-PROJECT A string containing your project.
ce-specversion 1.0 The specification version used for this event.
ce-type google.events.cloud.audit.v1 The CloudEvent type constant.
ce-time (example) 2020-10-08T17:57:33.647Z The time at which the message was sent.
ce-dataschema type.googleapis.com/google.logging.v2.LogEntry The schema for the HTTP POST data.
ce-subject storage.googleapis.com/projects/_/buckets/single-region-bucket Metadata about the GCS bucket location.

Cloud Storage

HTTP Header Value Description
ce-id (example) 1096434104173400 Unique ID of this message, assigned by the server when the event is generated.
ce-source //storage.googleapis.com/projects/_/buckets/BUCKET-NAME A string containing your Cloud Storage bucket.
ce-specversion 1.0 The specification version used for this event.
ce-type google.cloud.storage.object.v1.finalized
google.cloud.storage.object.v1.archived
google.cloud.storage.object.v1.deleted
google.cloud.storage.object.v1.metadataUpdated
The CloudEvent type constant.
ce-time (example) 2020-10-08T17:57:33.647Z The time at which the message was sent.
ce-dataschema https://raw.githubusercontent.com/googleapis/google-cloudevents/master/proto/google/events/cloud/storage/v1/data.proto The schema for the HTTP POST data.
ce-subject objects/OBJECT_NAME Metadata about the GCS bucket location.

Cloud Scheduler

HTTP Header Value Description
ce-id (example) 1096434104173400 Unique ID of this message, assigned by the server when the event is generated.
ce-source //cloudscheduler.googleapis.com/JOB-NAME A string containing your Cloud Scheduler job name.
ce-specversion 1.0 The specification version used for this event.
ce-type google.cloud.scheduler.job.v1.executed The CloudEvent type constant.
ce-time (example) 2020-10-08T17:57:33.647Z The time at which the message was sent.
ce-dataschema https://raw.githubusercontent.com/googleapis/google-cloudevents/master/proto/google/events/cloud/scheduler/v1/data.proto The schema for the HTTP POST data.
ce-subject N/A Metadata about the Cloud Scheduler job

Pub/Sub

HTTP Header Value Description
ce-id (example) 1096434104173400 Unique ID of this message, assigned by the server when the event is generated.
ce-source //pubsub.googleapis.com/projects/YOUR-PROJECT/topics/YOUR-TOPIC A string containing your project and topic.
ce-specversion 1.0 The specification version used for this event.
ce-type google.events.cloud.pubsub.v1 The CloudEvent type constant.
ce-time (example) 2020-10-08T17:57:33.647Z The time at which the message was sent.

Event HTTP body

The HTTP body of the request is similar to:

Cloud Audit Logs

{
"insertId": "9frck8cf9j",
"logName": "projects/test-project/logs/cloudaudit.googleapis.com%2Factivity",
"protoPayload": {
  "@type": "type.googleapis.com/google.cloud.audit.AuditLog",
  "authenticationInfo": {
    "principalEmail": "robot@test-project.iam.gserviceaccount.com",
    "principalSubject": "user:robot@test-project.iam.gserviceaccount.com",
    "serviceAccountKeyName": "//iam.googleapis.com/projects/test-project/serviceAccounts/robot@test-project.iam.gserviceaccount.com/keys/90f662482321f1ca8e82ea675b1a1c30c1fe681f"
  },
  "authorizationInfo": [
    {
      "granted": true,
      "permission": "pubsub.topics.create",
      "resource": "projects/test-project",
      "resourceAttributes": {}
    }
  ],
  "methodName": "google.pubsub.v1.Publisher.CreateTopic",
  "request": {
    "@type": "type.googleapis.com/google.pubsub.v1.Topic",
    "name": "projects/test-project/topics/test-auditlogs-source"
  },
  "requestMetadata": {
    "callerIp": "192.168.0.1",
    "callerNetwork": "//compute.googleapis.com/projects/google.com:my-laptop/global/networks/__unknown__",
    "callerSuppliedUserAgent": "google-cloud-sdk",
    "destinationAttributes": {},
    "requestAttributes": {
      "auth": {},
      "time": "2020-06-30T16:14:47.600710407Z"
    }
  },
  "resourceLocation": {
    "currentLocations": [
      "asia-east1",
      "asia-northeast1",
      "asia-southeast1",
      "australia-southeast1",
      "europe-north1",
      "europe-west1",
      "europe-west2",
      "europe-west3",
      "europe-west4",
      "us-central1",
      "us-central2",
      "us-east1",
      "us-east4",
      "us-west1",
      "us-west2",
      "us-west3",
      "us-west4"
    ]
  },
  "resourceName": "projects/test-project/topics/test-auditlogs-source",
  "response": {
    "@type": "type.googleapis.com/google.pubsub.v1.Topic",
    "messageStoragePolicy": {
      "allowedPersistenceRegions": [
        "asia-east1",
        "asia-northeast1",
        "asia-southeast1",
        "australia-southeast1",
        "europe-north1",
        "europe-west1",
        "europe-west2",
        "europe-west3",
        "europe-west4",
        "us-central1",
        "us-central2",
        "us-east1",
        "us-east4",
        "us-west1",
        "us-west2",
        "us-west3",
        "us-west4"
      ]
    },
    "name": "projects/test-project/topics/test-auditlogs-source"
  },
  "serviceName": "pubsub.googleapis.com"
},
"receiveTimestamp": "2020-06-30T16:14:48.401489148Z",
"resource": {
  "labels": {
    "project_id": "test-project",
    "topic_id": "projects/test-project/topics/test-auditlogs-source"
  },
  "type": "pubsub_topic"
},
"severity": "NOTICE",
"timestamp": "2020-06-30T16:14:47.593398572Z"
}

Cloud Storage

{
"kind": "storage#object",
"id": "BUCKET-NAME/OBJECT-NAME/GENERATION",
"selfLink": "https://www.googleapis.com/storage/v1/b/BUCKET-NAME/o/OBJECT-NAME.txt",
"name": "OBJECT-NAME",
"bucket": "BUCKET-NAME",
"generation": "1593534371944198",
"metageneration": "1",
"contentType": "text/plain",
"timeCreated": "2020-09-11T23:55:39.472Z",
"updated": "2020-09-11T23:55:39.472Z",
"storageClass": "STANDARD",
"timeStorageClassUpdated": "2020-09-11T23:55:39.472Z",
"size": "FILESIZE",
"md5Hash": "lc2/e7x2XJ4BnKelDsjAjw==",
"mediaLink": "https://www.googleapis.com/download/storage/v1/b/BUCKET-NAME/o/OBJECT-NAME?generation=GENERATION&alt=media",
"contentLanguage": "en",
"crc32c": "lPCM8g==",
"etag": "CLOV7t+m4usCEAE="
}

Cloud Scheduler

{
"custom_data": "c2NoZWR1bGVyIGN1c3RvbSBkYXRh" // base64 encoded "scheduler custom data"
}

Pub/Sub

{
"subscription": "cre-src_rc3_source-for-knativegcp-test-pubsub-tr_fcdf7716-c4bd-43b9-8ccc-e6e8ff848cd4",
"message": {
  "messageId": "1314133748793931",
  "data": "eyJIZWxsbyI6ICJ3b3JsZCJ9", // base64 encoded '{"Hello": "world"}'
  "publishTime": "2020-06-30T16:32:57.012Z"
}
}

Event receiver response

Your receiver service should send a HTTP 2xx response to signal successful event receipt to the broker. The broker treats all other HTTP responses as delivery failure and will resend the event.

Sample receiver source code

Python

@app.route('/', methods=['POST'])
def index():
    # Create CloudEvent from HTTP headers and body
    try:
        event = from_http(request.headers, request.get_data())

    except cloud_exceptions.MissingRequiredFields as e:
        print(f"cloudevents.exceptions.MissingRequiredFields: {e}")
        return "Failed to find all required cloudevent fields. ", 400

    except cloud_exceptions.InvalidStructuredJSON as e:
        print(f"cloudevents.exceptions.InvalidStructuredJSON: {e}")
        return "Could not deserialize the payload as JSON. ", 400

    except cloud_exceptions.InvalidRequiredFields as e:
        print(f"cloudevents.exceptions.InvalidRequiredFields: {e}")
        return "Request contained invalid required cloudevent fields. ", 400

    envelope = event.data

    if not envelope:
        msg = 'no Pub/Sub message received'
        print(f'error: {msg}')
        return f'Bad Request: {msg}', 400

    if not isinstance(envelope, dict) or 'message' not in envelope:
        msg = 'invalid Pub/Sub message format'
        print(f'error: {msg}')
        return f'Bad Request: {msg}', 400

    pubsub_message = envelope['message']

    name = 'World'
    if isinstance(pubsub_message, dict) and 'data' in pubsub_message:
        name = base64.b64decode(pubsub_message['data']).decode('utf-8').strip()

    resp = f"Hello, {name}! ID: {event['id']}"
    print(resp)
    return (resp, 200)

Java

import com.example.cloudrun.eventpojos.PubSubBody;
import java.util.Base64;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class EventController {
  @RequestMapping(value = "/", method = RequestMethod.POST)
  public ResponseEntity<String> receiveMessage(
      @RequestBody PubSubBody body, @RequestHeader Map<String, String> headers) {
    // Get PubSub message from request body.
    PubSubBody.PubSubMessage message = body.getMessage();
    if (message == null) {
      String msg = "No Pub/Sub message received.";
      System.out.println(msg);
      return new ResponseEntity<String>(msg, HttpStatus.BAD_REQUEST);
    }

    String data = message.getData();
    if (data == null || data.isEmpty()) {
      String msg = "Invalid Pub/Sub message format.";
      System.out.println(msg);
      return new ResponseEntity<String>(msg, HttpStatus.BAD_REQUEST);
    }

    String name =
        !StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : "World";
    String ceId = headers.getOrDefault("ce-id", "");
    String msg = String.format("Hello, %s! ID: %s", name, ceId);
    System.out.println(msg);
    return new ResponseEntity<String>(msg, HttpStatus.OK);
  }
}

Node.js

const express = require('express');
const app = express();

app.use(express.json());
app.post('/', (req, res) => {
  if (!req.body) {
    const errorMessage = 'no Pub/Sub message received';
    res.status(400).send(`Bad Request: ${errorMessage}`);
    console.log(`Bad Request: ${errorMessage}`);
    return;
  }
  if (!req.body.message) {
    const errorMessage = 'invalid Pub/Sub message format';
    res.status(400).send(`Bad Request: ${errorMessage}`);
    console.log(`Bad Request: ${errorMessage}`);
    return;
  }
  const pubSubMessage = req.body.message;
  const name = pubSubMessage.data
    ? Buffer.from(pubSubMessage.data, 'base64').toString().trim()
    : 'World';

  const result = `Hello, ${name}! ID: ${req.get('ce-id') || ''}`;
  console.log(result);
  res.send(result);
});

module.exports = app;

Go


// Sample run-events-pubsub is a Cloud Run service which handles Pub/Sub messages.
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
)

// PubSubMessage is the payload of a Pub/Sub event.
type PubSubMessage struct {
	Message struct {
		Data []byte `json:"data,omitempty"`
		ID   string `json:"id"`
	} `json:"message"`
	Subscription string `json:"subscription"`
}

// HelloEventsPubSub receives and processes a Pub/Sub push message.
func HelloEventsPubSub(w http.ResponseWriter, r *http.Request) {
	var e PubSubMessage
	if err := json.NewDecoder(r.Body).Decode(&e); err != nil {
		http.Error(w, "Bad HTTP Request", http.StatusBadRequest)
		log.Printf("Bad HTTP Request: %v", http.StatusBadRequest)
		return
	}
	name := string(e.Message.Data)
	if name == "" {
		name = "World"
	}
	s := fmt.Sprintf("Hello, %s! ID: %s", name, string(r.Header.Get("Ce-Id")))
	log.Printf(s)
	fmt.Fprintln(w, s)
}

func main() {
	http.HandleFunc("/", HelloEventsPubSub)
	// Determine port for HTTP service.
	port := os.Getenv("PORT")
	if port == "" {
		port = "8080"
		log.Printf("Defaulting to port %s", port)
	}
	// Start HTTP server.
	log.Printf("Listening on port %s", port)
	if err := http.ListenAndServe(":"+port, nil); err != nil {
		log.Fatal(err)
	}
}

C#


using CloudNative.CloudEvents;
using Google.Events;
using Google.Events.Protobuf.Cloud.PubSub.V1;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
    }

    public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILogger<Startup> logger)
    {
        if (env.IsDevelopment())
        {
            app.UseDeveloperExceptionPage();
        }

        logger.LogInformation("Service is starting...");

        app.UseRouting();

        app.UseEndpoints(endpoints =>
        {
            endpoints.MapPost("/", async context =>
            {
                var cloudEvent = await context.Request.ReadCloudEventAsync();
                logger.LogInformation("Received CloudEvent\n" + GetEventLog(cloudEvent));

                var messagePublishedData = CloudEventConverters.ConvertCloudEventData<MessagePublishedData>(cloudEvent);
                var pubSubMessage = messagePublishedData.Message;
                if (pubSubMessage == null)
                {
                    context.Response.StatusCode = 400;
                    await context.Response.WriteAsync("Bad request: Invalid Pub/Sub message format");
                    return;
                }

                var data = pubSubMessage.Data;
                logger.LogInformation($"Data: {data.ToBase64()}");

                var name = data.ToStringUtf8();
                logger.LogInformation($"Extracted name: {name}");

                var id = context.Request.Headers["ce-id"];
                await context.Response.WriteAsync($"Hello {name}! ID: {id}");
            });
        });
    }

    private string GetEventLog(CloudEvent cloudEvent)
    {
        return $"ID: {cloudEvent.Id}\n"
            + $"Source: {cloudEvent.Source}\n"
            + $"Type: {cloudEvent.Type}\n"
            + $"Subject: {cloudEvent.Subject}\n"
            + $"DataSchema: {cloudEvent.DataSchema}\n"
            + $"DataContentType: {cloudEvent.DataContentType}\n"
            + $"Time: {cloudEvent.Time?.ToUniversalTime():yyyy-MM-dd'T'HH:mm:ss.fff'Z'}\n"
            + $"SpecVersion: {cloudEvent.SpecVersion}\n"
            + $"Data: {cloudEvent.Data}";
    }
}

What's next