イベント レシーバの開発

このページでは、Cloud Run for Anthos on Google Cloud にデプロイできるイベント レシーバ サービスの作成方法を説明します。

Cloud Run for Anthos サービスは、CloudEvents 標準を使用するすべてのソースからイベントを受信できます。Events for Cloud Run for Anthos は、ネイティブとカスタムのイベントソースをサポートします。

ネイティブなイベントソースとしては次のものがあります。

カスタム イベント ソースのトリガーは、Ce-Type: HTTP ヘッダーの値に基づいて、任意のソースからの HTTP リクエストをフィルタリングできます。

CloudEvent 形式

サービスは、CloudEvents バージョン 1.0 HTTP プロトコル バインディング仕様の形式で HTTP POST リクエストとしてイベント ルーターからイベントを受信します。各 HTTP リクエストにはイベント固有のヘッダーと本文が含まれ、サービスへの POST リクエストによりルートパス(/)に送信されます。

各イベントには次の CloudEvent HTTP 標準ヘッダーがあります。

ヘッダー 説明
ce-id イベントの一意の識別子。 1096434104173400
ce-source イベントのソースを識別します。 //pubsub.googleapis.com/projects/serverless-com-demo/topics/my-topic
ce-specversion このイベントに使用された CloudEvent 仕様のバージョン。 1.0
ce-type イベントデータのタイプ。 google.cloud.pubsub.topic.v1.messagePublished
ce-time(省略可) イベントの生成時間(RFC 3339 形式)。 2020-12-20T13:37:33.647Z

イベントの HTTP 本文はすべて CloudEvents リポジトリにあります。リポジトリには、JSON スキーマとサンプルと一緒にイベントのプロトコル バッファが含まれます。

イベントの HTTP ヘッダー

各ソースのイベントには次の HTTP ヘッダーが追加されます。

Cloud Audit Logs

HTTP ヘッダー 説明
ce-id (例)projects/YOUR-PROJECT/logs/cloudaudit.googleapis.com%2Factivity50dhyee5b2531586289185422687 イベントが生成されたときにサーバーによって割り当てられたこのメッセージの一意の ID。
ce-source //storage.googleapis.com/projects/YOUR-PROJECT プロジェクトを含む文字列。
ce-specversion 1.0 このイベントに使用された仕様のバージョン。
ce-type google.events.cloud.audit.v1 CloudEvent タイプ定数。
ce-time (例)2020-10-08T17:57:33.647Z メッセージの送信時刻。
ce-dataschema type.googleapis.com/google.logging.v2.LogEntry HTTP POST データのスキーマ。
ce-subject storage.googleapis.com/projects/_/buckets/single-region-bucket GCS バケットのロケーションに関するメタデータ。

Cloud Storage

HTTP ヘッダー 説明
ce-id (例)1096434104173400 イベントが生成されたときにサーバーによって割り当てられたこのメッセージの一意の ID。
ce-source //storage.googleapis.com/projects/_/buckets/BUCKET-NAME Cloud Storage バケットを含む文字列。
ce-specversion 1.0 このイベントに使用された仕様のバージョン。
ce-type google.cloud.storage.object.v1.finalized
google.cloud.storage.object.v1.archived
google.cloud.storage.object.v1.deleted
google.cloud.storage.object.v1.metadataUpdated
CloudEvent タイプ定数。
ce-time (例)2020-10-08T17:57:33.647Z メッセージの送信時刻。
ce-dataschema https://raw.githubusercontent.com/googleapis/google-cloudevents/master/proto/google/events/cloud/storage/v1/data.proto HTTP POST データのスキーマ。
ce-subject objects/OBJECT_NAME GCS バケットのロケーションに関するメタデータ。

Cloud Scheduler

HTTP ヘッダー 説明
ce-id (例)1096434104173400 イベントが生成されたときにサーバーによって割り当てられたこのメッセージの一意の ID。
ce-source //cloudscheduler.googleapis.com/JOB-NAME Cloud Scheduler ジョブ名を含む文字列。
ce-specversion 1.0 このイベントに使用された仕様のバージョン。
ce-type google.cloud.scheduler.job.v1.executed CloudEvent タイプ定数。
ce-time (例)2020-10-08T17:57:33.647Z メッセージの送信時刻。
ce-dataschema https://raw.githubusercontent.com/googleapis/google-cloudevents/master/proto/google/events/cloud/scheduler/v1/data.proto HTTP POST データのスキーマ。
ce-subject なし Cloud Scheduler ジョブに関するメタデータ

