Menerima pesan yang bisa jadi untuk revisi skema yang berbeda

Menerima pesan yang bisa jadi untuk revisi skema yang berbeda

Mempelajari lebih lanjut

Untuk dokumentasi mendetail yang menyertakan contoh kode ini, lihat artikel berikut:

Contoh kode

C++

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C++ di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API C++ Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
    pubsub::Subscription(project_id, subscription_id)));

// Create a schema client.
auto schema_client =
    pubsub::SchemaServiceClient(pubsub::MakeSchemaServiceConnection());

// Read the reader schema. This is the schema you want the messages to be
// evaluated using.
std::ifstream ifs(avro_file);
avro::ValidSchema reader_schema;
avro::compileJsonSchema(ifs, reader_schema);

std::unordered_map<std::string, avro::ValidSchema> revisions_to_schemas;
auto session = subscriber.Subscribe(
    [&](pubsub::Message const& message, pubsub::AckHandler h) {
      // Get the reader schema revision for the message.
      auto schema_name = message.attributes()["googclient_schemaname"];
      auto schema_revision_id =
          message.attributes()["googclient_schemarevisionid"];
      // If we haven't received a message with this schema, look it up.
      if (revisions_to_schemas.find(schema_revision_id) ==
          revisions_to_schemas.end()) {
        auto schema_path = schema_name + "@" + schema_revision_id;
        // Use the schema client to get the path.
        auto schema = schema_client.GetSchema(schema_path);
        if (!schema) {
          std::cout << "Schema not found:" << schema_path << "\n";
          return;
        }
        avro::ValidSchema writer_schema;
        std::stringstream in;
        in << schema.value().definition();
        avro::compileJsonSchema(in, writer_schema);
        revisions_to_schemas[schema_revision_id] = writer_schema;
      }
      auto writer_schema = revisions_to_schemas[schema_revision_id];

      auto encoding = message.attributes()["googclient_schemaencoding"];
      if (encoding == "JSON") {
        std::stringstream in;
        in << message.data();
        auto avro_in = avro::istreamInputStream(in);
        avro::DecoderPtr decoder = avro::resolvingDecoder(
            writer_schema, reader_schema, avro::jsonDecoder(writer_schema));
        decoder->init(*avro_in);

        v2::State state;
        avro::decode(*decoder, state);
        std::cout << "Name: " << state.name << "\n";
        std::cout << "Postal Abbreviation: " << state.post_abbr << "\n";
        std::cout << "Population: " << state.population << "\n";
      } else {
        std::cout << "Unable to decode. Received message using encoding"
                  << encoding << "\n";
      }
      std::move(h).ack();
    });

C#

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan C# di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API C# Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

using Avro.Generic;
using Avro.IO;
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Concurrent;
using System.IO;
using System.Threading;
using System.Threading.Tasks;

public class SubscribeAvroRecordsWithRevisionsSample
{
    public async Task<(int, int)> SubscribeAvroRecordsWithRevisions(string projectId, string subscriptionId)
    {
        SchemaServiceClient schemaService = SchemaServiceClient.Create();

        var schemaCache = new ConcurrentDictionary<(string, string), Avro.Schema>();

        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberClient subscriber = await SubscriberClient.CreateAsync(subscriptionName);
        int messageCount = 0;
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
        {
            // Get the schema name, revision ID and encoding type from the message.
            string encoding = message.Attributes["googclient_schemaencoding"];
            string schemaName = message.Attributes["googclient_schemaname"];
            string revision = message.Attributes["googclient_schemarevisionid"];

            // Fetch the schema if we don't already have it.
            var avroSchema = schemaCache.GetOrAdd((schemaName, revision), key =>
            {
                var pubSubSchema = schemaService.GetSchema($"{schemaName}@{revision}");
                return Avro.Schema.Parse(pubSubSchema.Definition);
            });

            // Read the message.
            if (encoding == "BINARY")
            {
                using var ms = new MemoryStream(message.Data.ToByteArray());
                var decoder = new BinaryDecoder(ms);
                var reader = new DefaultReader(avroSchema, avroSchema);
                var record = reader.Read<GenericRecord>(null, decoder);
                Console.WriteLine($"Message {message.MessageId}: {record.GetValue(0)}");
                Interlocked.Increment(ref messageCount);
            }
            else
            {
                Console.WriteLine("Expected only binary messages in this sample");
            }
            return Task.FromResult(SubscriberClient.Reply.Ack);
        });
        // Run for 10 seconds.
        await Task.Delay(10_000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return (messageCount, schemaCache.Count);
    }
}

