Recibe mensajes de suscripciones Lite

En esta página, se explica cómo recibir mensajes de las suscripciones de Lite. Puedes recibir mensajes con la biblioteca cliente de Pub/Sub Lite para Java.

Las suscripciones Lite conectan temas de Lite con aplicaciones de suscriptores. Los suscriptores reciben mensajes de suscripciones Lite. Los suscriptores reciben todos los mensajes que las aplicaciones de publicador envían al tema Lite, incluidos los mensajes que los publicadores envían antes de crear la suscripción Lite.

Antes de recibir mensajes de una suscripción Lite, crea un tema Lite, crea una suscripción Lite al tema de Lite y publica mensajes allí.

Recibe mensajes

Para recibir mensajes de una suscripción Lite, solicita mensajes desde la suscripción Lite. La biblioteca cliente se conecta de forma automática a las particiones en el tema Lite conectado con la suscripción Lite. Si se crea más de una instancia de cliente suscriptor, los mensajes se distribuirán entre todos los clientes. La cantidad de particiones del tema determina la cantidad máxima de clientes suscriptores que pueden conectarse a una suscripción de forma simultánea.

Los suscriptores pueden tardar hasta un minuto en inicializarse y comenzar a recibir mensajes. Después de la inicialización, los mensajes se reciben con una latencia mínima.

En el siguiente ejemplo, se muestra cómo recibir mensajes de suscripciones Lite:

gcloud

Este comando requiere Python 3.6 o una versión superior, y requiere que se instale el paquete grpcio para Python. Para usuarios de MacOS, Linux y Cloud Shell, ejecuta lo siguiente:

sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1

Para recibir mensajes, usa el comando gcloud pubsub lite-subscriptions subscribe:

gcloud pubsub lite-subscriptions subscribe SUBSCRIPTION_ID \
    --location=LITE_LOCATION \
    --auto-ack

Reemplaza lo siguiente:

  • SUBSCRIPTION_ID: Es el ID de la suscripción Lite.
  • LITE_LOCATION: Es la ubicación de la suscripción a Lite.

Go

Antes de ejecutar esta muestra, sigue las instrucciones de configuración de Go en las bibliotecas cliente de Pub/Sub Lite.


package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
)

func main() {
	// NOTE: Set these flags for an existing Pub/Sub Lite subscription containing
	// published messages when running this sample.
	projectID := flag.String("project_id", "", "Cloud Project ID")
	zone := flag.String("zone", "", "Cloud Zone where the topic resides, e.g. us-central1-a")
	subscriptionID := flag.String("subscription_id", "", "Existing Pub/Sub Lite subscription")
	timeout := flag.Duration("timeout", 90*time.Second, "The duration to receive messages")
	flag.Parse()

	ctx := context.Background()
	subscriptionPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", *projectID, *zone, *subscriptionID)

	// Configure flow control settings. These settings apply per partition.
	// The message stream is paused based on the maximum size or number of
	// messages that the subscriber has already received, whichever condition is
	// met first.
	settings := pscompat.ReceiveSettings{
		// 10 MiB. Must be greater than the allowed size of the largest message
		// (1 MiB).
		MaxOutstandingBytes: 10 * 1024 * 1024,
		// 1,000 outstanding messages. Must be > 0.
		MaxOutstandingMessages: 1000,
	}

	// Create the subscriber client.
	subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscriptionPath, settings)
	if err != nil {
		log.Fatalf("pscompat.NewSubscriberClientWithSettings error: %v", err)
	}

	// Listen for messages until the timeout expires.
	log.Printf("Listening to messages on %s for %v...\n", subscriptionPath, *timeout)
	cctx, cancel := context.WithTimeout(ctx, *timeout)
	defer cancel()
	var receiveCount int32

	// Receive blocks until the context is cancelled or an error occurs.
	if err := subscriber.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
		// NOTE: May be called concurrently; synchronize access to shared memory.
		atomic.AddInt32(&receiveCount, 1)

		// Metadata decoded from the message ID contains the partition and offset.
		metadata, err := pscompat.ParseMessageMetadata(msg.ID)
		if err != nil {
			log.Fatalf("Failed to parse %q: %v", msg.ID, err)
		}

		fmt.Printf("Received (partition=%d, offset=%d): %s\n", metadata.Partition, metadata.Offset, string(msg.Data))
		msg.Ack()
	}); err != nil {
		log.Fatalf("SubscriberClient.Receive error: %v", err)
	}

	fmt.Printf("Received %d messages\n", receiveCount)
}

