Ricezione di messaggi dalle sottoscrizioni Lite

In questa pagina viene spiegato come ricevere messaggi dalle sottoscrizioni Lite. Puoi ricevere messaggi con la libreria client Pub/Sub Lite per Java.

Le sottoscrizioni Lite connettono gli argomenti Lite alle applicazioni degli abbonati; i sottoscrittori ricevono messaggi dalle sottoscrizioni Lite. I sottoscrittori ricevono ogni messaggio inviato dalle applicazioni del publisher all'argomento Lite, inclusi i messaggi inviati dagli editori prima della creazione della sottoscrizione Lite.

Prima di ricevere messaggi da una sottoscrizione Lite, crea un argomento Lite, crea una sottoscrizione Lite per l'argomento Lite e pubblica messaggi nell'argomento Lite.

Ricezione di messaggi

Per ricevere messaggi da una sottoscrizione Lite, richiedi messaggi dalla sottoscrizione Lite. La libreria client si connette automaticamente alle partizioni nell'argomento Lite associato alla sottoscrizione Lite. Se vengono confermati più client abbonati, i messaggi verranno distribuiti tra tutti i client. Il numero di partizioni nell'argomento determina il numero massimo di client sottoscrittori che possono connettersi contemporaneamente a una sottoscrizione.

I sottoscrittori potrebbero impiegare fino a un minuto per l'inizializzazione e la ricezione dei messaggi. Dopo l'inizializzazione, i messaggi vengono ricevuti con una latenza minima.

Il seguente esempio mostra come ricevere messaggi dalle sottoscrizioni Lite:

gcloud

Questo comando richiede Python 3.6 o versioni successive e l'installazione del pacchetto Python grpcio. Per gli utenti di MacOS, Linux e Cloud Shell, esegui:

sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1

Per ricevere messaggi, utilizza il comando gcloud pubsub lite-subscriptions Subscribe:

gcloud pubsub lite-subscriptions subscribe SUBSCRIPTION_ID \
    --location=LITE_LOCATION \
    --auto-ack

Sostituisci quanto segue:

  • SUBSCRIPTION_ID: l'ID della sottoscrizione Lite
  • LITE_LOCATION: la località della sottoscrizione Lite

Go

Prima di eseguire questo esempio, segui le istruzioni per la configurazione di Go in Librerie client di Pub/Sub Lite.


package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"sync/atomic"
	"time"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
)

func main() {
	// NOTE: Set these flags for an existing Pub/Sub Lite subscription containing
	// published messages when running this sample.
	projectID := flag.String("project_id", "", "Cloud Project ID")
	zone := flag.String("zone", "", "Cloud Zone where the topic resides, e.g. us-central1-a")
	subscriptionID := flag.String("subscription_id", "", "Existing Pub/Sub Lite subscription")
	timeout := flag.Duration("timeout", 90*time.Second, "The duration to receive messages")
	flag.Parse()

	ctx := context.Background()
	subscriptionPath := fmt.Sprintf("projects/%s/locations/%s/subscriptions/%s", *projectID, *zone, *subscriptionID)

	// Configure flow control settings. These settings apply per partition.
	// The message stream is paused based on the maximum size or number of
	// messages that the subscriber has already received, whichever condition is
	// met first.
	settings := pscompat.ReceiveSettings{
		// 10 MiB. Must be greater than the allowed size of the largest message
		// (1 MiB).
		MaxOutstandingBytes: 10 * 1024 * 1024,
		// 1,000 outstanding messages. Must be > 0.
		MaxOutstandingMessages: 1000,
	}

	// Create the subscriber client.
	subscriber, err := pscompat.NewSubscriberClientWithSettings(ctx, subscriptionPath, settings)
	if err != nil {
		log.Fatalf("pscompat.NewSubscriberClientWithSettings error: %v", err)
	}

	// Listen for messages until the timeout expires.
	log.Printf("Listening to messages on %s for %v...\n", subscriptionPath, *timeout)
	cctx, cancel := context.WithTimeout(ctx, *timeout)
	defer cancel()
	var receiveCount int32

	// Receive blocks until the context is cancelled or an error occurs.
	if err := subscriber.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
		// NOTE: May be called concurrently; synchronize access to shared memory.
		atomic.AddInt32(&receiveCount, 1)

		// Metadata decoded from the message ID contains the partition and offset.
		metadata, err := pscompat.ParseMessageMetadata(msg.ID)
		if err != nil {
			log.Fatalf("Failed to parse %q: %v", msg.ID, err)
		}

		fmt.Printf("Received (partition=%d, offset=%d): %s\n", metadata.Partition, metadata.Offset, string(msg.Data))
		msg.Ack()
	}); err != nil {
		log.Fatalf("SubscriberClient.Receive error: %v", err)
	}

	fmt.Printf("Received %d messages\n", receiveCount)
}

Java

Prima di eseguire questo esempio, segui le istruzioni di configurazione Java in Librerie client di Pub/Sub Lite.

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.MessageTransforms;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
import com.google.protobuf.Timestamp;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscriberExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription for the subscribe example to work.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId, regional);
  }

  public static void subscriberExample(
      String cloudRegion, char zoneId, long projectNumber, String subscriptionId, boolean regional)
      throws ApiException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    // The message stream is paused based on the maximum size or number of messages that the
    // subscriber has already received, whichever condition is met first.
    FlowControlSettings flowControlSettings =
        FlowControlSettings.builder()
            // 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
            .setBytesOutstanding(10 * 1024 * 1024L)
            // 1,000 outstanding messages. Must be >0.
            .setMessagesOutstanding(1000L)
            .build();

    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          System.out.println("Id : " + MessageMetadata.decode(message.getMessageId()));
          System.out.println("Data : " + message.getData().toStringUtf8());
          System.out.println("Ordering key : " + message.getOrderingKey());
          System.out.println("Attributes : ");
          message
              .getAttributesMap()
              .forEach(
                  (key, value) -> {
                    if (key == MessageTransforms.PUBSUB_LITE_EVENT_TIME_TIMESTAMP_PROTO) {
                      Timestamp ts = MessageTransforms.decodeAttributeEventTime(value);
                      System.out.println(key + " = " + ts.toString());
                    } else {
                      System.out.println(key + " = " + value);
                    }
                  });

          // Acknowledge the message.
          consumer.ack();
        };

    SubscriberSettings subscriberSettings =
        SubscriberSettings.newBuilder()
            .setSubscriptionPath(subscriptionPath)
            .setReceiver(receiver)
            // Flow control settings are set at the partition level.
            .setPerPartitionFlowControlSettings(flowControlSettings)
            .build();

    Subscriber subscriber = Subscriber.create(subscriberSettings);

    // Start the subscriber. Upon successful starting, its state will become RUNNING.
    subscriber.startAsync().awaitRunning();

    System.out.println("Listening to messages on " + subscriptionPath.toString() + "...");

    try {
      System.out.println(subscriber.state());
      // Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters
      // unrecoverable errors before then, its state will change to FAILED and an
      // IllegalStateException will be thrown.
      subscriber.awaitTerminated(90, TimeUnit.SECONDS);
    } catch (TimeoutException t) {
      // Shut down the subscriber. This will change the state of the subscriber to TERMINATED.
      subscriber.stopAsync().awaitTerminated();
      System.out.println("Subscriber is shut down: " + subscriber.state());
    }
  }
}

Python

Prima di eseguire questo esempio, segui le istruzioni di configurazione di Python in Librerie client di Pub/Sub Lite.

from concurrent.futures._base import TimeoutError
from google.pubsub_v1 import PubsubMessage
from google.cloud.pubsublite.cloudpubsub import SubscriberClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    FlowControlSettings,
    MessageMetadata,
    SubscriptionPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# timeout = 90
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

subscription_path = SubscriptionPath(project_number, location, subscription_id)
# Configure when to pause the message stream for more incoming messages based on the
# maximum size or number of messages that a single-partition subscriber has received,
# whichever condition is met first.
per_partition_flow_control_settings = FlowControlSettings(
    # 1,000 outstanding messages. Must be >0.
    messages_outstanding=1000,
    # 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
    bytes_outstanding=10 * 1024 * 1024,
)

def callback(message: PubsubMessage):
    message_data = message.data.decode("utf-8")
    metadata = MessageMetadata.decode(message.message_id)
    print(
        f"Received {message_data} of ordering key {message.ordering_key} with id {metadata}."
    )
    message.ack()