Go

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Go di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Go Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import (
	"context"
	"fmt"
	"io"
	"strings"
	"sync"
	"time"

	"cloud.google.com/go/pubsub"
	"github.com/linkedin/goavro/v2"
)

func subscribeWithAvroSchemaRevisions(w io.Writer, projectID, subID, avscFile string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// avscFile = "path/to/an/avro/schema/file(.avsc)/formatted/in/json"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}

	schemaClient, err := pubsub.NewSchemaClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewSchemaClient: %w", err)
	}

	// Create the cache for the codecs for different revision IDs.
	revisionCodecs := make(map[string]*goavro.Codec)

	sub := client.Subscription(subID)
	ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var mu sync.Mutex
	sub.Receive(ctx2, func(ctx context.Context, msg *pubsub.Message) {
		mu.Lock()
		defer mu.Unlock()
		name := msg.Attributes["googclient_schemaname"]
		revision := msg.Attributes["googclient_schemarevisionid"]

		codec, ok := revisionCodecs[revision]
		// If the codec doesn't exist in the map, this is the first time we
		// are seeing this revision. We need to fetch the schema and cache the
		// codec. It would be more typical to do this asynchronously, but is
		// shown here in a synchronous way to ease readability.
		if !ok {
			// Extract just the schema resource name
			path := strings.Split(name, "/")
			name = path[len(path)-1]
			schema, err := schemaClient.Schema(ctx, fmt.Sprintf("%s@%s", name, revision), pubsub.SchemaViewFull)
			if err != nil {
				fmt.Fprintf(w, "Nacking, cannot read message without schema: %v\n", err)
				msg.Nack()
				return
			}
			codec, err = goavro.NewCodec(schema.Definition)
			if err != nil {
				msg.Nack()
				fmt.Fprintf(w, "goavro.NewCodec err: %v\n", err)
			}
			revisionCodecs[revision] = codec
		}

		encoding := msg.Attributes["googclient_schemaencoding"]

		var state map[string]interface{}
		if encoding == "BINARY" {
			data, _, err := codec.NativeFromBinary(msg.Data)
			if err != nil {
				fmt.Fprintf(w, "codec.NativeFromBinary err: %v\n", err)
				msg.Nack()
				return
			}
			fmt.Fprintf(w, "Received a binary-encoded message:\n%#v\n", data)
			state = data.(map[string]interface{})
		} else if encoding == "JSON" {
			data, _, err := codec.NativeFromTextual(msg.Data)
			if err != nil {
				fmt.Fprintf(w, "codec.NativeFromTextual err: %v\n", err)
				msg.Nack()
				return
			}
			fmt.Fprintf(w, "Received a JSON-encoded message:\n%#v\n", data)
			state = data.(map[string]interface{})
		} else {
			fmt.Fprintf(w, "Unknown message type(%s), nacking\n", encoding)
			msg.Nack()
			return
		}
		fmt.Fprintf(w, "%s is abbreviated as %s\n", state["name"], state["post_abbr"])
		msg.Ack()
	})
	return nil
}

Java

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Java di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Java Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.


import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.SchemaServiceClient;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.Schema;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.avro.io.Decoder;
import org.apache.avro.io.DecoderFactory;
import org.apache.avro.specific.SpecificDatumReader;
import utilities.State;