Java

Antes de ejecutar esta muestra, sigue las instrucciones de configuración de Java en las bibliotecas cliente de Pub/Sub Lite.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.MessageTransforms;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscriberExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription for the subscribe example to work.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId, regional);
  }

  public static void subscriberExample(
      String cloudRegion, char zoneId, long projectNumber, String subscriptionId, boolean regional)
      throws ApiException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    // The message stream is paused based on the maximum size or number of messages that the
    // subscriber has already received, whichever condition is met first.
    FlowControlSettings flowControlSettings =
        FlowControlSettings.builder()
            // 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
            .setBytesOutstanding(10 * 1024 * 1024L)
            // 1,000 outstanding messages. Must be >0.
            .setMessagesOutstanding(1000L)
            .build();

    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          System.out.println("Id : " + MessageMetadata.decode(message.getMessageId()));
          System.out.println("Data : " + message.getData().toStringUtf8());
          System.out.println("Ordering key : " + message.getOrderingKey());
          System.out.println("Attributes : ");
          message
              .getAttributesMap()
              .forEach(
                  (key, value) -> {
                    if (key == MessageTransforms.PUBSUB_LITE_EVENT_TIME_TIMESTAMP_PROTO) {
                      Timestamp ts = MessageTransforms.decodeAttributeEventTime(value);
                      System.out.println(key + " = " + ts.toString());
                    } else {
                      System.out.println(key + " = " + value);
                    }
                  });

          // Acknowledge the message.
          consumer.ack();
        };

    SubscriberSettings subscriberSettings =
        SubscriberSettings.newBuilder()
            .setSubscriptionPath(subscriptionPath)
            .setReceiver(receiver)
            // Flow control settings are set at the partition level.
            .setPerPartitionFlowControlSettings(flowControlSettings)
            .build();

    Subscriber subscriber = Subscriber.create(subscriberSettings);

    // Start the subscriber. Upon successful starting, its state will become RUNNING.
    subscriber.startAsync().awaitRunning();

    System.out.println("Listening to messages on " + subscriptionPath.toString() + "...");

    try {
      System.out.println(subscriber.state());
      // Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters
      // unrecoverable errors before then, its state will change to FAILED and an
      // IllegalStateException will be thrown.
      subscriber.awaitTerminated(90, TimeUnit.SECONDS);
    } catch (TimeoutException t) {
      // Shut down the subscriber. This will change the state of the subscriber to TERMINATED.
      subscriber.stopAsync().awaitTerminated();
      System.out.println("Subscriber is shut down: " + subscriber.state());
    }
  }
}

Python

Antes de ejecutar esta muestra, sigue las instrucciones de configuración de Python en las bibliotecas cliente de Pub/Sub Lite.

from concurrent.futures._base import TimeoutError
from google.pubsub_v1 import PubsubMessage
from google.cloud.pubsublite.cloudpubsub import SubscriberClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    FlowControlSettings,
    MessageMetadata,
    SubscriptionPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# timeout = 90
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

subscription_path = SubscriptionPath(project_number, location, subscription_id)
# Configure when to pause the message stream for more incoming messages based on the
# maximum size or number of messages that a single-partition subscriber has received,
# whichever condition is met first.
per_partition_flow_control_settings = FlowControlSettings(
    # 1,000 outstanding messages. Must be >0.
    messages_outstanding=1000,
    # 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
    bytes_outstanding=10 * 1024 * 1024,
)

def callback(message: PubsubMessage):
    message_data = message.data.decode("utf-8")
    metadata = MessageMetadata.decode(message.message_id)
    print(
        f"Received {message_data} of ordering key {message.ordering_key} with id {metadata}."
    )
    message.ack()

