Publier avec des clés de tri

Publiez des messages avec une clé de tri. Les clés de tri sont hachées pour être mappées à une partition. Les messages de la même clé de tri sont publiés dans la même partition. Si la partition est remplie, les messages les plus anciens sont écrasés.

En savoir plus

Pour obtenir une documentation détaillée incluant cet exemple de code, consultez les articles suivants :

Exemple de code

Go

Pour vous authentifier auprès de Pub/Sub Lite, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
)

func publishWithOrderingKey(w io.Writer, projectID, zone, topicID string, messageCount int) error {
	// projectID := "my-project-id"
	// zone := "us-central1-a"
	// topicID := "my-topic"
	// messageCount := 10
	ctx := context.Background()
	topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", projectID, zone, topicID)

	// Create the publisher client.
	publisher, err := pscompat.NewPublisherClient(ctx, topicPath)
	if err != nil {
		return fmt.Errorf("pscompat.NewPublisherClient error: %w", err)
	}

	// Ensure the publisher will be shut down.
	defer publisher.Stop()

	// Messages of the same ordering key will always get published to the same
	// partition. When OrderingKey is unset, messages can get published to
	// different partitions if more than one partition exists for the topic.
	var results []*pubsub.PublishResult
	for i := 0; i < messageCount; i++ {
		r := publisher.Publish(ctx, &pubsub.Message{
			OrderingKey: "test_ordering_key",
			Data:        []byte(fmt.Sprintf("message-%d", i)),
		})
		results = append(results, r)
	}

	// Print publish results.
	var publishedCount int
	for _, r := range results {
		// Get blocks until the result is ready.
		id, err := r.Get(ctx)
		if err != nil {
			// NOTE: A failed PublishResult indicates that the publisher client
			// encountered a fatal error and has permanently terminated. After the
			// fatal error has been resolved, a new publisher client instance must be
			// created to republish failed messages.
			fmt.Fprintf(w, "Publish error: %v\n", err)
			continue
		}

		// Metadata decoded from the id contains the partition and offset.
		metadata, err := pscompat.ParseMessageMetadata(id)
		if err != nil {
			return fmt.Errorf("failed to parse message metadata %q: %w", id, err)
		}
		fmt.Fprintf(w, "Published: partition=%d, offset=%d\n", metadata.Partition, metadata.Offset)
		publishedCount++
	}

	fmt.Fprintf(w, "Published %d messages with ordering key\n", publishedCount)
	return publisher.Error()
}

Java

Pour vous authentifier auprès de Pub/Sub Lite, configurez les identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.ExecutionException;

public class PublishWithOrderingKeyExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    publishWithOrderingKeyExample(cloudRegion, zoneId, projectNumber, topicId, regional);
  }

  // Publish a message to a topic with an ordering key.
  public static void publishWithOrderingKeyExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, boolean regional)
      throws ApiException, ExecutionException, InterruptedException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(location)
            .setName(TopicName.of(topicId))
            .build();

    PublisherSettings publisherSettings =
        PublisherSettings.newBuilder().setTopicPath(topicPath).build();

    Publisher publisher = Publisher.create(publisherSettings);

    // Start the publisher. Upon successful starting, its state will become RUNNING.
    publisher.startAsync().awaitRunning();

    String message = "message-with-ordering-key";

    // Convert the message to a byte string.
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage =
        PubsubMessage.newBuilder()
            .setData(data)
            // Messages of the same ordering key will always get published to the
            // same partition. When OrderingKey is unset, messages can get published
            // to different partitions if more than one partition exists for the topic.
            .setOrderingKey("testing")
            .build();

    // Publish a message.
    ApiFuture<String> future = publisher.publish(pubsubMessage);

    // Shut down the publisher.
    publisher.stopAsync().awaitTerminated();

    String ackId = future.get();
    MessageMetadata metadata = MessageMetadata.decode(ackId);
    System.out.println("Published a message with ordering key:\n" + metadata);
  }
}

Python

Pour vous authentifier auprès de Pub/Sub Lite, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    MessageMetadata,
    TopicPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# num_messages = 100
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

topic_path = TopicPath(project_number, location, topic_id)

# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
    for message in range(num_messages):
        data = f"{message}"
        # Messages of the same ordering key will always get published to the same partition.
        # When ordering_key is unset, messsages can get published ot different partitions if
        # more than one partition exists for the topic.
        api_future = publisher_client.publish(
            topic_path, data.encode("utf-8"), ordering_key="testing"
        )
        # result() blocks. To resolve api futures asynchronously, use add_done_callback().
        message_id = api_future.result()
        message_metadata = MessageMetadata.decode(message_id)
        print(
            f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
        )

print(
    f"Finished publishing {num_messages} messages with an ordering key to {str(topic_path)}."
)

Étapes suivantes

Pour rechercher et filtrer des exemples de code pour d'autres produits Google Cloud, consultez l'explorateur d'exemples Google Cloud.