public class SubscribeWithAvroSchemaRevisionsExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Use an existing subscription.
    String subscriptionId = "your-subscription-id";

    subscribeWithAvroSchemaRevisionsExample(projectId, subscriptionId);
  }

  static SchemaServiceClient getSchemaServiceClient() {
    try {
      return SchemaServiceClient.create();
    } catch (IOException e) {
      System.out.println("Could not get schema client: " + e);
      return null;
    }
  }

  public static void subscribeWithAvroSchemaRevisionsExample(
      String projectId, String subscriptionId) {
    // Used to get the schemas for revsions.
    final SchemaServiceClient schemaServiceClient = getSchemaServiceClient();
    if (schemaServiceClient == null) {
      return;
    }

    // Cache for the readers for different revision IDs.
    Map<String, SpecificDatumReader<State>> revisionReaders =
        new HashMap<String, SpecificDatumReader<State>>();

    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Get the schema encoding type.
          String name = message.getAttributesMap().get("googclient_schemaname");
          String revision = message.getAttributesMap().get("googclient_schemarevisionid");

          SpecificDatumReader<State> reader = null;
          synchronized (revisionReaders) {
            reader = revisionReaders.get(revision);
          }
          if (reader == null) {
            // This is the first time we are seeing this revision. We need to
            // fetch the schema and cache its decoder. It would be more typical
            // to do this asynchronously, but is shown here in a synchronous
            // way to ease readability.
            try {
              Schema schema = schemaServiceClient.getSchema(name + "@" + revision);
              org.apache.avro.Schema avroSchema =
                  new org.apache.avro.Schema.Parser().parse(schema.getDefinition());
              reader = new SpecificDatumReader<State>(avroSchema, State.getClassSchema());
              synchronized (revisionReaders) {
                revisionReaders.put(revision, reader);
              }
            } catch (Exception e) {
              System.out.println("Could not get schema: " + e);
              // Without the schema, we cannot read the message, so nack it.
              consumer.nack();
              return;
            }
          }

          ByteString data = message.getData();
          // Send the message data to a byte[] input stream.
          InputStream inputStream = new ByteArrayInputStream(data.toByteArray());

          String encoding = message.getAttributesMap().get("googclient_schemaencoding");

          Decoder decoder = null;

          // Prepare an appropriate decoder for the message data in the input stream
          // based on the schema encoding type.
          try {
            switch (encoding) {
              case "BINARY":
                decoder = DecoderFactory.get().directBinaryDecoder(inputStream, /*reuse=*/ null);
                System.out.println("Receiving a binary-encoded message:");
                break;
              case "JSON":
                decoder = DecoderFactory.get().jsonDecoder(State.getClassSchema(), inputStream);
                System.out.println("Receiving a JSON-encoded message:");
                break;
              default:
                System.out.println("Unknown message type; nacking.");
                consumer.nack();
                break;
            }

            // Obtain an object of the generated Avro class using the decoder.
            State state = reader.read(null, decoder);
            System.out.println(state.getName() + " is abbreviated as " + state.getPostAbbr());

            // Ack the message.
            consumer.ack();
          } catch (IOException e) {
            System.err.println(e);
            // If we failed to process the message, nack it.
            consumer.nack();
          }
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      subscriber.stopAsync();
    }
  }
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub, Schema, Encodings} = require('@google-cloud/pubsub');

