Publier des messages dans des sujets Lite

Cette page explique comment publier des messages dans des sujets Lite. Vous pouvez publier des messages avec la bibliothèque cliente Pub/Sub Lite pour Java.

Après avoir publié des messages et créé un abonnement Lite dans un sujet Lite, vous pouvez recevoir des messages à partir de l'abonnement Lite.

Format du message

Un message est constitué de champs contenant des données du message et des métadonnées. Spécifiez au moins l'un des éléments suivants dans le message :

La bibliothèque cliente attribue automatiquement le message à une partition, et le service Pub/Sub Lite ajoute les champs suivants au message :

  • Un ID de message unique dans la partition
  • Horodatage correspondant au moment où le service Pub/Sub Lite stocke le message dans la partition

Publier des messages

Pour publier des messages, demandez une connexion en streaming au sujet Lite, puis envoyez des messages via la connexion en streaming.

L'exemple suivant montre comment publier des messages dans un sujet Lite :

gcloud

Cette commande nécessite Python 3.6 ou version ultérieure et l'installation du package Python grpcio. Pour les utilisateurs de macOS, Linux et Cloud Shell, exécutez :

sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1

Pour publier un message, exécutez la commande gcloud pubsub topics publish :

gcloud pubsub lite-topics publish TOPIC_ID \
    --location=LITE_LOCATION \
    --message=MESSAGE_DATA

Remplacez les éléments suivants :

  • TOPIC_ID : ID du sujet Lite
  • LITE_LOCATION : emplacement du sujet Lite
  • MESSAGE_DATA : chaîne contenant les données du message

Go

Avant d'exécuter cet exemple, suivez les instructions de configuration de Go décrites dans les bibliothèques clientes Pub/Sub Lite.


package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"sync"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
	"golang.org/x/sync/errgroup"
)

func main() {
	// NOTE: Set these flags for an existing Pub/Sub Lite topic when running this
	// sample.
	projectID := flag.String("project_id", "", "Cloud Project ID")
	zone := flag.String("zone", "", "Cloud Zone where the topic resides, e.g. us-central1-a")
	topicID := flag.String("topic_id", "", "Existing Pub/Sub Lite topic")
	messageCount := flag.Int("message_count", 100, "The number of messages to send")
	flag.Parse()

	ctx := context.Background()
	topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", *projectID, *zone, *topicID)

	// Create the publisher client.
	publisher, err := pscompat.NewPublisherClient(ctx, topicPath)
	if err != nil {
		log.Fatalf("pscompat.NewPublisherClient error: %v", err)
	}

	// Ensure the publisher will be shut down.
	defer publisher.Stop()

	// Collect any messages that need to be republished with a new publisher
	// client.
	var toRepublish []*pubsub.Message
	var toRepublishMu sync.Mutex

	// Publish messages. Messages are automatically batched.
	g := new(errgroup.Group)
	for i := 0; i < *messageCount; i++ {
		msg := &pubsub.Message{
			Data: []byte(fmt.Sprintf("message-%d", i)),
		}
		result := publisher.Publish(ctx, msg)

		g.Go(func() error {
			// Get blocks until the result is ready.
			id, err := result.Get(ctx)
			if err != nil {
				// NOTE: A failed PublishResult indicates that the publisher client
				// encountered a fatal error and has permanently terminated. After the
				// fatal error has been resolved, a new publisher client instance must
				// be created to republish failed messages.
				fmt.Printf("Publish error: %v\n", err)
				toRepublishMu.Lock()
				toRepublish = append(toRepublish, msg)
				toRepublishMu.Unlock()
				return err
			}

			// Metadata decoded from the id contains the partition and offset.
			metadata, err := pscompat.ParseMessageMetadata(id)
			if err != nil {
				fmt.Printf("Failed to parse message metadata %q: %v\n", id, err)
				return err
			}
			fmt.Printf("Published: partition=%d, offset=%d\n", metadata.Partition, metadata.Offset)
			return nil
		})
	}
	if err := g.Wait(); err != nil {
		fmt.Printf("Publishing finished with error: %v\n", err)
	}
	fmt.Printf("Published %d messages\n", *messageCount-len(toRepublish))

	// Print the error that caused the publisher client to terminate (if any),
	// which may contain more context than PublishResults.
	if err := publisher.Error(); err != nil {
		fmt.Printf("Publisher client terminated due to error: %v\n", publisher.Error())
	}
}

Java

Avant d'exécuter cet exemple, suivez les instructions de configuration de Java décrites dans les bibliothèques clientes Pub/Sub Lite.

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;

public class PublisherExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    int messageCount = 100;
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    publisherExample(cloudRegion, zoneId, projectNumber, topicId, messageCount, regional);
  }

  // Publish messages to a topic.
  public static void publisherExample(
      String cloudRegion,
      char zoneId,
      long projectNumber,
      String topicId,
      int messageCount,
      boolean regional)
      throws ApiException, ExecutionException, InterruptedException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(location)
            .setName(TopicName.of(topicId))
            .build();

    Publisher publisher = null;
    List<ApiFuture<String>> futures = new ArrayList<>();

    try {
      PublisherSettings publisherSettings =
          PublisherSettings.newBuilder().setTopicPath(topicPath).build();

      publisher = Publisher.create(publisherSettings);

      // Start the publisher. Upon successful starting, its state will become RUNNING.
      publisher.startAsync().awaitRunning();

      for (int i = 0; i < messageCount; i++) {
        String message = "message-" + i;

        // Convert the message to a byte string.
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Publish a message. Messages are automatically batched.
        ApiFuture<String> future = publisher.publish(pubsubMessage);
        futures.add(future);
      }
    } finally {
      ArrayList<MessageMetadata> metadata = new ArrayList<>();
      List<String> ackIds = ApiFutures.allAsList(futures).get();
      for (String id : ackIds) {
        // Decoded metadata contains partition and offset.
        metadata.add(MessageMetadata.decode(id));
      }
      System.out.println(metadata + "\nPublished " + ackIds.size() + " messages.");

      if (publisher != null) {
        // Shut down the publisher.
        publisher.stopAsync().awaitTerminated();
        System.out.println("Publisher is shut down.");
      }
    }
  }
}

Python

Avant d'exécuter cet exemple, suivez les instructions de configuration de Python décrites dans les bibliothèques clientes Pub/Sub Lite.

from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    MessageMetadata,
    TopicPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

topic_path = TopicPath(project_number, location, topic_id)

# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
    data = "Hello world!"
    api_future = publisher_client.publish(topic_path, data.encode("utf-8"))
    # result() blocks. To resolve API futures asynchronously, use add_done_callback().
    message_id = api_future.result()
    message_metadata = MessageMetadata.decode(message_id)
    print(
        f"Published a message to {topic_path} with partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
    )

La bibliothèque cliente envoie des messages de manière asynchrone et gère les erreurs. Si une erreur se produit, la bibliothèque cliente envoie à nouveau le message.

  1. Le service Pub/Sub Lite ferme le flux.
  2. La bibliothèque cliente met les messages en mémoire tampon et rétablit une connexion au sujet Lite.
  3. La bibliothèque cliente envoie les messages dans l'ordre.

Après la publication d'un message, le service Pub/Sub Lite stocke le message dans une partition et renvoie l'ID du message à l'éditeur.

Utiliser des clés de tri

Si les messages ont la même clé de tri, la bibliothèque cliente les attribue à la même partition. La clé de tri doit être une chaîne de 1 024 octets maximum.

La clé de tri se trouve dans le champ key d'un message. Vous pouvez définir des clés de tri avec la bibliothèque cliente.

gcloud

Cette commande nécessite Python 3.6 ou version ultérieure et l'installation du package Python grpcio. Pour les utilisateurs de macOS, Linux et Cloud Shell, exécutez :

sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1

Pour publier un message, exécutez la commande gcloud pubsub topics publish :

gcloud pubsub lite-topics publish TOPIC_ID \
    --location=LITE_LOCATION \
    --ordering-key=ORDERING_KEY \
    --message=MESSAGE_DATA

Remplacez les éléments suivants :

  • TOPIC_ID : ID du sujet Lite
  • LITE_LOCATION : emplacement du sujet Lite
  • ORDERING_KEY : chaîne permettant d'attribuer des messages aux partitions
  • MESSAGE_DATA : chaîne contenant les données du message

Go

Avant d'exécuter cet exemple, suivez les instructions de configuration de Go décrites dans les bibliothèques clientes Pub/Sub Lite.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
)

func publishWithOrderingKey(w io.Writer, projectID, zone, topicID string, messageCount int) error {
	// projectID := "my-project-id"
	// zone := "us-central1-a"
	// topicID := "my-topic"
	// messageCount := 10
	ctx := context.Background()
	topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", projectID, zone, topicID)

	// Create the publisher client.
	publisher, err := pscompat.NewPublisherClient(ctx, topicPath)
	if err != nil {
		return fmt.Errorf("pscompat.NewPublisherClient error: %w", err)
	}

	// Ensure the publisher will be shut down.
	defer publisher.Stop()

	// Messages of the same ordering key will always get published to the same
	// partition. When OrderingKey is unset, messages can get published to
	// different partitions if more than one partition exists for the topic.
	var results []*pubsub.PublishResult
	for i := 0; i < messageCount; i++ {
		r := publisher.Publish(ctx, &pubsub.Message{
			OrderingKey: "test_ordering_key",
			Data:        []byte(fmt.Sprintf("message-%d", i)),
		})
		results = append(results, r)
	}

	// Print publish results.
	var publishedCount int
	for _, r := range results {
		// Get blocks until the result is ready.
		id, err := r.Get(ctx)
		if err != nil {
			// NOTE: A failed PublishResult indicates that the publisher client
			// encountered a fatal error and has permanently terminated. After the
			// fatal error has been resolved, a new publisher client instance must be
			// created to republish failed messages.
			fmt.Fprintf(w, "Publish error: %v\n", err)
			continue
		}

		// Metadata decoded from the id contains the partition and offset.
		metadata, err := pscompat.ParseMessageMetadata(id)
		if err != nil {
			return fmt.Errorf("failed to parse message metadata %q: %w", id, err)
		}
		fmt.Fprintf(w, "Published: partition=%d, offset=%d\n", metadata.Partition, metadata.Offset)
		publishedCount++
	}

	fmt.Fprintf(w, "Published %d messages with ordering key\n", publishedCount)
	return publisher.Error()
}

Java

Avant d'exécuter cet exemple, suivez les instructions de configuration de Java décrites dans les bibliothèques clientes Pub/Sub Lite.

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.ExecutionException;

public class PublishWithOrderingKeyExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    publishWithOrderingKeyExample(cloudRegion, zoneId, projectNumber, topicId, regional);
  }

  // Publish a message to a topic with an ordering key.
  public static void publishWithOrderingKeyExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, boolean regional)
      throws ApiException, ExecutionException, InterruptedException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(location)
            .setName(TopicName.of(topicId))
            .build();

    PublisherSettings publisherSettings =
        PublisherSettings.newBuilder().setTopicPath(topicPath).build();

    Publisher publisher = Publisher.create(publisherSettings);

    // Start the publisher. Upon successful starting, its state will become RUNNING.
    publisher.startAsync().awaitRunning();

    String message = "message-with-ordering-key";

    // Convert the message to a byte string.
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage =
        PubsubMessage.newBuilder()
            .setData(data)
            // Messages of the same ordering key will always get published to the
            // same partition. When OrderingKey is unset, messages can get published
            // to different partitions if more than one partition exists for the topic.
            .setOrderingKey("testing")
            .build();

    // Publish a message.
    ApiFuture<String> future = publisher.publish(pubsubMessage);

    // Shut down the publisher.
    publisher.stopAsync().awaitTerminated();

    String ackId = future.get();
    MessageMetadata metadata = MessageMetadata.decode(ackId);
    System.out.println("Published a message with ordering key:\n" + metadata);
  }
}

Python

Avant d'exécuter cet exemple, suivez les instructions de configuration de Python décrites dans les bibliothèques clientes Pub/Sub Lite.

from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    MessageMetadata,
    TopicPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# num_messages = 100
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

topic_path = TopicPath(project_number, location, topic_id)

# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
    for message in range(num_messages):
        data = f"{message}"
        # Messages of the same ordering key will always get published to the same partition.
        # When ordering_key is unset, messsages can get published ot different partitions if
        # more than one partition exists for the topic.
        api_future = publisher_client.publish(
            topic_path, data.encode("utf-8"), ordering_key="testing"
        )
        # result() blocks. To resolve api futures asynchronously, use add_done_callback().
        message_id = api_future.result()
        message_metadata = MessageMetadata.decode(message_id)
        print(
            f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
        )

print(
    f"Finished publishing {num_messages} messages with an ordering key to {str(topic_path)}."
)

Vous pouvez envoyer plusieurs messages à la même partition à l'aide de clés de tri, afin que les abonnés reçoivent les messages dans l'ordre. La bibliothèque cliente peut attribuer plusieurs clés de tri à la même partition.

Définir l'heure de l'événement

Vous pouvez utiliser l'heure de l'événement pour publier vos messages Lite. L'heure de l'événement est un attribut personnalisé que vous pouvez ajouter à votre message.

Vous pouvez définir l'horodatage de l'événement à l'aide de la bibliothèque cliente ou de la gcloud CLI.

Cette commande nécessite Python 3.6 ou version ultérieure et l'installation du package Python grpcio. Pour les utilisateurs de macOS, Linux et Cloud Shell, exécutez :

sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1

Pour publier un message, exécutez la commande gcloud pubsub topics publish :

gcloud pubsub lite-topics publish TOPIC_ID \
    --location=LITE_LOCATION \
    --event-time=EVENT_TIME \
    --message=MESSAGE_DATA

Remplacez les éléments suivants :

  • TOPIC_ID : ID du sujet Lite

  • LITE_LOCATION : emplacement du sujet Lite

  • EVENT_TIME: heure de l'événement spécifiée par l'utilisateur. Pour en savoir plus sur les formats d'heure, exécutez gcloud topic datetimes.

  • MESSAGE_DATA : chaîne contenant les données du message

Utiliser des attributs

Les attributs de message sont des paires clé/valeur avec des métadonnées associées au message. Les attributs peuvent être des chaînes de texte ou d'octets.

Les attributs se trouvent dans le champ attributes d'un message. Vous pouvez définir des attributs avec la bibliothèque cliente.

gcloud

Cette commande nécessite Python 3.6 ou version ultérieure et l'installation du package Python grpcio. Pour les utilisateurs de macOS, Linux et Cloud Shell, exécutez :

sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1

Pour publier un message, exécutez la commande gcloud pubsub topics publish :

gcloud pubsub lite-topics publish TOPIC_ID \
    --location=LITE_LOCATION \
    --message=MESSAGE_DATA \
    --attribute=KEY=VALUE,...

Remplacez les éléments suivants :

  • TOPIC_ID : ID du sujet Lite
  • LITE_LOCATION : emplacement du sujet Lite
  • MESSAGE_DATA : chaîne contenant les données du message
  • KEY : clé d'un attribut de message
  • VALUE : valeur de la clé de l'attribut de message

Go

Avant d'exécuter cet exemple, suivez les instructions de configuration de Go décrites dans les bibliothèques clientes Pub/Sub Lite.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
)

func publishWithCustomAttributes(w io.Writer, projectID, zone, topicID string) error {
	// projectID := "my-project-id"
	// zone := "us-central1-a"
	// topicID := "my-topic"
	ctx := context.Background()
	topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", projectID, zone, topicID)

	// Create the publisher client.
	publisher, err := pscompat.NewPublisherClient(ctx, topicPath)
	if err != nil {
		return fmt.Errorf("pscompat.NewPublisherClient error: %w", err)
	}

	// Ensure the publisher will be shut down.
	defer publisher.Stop()

	// Publish a message with custom attributes.
	result := publisher.Publish(ctx, &pubsub.Message{
		Data: []byte("message-with-custom-attributes"),
		Attributes: map[string]string{
			"year":   "2020",
			"author": "unknown",
		},
	})

	// Get blocks until the result is ready.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("publish error: %w", err)
	}

	fmt.Fprintf(w, "Published a message with custom attributes: %v\n", id)
	return publisher.Error()
}

Java

Avant d'exécuter cet exemple, suivez les instructions de configuration de Java décrites dans les bibliothèques clientes Pub/Sub Lite.

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.MessageTransforms;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
import com.google.pubsub.v1.PubsubMessage;
import java.time.Instant;
import java.util.concurrent.ExecutionException;

public class PublishWithCustomAttributesExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    publishWithCustomAttributesExample(cloudRegion, zoneId, projectNumber, topicId, regional);
  }

  // Publish messages to a topic with custom attributes.
  public static void publishWithCustomAttributesExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, boolean regional)
      throws ApiException, ExecutionException, InterruptedException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(location)
            .setName(TopicName.of(topicId))
            .build();

    PublisherSettings publisherSettings =
        PublisherSettings.newBuilder().setTopicPath(topicPath).build();

    Publisher publisher = Publisher.create(publisherSettings);

    // Start the publisher. Upon successful starting, its state will become RUNNING.
    publisher.startAsync().awaitRunning();

    // Prepare the message data as a byte string.
    String messageData = "message-with-custom-attributes";
    ByteString data = ByteString.copyFromUtf8(messageData);

    // Prepare a protobuf-encoded event timestamp for the message.
    Instant now = Instant.now();
    String eventTime =
        MessageTransforms.encodeAttributeEventTime(Timestamps.fromMillis(now.toEpochMilli()));

    PubsubMessage pubsubMessage =
        PubsubMessage.newBuilder()
            .setData(data)
            // Add two sets of custom attributes to the message.
            .putAllAttributes(ImmutableMap.of("year", "2020", "author", "unknown"))
            // Add an event timestamp as an attribute.
            .putAttributes(MessageTransforms.PUBSUB_LITE_EVENT_TIME_TIMESTAMP_PROTO, eventTime)
            .build();

    // Publish a message.
    ApiFuture<String> future = publisher.publish(pubsubMessage);

    // Shut down the publisher.
    publisher.stopAsync().awaitTerminated();

    String ackId = future.get();
    MessageMetadata metadata = MessageMetadata.decode(ackId);
    System.out.println("Published a message with custom attributes:\n" + metadata);
  }
}

Python

Avant d'exécuter cet exemple, suivez les instructions de configuration de Python décrites dans les bibliothèques clientes Pub/Sub Lite.

from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    MessageMetadata,
    TopicPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

topic_path = TopicPath(project_number, location, topic_id)

# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
    data = "Hello world!"
    api_future = publisher_client.publish(
        topic_path,
        data.encode("utf-8"),
        year="2020",
        author="unknown",
    )
    # result() blocks. To resolve api futures asynchronously, use add_done_callback().
    message_id = api_future.result()
    message_metadata = MessageMetadata.decode(message_id)
    print(
        f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
    )

print(f"Finished publishing a message with custom attributes to {str(topic_path)}.")

Les attributs peuvent indiquer comment traiter un message. Les abonnés peuvent analyser le champ attributes d'un message et le traiter en fonction de ses attributs.

Messages par lot

La bibliothèque cliente publie des messages par lot. Les lots plus volumineux utilisent moins de ressources de calcul, mais augmentent la latence. Vous pouvez modifier la taille du lot à l'aide des paramètres de traitement par lot.

Le tableau suivant répertorie les paramètres de traitement par lot que vous pouvez configurer :

Paramètre Description Par défaut
Taille d'une requête Taille maximale du lot, exprimée en octets. 3,5 Mio
Le nombre de messages Nombre maximal de messages dans un lot. 1 000 messages
Délai de publication Durée, en millisecondes, entre l'ajout du message à un lot et l'envoi du lot au sujet Lite. 50 millisecondes

Vous pouvez configurer les paramètres de traitement par lot avec la bibliothèque cliente.

Go

Avant d'exécuter cet exemple, suivez les instructions de configuration de Go décrites dans les bibliothèques clientes Pub/Sub Lite.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
)

