Processar picos temporários com controle de fluxo

Às vezes, Data pipelines têm picos no tráfego publicado. Os picos de tráfego podem sobrecarregar os assinantes, a menos que você esteja preparado para isso. Uma solução simples para evitar picos de tráfego é aumentar dinamicamente os recursos de assinantes do Pub/Sub para processar mais mensagens. No entanto, essa solução pode aumentar os custos ou não funcionar instantaneamente. Por exemplo, você pode precisar de muitas VMs.

O controle de fluxo do assinante permite que ele regule a taxa em que as mensagens são ingeridas. O controle de fluxo lida com picos de tráfego sem aumentar os custos ou até que o assinante seja escalonado.

O controle de fluxo é um recurso disponível na biblioteca de cliente de alto nível do Pub/Sub. Também é possível implementar sua própria programação de controle de fluxo ao usar uma biblioteca de cliente de baixo nível.

A necessidade de controle de fluxo indica que as mensagens são publicadas com uma taxa maior do que são consumidas. Se esse cenário for um estado persistente, em vez de um pico temporário no volume de mensagens, aumente o número de instâncias do cliente assinante.

Configuração do controle de fluxo

O controle de fluxo permite configurar o número máximo de bytes alocados para solicitações pendentes e o número máximo de mensagens pendentes permitidas. Defina esses limites de acordo com a capacidade dos computadores clientes.

Os valores padrão e os nomes das variáveis de controle de fluxo podem ser diferentes nas bibliotecas de cliente. Por exemplo, na biblioteca de cliente Java, as seguintes variáveis configuram o controle de fluxo:

  • setMaxOutstandingElementCount(). Define o número máximo de mensagens para as quais o Pub/Sub não recebeu confirmações ou confirmações negativas.

  • setMaxOutstandingRequestBytes(). Define o tamanho máximo de mensagens para as quais o Pub/Sub não recebeu confirmações ou confirmações negativas.

Se o limite de setMaxOutstandingElementCount() ou setMaxOutstandingRequestBytes() for cruzado, o cliente do assinante não vai buscar mais mensagens. Esse comportamento continua até que as mensagens que já foram extraídas sejam confirmadas ou negadas. Assim, podemos alinhar a capacidade com o custo associado à execução de mais assinantes.

Exemplos de código para controle de fluxo

Para controlar a taxa com que o cliente do assinante recebe mensagens, use os recursos de controle de fluxo do assinante. Esses recursos de controle de fluxo são ilustrados nos exemplos a seguir:

C++

Antes de tentar esse exemplo, siga as instruções de configuração do C++ em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub C++.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
auto sample = [](std::string project_id, std::string subscription_id) {
  // Change the flow control watermarks, by default the client library uses
  // 0 and 1,000 for the message count watermarks, and 0 and 10MiB for the
  // size watermarks. Recall that the library stops requesting messages if
  // any of the high watermarks are reached, and the library resumes
  // requesting messages when *both* low watermarks are reached.
  auto constexpr kMiB = 1024 * 1024L;
  auto subscriber = pubsub::Subscriber(pubsub::MakeSubscriberConnection(
      pubsub::Subscription(std::move(project_id), std::move(subscription_id)),
      Options{}
          .set<pubsub::MaxOutstandingMessagesOption>(1000)
          .set<pubsub::MaxOutstandingBytesOption>(8 * kMiB)));

  auto session = subscriber.Subscribe(
      [](pubsub::Message const& m, pubsub::AckHandler h) {
        std::move(h).ack();
        std::cout << "Received message " << m << "\n";
        PleaseIgnoreThisSimplifiesTestingTheSamples();
      });
  return std::make_pair(subscriber, std::move(session));
};

C#

Antes de tentar esse exemplo, siga as instruções de configuração do C# em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub C#.


using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using System;
using System.Threading;
using System.Threading.Tasks;

public class PullMessagesWithFlowControlAsyncSample
{
    public async Task<int> PullMessagesWithFlowControlAsync(string projectId, string subscriptionId, bool acknowledge)
    {
        SubscriptionName subscriptionName = SubscriptionName.FromProjectSubscription(projectId, subscriptionId);
        int messageCount = 0;
        SubscriberClient subscriber = await new SubscriberClientBuilder
        {
            SubscriptionName = subscriptionName,
            Settings = new SubscriberClient.Settings
            {
                AckExtensionWindow = TimeSpan.FromSeconds(4),
                AckDeadline = TimeSpan.FromSeconds(10),
                FlowControlSettings = new FlowControlSettings(maxOutstandingElementCount: 100, maxOutstandingByteCount: 10240)
            }
        }.BuildAsync();
        // SubscriberClient runs your message handle function on multiple
        // threads to maximize throughput.
        Task startTask = subscriber.StartAsync((PubsubMessage message, CancellationToken cancel) =>
        {
            string text = message.Data.ToStringUtf8();
            Console.WriteLine($"Message {message.MessageId}: {text}");
            Interlocked.Increment(ref messageCount);
            return Task.FromResult(acknowledge ? SubscriberClient.Reply.Ack : SubscriberClient.Reply.Nack);
        });
        // Run for 5 seconds.
        await Task.Delay(5000);
        await subscriber.StopAsync(CancellationToken.None);
        // Lets make sure that the start task finished successfully after the call to stop.
        await startTask;
        return messageCount;
    }
}

Go

Antes de tentar esse exemplo, siga as instruções de configuração do Go em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func pullMsgsFlowControlSettings(w io.Writer, projectID, subID string) error {
	// projectID := "my-project-id"
	// subID := "my-sub"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	sub := client.Subscription(subID)
	// MaxOutstandingMessages is the maximum number of unprocessed messages the
	// subscriber client will pull from the server before pausing. This also configures
	// the maximum number of concurrent handlers for received messages.
	//
	// For more information, see https://cloud.google.com/pubsub/docs/pull#streamingpull_dealing_with_large_backlogs_of_small_messages.
	sub.ReceiveSettings.MaxOutstandingMessages = 100
	// MaxOutstandingBytes is the maximum size of unprocessed messages,
	// that the subscriber client will pull from the server before pausing.
	sub.ReceiveSettings.MaxOutstandingBytes = 1e8
	err = sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Fprintf(w, "Got message: %q\n", string(msg.Data))
		msg.Ack()
	})
	if err != nil {
		return fmt.Errorf("sub.Receive: %w", err)
	}
	return nil
}

Java

Antes de tentar essa amostra, siga as instruções de configuração do Java em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Java.