# SubscriberClient() must be used in a `with` block or have __enter__() called before use.
with SubscriberClient() as subscriber_client:

    streaming_pull_future = subscriber_client.subscribe(
        subscription_path,
        callback=callback,
        per_partition_flow_control_settings=per_partition_flow_control_settings,
    )

    print(f"Listening for messages on {str(subscription_path)}...")

    try:
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError or KeyboardInterrupt:
        streaming_pull_future.cancel()
        assert streaming_pull_future.done()

La libreria client stabilisce connessioni di streaming bidirezionali a ciascuna partizione nell'argomento Lite.

  1. Il sottoscrittore richiede le connessioni alle partizioni.

  2. Il servizio Pub/Sub Lite recapita i messaggi al sottoscrittore.

Dopo che il sottoscrittore ha elaborato il messaggio, deve accettare il messaggio. La libreria client elabora e conferma in modo asincrono i messaggi in un callback. Per limitare il numero di messaggi non confermati che il sottoscrittore può archiviare in memoria, configura le impostazioni di controllo del flusso.

Se più sottoscrittori ricevono messaggi dalla stessa sottoscrizione Lite, il servizio Pub/Sub Lite connette ciascun sottoscrittore a una stessa proporzione di partizioni. Ad esempio, se due sottoscrittori utilizzano la stessa sottoscrizione Lite e la sottoscrizione Lite è collegata a un argomento Lite con due partizioni, ogni sottoscrittore riceve messaggi da una delle partizioni.

Conferma dei messaggi

Per confermare un messaggio, invia una conferma alla sottoscrizione Lite.

Go

Per inviare una conferma, utilizza il metodo Message.Ack().

Java

Per inviare una conferma, utilizza il metodo AckReplyConsumer.ack().

Python

Per inviare una conferma, utilizza il metodo Message.ack().

I sottoscrittori devono confermare la ricezione di ogni messaggio. I sottoscrittori ricevono per primo il messaggio non confermato meno recente, seguito da ogni messaggio successivo. Se un sottoscrittore salta un messaggio, conferma i messaggi successivi e poi si riconnette, riceve il messaggio non confermato e ogni messaggio confermato successivo.

Le sottoscrizioni Lite non hanno una scadenza di conferma e il servizio Pub/Sub Lite non consegna nuovamente i messaggi non confermati su una connessione di streaming aperta.

Utilizzo del controllo del flusso

Dopo che il servizio Pub/Sub Lite consegna i messaggi ai sottoscrittori, i sottoscrittori archiviano i messaggi non confermati in memoria. Puoi limitare il numero di messaggi in sospeso che gli abbonati possono archiviare in memoria utilizzando le impostazioni di controllo del flusso. Le impostazioni di controllo del flusso si applicano a ogni partizione da cui un sottoscrittore riceve messaggi.

Puoi configurare le seguenti impostazioni di controllo del flusso:

  • Dimensioni del messaggio eccezionali. La dimensione massima, in byte, dei messaggi in sospeso. Le dimensioni massime devono essere maggiori di quelle del messaggio più grande.
  • Numero di messaggi. Il numero massimo di messaggi in sospeso.

Le dimensioni di un messaggio sono indicate nel campo size_bytes. Puoi configurare le impostazioni del controllo del flusso con la libreria client.

Go

Per configurare le impostazioni di controllo del flusso, passa ReceiveSettings durante la chiamata a pscompat.NewSubscriberClientWithSettings. Puoi impostare i seguenti parametri in ReceiveSettings:

  • MaxOutstandingMessages

  • MaxOutstandingBytes

Per un esempio, consulta questo esempio di controllo del flusso.

Java

Per configurare le impostazioni di controllo del flusso, utilizza i seguenti metodi nella classe FlowControlRequest.Builder:

Python

Per configurare le impostazioni del controllo del flusso, imposta i seguenti parametri nella classe FlowControlSettings:

  • bytes_outstanding

  • messages_outstanding

Ad esempio, se il numero massimo di messaggi è 100 e il sottoscrittore si connette a 10 partizioni, il sottoscrittore non può ricevere più di 100 messaggi da nessuna delle 10 partizioni. Il numero totale di messaggi in sospeso potrebbe essere maggiore di 100, ma il sottoscrittore non può archiviare più di 100 messaggi da ogni partizione.