(已废弃)接收可能适用于不同架构修订版的消息

(已废弃)接收可能适用于不同架构修订版的消息

代码示例

Go

在尝试此示例之前,请按照 Pub/Sub 快速入门:使用客户端库中的 Go 设置说明进行操作。 如需了解详情,请参阅 Pub/Sub Go API 参考文档

如需向 Pub/Sub 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import (
	"context"
	"fmt"
	"io"
	"strings"
	"sync"
	"time"

	"cloud.google.com/go/pubsub"
	"github.com/linkedin/goavro/v2"
)

func subscribeWithAvroSchemaRevisions(w io.Writer, projectID, subID, avscFile string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}

	schemaClient, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %w", err)
	}

	// Create the cache for the codecs for different revision IDs.
	revisionCodecs := make(map[string]*goavro.Codec)

	sub := client.Subscription(subID)
	ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var mu sync.Mutex
	sub.Receive(ctx2, func(ctx context.Context, msg *pubsub.Message) {
		mu.Lock()
		defer mu.Unlock()
		name := msg.Attributes["googclient_schemaname"]
		revision := msg.Attributes["googclient_schemarevisionid"]

		codec, ok := revisionCodecs[revision]
		// If the codec doesn't exist in the map, this is the first time we
		// are seeing this revision. We need to fetch the schema and cache the
		// codec. It would be more typical to do this asynchronously, but is
		// shown here in a synchronous way to ease readability.
		if !ok {
			// Extract just the schema resource name
			path := strings.Split(name, "/")
			name = path[len(path)-1]
			schema, err := schemaClient.Schema(ctx, fmt.Sprintf("%s@%s", name, revision), pubsub.SchemaViewFull)
			if err != nil {
				fmt.Fprintf(w, "Nacking, cannot read message without schema: %v\n", err)
				msg.Nack()
				return
			}
			codec, err = goavro.NewCodec(schema.Definition)
			if err != nil {
				msg.Nack()
				fmt.Fprintf(w, "goavro.NewCodec err: %v\n", err)
			}
			revisionCodecs[revision] = codec
		}

		encoding := msg.Attributes["googclient_schemaencoding"]

		var state map[string]interface{}
		if encoding == "BINARY" {
			data, _, err := codec.NativeFromBinary(msg.Data)
			if err != nil {
				fmt.Fprintf(w, "codec.NativeFromBinary err: %v\n", err)
				msg.Nack()
				return
			}
			fmt.Fprintf(w, "Received a binary-encoded message:\n%#v\n", data)
			state = data.(map[string]interface{})
		} else if encoding == "JSON" {
			data, _, err := codec.NativeFromTextual(msg.Data)
			if err != nil {
				fmt.Fprintf(w, "codec.NativeFromTextual err: %v\n", err)
				msg.Nack()
				return
			}
			fmt.Fprintf(w, "Received a JSON-encoded message:\n%#v\n", data)
			state = data.(map[string]interface{})
		} else {
			fmt.Fprintf(w, "Unknown message type(%s), nacking\n", encoding)
			msg.Nack()
			return
		}
		fmt.Fprintf(w, "%s is abbreviated as %s\n", state["name"], state["post_abbr"])
		msg.Ack()
	})
	return nil
}

后续步骤

如需搜索和过滤其他 Google Cloud 产品的代码示例,请参阅 Google Cloud 示例浏览器