Recibir mensajes del tipo de esquema de Proto

Recibe un mensaje del tipo de esquema del búfer de protocolo, convierte los datos del mensaje en un objeto de una clase Proto generada y confirma el mensaje.

Explora más

Para obtener documentación en la que se incluye esta muestra de código, consulta lo siguiente:

Muestra de código

C++

Antes de probar esta muestra, sigue las instrucciones de configuración de C++ que se encuentran en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub C++.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::StatusOr;
return [](pubsub::Subscriber subscriber) {
  auto session = subscriber.Subscribe(
      [](pubsub::Message const& m, pubsub::AckHandler h) {
        google::cloud::pubsub::samples::State state;
        state.ParseFromString(std::string{m.data()});
        std::cout << "Message contents: " << state.DebugString() << "\n";
        std::move(h).ack();
      });
  return session;
}

C#

Antes de probar esta muestra, sigue las instrucciones de configuración de C# que se encuentran en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub C#.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.


using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using System;
using System.Threading;
using System.Threading.Tasks;

public class PullProtoMessagesAsyncSample
{
    public async Task<int> PullProtoMessagesAsync(string projectId, string subscriptionId, bool acknowledge)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        int messageCount = 0;
        SubscriberClient subscriber = await new SubscriberClientBuilder
        {
            SubscriptionName = subscriptionName,
            Settings = new SubscriberClient.Settings
            {
                AckExtensionWindow = TimeSpan.FromSeconds(4),
                AckDeadline = TimeSpan.FromSeconds(10),
                FlowControlSettings = new FlowControlSettings(maxOutstandingElementCount: 100, maxOutstandingByteCount: 10240)
            }
        }.BuildAsync();
        // SubscriberClient runs your message handle function on multiple
        // threads to maximize throughput.
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
        {
            string encoding = message.Attributes["googclient_schemaencoding"];
            Utilities.State state = null;
            switch (encoding)
            {
                case "BINARY":
                    state = Utilities.State.Parser.ParseFrom(message.Data.ToByteArray());
                    break;
                case "JSON":
                    state = Utilities.State.Parser.ParseJson(message.Data.ToStringUtf8());
                    break;
                default:
                    Console.WriteLine($"Encoding not provided in message.");
                    break;
            }
            Console.WriteLine($"Message {message.MessageId}: {state}");
            Interlocked.Increment(ref messageCount);
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        });
        // Run for 5 seconds.
        await Task.Delay(5000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return messageCount;
    }
}

Go

Antes de probar esta muestra, sigue las instrucciones de configuración de Go que se encuentran en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub Go.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

import (
	"context"
	"fmt"
	"io"
	"sync"
	"time"

	"cloud.google.com/go/pubsub"
	statepb "github.com/GoogleCloudPlatform/golang-samples/internal/pubsub/schemas"
	"google.golang.org/protobuf/encoding/protojson"
	"google.golang.org/protobuf/proto"
)

func subscribeWithProtoSchema(w io.Writer, projectID, subID, protoFile string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	// protoFile = "path/to/a/proto/schema/file(.proto)/formatted/in/protocol/buffers"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}

	// Create an instance of the message to be decoded (a single U.S. state).
	state := &statepb.State{}

	sub := client.Subscription(subID)
	ctx2, cancel := context.WithTimeout(ctx, 10*time.Second)
	defer cancel()

	var mu sync.Mutex
	sub.Receive(ctx2, func(ctx context.Context, msg *pubsub.Message) {
		mu.Lock()
		defer mu.Unlock()
		encoding := msg.Attributes["googclient_schemaencoding"]

		if encoding == "BINARY" {
			if err := proto.Unmarshal(msg.Data, state); err != nil {
				fmt.Fprintf(w, "proto.Unmarshal err: %v\n", err)
				msg.Nack()
				return
			}
			fmt.Printf("Received a binary-encoded message:\n%#v\n", state)
		} else if encoding == "JSON" {
			if err := protojson.Unmarshal(msg.Data, state); err != nil {
				fmt.Fprintf(w, "proto.Unmarshal err: %v\n", err)
				msg.Nack()
				return
			}
			fmt.Fprintf(w, "Received a JSON-encoded message:\n%#v\n", state)
		} else {
			fmt.Fprintf(w, "Unknown message type(%s), nacking\n", encoding)
			msg.Nack()
			return
		}
		fmt.Fprintf(w, "%s is abbreviated as %s\n", state.Name, state.PostAbbr)
		msg.Ack()
	})
	return nil
}