func publishWithBatchSettings(w io.Writer, projectID, zone, topicID string, messageCount int) error {
	// projectID := "my-project-id"
	// zone := "us-central1-a"
	// topicID := "my-topic"
	// messageCount := 10
	ctx := context.Background()
	topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", projectID, zone, topicID)

	// Batch settings control how the publisher batches messages. These settings
	// apply per partition.
	// See https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#pkg-variables
	// for DefaultPublishSettings.
	settings := pscompat.PublishSettings{
		ByteThreshold:  5 * 1024, // 5 KiB
		CountThreshold: 1000,     // 1,000 messages
		DelayThreshold: 100 * time.Millisecond,
	}

	// Create the publisher client.
	publisher, err := pscompat.NewPublisherClientWithSettings(ctx, topicPath, settings)
	if err != nil {
		return fmt.Errorf("pscompat.NewPublisherClientWithSettings error: %w", err)
	}

	// Ensure the publisher will be shut down.
	defer publisher.Stop()

	// Publish requests are sent to the server based on request size, message
	// count and time since last publish, whichever condition is met first.
	var results []*pubsub.PublishResult
	for i := 0; i < messageCount; i++ {
		r := publisher.Publish(ctx, &pubsub.Message{
			Data: []byte(fmt.Sprintf("message-%d", i)),
		})
		results = append(results, r)
	}

	// Print publish results.
	var publishedCount int
	for _, r := range results {
		// Get blocks until the result is ready.
		id, err := r.Get(ctx)
		if err != nil {
			// NOTE: A failed PublishResult indicates that the publisher client
			// encountered a fatal error and has permanently terminated. After the
			// fatal error has been resolved, a new publisher client instance must be
			// created to republish failed messages.
			fmt.Fprintf(w, "Publish error: %v\n", err)
			continue
		}
		fmt.Fprintf(w, "Published: %v\n", id)
		publishedCount++
	}

	fmt.Fprintf(w, "Published %d messages with batch settings\n", publishedCount)
	return publisher.Error()
}

Java

Avant d'exécuter cet exemple, suivez les instructions de configuration de Java décrites dans les bibliothèques clientes Pub/Sub Lite.

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.threeten.bp.Duration;

public class PublishWithBatchSettingsExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    int messageCount = 100;
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    publishWithBatchSettingsExample(
        cloudRegion, zoneId, projectNumber, topicId, messageCount, regional);
  }

  // Publish messages to a topic with batch settings.
  public static void publishWithBatchSettingsExample(
      String cloudRegion,
      char zoneId,
      long projectNumber,
      String topicId,
      int messageCount,
      boolean regional)
      throws ApiException, ExecutionException, InterruptedException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(location)
            .setName(TopicName.of(topicId))
            .build();

    Publisher publisher = null;
    List<ApiFuture<String>> futures = new ArrayList<>();

    try {
      // Batch settings control how the publisher batches messages
      long requestBytesThreshold = 5000L; // default : 3_500_000 bytes
      long messageCountBatchSize = 100L; // default : 1000L message
      Duration publishDelayThreshold = Duration.ofMillis(100); // default : 50 ms

      // Publish request get triggered based on request size, messages count & time since last
      // publish, whichever condition is met first.
      BatchingSettings batchingSettings =
          BatchingSettings.newBuilder()
              .setRequestByteThreshold(requestBytesThreshold)
              .setElementCountThreshold(messageCountBatchSize)
              .setDelayThreshold(publishDelayThreshold)
              .build();

      PublisherSettings publisherSettings =
          PublisherSettings.newBuilder()
              .setTopicPath(topicPath)
              .setBatchingSettings(batchingSettings)
              .build();

      publisher = Publisher.create(publisherSettings);

      // Start the publisher. Upon successful starting, its state will become RUNNING.
      publisher.startAsync().awaitRunning();

      for (int i = 0; i < messageCount; i++) {
        String message = "message-" + i;

        // Convert the message to a byte string.
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Publish a message.
        ApiFuture<String> future = publisher.publish(pubsubMessage);
        futures.add(future);
      }
    } finally {
      ArrayList<MessageMetadata> metadata = new ArrayList<>();
      List<String> ackIds = ApiFutures.allAsList(futures).get();
      System.out.println("Published " + ackIds.size() + " messages with batch settings.");

      if (publisher != null) {
        // Shut down the publisher.
        publisher.stopAsync().awaitTerminated();
      }
    }
  }
}

Python

Avant d'exécuter cet exemple, suivez les instructions de configuration de Python décrites dans les bibliothèques clientes Pub/Sub Lite.

from google.cloud.pubsub_v1.types import BatchSettings
from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    MessageMetadata,
    TopicPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# num_messages = 100
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

topic_path = TopicPath(project_number, location, topic_id)
batch_setttings = BatchSettings(
    # 2 MiB. Default to 3 MiB. Must be less than 4 MiB gRPC's per-message limit.
    max_bytes=2 * 1024 * 1024,
    # 100 ms. Default to 50 ms.
    max_latency=0.1,
    # Default to 1000.
    max_messages=100,
)

# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient(
    per_partition_batching_settings=batch_setttings
) as publisher_client:
    for message in range(num_messages):
        data = f"{message}"
        api_future = publisher_client.publish(topic_path, data.encode("utf-8"))
        # result() blocks. To resolve API futures asynchronously, use add_done_callback().
        message_id = api_future.result()
        message_metadata = MessageMetadata.decode(message_id)
        print(
            f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
        )

print(
    f"Finished publishing {num_messages} messages with batch settings to {str(topic_path)}."
)

Lorsqu'une application d'éditeur démarre, la bibliothèque cliente crée un lot pour chaque partition d'un sujet Lite. Par exemple, si un sujet Lite comporte deux partitions, les éditeurs créent deux lots et envoient chaque lot à une partition.

Après la publication d'un message, la bibliothèque cliente la met en mémoire tampon jusqu'à ce que le lot dépasse la taille maximale de la requête, le nombre maximal de messages ou le délai de publication.

Trier des messages

Les sujets Lite trient les messages dans chaque partition en fonction de leur date de publication. Pour attribuer des messages à la même partition, utilisez une clé de tri.

Pub/Sub Lite distribue les messages d'une partition dans l'ordre, et les abonnés peuvent les traiter dans l'ordre. Pour en savoir plus, consultez la page Recevoir des messages.

Idempotence de publication

Les bibliothèques clientes Pub/Sub Lite sont compatibles avec la publication idempotente à partir des versions suivantes:

Si la publication d'un message fait l'objet d'une nouvelle tentative de publication en raison d'erreurs réseau ou de serveur, il est stocké une seule fois. L'idempotence n'est garantie que dans la même session. Elle ne peut pas l'être si le même message est republié à l'aide d'un nouveau client éditeur. Cette opération n'entraîne aucuns frais de service supplémentaires et n'augmente pas la latence de publication.

Activer ou désactiver la publication idempotente

La publication idempotente est activée par défaut dans les bibliothèques clientes Pub/Sub Lite. Vous pouvez la désactiver dans les paramètres client de l'éditeur dans la bibliothèque cliente correspondante.

Si la publication idempotente est activée, le décalage renvoyé dans un résultat de publication peut être -1. Cette valeur est renvoyée lorsque le message est identifié comme doublon d'un message déjà publié, mais que le serveur ne disposait pas de suffisamment d'informations pour renvoyer le décalage du message au moment de la publication. Les messages reçus par les abonnés ont toujours un décalage valide.

Dépannage

Doublons reçus

L'idempotence étant limitée à une seule session, des doublons peuvent être reçus si vous recréez le client éditeur pour publier les mêmes messages.

Un client abonné peut recevoir le même message plusieurs fois si des partitions sont automatiquement attribuées aux abonnés par le service Pub/Sub Lite (paramètre par défaut). Un message peut être redistribué à un autre client abonné en cas de réattribution.

Erreur de l'éditeur

L'état d'une session d'éditeur correspond à la récupération de mémoire sur le serveur après sept jours d'inactivité. Si une session est réactivée au-delà de ce délai, le client de l'éditeur se termine avec un message d'erreur semblable à "Condition préalable non remplie: numéro de séquence de publication attendu pour le message reçu..." et n'accepte pas les nouveaux messages. Recréez le client éditeur pour résoudre cette erreur.