Como receber mensagens de assinaturas do Lite

Esta página explica como receber mensagens de assinaturas Lite. Você pode receber mensagens com a biblioteca de cliente do Pub/Sub Lite para Java.

As assinaturas Lite conectam os tópicos Lite aos aplicativos dos assinantes. Os assinantes recebem mensagens de assinaturas Lite. Os assinantes recebem todas as mensagens que os aplicativos do editor enviam para o tópico do Lite, incluindo as mensagens que os editores enviam antes de criar a assinatura do Lite.

Antes de receber mensagens de uma assinatura do Lite, crie um tópico do Lite, uma assinatura do Lite para esse tópico e publique mensagens para o tópico do Lite.

Receber mensagens

Para receber mensagens de uma assinatura do Lite, solicite mensagens da assinatura do Lite. A biblioteca de cliente se conecta automaticamente às partições no tópico do Lite anexado à assinatura do Lite. Se mais de um cliente assinante for instanciado, as mensagens serão distribuídas entre todos eles. O número de partições no tópico determina a quantidade máxima de clientes assinantes que podem se conectar a uma assinatura ao mesmo tempo.

Os assinantes podem levar até um minuto para inicializar e começar a receber mensagens. Após a inicialização, as mensagens são recebidas com latência mínima.

O exemplo a seguir mostra como receber mensagens de assinaturas do Lite:

gcloud

Este comando requer o Python 3.6 ou superior e a instalação do pacote Python grpcio. Para usuários do MacOS, Linux e Cloud Shell, execute o seguinte:

sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1

Para receber mensagens, use o comando gcloud pubsub lite-subscriptions subscribe:

gcloud pubsub lite-subscriptions subscribe SUBSCRIPTION_ID \
    --location=LITE_LOCATION \
    --auto-ack

Substitua:

  • SUBSCRIPTION_ID: o ID da assinatura Lite
  • LITE_LOCATION: o local da assinatura do Lite.

Go

Antes de executar este exemplo, siga as instruções de configuração do Go em Bibliotecas de cliente do Pub/Sub Lite.


package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
)

func main() {
	// NOTE: Set these flags for an existing Pub/Sub Lite subscription containing
	// published messages when running this sample.
	projectID := flag.String("project_id", "", "Cloud Project ID")
	zone := flag.String("zone", "", "Cloud Zone where the topic resides, e.g. us-central1-a")
	subscriptionID := flag.String("subscription_id", "", "Existing Pub/Sub Lite subscription")
	timeout := flag.Duration("timeout", 90*time.Second, "The duration to receive messages")
	flag.Parse()

	ctx := context.Background()
	subscriptionPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", *projectID, *zone, *subscriptionID)

	// Configure flow control settings. These settings apply per partition.
	// The message stream is paused based on the maximum size or number of
	// messages that the subscriber has already received, whichever condition is
	// met first.
	settings := pscompat.ReceiveSettings{
		// 10 MiB. Must be greater than the allowed size of the largest message
		// (1 MiB).
		MaxOutstandingBytes: 10 * 1024 * 1024,
		// 1,000 outstanding messages. Must be > 0.
		MaxOutstandingMessages: 1000,
	}

	// Create the subscriber client.
	subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscriptionPath, settings)
	if err != nil {
		log.Fatalf("pscompat.NewSubscriberClientWithSettings error: %v", err)
	}

	// Listen for messages until the timeout expires.
	log.Printf("Listening to messages on %s for %v...\n", subscriptionPath, *timeout)
	cctx, cancel := context.WithTimeout(ctx, *timeout)
	defer cancel()
	var receiveCount int32

	// Receive blocks until the context is cancelled or an error occurs.
	if err := subscriber.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
		// NOTE: May be called concurrently; synchronize access to shared memory.
		atomic.AddInt32(&receiveCount, 1)

		// Metadata decoded from the message ID contains the partition and offset.
		metadata, err := pscompat.ParseMessageMetadata(msg.ID)
		if err != nil {
			log.Fatalf("Failed to parse %q: %v", msg.ID, err)
		}

		fmt.Printf("Received (partition=%d, offset=%d): %s\n", metadata.Partition, metadata.Offset, string(msg.Data))
		msg.Ack()
	}); err != nil {
		log.Fatalf("SubscriberClient.Receive error: %v", err)
	}

	fmt.Printf("Received %d messages\n", receiveCount)
}

