이벤트 수신자 개발

이 페이지에서는 Cloud Run에 배포할 수 있는 이벤트 수신자 서비스를 만드는 방법을 보여줍니다.

Cloud Run 서비스는 CloudEvents 형식으로 이벤트가 포함된 HTTP 요청을 수신합니다.

이벤트 소스에는 다음이 포함됩니다.

이벤트 수신자 응답

수신자 서비스는 성공적인 이벤트 수신을 라우터에 알리기 위해 HTTP 2xx 응답을 보내야 합니다. 라우터는 다른 모든 HTTP 응답을 전송 실패로 취급하며 이벤트를 다시 전송합니다.

오픈소스

모든 이벤트의 HTTP 본문 구조는 CloudEvents GitHub 저장소에서 제공합니다.

이 저장소에는 다음과 같은 항목이 포함되어 있어 프로그래밍 언어로 CloudEvents 데이터를 이해하고 사용하는 데 도움이 됩니다.

  • CloudEvents 데이터 페이로드용 Google 프로토콜 버퍼
  • 생성된 JSON 스키마
  • 공개 JSON 스키마 카탈로그

클라이언트 라이브러리 링크도 포함됩니다.

CloudEvents SDK 라이브러리 사용

다음 언어에서 사용할 수 있는 CloudEvents SDK 라이브러리를 사용하여 이벤트 수신자 서비스를 개발할 수 있습니다.

이러한 라이브러리는 오픈소스이며 HTTP 요청을 언어 관용구 CloudEvent 객체로 더욱 쉽게 변환할 수 있도록 합니다.

샘플 수신자 소스 코드

Cloud 감사 로그

샘플 코드는 Cloud Run에 배포된 서비스에서 Cloud 감사 로그를 사용하여 Cloud Storage 이벤트를 읽는 방법을 보여줍니다.

Python

@app.route('/', methods=['POST'])
def index():
    # Gets the GCS bucket name from the CloudEvent header
    # Example: "storage.googleapis.com/projects/_/buckets/my-bucket"
    bucket = request.headers.get('ce-subject')

    print(f"Detected change in Cloud Storage bucket: {bucket}")
    return (f"Detected change in Cloud Storage bucket: {bucket}", 200)

자바

import java.util.Arrays;
import java.util.List;
import java.util.Map;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class EventController {

  private static final List<String> requiredFields =
      Arrays.asList("ce-id", "ce-source", "ce-type", "ce-specversion");

  @RequestMapping(value = "/", method = RequestMethod.POST)
  public ResponseEntity<String> receiveMessage(
      @RequestBody Map<String, Object> body, @RequestHeader Map<String, String> headers) {
    for (String field : requiredFields) {
      if (headers.get(field) == null) {
        String msg = String.format("Missing expected header: %s.", field);
        System.out.println(msg);
        return new ResponseEntity<String>(msg, HttpStatus.BAD_REQUEST);
      }
    }

    if (headers.get("ce-subject") == null) {
      String msg = "Missing expected header: ce-subject.";
      System.out.println(msg);
      return new ResponseEntity<String>(msg, HttpStatus.BAD_REQUEST);
    }

    String ceSubject = headers.get("ce-subject");
    String msg = "Detected change in Cloud Storage bucket: " + ceSubject;
    System.out.println(msg);
    return new ResponseEntity<String>(msg, HttpStatus.OK);
  }
}

Node.js

const express = require('express');
const app = express();

app.use(express.json());
app.post('/', (req, res) => {
  if (!req.header('ce-subject')) {
    return res
      .status(400)
      .send('Bad Request: missing required header: ce-subject');
  }

  console.log(
    `Detected change in Cloud Storage bucket: ${req.header('ce-subject')}`
  );
  return res
    .status(200)
    .send(
      `Detected change in Cloud Storage bucket: ${req.header('ce-subject')}`
    );
});

module.exports = app;

Go


// Sample audit_storage is a Cloud Run service which handles Cloud Audit Log events with Cloud Storage data.
package main

