Control de flujo

En este documento se proporciona información sobre cómo usar el control del flujo con los mensajes publicados en un tema.

Información sobre el control de flujo

Un cliente editor puede intentar publicar mensajes más rápido de lo que es capaz de enviar datos al servicio Pub/Sub. Los clientes están limitados por muchos factores, entre los que se incluyen los siguientes:

  • Capacidad de CPU, RAM y red de la máquina
  • Ajustes de red, como el número de solicitudes pendientes y el ancho de banda disponible
  • La latencia de cada solicitud de publicación, que viene determinada en gran medida por las conexiones de red entre el servicio Pub/Sub, el cliente y Google Cloud

Si la frecuencia de solicitudes de publicación supera estos límites, las solicitudes se acumularán en la memoria hasta que fallen con un error DEADLINE_EXCEEDED. Esto es especialmente probable cuando se publican decenas de miles de mensajes en un bucle, lo que genera miles de solicitudes en milisegundos.

Para diagnosticar este problema, consulta las métricas del lado del servidor en Monitoring. No podrás ver las solicitudes que hayan fallado con DEADLINE_EXCEEDED, solo las que se hayan completado correctamente. La tasa de solicitudes correctas indica la capacidad de procesamiento de tus máquinas cliente, lo que proporciona una base para configurar el control de flujo.

Ir a la página Monitoring

Para mitigar los problemas de velocidad de flujo, configure su cliente editor con control de flujo para limitar la frecuencia de las solicitudes de publicación. Puede configurar el número máximo de bytes asignados a las solicitudes pendientes y el número máximo de mensajes pendientes permitidos. Define estos límites en función de la capacidad de procesamiento de tus máquinas cliente.

Antes de empezar

Antes de configurar el flujo de trabajo de publicación, asegúrate de que has completado las siguientes tareas:

Roles obligatorios

Para obtener los permisos que necesitas para usar el control de flujo, pide a tu administrador que te conceda el rol de gestión de identidades y accesos Editor de Pub/Sub (roles/pubsub.publisher) en tu tema. Para obtener más información sobre cómo conceder roles, consulta el artículo Gestionar el acceso a proyectos, carpetas y organizaciones.

También puedes conseguir los permisos necesarios a través de roles personalizados u otros roles predefinidos.

Necesitas permisos adicionales para crear o actualizar temas y suscripciones.

Usar el control del flujo con mensajes

El control de flujo de Publisher está disponible mediante las bibliotecas de cliente de Pub/Sub en los siguientes idiomas:

C++

Antes de probar este ejemplo, sigue las instrucciones de configuración de C++ que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de C++ de Pub/Sub.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  // Configure the publisher to block if either (1) 100 or more messages, or
  // (2) messages with 100MiB worth of data have not been acknowledged by the
  // service. By default the publisher never blocks, and its capacity is only
  // limited by the system's memory.
  auto publisher = pubsub::Publisher(pubsub::MakePublisherConnection(
      std::move(topic),
      Options{}
          .set<pubsub::MaxPendingMessagesOption>(100)
          .set<pubsub::MaxPendingBytesOption>(100 * 1024 * 1024L)
          .set<pubsub::FullPublisherActionOption>(
              pubsub::FullPublisherAction::kBlocks)));

  std::vector<future<void>> ids;
  for (char const* data : {"a", "b", "c"}) {
    ids.push_back(
        publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
            .then([data](future<StatusOr<std::string>> f) {
              auto s = f.get();
              if (!s) return;
              std::cout << "Sent '" << data << "' (" << *s << ")\n";
            }));
  }
  publisher.Flush();
  // Block until they are actually sent.
  for (auto& id : ids) id.get();
}

Go

En el siguiente ejemplo se usa la versión principal de la biblioteca de cliente de Pub/Sub de Go (v2). Si sigues usando la biblioteca v1, consulta la guía de migración a la versión 2. Para ver una lista de ejemplos de código de la versión 1, consulta los ejemplos de código obsoletos.

Antes de probar este ejemplo, sigue las instrucciones de configuración de Go que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API Go de Pub/Sub.

import (
	"context"
	"fmt"
	"io"
	"strconv"
	"sync"
	"sync/atomic"

	"cloud.google.com/go/pubsub/v2"
)