Java

Antes de executar este exemplo, siga as instruções de configuração do Java nas bibliotecas de cliente do Pub/Sub Lite.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.MessageTransforms;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscriberExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription for the subscribe example to work.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId, regional);
  }

  public static void subscriberExample(
      String cloudRegion, char zoneId, long projectNumber, String subscriptionId, boolean regional)
      throws ApiException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    // The message stream is paused based on the maximum size or number of messages that the
    // subscriber has already received, whichever condition is met first.
    FlowControlSettings flowControlSettings =
        FlowControlSettings.builder()
            // 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
            .setBytesOutstanding(10 * 1024 * 1024L)
            // 1,000 outstanding messages. Must be >0.
            .setMessagesOutstanding(1000L)
            .build();

    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          System.out.println("Id : " + MessageMetadata.decode(message.getMessageId()));
          System.out.println("Data : " + message.getData().toStringUtf8());
          System.out.println("Ordering key : " + message.getOrderingKey());
          System.out.println("Attributes : ");
          message
              .getAttributesMap()
              .forEach(
                  (key, value) -> {
                    if (key == MessageTransforms.PUBSUB_LITE_EVENT_TIME_TIMESTAMP_PROTO) {
                      Timestamp ts = MessageTransforms.decodeAttributeEventTime(value);
                      System.out.println(key + " = " + ts.toString());
                    } else {
                      System.out.println(key + " = " + value);
                    }
                  });

          // Acknowledge the message.
          consumer.ack();
        };

    SubscriberSettings subscriberSettings =
        SubscriberSettings.newBuilder()
            .setSubscriptionPath(subscriptionPath)
            .setReceiver(receiver)
            // Flow control settings are set at the partition level.
            .setPerPartitionFlowControlSettings(flowControlSettings)
            .build();

    Subscriber subscriber = Subscriber.create(subscriberSettings);

    // Start the subscriber. Upon successful starting, its state will become RUNNING.
    subscriber.startAsync().awaitRunning();

    System.out.println("Listening to messages on " + subscriptionPath.toString() + "...");

    try {
      System.out.println(subscriber.state());
      // Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters
      // unrecoverable errors before then, its state will change to FAILED and an
      // IllegalStateException will be thrown.
      subscriber.awaitTerminated(90, TimeUnit.SECONDS);
    } catch (TimeoutException t) {
      // Shut down the subscriber. This will change the state of the subscriber to TERMINATED.
      subscriber.stopAsync().awaitTerminated();
      System.out.println("Subscriber is shut down: " + subscriber.state());
    }
  }
}

Python

Antes de executar este exemplo, siga as instruções de configuração do Java nas bibliotecas de cliente do Pub/Sub Lite.

from concurrent.futures._base import TimeoutError
from google.pubsub_v1 import PubsubMessage
from google.cloud.pubsublite.cloudpubsub import SubscriberClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    FlowControlSettings,
    MessageMetadata,
    SubscriptionPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# timeout = 90
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

subscription_path = SubscriptionPath(project_number, location, subscription_id)
# Configure when to pause the message stream for more incoming messages based on the
# maximum size or number of messages that a single-partition subscriber has received,
# whichever condition is met first.
per_partition_flow_control_settings = FlowControlSettings(
    # 1,000 outstanding messages. Must be >0.
    messages_outstanding=1000,
    # 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
    bytes_outstanding=10 * 1024 * 1024,
)

def callback(message: PubsubMessage):
    message_data = message.data.decode("utf-8")
    metadata = MessageMetadata.decode(message.message_id)
    print(
        f"Received {message_data} of ordering key {message.ordering_key} with id {metadata}."
    )
    message.ack()

