Create an event handler that receives and processes a Pub/Sub message event
Stay organized with collections
Save and categorize content based on your preferences.
Creates an event handler that receives and processes a Pub/Sub message event.
Explore further
For detailed documentation that includes this code sample, see the following:
Code sample
Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see the Google Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
[[["Easy to understand","easyToUnderstand","thumb-up"],["Solved my problem","solvedMyProblem","thumb-up"],["Other","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["Other","otherDown","thumb-down"]],[],[[["\u003cp\u003eThis code creates an event handler for receiving and processing Pub/Sub message events.\u003c/p\u003e\n"],["\u003cp\u003eThe provided examples demonstrate how to handle Pub/Sub messages in C#, Go, Java, Node.js, Python, and Ruby.\u003c/p\u003e\n"],["\u003cp\u003eThe code extracts data from the Pub/Sub message, decodes it if necessary, and uses the data to create a response.\u003c/p\u003e\n"],["\u003cp\u003eEach language sample shows how to access the message data and the "ce-id" header for event identification.\u003c/p\u003e\n"],["\u003cp\u003eTo use these code snippets, you need to set up Application Default Credentials for authentication with Eventarc.\u003c/p\u003e\n"]]],[],null,["Creates an event handler that receives and processes a Pub/Sub message event.\n\nExplore further\n\n\nFor detailed documentation that includes this code sample, see the following:\n\n- [Develop event receivers](/eventarc/standard/docs/run/event-receivers)\n\nCode sample \n\nC#\n\n\nTo authenticate to Eventarc, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n\n using CloudNative.CloudEvents;\n using CloudNative.CloudEvents.AspNetCore;\n using Google.Events.Protobuf.Cloud.PubSub.V1;\n using Microsoft.AspNetCore.Builder;\n using Microsoft.AspNetCore.Hosting;\n using Microsoft.AspNetCore.Http;\n using https://cloud.google.com/dotnet/docs/reference/Google.Cloud.Monitoring.V3/latest/Microsoft.Extensions.DependencyInjection.html;\n using Microsoft.Extensions.Hosting;\n using Microsoft.Extensions.Logging;\n\n public class Startup\n {\n public void ConfigureServices(IServiceCollection services)\n {\n }\n\n public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILogger\u003cStartup\u003e logger)\n {\n if (env.IsDevelopment())\n {\n app.UseDeveloperExceptionPage();\n }\n\n logger.LogInformation(\"Service is starting...\");\n\n app.UseRouting();\n\n app.UseEndpoints(endpoints =\u003e\n {\n endpoints.MapPost(\"/\", async context =\u003e\n {\n var formatter = CloudEventFormatterAttribute.CreateFormatter(typeof(MessagePublishedData));\n var cloudEvent = await context.Request.ToCloudEventAsync(formatter);\n logger.LogInformation(\"Received CloudEvent\\n\" + GetEventLog(cloudEvent));\n\n var messagePublishedData = (MessagePublishedData) cloudEvent.Data;\n var pubSubMessage = messagePublishedData.Message;\n if (pubSubMessage == null)\n {\n context.Response.StatusCode = 400;\n await context.Response.WriteAsync(\"Bad request: Invalid Pub/Sub message format\");\n return;\n }\n\n var data = pubSubMessage.Data;\n logger.LogInformation($\"Data: {data.ToBase64()}\");\n\n var name = data.ToStringUtf8();\n logger.LogInformation($\"Extracted name: {name}\");\n\n var id = context.Request.Headers[\"ce-id\"];\n await context.Response.WriteAsync($\"Hello {name}! ID: {id}\");\n });\n });\n }\n\n private string GetEventLog(CloudEvent cloudEvent)\n {\n return $\"ID: {cloudEvent.Id}\\n\"\n + $\"Source: {cloudEvent.Source}\\n\"\n + $\"Type: {cloudEvent.Type}\\n\"\n + $\"Subject: {cloudEvent.Subject}\\n\"\n + $\"DataSchema: {cloudEvent.DataSchema}\\n\"\n + $\"DataContentType: {cloudEvent.DataContentType}\\n\"\n + $\"Time: {cloudEvent.Time?.UtcDateTime:yyyy-MM-dd'T'HH:mm:ss.fff'Z'}\\n\"\n + $\"SpecVersion: {cloudEvent.SpecVersion}\\n\"\n + $\"Data: {cloudEvent.Data}\";\n }\n }\n\nGo\n\n\nTo authenticate to Eventarc, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n\n // Sample pubsub is a Cloud Run service which handles Pub/Sub messages.\n package main\n\n import (\n \t\"encoding/json\"\n \t\"fmt\"\n \t\"log\"\n \t\"net/http\"\n \t\"os\"\n )\n\n // PubSubMessage is the payload of a Pub/Sub event.\n // See the documentation for more details:\n // https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage\n type PubSubMessage struct {\n \tMessage struct {\n \t\tData []byte `json:\"data,omitempty\"`\n \t\tID string `json:\"id\"`\n \t} `json:\"message\"`\n \tSubscription string `json:\"subscription\"`\n }\n\n // HelloEventsPubSub receives and processes a Pub/Sub push message.\n func HelloEventsPubSub(w http.ResponseWriter, r *http.Request) {\n \tvar e PubSubMessage\n \tif err := json.NewDecoder(r.Body).Decode(&e); err != nil {\n \t\thttp.Error(w, \"Bad HTTP Request\", http.StatusBadRequest)\n \t\tlog.Printf(\"Bad HTTP Request: %v\", http.StatusBadRequest)\n \t\treturn\n \t}\n \tname := string(e.Message.Data)\n \tif name == \"\" {\n \t\tname = \"World\"\n \t}\n \ts := fmt.Sprintf(\"Hello, %s! ID: %s\", name, string(r.Header.Get(\"Ce-Id\")))\n \tlog.Printf(s)\n \tfmt.Fprintln(w, s)\n }\n\nJava\n\n\nTo authenticate to Eventarc, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import com.example.cloudrun.eventpojos.PubSubBody;\n import java.util.Base64;\n import java.util.Map;\n import org.apache.commons.lang3.StringUtils;\n import org.springframework.http.HttpStatus;\n import org.springframework.http.ResponseEntity;\n import org.springframework.web.bind.annotation.RequestBody;\n import org.springframework.web.bind.annotation.RequestHeader;\n import org.springframework.web.bind.annotation.RequestMapping;\n import org.springframework.web.bind.annotation.RequestMethod;\n import org.springframework.web.bind.annotation.RestController;\n\n @RestController\n public class EventController {\n @RequestMapping(value = \"/\", method = RequestMethod.POST)\n public ResponseEntity\u003cString\u003e receiveMessage(\n @RequestBody PubSubBody body, @RequestHeader Map\u003cString, String\u003e headers) {\n // Get PubSub message from request body.\n PubSubBody.PubSubMessage message = body.getMessage();\n if (message == null) {\n String msg = \"No Pub/Sub message received.\";\n System.out.println(msg);\n return new ResponseEntity\u003c\u003e(msg, HttpStatus.BAD_REQUEST);\n }\n\n String data = message.getData();\n if (data == null || data.isEmpty()) {\n String msg = \"Invalid Pub/Sub message format.\";\n System.out.println(msg);\n return new ResponseEntity\u003c\u003e(msg, HttpStatus.BAD_REQUEST);\n }\n\n String name =\n !StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : \"World\";\n String ceId = headers.getOrDefault(\"ce-id\", \"\");\n String msg = String.format(\"Hello, %s! ID: %s\", name, ceId);\n System.out.println(msg);\n return new ResponseEntity\u003c\u003e(msg, HttpStatus.OK);\n }\n }\n\nNode.js\n\n\nTo authenticate to Eventarc, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n const express = require('express');\n const {\n toMessagePublishedData,\n } = require('@google/events/cloud/pubsub/v1/MessagePublishedData');\n const app = express();\n\n app.use(express.json());\n app.post('/', (req, res) =\u003e {\n if (!req.body) {\n const errorMessage = 'no Pub/Sub message received';\n res.status(400).send(`Bad Request: ${errorMessage}`);\n console.log(`Bad Request: ${errorMessage}`);\n return;\n }\n if (!req.body.message) {\n const errorMessage = 'invalid Pub/Sub message format';\n res.status(400).send(`Bad Request: ${errorMessage}`);\n console.log(`Bad Request: ${errorMessage}`);\n return;\n }\n // Cast to MessagePublishedEvent for IDE autocompletion\n const pubSubMessage = toMessagePublishedData(req.body);\n const name =\n pubSubMessage.message && pubSubMessage.message.data\n ? Buffer.from(pubSubMessage.message.data, 'base64').toString().trim()\n : 'World';\n\n const result = `Hello, ${name}! ID: ${req.get('ce-id') || ''}`;\n console.log(result);\n res.send(result);\n });\n\n module.exports = app;\n\nPython\n\n\nTo authenticate to Eventarc, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n @app.route(\"/\", methods=[\"POST\"])\n def index():\n data = request.get_json()\n if not data:\n msg = \"no Pub/Sub message received\"\n print(f\"error: {msg}\")\n return f\"Bad Request: {msg}\", 400\n\n if not isinstance(data, dict) or \"message\" not in data:\n msg = \"invalid Pub/Sub message format\"\n print(f\"error: {msg}\")\n return f\"Bad Request: {msg}\", 400\n\n pubsub_message = data[\"message\"]\n\n name = \"World\"\n if isinstance(pubsub_message, dict) and \"data\" in pubsub_message:\n name = base64.b64decode(pubsub_message[\"data\"]).decode(\"utf-8\").strip()\n\n resp = f\"Hello, {name}! ID: {request.headers.get('ce-id')}\"\n print(resp)\n\n return (resp, 200)\n\nRuby\n\n\nTo authenticate to Eventarc, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n post \"/\" do\n request.body.rewind # in case someone already read it\n\n body = JSON.parse request.body.read\n data = Base64.decode64 body[\"message\"][\"data\"]\n if data.empty?\n data = \"World\"\n end\n id = request.env[\"HTTP_CE_ID\"]\n if request.has_header? \"ce-id\"\n id = request.get_header \"ce-id\"\n end\n\n result = \"Hello #{data}! ID: #{id}\"\n puts result\n result\n end\n\nWhat's next\n\n\nTo search and filter code samples for other Google Cloud products, see the\n[Google Cloud sample browser](/docs/samples?product=eventarc)."]]