Java

Antes de probar esta muestra, sigue las instrucciones de configuración de Java que se encuentran en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub Java.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.


import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import utilities.StateProto.State;

public class SubscribeWithProtoSchemaExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    // Use an existing subscription.
    String subscriptionId = "your-subscription-id";

    subscribeWithProtoSchemaExample(projectId, subscriptionId);
  }

  public static void subscribeWithProtoSchemaExample(String projectId, String subscriptionId) {

    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          ByteString data = message.getData();

          // Get the schema encoding type.
          String encoding = message.getAttributesMap().get("googclient_schemaencoding");

          block:
          try {
            switch (encoding) {
              case "BINARY":
                // Obtain an object of the generated proto class.
                State state = State.parseFrom(data);
                System.out.println("Received a BINARY-formatted message: " + state);
                break;

              case "JSON":
                State.Builder stateBuilder = State.newBuilder();
                JsonFormat.parser().merge(data.toStringUtf8(), stateBuilder);
                System.out.println("Received a JSON-formatted message:" + stateBuilder.build());
                break;

              default:
                break block;
            }
          } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
          }

          consumer.ack();
          System.out.println("Ack'ed the message");
        };

    // Create subscriber client.
    Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();

    try {
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName);
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      subscriber.stopAsync();
    }
  }
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
const {PubSub, Schema, Encodings} = require('@google-cloud/pubsub');