# SubscriberClient() must be used in a `with` block or have __enter__() called before use.
with SubscriberClient() as subscriber_client:

    streaming_pull_future = subscriber_client.subscribe(
        subscription_path,
        callback=callback,
        per_partition_flow_control_settings=per_partition_flow_control_settings,
    )

    print(f"Listening for messages on {str(subscription_path)}...")

    try:
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError or KeyboardInterrupt:
        streaming_pull_future.cancel()
        assert streaming_pull_future.done()

La biblioteca cliente establece conexiones de transmisión bidireccional para cada una de las particiones del tema de Lite.

  1. El suscriptor solicita conexiones a las particiones.

  2. El servicio Lite de Pub/Sub entrega los mensajes al suscriptor.

Después de que el suscriptor procese el mensaje, el suscriptor debe confirmar la recepción del mensaje. La biblioteca cliente procesa y confirma la recepción de los mensajes forma asíncrona en una devolución de llamada. Para limitar la cantidad de mensajes no confirmados que el suscriptor puede almacenar en la memoria, establece la configuración del control de flujo.

Si varios suscriptores reciben mensajes de la misma suscripción Lite, el servicio Lite de Pub/Sub conecta cada suscriptor con una proporción igual de particiones. Por ejemplo, si dos suscriptores usan la misma suscripción Lite y la suscripción está conectada a un tema Lite con dos particiones, cada suscriptor recibe mensajes desde una de las particiones.

Confirma mensajes

Para confirmar recepción de un mensaje, envía una confirmación de recepción a la suscripción Lite.

Go

Para enviar una confirmación de recepción, usa el método Message.Ack().

Java

Para enviar una confirmación de recepción, usa el método AckReplyConsumer.ack().

Python

Para enviar una confirmación de recepción, usa el método Message.ack().

Los suscriptores deben confirmar todos los mensajes. Los suscriptores reciben primero el mensaje no confirmado más antiguo, seguido de cada mensaje subsiguiente. Si un suscriptor omite un mensaje, confirma los mensajes posteriores y se vuelve a conectar, el suscriptor recibe el mensaje no confirmado y cada uno confirmado posterior.

Las suscripciones Lite no tienen un plazo de confirmación y el servicio Lite de Pub/Sub no vuelve a entregar los mensajes no confirmados a través de una conexión de transmisión abierta.

Usa el control de flujo

Después de que el servicio de Pub/Sub Lite entrega mensajes a los suscriptores, los suscriptores almacenan mensajes no confirmados en la memoria. Puedes limitar la cantidad de mensajes pendientes que los suscriptores pueden almacenar en la memoria mediante la configuración de control de flujo. La configuración del control de flujo se aplica a cada partición de la que un suscriptor recibe mensajes.

Puedes definir la siguiente configuración del control de flujo:

  • Tamaño de mensajes pendientes. El tamaño máximo de los mensajes pendientes en bytes. El tamaño máximo debe ser mayor que el tamaño del mensaje más grande.
  • Cantidad de mensajes La cantidad máxima de mensajes pendientes.

El tamaño de un mensaje se encuentra en el campo size_bytes. Puedes configurar los controles de control de flujo con la biblioteca cliente.

Go

Para configurar los controles de flujo, pasa ReceiveSettings cuando llames a pscompat.NewSubscriberClientWithSettings. Puedes configurar los siguientes parámetros en ReceiveSettings:

  • MaxOutstandingMessages

  • MaxOutstandingBytes

Para ver un ejemplo, consulta esta muestra de control de flujo.

Java

Para establecer la configuración del control de flujo, usa los siguientes métodos en la clase FlowControlRequest.Builder:

Python

Para configurar los controles de flujo, establece los siguientes parámetros en la clase FlowControlSettings:

  • bytes_outstanding

  • messages_outstanding

Por ejemplo, si el número máximo de mensajes es 100 y el suscriptor se conecta a 10 particiones, el suscriptor no puede recibir más de 100 mensajes de cualquiera de las 10 particiones. La cantidad total de mensajes pendientes podría ser mayor a 100, pero el suscriptor no puede almacenar más de 100 mensajes de cada partición.