func publishWithFlowControlSettings(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	// client.Publisher can be passed a topic ID (e.g. "my-topic") or
	// a fully qualified name (e.g. "projects/my-project/topics/my-topic").
	// If a topic ID is provided, the project ID from the client is used.
	// Reuse this publisher for all publish calls to send messages in batches.
	publisher := client.Publisher(topicID)
	publisher.PublishSettings.FlowControlSettings = pubsub.FlowControlSettings{
		MaxOutstandingMessages: 100,                     // default 1000
		MaxOutstandingBytes:    10 * 1024 * 1024,        // default 0 (unlimited)
		LimitExceededBehavior:  pubsub.FlowControlBlock, // default Ignore, other options: Block and SignalError
	}

	var wg sync.WaitGroup
	var totalErrors uint64

	numMsgs := 1000
	// Rapidly publishing 1000 messages in a loop may be constrained by flow control.
	for i := 0; i < numMsgs; i++ {
		wg.Add(1)
		result := publisher.Publish(ctx, &pubsub.Message{
			Data: []byte("message #" + strconv.Itoa(i)),
		})
		go func(i int, res *pubsub.PublishResult) {
			fmt.Fprintf(w, "Publishing message %d\n", i)
			defer wg.Done()
			// The Get method blocks until a server-generated ID or
			// an error is returned for the published message.
			_, err := res.Get(ctx)
			if err != nil {
				// Error handling code can be added here.
				fmt.Fprintf(w, "Failed to publish: %v", err)
				atomic.AddUint64(&totalErrors, 1)
				return
			}
		}(i, result)
	}

	wg.Wait()

	if totalErrors > 0 {
		return fmt.Errorf("%d of %d messages did not publish successfully", totalErrors, numMsgs)
	}
	return nil
}

Java

Antes de probar este ejemplo, sigue las instrucciones de configuración de Java que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Java de Pub/Sub.


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.batching.FlowControlSettings;
import com.google.api.gax.batching.FlowController.LimitExceededBehavior;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublishWithFlowControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithFlowControlExample(projectId, topicId);
  }

  public static void publishWithFlowControlExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;
    List<ApiFuture<String>> messageIdFutures = new ArrayList<>();

    try {
      // Configure how many messages the publisher client can hold in memory
      // and what to do when messages exceed the limit.
      FlowControlSettings flowControlSettings =
          FlowControlSettings.newBuilder()
              // Block more messages from being published when the limit is reached. The other
              // options are Ignore (or continue publishing) and ThrowException (or error out).
              .setLimitExceededBehavior(LimitExceededBehavior.Block)
              .setMaxOutstandingRequestBytes(10 * 1024 * 1024L) // 10 MiB
              .setMaxOutstandingElementCount(100L) // 100 messages
              .build();

      // By default, messages are not batched.
      BatchingSettings batchingSettings =
          BatchingSettings.newBuilder().setFlowControlSettings(flowControlSettings).build();

      publisher = Publisher.newBuilder(topicName).setBatchingSettings(batchingSettings).build();

      // Publish 1000 messages in quick succession may be constrained by publisher flow control.
      for (int i = 0; i < 1000; i++) {
        String message = "message " + i;
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Once published, returns a server-assigned message id (unique within the topic)
        ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
        messageIdFutures.add(messageIdFuture);
      }
    } finally {
      // Wait on any pending publish requests.
      List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();

      System.out.println(
          "Published " + messageIds.size() + " messages with flow control settings.");

      if (publisher != null) {
        // When finished with the publisher, shut down to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Node.js

Antes de probar este ejemplo, sigue las instrucciones de configuración de Node.js que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Node.js de Pub/Sub.

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
const {PubSub} = require('@google-cloud/pubsub');

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishWithFlowControl(topicNameOrId) {
  // Create publisher options
  const options = {
    flowControlOptions: {
      maxOutstandingMessages: 50,
      maxOutstandingBytes: 10 * 1024 * 1024, // 10 MB
    },
  };

  // Get a publisher. Cache topic objects (publishers) and reuse them.
  const topic = pubSubClient.topic(topicNameOrId, options);

  // For flow controlled publishing, we'll use a publisher flow controller
  // instead of `topic.publish()`.
  const flow = topic.flowControlled();

  // Publish messages in a fast loop.
  const testMessage = {data: Buffer.from('test!')};
  for (let i = 0; i < 1000; i++) {
    // You can also just `await` on `publish()` unconditionally, but if
    // you want to avoid pausing to the event loop on each iteration,
    // you can manually check the return value before doing so.
    const wait = flow.publish(testMessage);
    if (wait) {
      await wait;
    }
  }

  // Wait on any pending publish requests. Note that you can call `all()`
  // earlier if you like, and it will return a Promise for all messages
  // that have been sent to `flowController.publish()` so far.
  const messageIds = await flow.all();
  console.log(`Published ${messageIds.length} with flow control settings.`);
}

Node.js

Antes de probar este ejemplo, sigue las instrucciones de configuración de Node.js que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Node.js de Pub/Sub.

/**
 * TODO(developer): Uncomment this variable before running the sample.
 */
// const topicNameOrId = 'YOUR_TOPIC_NAME_OR_ID';

// Imports the Google Cloud client library
import {PubSub, PublishOptions} from '@google-cloud/pubsub';

// Creates a client; cache this for further use
const pubSubClient = new PubSub();

async function publishWithFlowControl(topicNameOrId: string) {
  // Create publisher options
  const options: PublishOptions = {
    flowControlOptions: {
      maxOutstandingMessages: 50,
      maxOutstandingBytes: 10 * 1024 * 1024, // 10 MB
    },
  };

  // Get a publisher. Cache topic objects (publishers) and reuse them.
  const topic = pubSubClient.topic(topicNameOrId, options);

  // For flow controlled publishing, we'll use a publisher flow controller
  // instead of `topic.publish()`.
  const flow = topic.flowControlled();

  // Publish messages in a fast loop.
  const testMessage = {data: Buffer.from('test!')};
  for (let i = 0; i < 1000; i++) {
    // You can also just `await` on `publish()` unconditionally, but if
    // you want to avoid pausing to the event loop on each iteration,
    // you can manually check the return value before doing so.
    const wait = flow.publish(testMessage);
    if (wait) {
      await wait;
    }
  }

  // Wait on any pending publish requests. Note that you can call `all()`
  // earlier if you like, and it will return a Promise for all messages
  // that have been sent to `flowController.publish()` so far.
  const messageIds = await flow.all();
  console.log(`Published ${messageIds.length} with flow control settings.`);
}

Python

Antes de probar este ejemplo, sigue las instrucciones de configuración de Python que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Python de Pub/Sub.

from concurrent import futures
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.types import (
    LimitExceededBehavior,
    PublisherOptions,
    PublishFlowControl,
)

# TODO(developer)
# project_id = "your-project-id"
# topic_id = "your-topic-id"

# Configure how many messages the publisher client can hold in memory
# and what to do when messages exceed the limit.
flow_control_settings = PublishFlowControl(
    message_limit=100,  # 100 messages
    byte_limit=10 * 1024 * 1024,  # 10 MiB
    limit_exceeded_behavior=LimitExceededBehavior.BLOCK,
)
publisher = pubsub_v1.PublisherClient(
    publisher_options=PublisherOptions(flow_control=flow_control_settings)
)
topic_path = publisher.topic_path(project_id, topic_id)
publish_futures = []

# Resolve the publish future in a separate thread.
def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
    message_id = publish_future.result()
    print(message_id)

# Publish 1000 messages in quick succession may be constrained by
# publisher flow control.
for n in range(1, 1000):
    data_str = f"Message number {n}"
    # Data must be a bytestring
    data = data_str.encode("utf-8")
    publish_future = publisher.publish(topic_path, data)
    # Non-blocking. Allow the publisher client to batch messages.
    publish_future.add_done_callback(callback)
    publish_futures.append(publish_future)

futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with flow control settings to {topic_path}.")

Ruby

En el siguiente ejemplo se usa la biblioteca de cliente de Ruby Pub/Sub v3. Si sigues usando la biblioteca v2, consulta la guía de migración a la versión 3. Para ver una lista de ejemplos de código de Ruby v2, consulta los ejemplos de código obsoletos.

Antes de probar este ejemplo, sigue las instrucciones de configuración de Ruby que se indican en la guía de inicio rápido sobre cómo usar bibliotecas de cliente. Para obtener más información, consulta la documentación de referencia de la API de Ruby de Pub/Sub.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::PubSub.new
publisher = pubsub.publisher topic_id, async: {
  # Configure how many messages the publisher client can hold in memory
  # and what to do when messages exceed the limit.
  flow_control: {
    message_limit: 100,
    byte_limit: 10 * 1024 * 1024, # 10 MiB
    # Block more messages from being published when the limit is reached. The
    # other options are :ignore and :error.
    limit_exceeded_behavior: :block
  }
}

# Rapidly publishing 1000 messages in a loop may be constrained by flow
# control.
1000.times do |i|
  publisher.publish_async "message #{i}" do |result|
    raise "Failed to publish the message." unless result.succeeded?
  end
end

# Stop the async_publisher to send all queued messages immediately.
publisher.async_publisher.stop.wait!
puts "Published messages with flow control settings to #{topic_id}."

Siguientes pasos