Pub/Sub

HTTP ヘッダー 説明
ce-id (例)1096434104173400 イベントが生成されたときにサーバーによって割り当てられたこのメッセージの一意の ID。
ce-source //pubsub.googleapis.com/projects/YOUR-PROJECT/topics/YOUR-TOPIC プロジェクトとトピックを含む文字列。
ce-specversion 1.0 このイベントに使用された仕様のバージョン。
ce-type google.events.cloud.pubsub.v1 CloudEvent タイプ定数。
ce-time (例)2020-10-08T17:57:33.647Z メッセージの送信時刻。

イベントの HTTP 本文

リクエストの HTTP 本文は次のようになります。

Cloud Audit Logs

{
"insertId": "9frck8cf9j",
"logName": "projects/test-project/logs/cloudaudit.googleapis.com%2Factivity",
"protoPayload": {
  "@type": "type.googleapis.com/google.cloud.audit.AuditLog",
  "authenticationInfo": {
    "principalEmail": "robot@test-project.iam.gserviceaccount.com",
    "principalSubject": "user:robot@test-project.iam.gserviceaccount.com",
    "serviceAccountKeyName": "//iam.googleapis.com/projects/test-project/serviceAccounts/robot@test-project.iam.gserviceaccount.com/keys/90f662482321f1ca8e82ea675b1a1c30c1fe681f"
  },
  "authorizationInfo": [
    {
      "granted": true,
      "permission": "pubsub.topics.create",
      "resource": "projects/test-project",
      "resourceAttributes": {}
    }
  ],
  "methodName": "google.pubsub.v1.Publisher.CreateTopic",
  "request": {
    "@type": "type.googleapis.com/google.pubsub.v1.Topic",
    "name": "projects/test-project/topics/test-auditlogs-source"
  },
  "requestMetadata": {
    "callerIp": "192.168.0.1",
    "callerNetwork": "//compute.googleapis.com/projects/google.com:my-laptop/global/networks/__unknown__",
    "callerSuppliedUserAgent": "google-cloud-sdk",
    "destinationAttributes": {},
    "requestAttributes": {
      "auth": {},
      "time": "2020-06-30T16:14:47.600710407Z"
    }
  },
  "resourceLocation": {
    "currentLocations": [
      "asia-east1",
      "asia-northeast1",
      "asia-southeast1",
      "australia-southeast1",
      "europe-north1",
      "europe-west1",
      "europe-west2",
      "europe-west3",
      "europe-west4",
      "us-central1",
      "us-central2",
      "us-east1",
      "us-east4",
      "us-west1",
      "us-west2",
      "us-west3",
      "us-west4"
    ]
  },
  "resourceName": "projects/test-project/topics/test-auditlogs-source",
  "response": {
    "@type": "type.googleapis.com/google.pubsub.v1.Topic",
    "messageStoragePolicy": {
      "allowedPersistenceRegions": [
        "asia-east1",
        "asia-northeast1",
        "asia-southeast1",
        "australia-southeast1",
        "europe-north1",
        "europe-west1",
        "europe-west2",
        "europe-west3",
        "europe-west4",
        "us-central1",
        "us-central2",
        "us-east1",
        "us-east4",
        "us-west1",
        "us-west2",
        "us-west3",
        "us-west4"
      ]
    },
    "name": "projects/test-project/topics/test-auditlogs-source"
  },
  "serviceName": "pubsub.googleapis.com"
},
"receiveTimestamp": "2020-06-30T16:14:48.401489148Z",
"resource": {
  "labels": {
    "project_id": "test-project",
    "topic_id": "projects/test-project/topics/test-auditlogs-source"
  },
  "type": "pubsub_topic"
},
"severity": "NOTICE",
"timestamp": "2020-06-30T16:14:47.593398572Z"
}

Cloud Storage

{
"kind": "storage#object",
"id": "BUCKET-NAME/OBJECT-NAME/GENERATION",
"selfLink": "https://www.googleapis.com/storage/v1/b/BUCKET-NAME/o/OBJECT-NAME.txt",
"name": "OBJECT-NAME",
"bucket": "BUCKET-NAME",
"generation": "1593534371944198",
"metageneration": "1",
"contentType": "text/plain",
"timeCreated": "2020-09-11T23:55:39.472Z",
"updated": "2020-09-11T23:55:39.472Z",
"storageClass": "STANDARD",
"timeStorageClassUpdated": "2020-09-11T23:55:39.472Z",
"size": "FILESIZE",
"md5Hash": "lc2/e7x2XJ4BnKelDsjAjw==",
"mediaLink": "https://www.googleapis.com/download/storage/v1/b/BUCKET-NAME/o/OBJECT-NAME?generation=GENERATION&alt=media",
"contentLanguage": "en",
"crc32c": "lPCM8g==",
"etag": "CLOV7t+m4usCEAE="
}

Cloud Scheduler

{
"custom_data": "c2NoZWR1bGVyIGN1c3RvbSBkYXRh" // base64 encoded "scheduler custom data"
}

Pub/Sub

{
"subscription": "cre-src_rc3_source-for-knativegcp-test-pubsub-tr_fcdf7716-c4bd-43b9-8ccc-e6e8ff848cd4",
"message": {
  "messageId": "1314133748793931",
  "data": "eyJIZWxsbyI6ICJ3b3JsZCJ9", // base64 encoded '{"Hello": "world"}'
  "publishTime": "2020-06-30T16:32:57.012Z"
}
}

イベント レシーバのレスポンス

レシーバ サービスが HTTP 2xx レスポンスを送信し、ルーターにイベントの受信成功を通知します。他の HTTP レスポンスはすべて配信エラーとして処理され、イベントが再送信されます。

CloudEvents SDK ライブラリの使用

イベント レシーバ サービスは、CloudEvents SDK ライブラリを使用して作成できます。このライブラリは次の言語で利用できます。

これはオープンソースのライブラリです。HTTP リクエストを言語固有の CloudEvent オブジェクトに簡単に変換できます。

サンプルのレシーバ ソースコード

このサンプルコードは、Cloud Run for Anthos on Google Cloud にデプロイされたサービスで Pub/Sub イベントを読み取る方法を示しています。

Python

@app.route('/', methods=['POST'])
def index():
    data = request.get_json()
    if not data:
        msg = 'no Pub/Sub message received'
        print(f'error: {msg}')
        return f'Bad Request: {msg}', 400

    if not isinstance(data, dict) or 'message' not in data:
        msg = 'invalid Pub/Sub message format'
        print(f'error: {msg}')
        return f'Bad Request: {msg}', 400

    pubsub_message = data['message']

    name = 'World'
    if isinstance(pubsub_message, dict) and 'data' in pubsub_message:
        name = base64.b64decode(pubsub_message['data']).decode('utf-8').strip()

    resp = f"Hello, {name}! ID: {request.headers.get('ce-id')}"
    print(resp)

    return (resp, 200)

Java

import com.example.cloudrun.eventpojos.PubSubBody;
import java.util.Base64;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class EventController {
  @RequestMapping(value = "/", method = RequestMethod.POST)
  public ResponseEntity<String> receiveMessage(
      @RequestBody PubSubBody body, @RequestHeader Map<String, String> headers) {
    // Get PubSub message from request body.
    PubSubBody.PubSubMessage message = body.getMessage();
    if (message == null) {
      String msg = "No Pub/Sub message received.";
      System.out.println(msg);
      return new ResponseEntity<String>(msg, HttpStatus.BAD_REQUEST);
    }

    String data = message.getData();
    if (data == null || data.isEmpty()) {
      String msg = "Invalid Pub/Sub message format.";
      System.out.println(msg);
      return new ResponseEntity<String>(msg, HttpStatus.BAD_REQUEST);
    }

    String name =
        !StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : "World";
    String ceId = headers.getOrDefault("ce-id", "");
    String msg = String.format("Hello, %s! ID: %s", name, ceId);
    System.out.println(msg);
    return new ResponseEntity<String>(msg, HttpStatus.OK);
  }
}

Node.js

const express = require('express');
const {
  toMessagePublishedData,
} = require('@google/events/cloud/pubsub/v1/MessagePublishedData');
const app = express();

app.use(express.json());
app.post('/', (req, res) => {
  if (!req.body) {
    const errorMessage = 'no Pub/Sub message received';
    res.status(400).send(`Bad Request: ${errorMessage}`);
    console.log(`Bad Request: ${errorMessage}`);
    return;
  }
  if (!req.body.message) {
    const errorMessage = 'invalid Pub/Sub message format';
    res.status(400).send(`Bad Request: ${errorMessage}`);
    console.log(`Bad Request: ${errorMessage}`);
    return;
  }
  // Cast to MessagePublishedEvent for IDE autocompletion
  const pubSubMessage = toMessagePublishedData(req.body);
  const name =
    pubSubMessage.message && pubSubMessage.message.data
      ? Buffer.from(pubSubMessage.message.data, 'base64').toString().trim()
      : 'World';

  const result = `Hello, ${name}! ID: ${req.get('ce-id') || ''}`;
  console.log(result);
  res.send(result);
});

module.exports = app;

Go


// Sample pubsub is a Cloud Run service which handles Pub/Sub messages.
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
)

// PubSubMessage is the payload of a Pub/Sub event.
type PubSubMessage struct {
	Message struct {
		Data []byte `json:"data,omitempty"`
		ID   string `json:"id"`
	} `json:"message"`
	Subscription string `json:"subscription"`
}

// HelloEventsPubSub receives and processes a Pub/Sub push message.
func HelloEventsPubSub(w http.ResponseWriter, r *http.Request) {
	var e PubSubMessage
	if err := json.NewDecoder(r.Body).Decode(&e); err != nil {
		http.Error(w, "Bad HTTP Request", http.StatusBadRequest)
		log.Printf("Bad HTTP Request: %v", http.StatusBadRequest)
		return
	}
	name := string(e.Message.Data)
	if name == "" {
		name = "World"
	}
	s := fmt.Sprintf("Hello, %s! ID: %s", name, string(r.Header.Get("Ce-Id")))
	log.Printf(s)
	fmt.Fprintln(w, s)
}

C#


using CloudNative.CloudEvents;
using Google.Events;
using Google.Events.Protobuf.Cloud.PubSub.V1;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
    }

    public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILogger<Startup> logger)
    {
        if (env.IsDevelopment())
        {
            app.UseDeveloperExceptionPage();
        }

        logger.LogInformation("Service is starting...");

        app.UseRouting();

        app.UseEndpoints(endpoints =>
        {
            endpoints.MapPost("/", async context =>
            {
                var cloudEvent = await context.Request.ReadCloudEventAsync();
                logger.LogInformation("Received CloudEvent\n" + GetEventLog(cloudEvent));

                var messagePublishedData = CloudEventConverters.ConvertCloudEventData<MessagePublishedData>(cloudEvent);
                var pubSubMessage = messagePublishedData.Message;
                if (pubSubMessage == null)
                {
                    context.Response.StatusCode = 400;
                    await context.Response.WriteAsync("Bad request: Invalid Pub/Sub message format");
                    return;
                }

                var data = pubSubMessage.Data;
                logger.LogInformation($"Data: {data.ToBase64()}");

                var name = data.ToStringUtf8();
                logger.LogInformation($"Extracted name: {name}");

                var id = context.Request.Headers["ce-id"];
                await context.Response.WriteAsync($"Hello {name}! ID: {id}");
            });
        });
    }

    private string GetEventLog(CloudEvent cloudEvent)
    {
        return $"ID: {cloudEvent.Id}\n"
            + $"Source: {cloudEvent.Source}\n"
            + $"Type: {cloudEvent.Type}\n"
            + $"Subject: {cloudEvent.Subject}\n"
            + $"DataSchema: {cloudEvent.DataSchema}\n"
            + $"DataContentType: {cloudEvent.DataContentType}\n"
            + $"Time: {cloudEvent.Time?.ToUniversalTime():yyyy-MM-dd'T'HH:mm:ss.fff'Z'}\n"
            + $"SpecVersion: {cloudEvent.SpecVersion}\n"
            + $"Data: {cloudEvent.Data}";
    }
}

次のステップ