// And the Apache Avro library; this lacks typings, so for
// TypeScript, a few synthetic types were created.
const avro = require('avro-js');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listenForAvroRecordsWithRevisions(
  subscriptionNameOrId,
  timeout
) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Cache decoders for various schema revisions.
  const revisionReaders = new Map();

  // We need a schema admin service client to retrieve revisions.
  const schemaClient = await pubSubClient.getSchemaClient();

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async message => {
    // Get the schema metadata from the message.
    const schemaMetadata = Schema.metadataFromMessage(message.attributes);

    let reader;
    try {
      // Do we already have a decoder for this revision?
      const revision = schemaMetadata.revision;
      if (revisionReaders.has(revision)) {
        reader = revisionReaders.get(revision);
      } else {
        // This is the first time we are seeing this revision. We need to
        // fetch the schema and cache its decoder.
        const [schema] = await schemaClient.getSchema({
          name: `${schemaMetadata.name}@${schemaMetadata.revision}`,
        });
        reader = avro.parse(schema.definition);
        revisionReaders.set(revision, reader);
      }
    } catch (err) {
      console.log('Could not get schema', err);
      message.nack();
      return;
    }

    let result;
    switch (schemaMetadata.encoding) {
      case Encodings.Binary:
        result = reader.fromBuffer(message.data);
        break;
      case Encodings.Json:
        result = reader.fromString(message.data.toString());
        break;
      default:
        console.log(`Unknown schema encoding: ${schemaMetadata.encoding}`);
        message.nack();
        return;
    }

    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${JSON.stringify(result, null, 4)}`);
    console.log(`\tAttributes: ${message.attributes}`);
    console.log(
      `\tProvince ${result.name} is abbreviated as ${result.post_abbr}`
    );
    messageCount += 1;

    // Ack the message.
    message.ack();
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
import {PubSub, Schema, Encodings, Message} from '@google-cloud/pubsub';

// And the Apache Avro library; this lacks typings, so for
// TypeScript, a few synthetic types were created.
import * as avro from 'avro-js';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

interface ProvinceObject {
  name: string;
  post_abbr: string;
}

async function listenForAvroRecordsWithRevisions(
  subscriptionNameOrId: string,
  timeout: number
) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Cache decoders for various schema revisions.
  const revisionReaders = new Map<string, avro.Parser>();

  // We need a schema admin service client to retrieve revisions.
  const schemaClient = await pubSubClient.getSchemaClient();

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async (message: Message) => {
    // Get the schema metadata from the message.
    const schemaMetadata = Schema.metadataFromMessage(message.attributes);

    let reader: avro.Parser;
    try {
      // Do we already have a decoder for this revision?
      const revision = schemaMetadata.revision!;
      if (revisionReaders.has(revision)) {
        reader = revisionReaders.get(revision)!;
      } else {
        // This is the first time we are seeing this revision. We need to
        // fetch the schema and cache its decoder.
        const [schema] = await schemaClient.getSchema({
          name: `${schemaMetadata.name}@${schemaMetadata.revision}`,
        });
        reader = avro.parse(schema.definition!);
        revisionReaders.set(revision, reader);
      }
    } catch (err: unknown) {
      console.log('Could not get schema', err);
      message.nack();
      return;
    }

    let result: ProvinceObject | undefined;
    switch (schemaMetadata.encoding) {
      case Encodings.Binary:
        result = reader.fromBuffer(message.data);
        break;
      case Encodings.Json:
        result = reader.fromString(message.data.toString());
        break;
      default:
        console.log(`Unknown schema encoding: ${schemaMetadata.encoding}`);
        message.nack();
        return;
    }

    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${JSON.stringify(result, null, 4)}`);
    console.log(`\tAttributes: ${message.attributes}`);
    console.log(
      `\tProvince ${result?.name} is abbreviated as ${result?.post_abbr}`
    );
    messageCount += 1;

    // Ack the message.
    message.ack();
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Python

Sebelum mencoba contoh ini, ikuti petunjuk penyiapan Python di panduan memulai Pub/Sub menggunakan library klien. Untuk mengetahui informasi selengkapnya, lihat dokumentasi referensi API Python Pub/Sub.

Untuk melakukan autentikasi ke Pub/Sub, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import avro.schema as schema
from avro.io import BinaryDecoder, DatumReader
from concurrent.futures import TimeoutError
import io
import json
from google.api_core.exceptions import NotFound
from google.cloud.pubsub import SchemaServiceClient, SubscriberClient

schema_client = SchemaServiceClient()

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# avsc_file = "path/to/an/avro/schema/file/(.avsc)/formatted/in/json"
# Number of seconds the subscriber listens for messages
# timeout = 5.0

subscriber = SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

with open(avsc_file, "rb") as file:
    reader_avro_schema = schema.parse(file.read())
# Dict to keep readers for different schema revisions.
revisions_to_readers = {}

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    # Get the message serialization type.
    schema_name = message.attributes.get("googclient_schemaname")
    schema_revision_id = message.attributes.get("googclient_schemarevisionid")
    encoding = message.attributes.get("googclient_schemaencoding")

    if schema_revision_id not in revisions_to_readers:
        schema_path = schema_name + "@" + schema_revision_id
        try:
            received_avro_schema = schema_client.get_schema(
                request={"name": schema_path}
            )
        except NotFound:
            print(f"{schema_path} not found.")
            message.nack()
            return
        writer_avro_schema = schema.parse(received_avro_schema.definition)
        revisions_to_readers[schema_revision_id] = DatumReader(
            writer_avro_schema, reader_avro_schema
        )
    reader = revisions_to_readers[schema_revision_id]

    # Deserialize the message data accordingly.
    if encoding == "BINARY":
        bout = io.BytesIO(message.data)
        decoder = BinaryDecoder(bout)
        message_data = reader.read(decoder)
        print(f"Received a binary-encoded message:\n{message_data}")
    elif encoding == "JSON":
        message_data = json.loads(message.data)
        print(f"Received a JSON-encoded message:\n{message_data}")
    else:
        print(f"Received a message with no encoding:\n{message}")
        message.nack()

    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception occurs first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Langkah selanjutnya

Untuk menelusuri dan memfilter contoh kode untuk produk Google Cloud lainnya, lihat browser contoh Google Cloud.