# SubscriberClient() must be used in a `with` block or have __enter__() called before use.
with SubscriberClient() as subscriber_client:

    streaming_pull_future = subscriber_client.subscribe(
        subscription_path,
        callback=callback,
        per_partition_flow_control_settings=per_partition_flow_control_settings,
    )

    print(f"Listening for messages on {str(subscription_path)}...")

    try:
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError or KeyboardInterrupt:
        streaming_pull_future.cancel()
        assert streaming_pull_future.done()

A biblioteca de cliente estabelece conexões de streaming bidirecionais para cada uma das partições no tópico do Lite.

  1. O assinante solicita conexões às partições.

  2. O serviço Pub/Sub Lite entrega as mensagens ao assinante.

Depois de processar a mensagem, o assinante precisa confirmá-la. A biblioteca de cliente processa e confirma mensagens de maneira assíncrona em um callback. Para limitar o número de mensagens não confirmadas que o assinante pode armazenar na memória, defina as configurações de controle de fluxo.

Se vários assinantes receberem mensagens da mesma assinatura do Lite, o serviço Lite do Pub/Sub conectará cada assinante a uma proporção igual de partições. Por exemplo, se dois assinantes usarem a mesma assinatura do Lite e a assinatura do Lite estiver anexada a um tópico do Lite com duas partições, cada assinante receberá mensagens de uma das partições.

Como confirmar mensagens

Para confirmar uma mensagem, envie uma confirmação para a assinatura Lite.

Go

Para enviar uma confirmação, use o método Message.Ack().

Java

Para enviar uma confirmação, use o método AckReplyConsumer.ack().

Python

Para enviar uma confirmação, use o método Message.ack().

Os assinantes precisam confirmar todas as mensagens. Eles recebem primeiro a mensagem não confirmada mais antiga, seguida por cada uma das que foram enviadas. Se um assinante ignorar uma mensagem, confirmar as posteriores e se reconectar, ele receberá a mensagem não confirmada e cada mensagem posterior confirmada.

As assinaturas do Lite não têm um prazo de confirmação e o serviço Pub/Sub Lite não reenvia mensagens não confirmadas por uma conexão de streaming aberto.

Como usar o controle de fluxo

Depois que o serviço do Pub/Sub Lite entrega mensagens aos assinantes, eles armazenam mensagens não confirmadas na memória. É possível limitar o número de mensagens pendentes que os assinantes podem armazenar na memória usando as configurações de controle de fluxo. As configurações de controle de fluxo são aplicadas a cada partição de que um assinante recebe mensagens.

É possível definir as seguintes configurações de controle de fluxo:

  • Tamanho de mensagem pendente. O tamanho máximo, em bytes, das mensagens subsequentes. O tamanho máximo precisa ser maior que o tamanho da maior mensagem.
  • Número de mensagens. O número máximo de mensagens pendentes.

O tamanho de uma mensagem está no campo size_bytes (em inglês). É possível definir as configurações de controle de fluxo com a biblioteca de cliente.

Go

Para definir as configurações de controle de fluxo, transmita ReceiveSettings. ao chamar pscompat.NewSubscriberClientWithSettings. É possível definir os seguintes parâmetros no ReceiveSettings:

  • MaxOutstandingMessages

  • MaxOutstandingBytes

Consulte este exemplo de controle de fluxo (link em inglês).

Java

Para definir as configurações do controle de fluxo, use os seguintes métodos na classe FlowControlRequest.Builder:

Python

Para definir as configurações do controle de fluxo, defina os seguintes parâmetros na classe FlowControlSettings:

  • bytes_outstanding

  • messages_outstanding

Por exemplo, se o número máximo de mensagens for 100 e o assinante se conectar a 10 partições, não será possível receber mais de 100 mensagens de nenhuma das 10 partições. O número total de mensagens pendentes pode ser maior que 100, mas o assinante não pode armazenar mais de 100 mensagens de cada partição.