Control de flujo

En este documento, se proporciona información sobre el uso del control de flujo con mensajes publicados en un tema.

Información acerca del control de flujo

Un cliente publicador puede intentar publicar mensajes más rápido de lo que ese cliente puede enviar datos al servicio de Pub/Sub. Los clientes están limitados por muchos factores, incluidos los siguientes:

  • El CPU, la RAM y la capacidad de red de la máquina
  • La configuración de red, como la cantidad de solicitudes pendientes y el ancho de banda disponible
  • La latencia de cada solicitud de publicación, determinada en gran medida por las conexiones de red entre el servicio de Pub/Sub, el cliente y Google Cloud

Si la tasa de solicitudes de publicación supera estos límites, las solicitudes se acumulan en la memoria hasta que fallen con un error DEADLINE_EXCEEDED. Es muy probable que esto suceda cuando se publican decenas de miles de mensajes en un bucle, lo que genera miles de solicitudes en milisegundos.

Para diagnosticar este problema, puedes verificar las métricas del servidor en Monitoring. No podrás ver las solicitudes que fallaron con DEADLINE_EXCEEDED, solo las solicitudes exitosas. La tasa de solicitudes exitosas te indica la capacidad de procesamiento de las máquinas de cliente, lo que proporciona un modelo de referencia para configurar el control de flujo.

Abrir la página Monitoring

A fin de mitigar los problemas de la tasa de flujo, configura tu cliente publicador con un control de flujo para limitar la frecuencia de solicitudes de publicación. Puedes configurar la cantidad máxima de bytes asignados para las solicitudes pendientes y la cantidad máxima de mensajes pendientes permitidos. Establece estos límites según la capacidad de procesamiento de las máquinas de cliente.

Antes de comenzar

Antes de configurar el flujo de trabajo de publicación, asegúrate de haber completado las siguientes tareas:

Roles obligatorios

Para obtener los permisos que necesitas para usar el control de flujo, pídele a tu administrador que te otorgue el rol de IAM Publicador de Pub/Sub (roles/pubsub.publisher) en tu tema. Para obtener más información sobre cómo otorgar roles, consulta Administra el acceso a proyectos, carpetas y organizaciones.

También puedes obtener los permisos necesarios mediante roles personalizados o cualquier otro rol predefinido.

Necesitas permisos adicionales para crear o actualizar temas y suscripciones.

Cómo usar el control de flujo con mensajes

El control de flujo del publicador está disponible a través de las bibliotecas cliente de Pub/Sub en los siguientes lenguajes:

C++

Antes de probar esta muestra, sigue las instrucciones de configuración de C++ en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para C++.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  // Configure the publisher to block if either (1) 100 or more messages, or
  // (2) messages with 100MiB worth of data have not been acknowledged by the
  // service. By default the publisher never blocks, and its capacity is only
  // limited by the system's memory.
  auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
      std::move(topic),
      Options{}
          .set<pubsub::MaxPendingMessagesOption>(100)
          .set<pubsub::MaxPendingBytesOption>(100 * 1024 * 1024L)
          .set<pubsub::FullPublisherActionOption>(
              pubsub::FullPublisherAction::kBlocks)));

  std::vector<future<void>> ids;
  for (char const* data : {"a", "b", "c"}) {
    ids.push_back(
        publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
            .then([data](future<StatusOr<std::string>> f) {
              auto s = f.get();
              if (!s) return;
              std::cout << "Sent '" << data << "' (" << *s << ")\n";
            }));
  }
  publisher.Flush();
  // Block until they are actually sent.
  for (auto& id : ids) id.get();
}

Go

Antes de probar esta muestra, sigue las instrucciones de configuración de Go en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Go.

import (
	"context"
	"fmt"
	"io"
	"strconv"
	"sync"
	"sync/atomic"

	"cloud.google.com/go/pubsub"
)

func publishWithFlowControlSettings(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	t := client.Topic(topicID)
	t.PublishSettings.FlowControlSettings = pubsub.FlowControlSettings{
		MaxOutstandingMessages: 100,                     // default 1000
		MaxOutstandingBytes:    10 * 1024 * 1024,        // default 0 (unlimited)
		LimitExceededBehavior:  pubsub.FlowControlBlock, // default Ignore, other options: Block and SignalError
	}

	var wg sync.WaitGroup
	var totalErrors uint64

	numMsgs := 1000
	// Rapidly publishing 1000 messages in a loop may be constrained by flow control.
	for i := 0; i < numMsgs; i++ {
		wg.Add(1)
		result := t.Publish(ctx, &pubsub.Message{
			Data: []byte("message #" + strconv.Itoa(i)),
		})
		go func(i int, res *pubsub.PublishResult) {
			fmt.Fprintf(w, "Publishing message %d\n", i)
			defer wg.Done()
			// The Get method blocks until a server-generated ID or
			// an error is returned for the published message.
			_, err := res.Get(ctx)
			if err != nil {
				// Error handling code can be added here.
				fmt.Fprintf(w, "Failed to publish: %v", err)
				atomic.AddUint64(&totalErrors, 1)
				return
			}
		}(i, result)
	}

	wg.Wait()

	if totalErrors > 0 {
		return fmt.Errorf("%d of %d messages did not publish successfully", totalErrors, numMsgs)
	}
	return nil
}

Java

Antes de probar esta muestra, sigue las instrucciones de configuración de Java en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Java.


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublishWithFlowControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithFlowControlExample(projectId, topicId);
  }

  public static void publishWithFlowControlExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;
    List<ApiFuture<String>> messageIdFutures = new ArrayList<>();

    try {
      // Configure how many messages the publisher client can hold in memory
      // and what to do when messages exceed the limit.
      FlowControlSettings flowControlSettings =
          FlowControlSettings.newBuilder()
              // Block more messages from being published when the limit is reached. The other
              // options are Ignore (or continue publishing) and ThrowException (or error out).
              .setLimitExceededBehavior(LimitExceededBehavior.Block)
              .setMaxOutstandingRequestBytes(10 * 1024 * 1024L) // 10 MiB
              .setMaxOutstandingElementCount(100L) // 100 messages
              .build();

      // By default, messages are not batched.
      BatchingSettings batchingSettings =
          BatchingSettings.newBuilder().setFlowControlSettings(flowControlSettings).build();

      publisher = Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build();

      // Publish 1000 messages in quick succession may be constrained by publisher flow control.
      for (int i = 0; i < 1000; i++) {
        String message = "message " + i;
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Once published, returns a server-assigned message id (unique within the topic)
        ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
        messageIdFutures.add(messageIdFuture);
      }
    } finally {
      // Wait on any pending publish requests.
      List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();

      System.out.println(
          "Published " + messageIds.size() + " messages with flow control settings.");

      if (publisher != null) {
        // When finished with the publisher, shut down to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishWithFlowControl(topicNameOrId) {
  // Create publisher options
  const options = {
    flowControlOptions: {
      maxOutstandingMessages: 50,
      maxOutstandingBytes: 10 * 1024 * 1024, // 10 MB
    },
  };

  // Get a publisher. Cache topic objects (publishers) and reuse them.
  const topic = pubSubClient.topic(topicNameOrId, options);

  // For flow controlled publishing, we'll use a publisher flow controller
  // instead of `topic.publish()`.
  const flow = topic.flowControlled();

  // Publish messages in a fast loop.
  const testMessage = {data: Buffer.from('test!')};
  for (let i = 0; i < 1000; i++) {
    // You can also just `await` on `publish()` unconditionally, but if
    // you want to avoid pausing to the event loop on each iteration,
    // you can manually check the return value before doing so.
    const wait = flow.publish(testMessage);
    if (wait) {
      await wait;
    }
  }

  // Wait on any pending publish requests. Note that you can call `all()`
  // earlier if you like, and it will return a Promise for all messages
  // that have been sent to `flowController.publish()` so far.
  const messageIds = await flow.all();
  console.log(`Published ${messageIds.length} with flow control settings.`);
}

Node.js

Antes de probar esta muestra, sigue las instrucciones de configuración de Node.js en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Node.js.

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub, PublishOptions} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishWithFlowControl(topicNameOrId: string) {
  // Create publisher options
  const options: PublishOptions = {
    flowControlOptions: {
      maxOutstandingMessages: 50,
      maxOutstandingBytes: 10 * 1024 * 1024, // 10 MB
    },
  };

  // Get a publisher. Cache topic objects (publishers) and reuse them.
  const topic = pubSubClient.topic(topicNameOrId, options);

  // For flow controlled publishing, we'll use a publisher flow controller
  // instead of `topic.publish()`.
  const flow = topic.flowControlled();

  // Publish messages in a fast loop.
  const testMessage = {data: Buffer.from('test!')};
  for (let i = 0; i < 1000; i++) {
    // You can also just `await` on `publish()` unconditionally, but if
    // you want to avoid pausing to the event loop on each iteration,
    // you can manually check the return value before doing so.
    const wait = flow.publish(testMessage);
    if (wait) {
      await wait;
    }
  }

  // Wait on any pending publish requests. Note that you can call `all()`
  // earlier if you like, and it will return a Promise for all messages
  // that have been sent to `flowController.publish()` so far.
  const messageIds = await flow.all();
  console.log(`Published ${messageIds.length} with flow control settings.`);
}

Python

Antes de probar esta muestra, sigue las instrucciones de configuración de Python en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Python.

from concurrent import futures
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.types import (
    LimitExceededBehavior,
    PublisherOptions,
    PublishFlowControl,
)

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

# Configure how many messages the publisher client can hold in memory
# and what to do when messages exceed the limit.
flow_control_settings = PublishFlowControl(
    message_limit=100,  # 100 messages
    byte_limit=10 * 1024 * 1024,  # 10 MiB
    limit_exceeded_behavior=LimitExceededBehavior.BLOCK,
)
publisher = pubsub_v1.PublisherClient(
    publisher_options=PublisherOptions(flow_control=flow_control_settings)
)
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []

# Resolve the publish future in a separate thread.
def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
    message_id = publish_future.result()
    print(message_id)

# Publish 1000 messages in quick succession may be constrained by
# publisher flow control.
for n in range(1, 1000):
    data_str = f"Message number {n}"
    # Data must be a bytestring
    data = data_str.encode("utf-8")
    publish_future = publisher.publish(topic_path, data)
    # Non-blocking. Allow the publisher client to batch messages.
    publish_future.add_done_callback(callback)
    publish_futures.append(publish_future)

futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with flow control settings to {topic_path}.")

Ruby

Antes de probar esta muestra, sigue las instrucciones de configuración de Ruby en la guía de inicio rápido sobre el uso de bibliotecas cliente. Si quieres obtener más información, consulta la documentación de referencia de la API de Pub/Sub para Ruby.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic topic_id, async: {
  # Configure how many messages the publisher client can hold in memory
  # and what to do when messages exceed the limit.
  flow_control: {
    message_limit: 100,
    byte_limit: 10 * 1024 * 1024, # 10 MiB
    # Block more messages from being published when the limit is reached. The
    # other options are :ignore and :error.
    limit_exceeded_behavior: :block
  }
}
# Rapidly publishing 1000 messages in a loop may be constrained by flow control.
1000.times do |i|
  topic.publish_async "message #{i}" do |result|
    raise "Failed to publish the message." unless result.succeeded?
  end
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
puts "Published messages with flow control settings to #{topic_id}."

¿Qué sigue?