이벤트 수신자 개발

이 페이지에서는 Cloud Run for Anthos on Google Cloud에 배포할 수 있는 이벤트 수신자 서비스를 만드는 방법을 보여줍니다.

Cloud Run for Anthos 서비스는 CloudEvents 표준을 사용하는 모든 소스에서 이벤트를 수신할 수 있습니다. Cloud Run for Anthos 이벤트는 네이티브 및 커스텀 이벤트 소스를 지원합니다.

네이티브 이벤트 소스는 다음과 같습니다.

커스텀 이벤트 소스 트리거는 Ce-Type: HTTP 헤더 값을 기준으로 모든 소스의 HTTP 요청을 필터링할 수 있습니다.

CloudEvent 형식

서비스는 이벤트 라우터의 이벤트를 CloudEvents 버전 1.0 HTTP 프로토콜 바인딩 사양으로 형식이 지정된 HTTP POST 요청으로 수신합니다. 각 HTTP 요청에는 이벤트별 헤더와 본문이 포함되며, 이러한 요청은 서비스에 대한 POST 요청을 사용하여 루트 경로(/)로 전송됩니다.

각 이벤트에는 다음과 같은 표준 CloudEvent HTTP 헤더가 있습니다.

헤더 설명 예시
ce-id 이벤트의 고유 식별자입니다. 1096434104173400
ce-source 이벤트의 소스를 식별합니다. //pubsub.googleapis.com/projects/serverless-com-demo/topics/my-topic
ce-specversion 이 이벤트에 사용된 CloudEvent 사양 버전입니다. 1.0
ce-type 이벤트 데이터의 유형입니다. google.cloud.pubsub.topic.v1.messagePublished
ce-time(선택사항) 이벤트 생성 시간(RFC 3339 형식) 2020-12-20T13:37:33.647Z

모든 이벤트의 HTTP 본문은 CloudEvents 저장소에 있습니다. 저장소에는 JSON 스키마 및 예시와 함께 이벤트의 프로토콜 버퍼가 포함됩니다.

이벤트 HTTP 헤더

다음 HTTP 헤더는 각 소스의 이벤트에 추가됩니다.

Cloud 감사 로그

HTTP 헤더 설명
ce-id (예시) projects/YOUR-PROJECT/logs/cloudaudit.googleapis.com%2Factivity50dhyee5b2531586289185422687 이 메시지의 고유 ID로, 이벤트가 생성될 때 서버에서 할당됩니다.
ce-source //storage.googleapis.com/projects/YOUR-PROJECT 프로젝트가 포함된 문자열입니다.
ce-specversion 1.0 이 이벤트에 사용되는 사양 버전입니다.
ce-type google.events.cloud.audit.v1 CloudEvent 유형 상수입니다.
ce-time (예시) 2020-10-08T17:57:33.647Z 메시지가 전송된 시간입니다.
ce-dataschema type.googleapis.com/google.logging.v2.LogEntry HTTP POST 데이터의 스키마입니다.
ce-subject storage.googleapis.com/projects/_/buckets/single-region-bucket GCS 버킷 위치에 대한 메타데이터입니다.

Cloud Storage

HTTP 헤더 설명
ce-id (예시) 1096434104173400 이 메시지의 고유 ID로, 이벤트가 생성될 때 서버에서 할당됩니다.
ce-source //storage.googleapis.com/projects/_/buckets/BUCKET-NAME Cloud Storage 버킷이 포함된 문자열입니다.
ce-specversion 1.0 이 이벤트에 사용되는 사양 버전입니다.
ce-type google.cloud.storage.object.v1.finalized
google.cloud.storage.object.v1.archived
google.cloud.storage.object.v1.deleted
google.cloud.storage.object.v1.metadataUpdated
CloudEvent 유형 상수입니다.
ce-time (예시) 2020-10-08T17:57:33.647Z 메시지가 전송된 시간입니다.
ce-dataschema https://raw.githubusercontent.com/googleapis/google-cloudevents/master/proto/google/events/cloud/storage/v1/data.proto HTTP POST 데이터의 스키마입니다.
ce-subject objects/OBJECT_NAME GCS 버킷 위치에 대한 메타데이터입니다.

Cloud Scheduler

HTTP 헤더 설명
ce-id (예시) 1096434104173400 이 메시지의 고유 ID로, 이벤트가 생성될 때 서버에서 할당됩니다.
ce-source //cloudscheduler.googleapis.com/JOB-NAME Cloud Scheduler 작업 이름이 포함된 문자열입니다.
ce-specversion 1.0 이 이벤트에 사용되는 사양 버전입니다.
ce-type google.cloud.scheduler.job.v1.executed CloudEvent 유형 상수입니다.
ce-time (예시) 2020-10-08T17:57:33.647Z 메시지가 전송된 시간입니다.
ce-dataschema https://raw.githubusercontent.com/googleapis/google-cloudevents/master/proto/google/events/cloud/scheduler/v1/data.proto HTTP POST 데이터의 스키마입니다.
ce-subject 해당 없음 Cloud Scheduler 작업에 대한 메타데이터

Pub/Sub

HTTP 헤더 설명
ce-id (예시) 1096434104173400 이 메시지의 고유 ID로, 이벤트가 생성될 때 서버에서 할당됩니다.
ce-source //pubsub.googleapis.com/projects/YOUR-PROJECT/topics/YOUR-TOPIC 프로젝트와 주제를 포함하는 문자열입니다.
ce-specversion 1.0 이 이벤트에 사용되는 사양 버전입니다.
ce-type google.events.cloud.pubsub.v1 CloudEvent 유형 상수입니다.
ce-time (예시) 2020-10-08T17:57:33.647Z 메시지가 전송된 시간입니다.

이벤트 HTTP 본문

요청의 HTTP 본문은 다음과 비슷합니다.

Cloud 감사 로그

{
"insertId": "9frck8cf9j",
"logName": "projects/test-project/logs/cloudaudit.googleapis.com%2Factivity",
"protoPayload": {
  "@type": "type.googleapis.com/google.cloud.audit.AuditLog",
  "authenticationInfo": {
    "principalEmail": "robot@test-project.iam.gserviceaccount.com",
    "principalSubject": "user:robot@test-project.iam.gserviceaccount.com",
    "serviceAccountKeyName": "//iam.googleapis.com/projects/test-project/serviceAccounts/robot@test-project.iam.gserviceaccount.com/keys/90f662482321f1ca8e82ea675b1a1c30c1fe681f"
  },
  "authorizationInfo": [
    {
      "granted": true,
      "permission": "pubsub.topics.create",
      "resource": "projects/test-project",
      "resourceAttributes": {}
    }
  ],
  "methodName": "google.pubsub.v1.Publisher.CreateTopic",
  "request": {
    "@type": "type.googleapis.com/google.pubsub.v1.Topic",
    "name": "projects/test-project/topics/test-auditlogs-source"
  },
  "requestMetadata": {
    "callerIp": "192.168.0.1",
    "callerNetwork": "//compute.googleapis.com/projects/google.com:my-laptop/global/networks/__unknown__",
    "callerSuppliedUserAgent": "google-cloud-sdk",
    "destinationAttributes": {},
    "requestAttributes": {
      "auth": {},
      "time": "2020-06-30T16:14:47.600710407Z"
    }
  },
  "resourceLocation": {
    "currentLocations": [
      "asia-east1",
      "asia-northeast1",
      "asia-southeast1",
      "australia-southeast1",
      "europe-north1",
      "europe-west1",
      "europe-west2",
      "europe-west3",
      "europe-west4",
      "us-central1",
      "us-central2",
      "us-east1",
      "us-east4",
      "us-west1",
      "us-west2",
      "us-west3",
      "us-west4"
    ]
  },
  "resourceName": "projects/test-project/topics/test-auditlogs-source",
  "response": {
    "@type": "type.googleapis.com/google.pubsub.v1.Topic",
    "messageStoragePolicy": {
      "allowedPersistenceRegions": [
        "asia-east1",
        "asia-northeast1",
        "asia-southeast1",
        "australia-southeast1",
        "europe-north1",
        "europe-west1",
        "europe-west2",
        "europe-west3",
        "europe-west4",
        "us-central1",
        "us-central2",
        "us-east1",
        "us-east4",
        "us-west1",
        "us-west2",
        "us-west3",
        "us-west4"
      ]
    },
    "name": "projects/test-project/topics/test-auditlogs-source"
  },
  "serviceName": "pubsub.googleapis.com"
},
"receiveTimestamp": "2020-06-30T16:14:48.401489148Z",
"resource": {
  "labels": {
    "project_id": "test-project",
    "topic_id": "projects/test-project/topics/test-auditlogs-source"
  },
  "type": "pubsub_topic"
},
"severity": "NOTICE",
"timestamp": "2020-06-30T16:14:47.593398572Z"
}

Cloud Storage

{
"kind": "storage#object",
"id": "BUCKET-NAME/OBJECT-NAME/GENERATION",
"selfLink": "https://www.googleapis.com/storage/v1/b/BUCKET-NAME/o/OBJECT-NAME.txt",
"name": "OBJECT-NAME",
"bucket": "BUCKET-NAME",
"generation": "1593534371944198",
"metageneration": "1",
"contentType": "text/plain",
"timeCreated": "2020-09-11T23:55:39.472Z",
"updated": "2020-09-11T23:55:39.472Z",
"storageClass": "STANDARD",
"timeStorageClassUpdated": "2020-09-11T23:55:39.472Z",
"size": "FILESIZE",
"md5Hash": "lc2/e7x2XJ4BnKelDsjAjw==",
"mediaLink": "https://www.googleapis.com/download/storage/v1/b/BUCKET-NAME/o/OBJECT-NAME?generation=GENERATION&alt=media",
"contentLanguage": "en",
"crc32c": "lPCM8g==",
"etag": "CLOV7t+m4usCEAE="
}

Cloud Scheduler

{
"custom_data": "c2NoZWR1bGVyIGN1c3RvbSBkYXRh" // base64 encoded "scheduler custom data"
}

Pub/Sub

{
"subscription": "cre-src_rc3_source-for-knativegcp-test-pubsub-tr_fcdf7716-c4bd-43b9-8ccc-e6e8ff848cd4",
"message": {
  "messageId": "1314133748793931",
  "data": "eyJIZWxsbyI6ICJ3b3JsZCJ9", // base64 encoded '{"Hello": "world"}'
  "publishTime": "2020-06-30T16:32:57.012Z"
}
}

이벤트 수신자 응답

수신자 서비스는 성공적인 이벤트 수신을 라우터에 알리기 위해 HTTP 2xx 응답을 보내야 합니다. 라우터는 다른 모든 HTTP 응답을 전송 실패로 취급하며 이벤트를 다시 전송합니다.

CloudEvents SDK 라이브러리 사용

다음 언어에서 사용할 수 있는 CloudEvents SDK 라이브러리를 사용하여 이벤트 수신자 서비스를 개발할 수 있습니다.

이러한 라이브러리는 오픈소스이며 HTTP 요청을 언어 관용구 CloudEvent 객체로 더욱 쉽게 변환할 수 있도록 합니다.

샘플 수신자 소스 코드

샘플 코드는 Cloud Run for Anthos on Google Cloud에 배포된 서비스에서 Pub/Sub 이벤트를 읽는 방법을 보여줍니다.

Python

@app.route('/', methods=['POST'])
def index():
    data = request.get_json()
    if not data:
        msg = 'no Pub/Sub message received'
        print(f'error: {msg}')
        return f'Bad Request: {msg}', 400

    if not isinstance(data, dict) or 'message' not in data:
        msg = 'invalid Pub/Sub message format'
        print(f'error: {msg}')
        return f'Bad Request: {msg}', 400

    pubsub_message = data['message']

    name = 'World'
    if isinstance(pubsub_message, dict) and 'data' in pubsub_message:
        name = base64.b64decode(pubsub_message['data']).decode('utf-8').strip()

    resp = f"Hello, {name}! ID: {request.headers.get('ce-id')}"
    print(resp)

    return (resp, 200)

자바

import com.example.cloudrun.eventpojos.PubSubBody;
import java.util.Base64;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class EventController {
  @RequestMapping(value = "/", method = RequestMethod.POST)
  public ResponseEntity<String> receiveMessage(
      @RequestBody PubSubBody body, @RequestHeader Map<String, String> headers) {
    // Get PubSub message from request body.
    PubSubBody.PubSubMessage message = body.getMessage();
    if (message == null) {
      String msg = "No Pub/Sub message received.";
      System.out.println(msg);
      return new ResponseEntity<String>(msg, HttpStatus.BAD_REQUEST);
    }

    String data = message.getData();
    if (data == null || data.isEmpty()) {
      String msg = "Invalid Pub/Sub message format.";
      System.out.println(msg);
      return new ResponseEntity<String>(msg, HttpStatus.BAD_REQUEST);
    }

    String name =
        !StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : "World";
    String ceId = headers.getOrDefault("ce-id", "");
    String msg = String.format("Hello, %s! ID: %s", name, ceId);
    System.out.println(msg);
    return new ResponseEntity<String>(msg, HttpStatus.OK);
  }
}

Node.js

const express = require('express');
const {
  toMessagePublishedData,
} = require('@google/events/cloud/pubsub/v1/MessagePublishedData');
const app = express();

app.use(express.json());
app.post('/', (req, res) => {
  if (!req.body) {
    const errorMessage = 'no Pub/Sub message received';
    res.status(400).send(`Bad Request: ${errorMessage}`);
    console.log(`Bad Request: ${errorMessage}`);
    return;
  }
  if (!req.body.message) {
    const errorMessage = 'invalid Pub/Sub message format';
    res.status(400).send(`Bad Request: ${errorMessage}`);
    console.log(`Bad Request: ${errorMessage}`);
    return;
  }
  // Cast to MessagePublishedEvent for IDE autocompletion
  const pubSubMessage = toMessagePublishedData(req.body);
  const name =
    pubSubMessage.message && pubSubMessage.message.data
      ? Buffer.from(pubSubMessage.message.data, 'base64').toString().trim()
      : 'World';

  const result = `Hello, ${name}! ID: ${req.get('ce-id') || ''}`;
  console.log(result);
  res.send(result);
});

module.exports = app;

Go


// Sample pubsub is a Cloud Run service which handles Pub/Sub messages.
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
)

// PubSubMessage is the payload of a Pub/Sub event.
type PubSubMessage struct {
	Message struct {
		Data []byte `json:"data,omitempty"`
		ID   string `json:"id"`
	} `json:"message"`
	Subscription string `json:"subscription"`
}

// HelloEventsPubSub receives and processes a Pub/Sub push message.
func HelloEventsPubSub(w http.ResponseWriter, r *http.Request) {
	var e PubSubMessage
	if err := json.NewDecoder(r.Body).Decode(&e); err != nil {
		http.Error(w, "Bad HTTP Request", http.StatusBadRequest)
		log.Printf("Bad HTTP Request: %v", http.StatusBadRequest)
		return
	}
	name := string(e.Message.Data)
	if name == "" {
		name = "World"
	}
	s := fmt.Sprintf("Hello, %s! ID: %s", name, string(r.Header.Get("Ce-Id")))
	log.Printf(s)
	fmt.Fprintln(w, s)
}

C#


using CloudNative.CloudEvents;
using Google.Events;
using Google.Events.Protobuf.Cloud.PubSub.V1;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
    }

    public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILogger<Startup> logger)
    {
        if (env.IsDevelopment())
        {
            app.UseDeveloperExceptionPage();
        }

        logger.LogInformation("Service is starting...");

        app.UseRouting();

        app.UseEndpoints(endpoints =>
        {
            endpoints.MapPost("/", async context =>
            {
                var cloudEvent = await context.Request.ReadCloudEventAsync();
                logger.LogInformation("Received CloudEvent\n" + GetEventLog(cloudEvent));

                var messagePublishedData = CloudEventConverters.ConvertCloudEventData<MessagePublishedData>(cloudEvent);
                var pubSubMessage = messagePublishedData.Message;
                if (pubSubMessage == null)
                {
                    context.Response.StatusCode = 400;
                    await context.Response.WriteAsync("Bad request: Invalid Pub/Sub message format");
                    return;
                }

                var data = pubSubMessage.Data;
                logger.LogInformation($"Data: {data.ToBase64()}");

                var name = data.ToStringUtf8();
                logger.LogInformation($"Extracted name: {name}");

                var id = context.Request.Headers["ce-id"];
                await context.Response.WriteAsync($"Hello {name}! ID: {id}");
            });
        });
    }

    private string GetEventLog(CloudEvent cloudEvent)
    {
        return $"ID: {cloudEvent.Id}\n"
            + $"Source: {cloudEvent.Source}\n"
            + $"Type: {cloudEvent.Type}\n"
            + $"Subject: {cloudEvent.Subject}\n"
            + $"DataSchema: {cloudEvent.DataSchema}\n"
            + $"DataContentType: {cloudEvent.DataContentType}\n"
            + $"Time: {cloudEvent.Time?.ToUniversalTime():yyyy-MM-dd'T'HH:mm:ss.fff'Z'}\n"
            + $"SpecVersion: {cloudEvent.SpecVersion}\n"
            + $"Data: {cloudEvent.Data}";
    }
}

다음 단계