import (
	"fmt"
	"log"
	"net/http"
	"os"
)

// HelloEventsStorage receives and processes a Cloud Audit Log event with Cloud Storage data.
func HelloEventsStorage(w http.ResponseWriter, r *http.Request) {
	s := fmt.Sprintf("Detected change in Cloud Storage bucket: %s", string(r.Header.Get("Ce-Subject")))
	log.Printf(s)
	fmt.Fprintln(w, s)
}

C#


using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
    }

    public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILogger<Startup> logger)
    {
        if (env.IsDevelopment())
        {
            app.UseDeveloperExceptionPage();
        }

        logger.LogInformation("Service is starting...");

        app.UseRouting();

        app.UseEndpoints(endpoints =>
        {
            endpoints.MapPost("/", async context =>
            {
                logger.LogInformation("Handling HTTP POST");

                var ceSubject = context.Request.Headers["ce-subject"];
                logger.LogInformation($"ce-subject: {ceSubject}");

                if (string.IsNullOrEmpty(ceSubject))
                {
                    context.Response.StatusCode = 400;
                    await context.Response.WriteAsync("Bad Request: expected header Ce-Subject");
                    return;
                }

                await context.Response.WriteAsync($"GCS CloudEvent type: {ceSubject}");
            });
        });
    }
}

Pub/Sub

이 샘플 코드는 Cloud Run에 배포된 서비스에서 Pub/Sub 이벤트를 읽는 방법을 보여줍니다.

Python

@app.route('/', methods=['POST'])
def index():
    data = request.get_json()
    if not data:
        msg = 'no Pub/Sub message received'
        print(f'error: {msg}')
        return f'Bad Request: {msg}', 400

    if not isinstance(data, dict) or 'message' not in data:
        msg = 'invalid Pub/Sub message format'
        print(f'error: {msg}')
        return f'Bad Request: {msg}', 400

    pubsub_message = data['message']

    name = 'World'
    if isinstance(pubsub_message, dict) and 'data' in pubsub_message:
        name = base64.b64decode(pubsub_message['data']).decode('utf-8').strip()

    resp = f"Hello, {name}! ID: {request.headers.get('ce-id')}"
    print(resp)

    return (resp, 200)

자바

import com.example.cloudrun.eventpojos.PubSubBody;
import java.util.Base64;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestHeader;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class EventController {
  @RequestMapping(value = "/", method = RequestMethod.POST)
  public ResponseEntity<String> receiveMessage(
      @RequestBody PubSubBody body, @RequestHeader Map<String, String> headers) {
    // Get PubSub message from request body.
    PubSubBody.PubSubMessage message = body.getMessage();
    if (message == null) {
      String msg = "No Pub/Sub message received.";
      System.out.println(msg);
      return new ResponseEntity<String>(msg, HttpStatus.BAD_REQUEST);
    }

    String data = message.getData();
    if (data == null || data.isEmpty()) {
      String msg = "Invalid Pub/Sub message format.";
      System.out.println(msg);
      return new ResponseEntity<String>(msg, HttpStatus.BAD_REQUEST);
    }

    String name =
        !StringUtils.isEmpty(data) ? new String(Base64.getDecoder().decode(data)) : "World";
    String ceId = headers.getOrDefault("ce-id", "");
    String msg = String.format("Hello, %s! ID: %s", name, ceId);
    System.out.println(msg);
    return new ResponseEntity<String>(msg, HttpStatus.OK);
  }
}

Node.js

const express = require('express');
const {
  toMessagePublishedData,
} = require('@google/events/cloud/pubsub/v1/MessagePublishedData');
const app = express();

app.use(express.json());
app.post('/', (req, res) => {
  if (!req.body) {
    const errorMessage = 'no Pub/Sub message received';
    res.status(400).send(`Bad Request: ${errorMessage}`);
    console.log(`Bad Request: ${errorMessage}`);
    return;
  }
  if (!req.body.message) {
    const errorMessage = 'invalid Pub/Sub message format';
    res.status(400).send(`Bad Request: ${errorMessage}`);
    console.log(`Bad Request: ${errorMessage}`);
    return;
  }
  // Cast to MessagePublishedEvent for IDE autocompletion
  const pubSubMessage = toMessagePublishedData(req.body);
  const name =
    pubSubMessage.message && pubSubMessage.message.data
      ? Buffer.from(pubSubMessage.message.data, 'base64').toString().trim()
      : 'World';

  const result = `Hello, ${name}! ID: ${req.get('ce-id') || ''}`;
  console.log(result);
  res.send(result);
});

module.exports = app;

Go


// Sample pubsub is a Cloud Run service which handles Pub/Sub messages.
package main

import (
	"encoding/json"
	"fmt"
	"log"
	"net/http"
	"os"
)

// PubSubMessage is the payload of a Pub/Sub event.
// See the documentation for more details:
// https://cloud.google.com/pubsub/docs/reference/rest/v1/PubsubMessage
type PubSubMessage struct {
	Message struct {
		Data []byte `json:"data,omitempty"`
		ID   string `json:"id"`
	} `json:"message"`
	Subscription string `json:"subscription"`
}

// HelloEventsPubSub receives and processes a Pub/Sub push message.
func HelloEventsPubSub(w http.ResponseWriter, r *http.Request) {
	var e PubSubMessage
	if err := json.NewDecoder(r.Body).Decode(&e); err != nil {
		http.Error(w, "Bad HTTP Request", http.StatusBadRequest)
		log.Printf("Bad HTTP Request: %v", http.StatusBadRequest)
		return
	}
	name := string(e.Message.Data)
	if name == "" {
		name = "World"
	}
	s := fmt.Sprintf("Hello, %s! ID: %s", name, string(r.Header.Get("Ce-Id")))
	log.Printf(s)
	fmt.Fprintln(w, s)
}

C#


using CloudNative.CloudEvents;
using Google.Events;
using Google.Events.Protobuf.Cloud.PubSub.V1;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Hosting;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
    }

    public void Configure(IApplicationBuilder app, IWebHostEnvironment env, ILogger<Startup> logger)
    {
        if (env.IsDevelopment())
        {
            app.UseDeveloperExceptionPage();
        }

        logger.LogInformation("Service is starting...");

        app.UseRouting();

        app.UseEndpoints(endpoints =>
        {
            endpoints.MapPost("/", async context =>
            {
                var cloudEvent = await context.Request.ReadCloudEventAsync();
                logger.LogInformation("Received CloudEvent\n" + GetEventLog(cloudEvent));

                var messagePublishedData = CloudEventConverters.ConvertCloudEventData<MessagePublishedData>(cloudEvent);
                var pubSubMessage = messagePublishedData.Message;
                if (pubSubMessage == null)
                {
                    context.Response.StatusCode = 400;
                    await context.Response.WriteAsync("Bad request: Invalid Pub/Sub message format");
                    return;
                }

                var data = pubSubMessage.Data;
                logger.LogInformation($"Data: {data.ToBase64()}");

                var name = data.ToStringUtf8();
                logger.LogInformation($"Extracted name: {name}");

                var id = context.Request.Headers["ce-id"];
                await context.Response.WriteAsync($"Hello {name}! ID: {id}");
            });
        });
    }

    private string GetEventLog(CloudEvent cloudEvent)
    {
        return $"ID: {cloudEvent.Id}\n"
            + $"Source: {cloudEvent.Source}\n"
            + $"Type: {cloudEvent.Type}\n"
            + $"Subject: {cloudEvent.Subject}\n"
            + $"DataSchema: {cloudEvent.DataSchema}\n"
            + $"DataContentType: {cloudEvent.DataContentType}\n"
            + $"Time: {cloudEvent.Time?.ToUniversalTime():yyyy-MM-dd'T'HH:mm:ss.fff'Z'}\n"
            + $"SpecVersion: {cloudEvent.SpecVersion}\n"
            + $"Data: {cloudEvent.Data}";
    }
}

다음 단계