v1 Publique mensagens com definições de controlo de fluxo (DESCONTINUADO)

(OBSOLETO) Publique mensagens com definições de controlo de fluxo

Exemplo de código

Go

Antes de experimentar este exemplo, siga as Goinstruções de configuração no início rápido do Pub/Sub com as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Go Pub/Sub.

Para se autenticar no Pub/Sub, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

import (
	"context"
	"fmt"
	"io"
	"strconv"
	"sync"
	"sync/atomic"

	"cloud.google.com/go/pubsub"
)

func publishWithFlowControlSettings(w io.Writer, projectID, topicID string) error {
	// projectID := "my-project-id"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsub.NewClient(ctx, projectID)
	if err != nil {
		return fmt.Errorf("pubsub.NewClient: %w", err)
	}
	defer client.Close()

	t := client.Topic(topicID)
	t.PublishSettings.FlowControlSettings = pubsub.FlowControlSettings{
		MaxOutstandingMessages: 100,                     // default 1000
		MaxOutstandingBytes:    10 * 1024 * 1024,        // default 0 (unlimited)
		LimitExceededBehavior:  pubsub.FlowControlBlock, // default Ignore, other options: Block and SignalError
	}

	var wg sync.WaitGroup
	var totalErrors uint64

	numMsgs := 1000
	// Rapidly publishing 1000 messages in a loop may be constrained by flow control.
	for i := 0; i < numMsgs; i++ {
		wg.Add(1)
		result := t.Publish(ctx, &pubsub.Message{
			Data: []byte("message #" + strconv.Itoa(i)),
		})
		go func(i int, res *pubsub.PublishResult) {
			fmt.Fprintf(w, "Publishing message %d\n", i)
			defer wg.Done()
			// The Get method blocks until a server-generated ID or
			// an error is returned for the published message.
			_, err := res.Get(ctx)
			if err != nil {
				// Error handling code can be added here.
				fmt.Fprintf(w, "Failed to publish: %v", err)
				atomic.AddUint64(&totalErrors, 1)
				return
			}
		}(i, result)
	}

	wg.Wait()

	if totalErrors > 0 {
		return fmt.Errorf("%d of %d messages did not publish successfully", totalErrors, numMsgs)
	}
	return nil
}

Ruby

Antes de experimentar este exemplo, siga as Rubyinstruções de configuração no início rápido do Pub/Sub com as bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Ruby Pub/Sub.

Para se autenticar no Pub/Sub, configure as Credenciais padrão da aplicação. Para mais informações, consulte o artigo Configure a autenticação para um ambiente de desenvolvimento local.

# topic_id = "your-topic-id"

pubsub = Google::Cloud::Pubsub.new

topic = pubsub.topic topic_id, async: {
  # Configure how many messages the publisher client can hold in memory
  # and what to do when messages exceed the limit.
  flow_control: {
    message_limit: 100,
    byte_limit: 10 * 1024 * 1024, # 10 MiB
    # Block more messages from being published when the limit is reached. The
    # other options are :ignore and :error.
    limit_exceeded_behavior: :block
  }
}
# Rapidly publishing 1000 messages in a loop may be constrained by flow control.
1000.times do |i|
  topic.publish_async "message #{i}" do |result|
    raise "Failed to publish the message." unless result.succeeded?
  end
end

# Stop the async_publisher to send all queued messages immediately.
topic.async_publisher.stop.wait!
puts "Published messages with flow control settings to #{topic_id}."

O que se segue?

Para pesquisar e filtrar exemplos de código para outros Google Cloud produtos, consulte o Google Cloud navegador de exemplos.