import com.google.api.gax.batching.FlowControlSettings;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeWithFlowControlSettingsExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeWithFlowControlSettingsExample(projectId, subscriptionId);
  }

  public static void subscribeWithFlowControlSettingsExample(
      String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Id: " + message.getMessageId());
          System.out.println("Data: " + message.getData().toStringUtf8());
          consumer.ack();
        };

    Subscriber subscriber = null;

    // The subscriber will pause the message stream and stop receiving more messsages from the
    // server if any one of the conditions is met.
    FlowControlSettings flowControlSettings =
        FlowControlSettings.newBuilder()
            // 1,000 outstanding messages. Must be >0. It controls the maximum number of messages
            // the subscriber receives before pausing the message stream.
            .setMaxOutstandingElementCount(1000L)
            // 100 MiB. Must be >0. It controls the maximum size of messages the subscriber
            // receives before pausing the message stream.
            .setMaxOutstandingRequestBytes(100L * 1024L * 1024L)
            .build();

    try {
      subscriber =
          Subscriber.newBuilder(subscriptionName, receiver)
              .setFlowControlSettings(flowControlSettings)
              .build();

      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

Node.js

Antes de tentar essa amostra, siga as instruções de configuração do Node.js em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const maxInProgress = 5;
// const timeout = 10;

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function subscribeWithFlowControlSettings(
  subscriptionNameOrId,
  maxInProgress,
  timeout
) {
  const subscriberOptions = {
    flowControl: {
      maxMessages: maxInProgress,
    },
  };

  // References an existing subscription.
  // Note that flow control settings are not persistent across subscribers.
  const subscription = pubSubClient.subscription(
    subscriptionNameOrId,
    subscriberOptions
  );

  console.log(
    `Subscriber to subscription ${subscription.name} is ready to receive messages at a controlled volume of ${maxInProgress} messages.`
  );

  const messageHandler = message => {
    console.log(`Received message: ${message.id}`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.close();
  }, timeout * 1000);
}

Node.js

Antes de tentar essa amostra, siga as instruções de configuração do Node.js em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Node.js.

/**
 * TODO(developer): Uncomment these variables before running the sample.
 */
// const subscriptionNameOrId = 'YOUR_SUBSCRIPTION_NAME_OR_ID';
// const maxInProgress = 5;
// const timeout = 10;

// Imports the Google Cloud client library
import {Message, PubSub, SubscriberOptions} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function subscribeWithFlowControlSettings(
  subscriptionNameOrId: string,
  maxInProgress: number,
  timeout: number
) {
  const subscriberOptions: SubscriberOptions = {
    flowControl: {
      maxMessages: maxInProgress,
    },
  };

  // References an existing subscription.
  // Note that flow control settings are not persistent across subscribers.
  const subscription = pubSubClient.subscription(
    subscriptionNameOrId,
    subscriberOptions
  );

  console.log(
    `Subscriber to subscription ${subscription.name} is ready to receive messages at a controlled volume of ${maxInProgress} messages.`
  );

  const messageHandler = (message: Message) => {
    console.log(`Received message: ${message.id}`);
    console.log(`\tData: ${message.data}`);
    console.log(`\tAttributes: ${message.attributes}`);

    // "Ack" (acknowledge receipt of) the message
    message.ack();
  };

  subscription.on('message', messageHandler);

  // Wait a while for the subscription to run. (Part of the sample only.)
  setTimeout(() => {
    subscription.close();
  }, timeout * 1000);
}

Python

Antes de tentar esse exemplo, siga as instruções de configuração do Python em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Python.

from concurrent.futures import TimeoutError
from google.cloud import pubsub_v1

# TODO(developer)
# project_id = "your-project-id"
# subscription_id = "your-subscription-id"
# Number of seconds the subscriber should listen for messages
# timeout = 5.0

subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(project_id, subscription_id)

def callback(message: pubsub_v1.subscriber.message.Message) -> None:
    print(f"Received {message.data!r}.")
    message.ack()

# Limit the subscriber to only have ten outstanding messages at a time.
flow_control = pubsub_v1.types.FlowControl(max_messages=10)

streaming_pull_future = subscriber.subscribe(
    subscription_path, callback=callback, flow_control=flow_control
)
print(f"Listening for messages on {subscription_path}..\n")

# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
    try:
        # When `timeout` is not set, result() will block indefinitely,
        # unless an exception is encountered first.
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError:
        streaming_pull_future.cancel()  # Trigger the shutdown.
        streaming_pull_future.result()  # Block until the shutdown is complete.

Ruby

Antes de tentar esse exemplo, siga as instruções de configuração do Ruby em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Ruby.

# subscription_id = "your-subscription-id"

pubsub = Google::Cloud::Pubsub.new

subscription = pubsub.subscription subscription_id
subscriber   = subscription.listen inventory: 10 do |received_message|
  puts "Received message: #{received_message.data}"
  received_message.acknowledge!
end

subscriber.start
# Let the main thread sleep for 60 seconds so the thread for listening
# messages does not quit
sleep 60
subscriber.stop.wait!

A seguir

Leia sobre as outras opções de exibição que podem ser configuradas para uma assinatura: