Estendi il tempo di conferma con la gestione dei leasing

Quando un messaggio viene recapitato a un sottoscrittore di tipo pull, il sottoscrittore deve elaborare e confermare (ack) il messaggio entro la scadenza di conferma. In caso contrario, l'abbonato deve estendere la scadenza con una chiamata per modificare la scadenza per la conferma.

Le librerie client di alto livello di Pub/Sub offrono la gestione del leasing come funzionalità che estende automaticamente la scadenza di un messaggio non ancora confermato. Per impostazione predefinita, le librerie client possono estendere la scadenza a un'ora inviando richieste modifyAckDeadline periodiche.Le librerie client di alto livello per Python, Go, Java e .Net utilizzano il 99° percentile di ritardo di conferma per determinare la lunghezza di ogni estensione.

La gestione del leasing consente di avere un controllo più granulare sulla scadenza di conferma dei messaggi rispetto alla configurazione della proprietà a livello di abbonamento. Se utilizzi solo la scadenza di conferma a livello di abbonamento, devi trovare un equilibrio tra un valore basso e uno di alto valore. Un valore basso aumenta la probabilità che si verifichino duplicati, mentre un valore elevato ritarda la riconsegna dei messaggi non riusciti. Determinare il valore giusto può essere difficile, soprattutto quando il tempo di elaborazione previsto per messaggi diversi può variare notevolmente.

Per ulteriori informazioni sulle proprietà di un abbonamento, inclusa la scadenza della conferma, consulta Proprietà degli abbonamenti.

Configurazione della gestione del leasing

Puoi configurare le seguenti proprietà nelle librerie client di alto livello per controllare la gestione del lease.

  • Periodo di estensione massimo della conferma. Il periodo di tempo massimo entro il quale è possibile estendere la scadenza della conferma di un messaggio utilizzando la richiesta modify acknowledgment deadline. Questa proprietà ti consente di determinare quanto tempo vuoi che i client del sottoscrittore elaborino i messaggi.

  • Durata massima di ogni estensione di conferma. Il periodo di tempo massimo entro il quale estendere la scadenza di conferma per ciascuna delle richieste modify acknowledgment deadline. che consente di definire la quantità di tempo necessaria a Pub/Sub per recapitare un messaggio. La nuova consegna si verifica quando il primo sottoscrittore che elabora il messaggio si arresta in modo anomalo o non è integro e non è più in grado di inviare la richiesta modify acknowledgment deadline.

  • Durata minima per ogni estensione di conferma. Il periodo di tempo minimo entro il quale estendere la scadenza di conferma per ogni richiesta modify acknowledgment deadline. Questa proprietà consente di specificare la quantità minima di tempo che deve trascorrere prima che un messaggio venga recapitato nuovamente.

Non è garantito che le scadenze di conferma vengano rispettate, a meno che non abiliti la consegna "exactly-once".

Gestione manuale delle scadenze di conferma

Per evitare che i messaggi scadano e vengano recapitati nuovamente quando utilizzi il pull unario o le librerie client di basso livello, utilizza la richiesta modify acknowledgment deadline per estendere le scadenze di conferma. Fanno eccezione le librerie client di alto livello Go e C++, che offrono la gestione del lease quando si utilizza il pull unario. Consulta i seguenti esempi di pull unario con gestione del lease:

C#

Prima di provare questo esempio, segui le istruzioni di configurazione di C# nella Guida rapida sull'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API C# di Pub/Sub.


using Google.Cloud.PubSub.V1;
using Grpc.Core;
using System;
using System.Collections.Generic;

public class PullMessageWithLeaseManagementSample
{
    public int PullMessageWithLeaseManagement(string projectId, string subscriptionId, bool acknowledge)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        SubscriberServiceApiClient subscriberClient = SubscriberServiceApiClient.Create();

        var ackIds = new List<string>();
        try
        {
            PullResponse response = subscriberClient.Pull(subscriptionName, maxMessages: 20);

            // Print out each received message.
            foreach (ReceivedMessage msg in response.ReceivedMessages)
            {
                ackIds.Add(msg.AckId);
                string text = msg.Message.Data.ToStringUtf8();
                Console.WriteLine($"Message {msg.Message.MessageId}: {text}");

                // Modify the ack deadline of each received message from the default 10 seconds to 30.
                // This prevents the server from redelivering the message after the default 10 seconds
                // have passed.
                subscriberClient.ModifyAckDeadline(subscriptionName, new List<string> { msg.AckId }, 30);
            }
            // If acknowledgement required, send to server.
            if (acknowledge && ackIds.Count > 0)
            {
                subscriberClient.Acknowledge(subscriptionName, ackIds);
            }
        }
        catch (RpcException ex) when (ex.Status.StatusCode == StatusCode.Unavailable)
        {
            // UNAVAILABLE due to too many concurrent pull requests pending for the given subscription.
        }
        return ackIds.Count;
    }
}

