Controle de simultaneidade

Este documento fornece informações sobre como usar o controle de simultaneidade com mensagens publicadas em um tópico.

O controle de simultaneidade ajuda a substituir o número padrão de linhas de execução (E/S) em segundo plano usadas pela biblioteca de cliente para publicar mensagens. Isso permite que os clientes do editor enviem mensagens em paralelo.

O controle de simultaneidade é um recurso disponível na biblioteca de cliente de alto nível do Pub/Sub. Também é possível implementar seu próprio controle de simultaneidade ao usar uma biblioteca de baixo nível.

O suporte ao controle de simultaneidade depende da linguagem de programação da biblioteca de cliente. Para implementações de linguagem que aceitam linhas de execução paralelas, como C++, Go e Java, as bibliotecas de cliente fazem uma escolha padrão para o número de linhas de execução.

Esta página explica o conceito de controle de simultaneidade e como configurar o recurso para seus clientes de publicação. Para configurar os clientes assinantes para controle de simultaneidade, consulte Processar mais mensagens com controle de simultaneidade.

Antes de começar

Antes de configurar o fluxo de trabalho de publicação, verifique se você concluiu as seguintes tarefas:

Funções exigidas

Para receber as permissões necessárias para publicar mensagens em um tópico, peça ao administrador para conceder a você o Papel do IAM do Editor do Pub/Sub (roles/pubsub.publisher) no tópico. Para mais informações sobre a concessão de papéis, consulte Gerenciar o acesso a projetos, pastas e organizações.

Também é possível conseguir as permissões necessárias por meio de papéis personalizados ou de outros papéis predefinidos.

Você precisa de permissões adicionais para criar ou atualizar tópicos e assinaturas.

Configurações de controle de simultaneidade

Os valores padrão das variáveis de controle de simultaneidade e os nomes das variáveis podem ser diferentes nas bibliotecas de cliente. Por exemplo, na biblioteca de cliente Java, os métodos para configurar o controle de simultaneidade são setExecutorProvider() e setChannelProvider(). Para mais informações, consulte a documentação de referência da API.

  • setExecutorProvider() permite personalizar o provedor de executor usado para processar respostas de publicação. Por exemplo, é possível mudar o provedor de executor para um que retorne um único executor compartilhado com um número limitado de linhas de execução em vários clientes de editores. Essa configuração ajuda a limitar o número de linhas de execução criadas.

  • setChannelProvider() permite personalizar o provedor de canal usado para abrir conexões com o Pub/Sub. Normalmente, você não configura esse valor, a menos que queira usar o mesmo canal em vários clientes de editores. A reutilização de um canal em muitos clientes pode resultar em erros GOAWAY ou ENHANCE_YOUR_CALM. Se você encontrar esses erros nos registros do seu aplicativo ou no Cloud Logging, crie mais canais.

Exemplos de código para controle de simultaneidade

C++

Antes de tentar esse exemplo, siga as instruções de configuração do C++ em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub C++.

namespace pubsub = ::google::cloud::pubsub;
using ::google::cloud::future;
using ::google::cloud::GrpcBackgroundThreadPoolSizeOption;
using ::google::cloud::Options;
using ::google::cloud::StatusOr;
[](std::string project_id, std::string topic_id) {
  auto topic = pubsub::Topic(std::move(project_id), std::move(topic_id));
  // Override the default number of background (I/O) threads. By default the
  // library uses `std::thread::hardware_concurrency()` threads.
  auto options = Options{}.set<GrpcBackgroundThreadPoolSizeOption>(8);
  auto publisher = pubsub::Publisher(
      pubsub::MakePublisherConnection(std::move(topic), std::move(options)));

  std::vector<future<void>> ids;
  for (char const* data : {"1", "2", "3", "go!"}) {
    ids.push_back(
        publisher.Publish(pubsub::MessageBuilder().SetData(data).Build())
            .then([data](future<StatusOr<std::string>> f) {
              auto s = f.get();
              if (!s) return;
              std::cout << "Sent '" << data << "' (" << *s << ")\n";
            }));
  }
  publisher.Flush();
  // Block until they are actually sent.
  for (auto& id : ids) id.get();
}

Go

Antes de tentar esse exemplo, siga as instruções de configuração do Go em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Go.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
)

func publishSingleGoroutine(w io.Writer, projectID, topicID, msg string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	// msg := "Hello World"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	t := client.Topic(topicID)
	t.PublishSettings.NumGoroutines = 1

	result := t.Publish(ctx, &pubsub.Message{Data: []byte(msg)})
	// Block until the result is returned and a server-generated
	// ID is returned for the published message.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("Get: %w", err)
	}
	fmt.Fprintf(w, "Published a message; msg ID: %v\n", id)
	return nil
}

Java

Antes de tentar essa amostra, siga as instruções de configuração do Java em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Java.


import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.core.ExecutorProvider;
import com.google.api.gax.core.InstantiatingExecutorProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;

public class PublishWithConcurrencyControlExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String topicId = "your-topic-id";

    publishWithConcurrencyControlExample(projectId, topicId);
  }

  public static void publishWithConcurrencyControlExample(String projectId, String topicId)
      throws IOException, ExecutionException, InterruptedException {
    TopicName topicName = TopicName.of(projectId, topicId);
    Publisher publisher = null;
    List<ApiFuture<String>> messageIdFutures = new ArrayList<>();

    try {
      // Provides an executor service for processing messages. The default
      // `executorProvider` used by the publisher has a default thread count of
      // 5 * the number of processors available to the Java virtual machine.
      ExecutorProvider executorProvider =
          InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(4).build();

      // `setExecutorProvider` configures an executor for the publisher.
      publisher = Publisher.newBuilder(topicName).setExecutorProvider(executorProvider).build();

      // schedule publishing one message at a time : messages get automatically batched
      for (int i = 0; i < 100; i++) {
        String message = "message " + i;
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Once published, returns a server-assigned message id (unique within the topic)
        ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
        messageIdFutures.add(messageIdFuture);
      }
    } finally {
      // Wait on any pending publish requests.
      List<String> messageIds = ApiFutures.allAsList(messageIdFutures).get();

      System.out.println("Published " + messageIds.size() + " messages with concurrency control.");

      if (publisher != null) {
        // When finished with the publisher, shutdown to free up resources.
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      }
    }
  }
}

Ruby

Antes de tentar esse exemplo, siga as instruções de configuração do Ruby em Guia de início rápido: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Pub/Sub Ruby.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic topic_id, async: {
  threads: {
    # Use exactly one thread for publishing message and exactly one thread
    # for executing callbacks
    publish:  1,
    callback: 1
  }
}
topic.publish_async "This is a test message." do |result|
  raise "Failed to publish the message." unless result.succeeded?
  puts "Message published asynchronously."
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!

A seguir