(OBSOLETO) Recibe mensajes que podrían ser de diferentes revisiones de esquemas

(OBSOLETO) Recibe mensajes que podrían ser de diferentes revisiones de esquemas

Muestra de código

Go

Antes de probar esta muestra, sigue las instrucciones de configuración de Go que encontrarás en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Go de Pub/Sub.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import (
	"context"
	"fmt"
	"io"
	"strings"
	"sync"
	"time"

	"cloud.google.com/go/pubsub"
	"github.com/linkedin/goavro/v2"
)

func subscribeWithAvroSchemaRevisions(w io.Writer, projectID, subID, avscFile string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}

	schemaClient, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %w", err)
	}

	// Create the cache for the codecs for different revision IDs.
	revisionCodecs := make(map[string]*goavro.Codec)

	sub := client.Subscription(subID)
	ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var mu sync.Mutex
	sub.Receive(ctx2, func(ctx context.Context, msg *pubsub.Message) {
		mu.Lock()
		defer mu.Unlock()
		name := msg.Attributes["googclient_schemaname"]
		revision := msg.Attributes["googclient_schemarevisionid"]

		codec, ok := revisionCodecs[revision]
		// If the codec doesn't exist in the map, this is the first time we
		// are seeing this revision. We need to fetch the schema and cache the
		// codec. It would be more typical to do this asynchronously, but is
		// shown here in a synchronous way to ease readability.
		if !ok {
			// Extract just the schema resource name
			path := strings.Split(name, "/")
			name = path[len(path)-1]
			schema, err := schemaClient.Schema(ctx, fmt.Sprintf("%s@%s", name, revision), pubsub.SchemaViewFull)
			if err != nil {
				fmt.Fprintf(w, "Nacking, cannot read message without schema: %v\n", err)
				msg.Nack()
				return
			}
			codec, err = goavro.NewCodec(schema.Definition)
			if err != nil {
				msg.Nack()
				fmt.Fprintf(w, "goavro.NewCodec err: %v\n", err)
			}
			revisionCodecs[revision] = codec
		}

		encoding := msg.Attributes["googclient_schemaencoding"]

		var state map[string]interface{}
		if encoding == "BINARY" {
			data, _, err := codec.NativeFromBinary(msg.Data)
			if err != nil {
				fmt.Fprintf(w, "codec.NativeFromBinary err: %v\n", err)
				msg.Nack()
				return
			}
			fmt.Fprintf(w, "Received a binary-encoded message:\n%#v\n", data)
			state = data.(map[string]interface{})
		} else if encoding == "JSON" {
			data, _, err := codec.NativeFromTextual(msg.Data)
			if err != nil {
				fmt.Fprintf(w, "codec.NativeFromTextual err: %v\n", err)
				msg.Nack()
				return
			}
			fmt.Fprintf(w, "Received a JSON-encoded message:\n%#v\n", data)
			state = data.(map[string]interface{})
		} else {
			fmt.Fprintf(w, "Unknown message type(%s), nacking\n", encoding)
			msg.Nack()
			return
		}
		fmt.Fprintf(w, "%s is abbreviated as %s\n", state["name"], state["post_abbr"])
		msg.Ack()
	})
	return nil
}

¿Qué sigue?

Para buscar y filtrar muestras de código para otros productos de Google Cloud , consulta el navegador de muestras deGoogle Cloud .