Java

Prima di provare questo esempio, segui le istruzioni di configurazione Java in Guida rapida sull'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Java Pub/Sub.


import com.google.cloud.pubsub.v1.stub.GrpcSubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStub;
import com.google.cloud.pubsub.v1.stub.SubscriberStubSettings;
import com.google.pubsub.v1.AcknowledgeRequest;
import com.google.pubsub.v1.ModifyAckDeadlineRequest;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PullRequest;
import com.google.pubsub.v1.PullResponse;
import com.google.pubsub.v1.ReceivedMessage;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

public class SubscribeSyncWithLeaseExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";
    Integer numOfMessages = 10;

    subscribeSyncWithLeaseExample(projectId, subscriptionId, numOfMessages);
  }

  public static void subscribeSyncWithLeaseExample(
      String projectId, String subscriptionId, Integer numOfMessages)
      throws IOException, InterruptedException {
    SubscriberStubSettings subscriberStubSettings =
        SubscriberStubSettings.newBuilder()
            .setTransportChannelProvider(
                SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
                    .setMaxInboundMessageSize(20 << 20) // 20 MB
                    .build())
            .build();

    try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) {

      String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);

      PullRequest pullRequest =
          PullRequest.newBuilder()
              .setMaxMessages(numOfMessages)
              .setSubscription(subscriptionName)
              .build();

      // Use pullCallable().futureCall to asynchronously perform this operation.
      PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);

      // Stop the program if the pull response is empty to avoid acknowledging
      // an empty list of ack IDs.
      if (pullResponse.getReceivedMessagesList().isEmpty()) {
        System.out.println("No message was pulled. Exiting.");
        return;
      }

      List<String> ackIds = new ArrayList<>();
      for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) {
        ackIds.add(message.getAckId());

        // Modify the ack deadline of each received message from the default 10 seconds to 30.
        // This prevents the server from redelivering the message after the default 10 seconds
        // have passed.
        ModifyAckDeadlineRequest modifyAckDeadlineRequest =
            ModifyAckDeadlineRequest.newBuilder()
                .setSubscription(subscriptionName)
                .addAckIds(message.getAckId())
                .setAckDeadlineSeconds(30)
                .build();

        subscriber.modifyAckDeadlineCallable().call(modifyAckDeadlineRequest);
      }

      // Acknowledge received messages.
      AcknowledgeRequest acknowledgeRequest =
          AcknowledgeRequest.newBuilder()
              .setSubscription(subscriptionName)
              .addAllAckIds(ackIds)
              .build();

      // Use acknowledgeCallable().futureCall to asynchronously perform this operation.
      subscriber.acknowledgeCallable().call(acknowledgeRequest);
      System.out.println(pullResponse.getReceivedMessagesList());
    }
  }
}

Node.js

Prima di provare questo esempio, segui le istruzioni di configurazione di Node.js in Guida rapida sull'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Node.js Pub/Sub.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const projectId = 'YOUR_PROJECT_ID';
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';

// Imports the Google Cloud client library. v1 is for the lower level
// proto access.
const {v1} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use.
const subClient = new v1.SubscriberClient();

