v1 接收可能適用於不同結構定義修訂版本的訊息 (已淘汰)

(已淘汰) 接收可能適用於不同結構定義修訂版本的訊息

程式碼範例

Go

在試用這個範例之前,請先按照GoPub/Sub 快速入門導覽課程:使用用戶端程式庫」中的操作說明進行設定。 詳情請參閱 Pub/Sub Go API 參考說明文件

如要驗證 Pub/Sub,請設定應用程式預設憑證。 詳情請參閱「為本機開發環境設定驗證」。

import (
	"context"
	"fmt"
	"io"
	"strings"
	"sync"
	"time"

	"cloud.google.com/go/pubsub"
	"github.com/linkedin/goavro/v2"
)

func subscribeWithAvroSchemaRevisions(w io.Writer, projectID, subID, avscFile string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}

	schemaClient, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %w", err)
	}

	// Create the cache for the codecs for different revision IDs.
	revisionCodecs := make(map[string]*goavro.Codec)

	sub := client.Subscription(subID)
	ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var mu sync.Mutex
	sub.Receive(ctx2, func(ctx context.Context, msg *pubsub.Message) {
		mu.Lock()
		defer mu.Unlock()
		name := msg.Attributes["googclient_schemaname"]
		revision := msg.Attributes["googclient_schemarevisionid"]

		codec, ok := revisionCodecs[revision]
		// If the codec doesn't exist in the map, this is the first time we
		// are seeing this revision. We need to fetch the schema and cache the
		// codec. It would be more typical to do this asynchronously, but is
		// shown here in a synchronous way to ease readability.
		if !ok {
			// Extract just the schema resource name
			path := strings.Split(name, "/")
			name = path[len(path)-1]
			schema, err := schemaClient.Schema(ctx, fmt.Sprintf("%s@%s", name, revision), pubsub.SchemaViewFull)
			if err != nil {
				fmt.Fprintf(w, "Nacking, cannot read message without schema: %v\n", err)
				msg.Nack()
				return
			}
			codec, err = goavro.NewCodec(schema.Definition)
			if err != nil {
				msg.Nack()
				fmt.Fprintf(w, "goavro.NewCodec err: %v\n", err)
			}
			revisionCodecs[revision] = codec
		}

		encoding := msg.Attributes["googclient_schemaencoding"]

		var state map[string]interface{}
		if encoding == "BINARY" {
			data, _, err := codec.NativeFromBinary(msg.Data)
			if err != nil {
				fmt.Fprintf(w, "codec.NativeFromBinary err: %v\n", err)
				msg.Nack()
				return
			}
			fmt.Fprintf(w, "Received a binary-encoded message:\n%#v\n", data)
			state = data.(map[string]interface{})
		} else if encoding == "JSON" {
			data, _, err := codec.NativeFromTextual(msg.Data)
			if err != nil {
				fmt.Fprintf(w, "codec.NativeFromTextual err: %v\n", err)
				msg.Nack()
				return
			}
			fmt.Fprintf(w, "Received a JSON-encoded message:\n%#v\n", data)
			state = data.(map[string]interface{})
		} else {
			fmt.Fprintf(w, "Unknown message type(%s), nacking\n", encoding)
			msg.Nack()
			return
		}
		fmt.Fprintf(w, "%s is abbreviated as %s\n", state["name"], state["post_abbr"])
		msg.Ack()
	})
	return nil
}

後續步驟

如要搜尋及篩選其他 Google Cloud 產品的程式碼範例,請參閱Google Cloud 範例瀏覽器