// And the protobufjs library
const protobuf = require('protobufjs');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listenForProtobufMessages(subscriptionNameOrId, timeout) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Make an decoder using the protobufjs library.
  //
  // Since we're providing the test message for a specific schema here, we'll
  // also code in the path to a sample proto definition.
  const root = protobuf.loadSync('system-test/fixtures/provinces.proto');
  const Province = root.lookupType('utilities.Province');

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async message => {
    // "Ack" (acknowledge receipt of) the message
    message.ack();

    // Get the schema metadata from the message.
    const schemaMetadata = Schema.metadataFromMessage(message.attributes);

    let result;
    switch (schemaMetadata.encoding) {
      case Encodings.Binary:
        result = Province.decode(message.data);
        break;
      case Encodings.Json:
        // This doesn't require decoding with the protobuf library,
        // since it's plain JSON. But you can still validate it against
        // your schema.
        result = JSON.parse(message.data.toString());
        console.log(`Validation of JSON: ${Province.verify(result)}`);
        break;
      default:
        console.log(`Unknown schema encoding: ${schemaMetadata.encoding}`);
        break;
    }

    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${JSON.stringify(result, null, 4)}`);
    console.log(`\tAttributes: ${JSON.stringify(message.attributes, null, 4)}`);
    messageCount += 1;
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

Node.js

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const timeout = 60;

// Imports the Google Cloud client library
import {PubSub, Schema, Encodings, Message} from '@google-cloud/pubsub';

// And the protobufjs library
import * as protobuf from 'protobufjs';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function listenForProtobufMessages(
  subscriptionNameOrId: string,
  timeout: number
) {
  // References an existing subscription
  const subscription = pubSubClient.subscription(subscriptionNameOrId);

  // Make an decoder using the protobufjs library.
  //
  // Since we're providing the test message for a specific schema here, we'll
  // also code in the path to a sample proto definition.
  const root = protobuf.loadSync('system-test/fixtures/provinces.proto');
  const Province = root.lookupType('utilities.Province');

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async (message: Message) => {
    // "Ack" (acknowledge receipt of) the message
    message.ack();

    // Get the schema metadata from the message.
    const schemaMetadata = Schema.metadataFromMessage(message.attributes);

    let result;
    switch (schemaMetadata.encoding) {
      case Encodings.Binary:
        result = Province.decode(message.data);
        break;
      case Encodings.Json:
        // This doesn't require decoding with the protobuf library,
        // since it's plain JSON. But you can still validate it against
        // your schema.
        result = JSON.parse(message.data.toString());
        console.log(`Validation of JSON: ${Province.verify(result)}`);
        break;
      default:
        console.log(`Unknown schema encoding: ${schemaMetadata.encoding}`);
        break;
    }

    console.log(`Received message ${message.id}:`);
    console.log(`\tData: ${JSON.stringify(result, null, 4)}`);
    console.log(`\tAttributes: ${JSON.stringify(message.attributes, null, 4)}`);
    messageCount += 1;
  };

  // Listen for new messages until timeout is hit
  subscription.on('message', messageHandler);

  setTimeout(() => {
    subscription.removeListener('message', messageHandler);
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);
}

PHP

Antes de probar esta muestra, sigue las instrucciones de configuración de PHP que se encuentran en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub PHP.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

use Google\Cloud\PubSub\PubSubClient;

/**
 * Subscribe and pull messages using a protocol buffer schema.
 *
 * Relies on a proto message of the following form:
 * ```
 * syntax = "proto3";
 *
 * package utilities;
 *
 * message StateProto {
 *   string name = 1;
 *   string post_abbr = 2;
 * }
 * ```
 *
 * @param string $projectId
 * @param string $subscriptionId
 */
function subscribe_proto_messages($projectId, $subscriptionId)
{
    $pubsub = new PubSubClient([
        'projectId' => $projectId,
    ]);

    $subscription = $pubsub->subscription($subscriptionId);
    $messages = $subscription->pull();

    foreach ($messages as $message) {
        $decodedMessageData = '';
        $encoding = $message->attribute('googclient_schemaencoding');
        switch ($encoding) {
            case 'BINARY':
                $protobufMessage = new \Utilities\StateProto();
                $protobufMessage->mergeFromString($message->data());

                $decodedMessageData = $protobufMessage->serializeToJsonString();
                break;
            case 'JSON':
                $decodedMessageData = $message->data();
                break;
        }

        printf('Received a %d-encoded message %s', $encoding, $decodedMessageData);
    }
}

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python que se encuentran en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub Python.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

from concurrent.futures import TimeoutError
from google.cloud.pubsub import SubscriberClient
from google.protobuf.json_format import Parse

from utilities import us_states_pb2

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber listens for messages
# timeout = 5.0

subscriber = SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

# Instantiate a protoc-generated class defined in `us-states.proto`.
state = us_states_pb2.StateProto()

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    # Get the message serialization type.
    encoding = message.attributes.get("googclient_schemaencoding")
    # Deserialize the message data accordingly.
    if encoding == "BINARY":
        state.ParseFromString(message.data)
        print(f"Received a binary-encoded message:\n{state}")
    elif encoding == "JSON":
        Parse(message.data, state)
        print(f"Received a JSON-encoded message:\n{state}")
    else:
        print(f"Received a message with no encoding:\n{message}")

    message.ack()

streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception occurs first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Ruby

Antes de probar esta muestra, sigue las instrucciones de configuración de Ruby que se encuentran en la guía de inicio rápido de Pub/Sub con bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub Ruby.

Para autenticarte en Pub/Sub, configura las credenciales predeterminadas de la aplicación. Si deseas obtener más información, consulta Configura la autenticación para un entorno de desarrollo local.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

subscription = pubsub.subscription subscription_id

subscriber = subscription.listen do |received_message|
  encoding = received_message.attributes["googclient_schemaencoding"]
  case encoding
  when "BINARY"
    state = Utilities::StateProto.decode received_message.data
    puts "Received a binary-encoded message:\n#{state}"
  when "JSON"
    require "json"
    state = Utilities::StateProto.decode_json received_message.data
    puts "Received a JSON-encoded message:\n#{state}"
  else
    "Received a message with no encoding:\n#{received_message.message_id}"
  end
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

¿Qué sigue?

Para buscar y filtrar muestras de código para otros productos de Google Cloud, consulta el navegador de muestra de Google Cloud.