async function synchronousPullWithLeaseManagement() {
  // The low level API client requires a name only.
  const formattedSubscription =
    subscriptionNameOrId.indexOf('/') >= 0
      ? subscriptionNameOrId
      : subClient.subscriptionPath(projectId, subscriptionNameOrId);

  // The maximum number of messages returned for this request.
  // Pub/Sub may return fewer than the number specified.
  const maxMessages = 1;
  const newAckDeadlineSeconds = 30;
  const request = {
    subscription: formattedSubscription,
    maxMessages: maxMessages,
    allowExcessMessages: false,
  };

  let isProcessed = false;

  // The worker function is meant to be non-blocking. It starts a long-
  // running process, such as writing the message to a table, which may
  // take longer than the default 10-sec acknowledge deadline.
  function worker(message) {
    console.log(`Processing "${message.message.data}"...`);

    setTimeout(() => {
      console.log(`Finished procesing "${message.message.data}".`);
      isProcessed = true;
    }, 30000);
  }

  // The subscriber pulls a specified number of messages.
  const [response] = await subClient.pull(request);

  // Obtain the first message.
  const message = response.receivedMessages[0];

  // Send the message to the worker function.
  worker(message);

  let waiting = true;
  while (waiting) {
    await new Promise(r => setTimeout(r, 10000));
    // If the message has been processed..
    if (isProcessed) {
      const ackRequest = {
        subscription: formattedSubscription,
        ackIds: [message.ackId],
      };

      //..acknowledges the message.
      await subClient.acknowledge(ackRequest);
      console.log(`Acknowledged: "${message.message.data}".`);
      // Exit after the message is acknowledged.
      waiting = false;
      console.log('Done.');
    } else {
      // If the message is not yet processed..
      const modifyAckRequest = {
        subscription: formattedSubscription,
        ackIds: [message.ackId],
        ackDeadlineSeconds: newAckDeadlineSeconds,
      };

      //..reset its ack deadline.
      await subClient.modifyAckDeadline(modifyAckRequest);

      console.log(
        `Reset ack deadline for "${message.message.data}" for ${newAckDeadlineSeconds}s.`
      );
    }
  }
}

synchronousPullWithLeaseManagement().catch(console.error);

Python

Prima di provare questo esempio, segui le istruzioni di configurazione di Python in Guida rapida sull'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Python Pub/Sub.

import logging
import multiprocessing
import sys
import time

from google.api_core import retry
from google.cloud import pubsub_v1

multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
processes = dict()

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

response = subscriber.pull(
    request={"subscription": subscription_path, "max_messages": 3},
    retry=retry.Retry(deadline=300),
)

if len(response.received_messages) == 0:
    return

# Start a process for each message based on its size modulo 10.
for message in response.received_messages:
    process = multiprocessing.Process(
        target=time.sleep, args=(sys.getsizeof(message) % 10,)
    )
    processes[process] = (message.ack_id, message.message.data)
    process.start()

while processes:
    # Take a break every second.
    if processes:
        time.sleep(1)

    for process in list(processes):
        ack_id, msg_data = processes[process]
        # If the process is running, reset the ack deadline.
        if process.is_alive():
            subscriber.modify_ack_deadline(
                request={
                    "subscription": subscription_path,
                    "ack_ids": [ack_id],
                    # Must be between 10 and 600.
                    "ack_deadline_seconds": 15,
                }
            )
            logger.debug(f"Reset ack deadline for {msg_data}.")

        # If the process is complete, acknowledge the message.
        else:
            subscriber.acknowledge(
                request={"subscription": subscription_path, "ack_ids": [ack_id]}
            )
            logger.debug(f"Acknowledged {msg_data}.")
            processes.pop(process)
print(
    f"Received and acknowledged {len(response.received_messages)} messages from {subscription_path}."
)

# Close the underlying gPRC channel. Alternatively, wrap subscriber in
# a 'with' block to automatically call close() when done.
subscriber.close()

Ruby

Prima di provare questo esempio, segui le istruzioni di configurazione di Ruby riportate in Guida rapida sull'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Ruby Pub/Sub.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

subscription = pubsub.subscription subscription_id
new_ack_deadline = 30
processed = false

# The subscriber pulls a specified number of messages.
received_messages = subscription.pull immediate: false, max: 1

# Obtain the first message.
message = received_messages.first

# Send the message to a non-blocking worker that starts a long-running process, such as writing
# the message to a table, which may take longer than the default 10-sec acknowledge deadline.
Thread.new do
  sleep 15
  processed = true
  puts "Finished processing \"#{message.data}\"."
end

loop do
  sleep 1
  if processed
    # If the message has been processed, acknowledge the message.
    message.acknowledge!
    puts "Done."
    # Exit after the message is acknowledged.
    break
  else
    # If the message has not yet been processed, reset its ack deadline.
    message.modify_ack_deadline! new_ack_deadline
    puts "Reset ack deadline for \"#{message.data}\" for #{new_ack_deadline} seconds."
  end
end

Passaggi successivi

Scopri le altre opzioni di consegna che puoi configurare per un abbonamento: