Nachrichten in Lite-Themen veröffentlichen

Auf dieser Seite wird erläutert, wie Sie Nachrichten zu Lite-Themen veröffentlichen. Sie können Nachrichten mit der Pub/Sub Lite-Clientbibliothek für Java veröffentlichen.

Nach dem Veröffentlichen von Nachrichten und Erstellen eines Lite-Abos für ein Lite-Thema können Sie Nachrichten vom Lite-Abo empfangen.

Nachrichtenformat

Eine Nachricht besteht aus Feldern mit den Nachrichtendaten und Metadaten. Geben Sie in der Nachricht Folgendes an:

Die Clientbibliothek weist die Nachricht automatisch einer Partition zu. Der Pub/Sub Lite-Dienst fügt der Nachricht die folgenden Felder hinzu:

  • Eine innerhalb der Partition eindeutige Nachrichten-ID
  • Ein Zeitstempel für den Zeitpunkt, zu dem der Pub/Sub Lite-Dienst die Nachricht in der Partition speichert

Nachrichten veröffentlichen

Wenn Sie Nachrichten veröffentlichen möchten, fordern Sie eine Streamingverbindung an das Lite-Thema an und senden Sie dann Nachrichten über die Streamingverbindung.

Das folgende Beispiel zeigt, wie Sie Nachrichten in einem Lite-Thema veröffentlichen:

gcloud

Für diesen Befehl ist Python 3.6 oder höher erforderlich und das grpcio-Python-Paket muss installiert sein. Führen Sie für MacOS-, Linux- und Cloud Shell-Nutzer Folgendes aus:

sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1

Veröffentlichen Sie eine Nachricht mit dem Befehl gcloud pubsub lite-topics publish:

gcloud pubsub lite-topics publish TOPIC_ID \
    --location=LITE_LOCATION \
    --message=MESSAGE_DATA

Ersetzen Sie Folgendes:

  • TOPIC_ID: die ID des Lite-Themas
  • LITE_LOCATION: der Standort des Lite-Themas
  • MESSAGE_DATA: ein String mit den Nachrichtendaten

Einfach loslegen (Go)

Folgen Sie der Einrichtungsanleitung für Go in Pub/Sub Lite-Clientbibliotheken, bevor Sie dieses Beispiel ausführen.


package main

import (
	"context"
	"flag"
	"fmt"
	"log"
	"sync"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
	"golang.org/x/sync/errgroup"
)

func main() {
	// NOTE: Set these flags for an existing Pub/Sub Lite topic when running this
	// sample.
	projectID := flag.String("project_id", "", "Cloud Project ID")
	zone := flag.String("zone", "", "Cloud Zone where the topic resides, e.g. us-central1-a")
	topicID := flag.String("topic_id", "", "Existing Pub/Sub Lite topic")
	messageCount := flag.Int("message_count", 100, "The number of messages to send")
	flag.Parse()

	ctx := context.Background()
	topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", *projectID, *zone, *topicID)

	// Create the publisher client.
	publisher, err := pscompat.NewPublisherClient(ctx, topicPath)
	if err != nil {
		log.Fatalf("pscompat.NewPublisherClient error: %v", err)
	}

	// Ensure the publisher will be shut down.
	defer publisher.Stop()

	// Collect any messages that need to be republished with a new publisher
	// client.
	var toRepublish []*pubsub.Message
	var toRepublishMu sync.Mutex

	// Publish messages. Messages are automatically batched.
	g := new(errgroup.Group)
	for i := 0; i < *messageCount; i++ {
		msg := &pubsub.Message{
			Data: []byte(fmt.Sprintf("message-%d", i)),
		}
		result := publisher.Publish(ctx, msg)

		g.Go(func() error {
			// Get blocks until the result is ready.
			id, err := result.Get(ctx)
			if err != nil {
				// NOTE: A failed PublishResult indicates that the publisher client
				// encountered a fatal error and has permanently terminated. After the
				// fatal error has been resolved, a new publisher client instance must
				// be created to republish failed messages.
				fmt.Printf("Publish error: %v\n", err)
				toRepublishMu.Lock()
				toRepublish = append(toRepublish, msg)
				toRepublishMu.Unlock()
				return err
			}

			// Metadata decoded from the id contains the partition and offset.
			metadata, err := pscompat.ParseMessageMetadata(id)
			if err != nil {
				fmt.Printf("Failed to parse message metadata %q: %v\n", id, err)
				return err
			}
			fmt.Printf("Published: partition=%d, offset=%d\n", metadata.Partition, metadata.Offset)
			return nil
		})
	}
	if err := g.Wait(); err != nil {
		fmt.Printf("Publishing finished with error: %v\n", err)
	}
	fmt.Printf("Published %d messages\n", *messageCount-len(toRepublish))

	// Print the error that caused the publisher client to terminate (if any),
	// which may contain more context than PublishResults.
	if err := publisher.Error(); err != nil {
		fmt.Printf("Publisher client terminated due to error: %v\n", publisher.Error())
	}
}

Java

Folgen Sie der Einrichtungsanleitung für Java unter Pub/Sub Lite-Clientbibliotheken, bevor Sie dieses Beispiel ausführen.

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;

public class PublisherExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    int messageCount = 100;
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    publisherExample(cloudRegion, zoneId, projectNumber, topicId, messageCount, regional);
  }

  // Publish messages to a topic.
  public static void publisherExample(
      String cloudRegion,
      char zoneId,
      long projectNumber,
      String topicId,
      int messageCount,
      boolean regional)
      throws ApiException, ExecutionException, InterruptedException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(location)
            .setName(TopicName.of(topicId))
            .build();

    Publisher publisher = null;
    List<ApiFuture<String>> futures = new ArrayList<>();

    try {
      PublisherSettings publisherSettings =
          PublisherSettings.newBuilder().setTopicPath(topicPath).build();

      publisher = Publisher.create(publisherSettings);

      // Start the publisher. Upon successful starting, its state will become RUNNING.
      publisher.startAsync().awaitRunning();

      for (int i = 0; i < messageCount; i++) {
        String message = "message-" + i;

        // Convert the message to a byte string.
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Publish a message. Messages are automatically batched.
        ApiFuture<String> future = publisher.publish(pubsubMessage);
        futures.add(future);
      }
    } finally {
      ArrayList<MessageMetadata> metadata = new ArrayList<>();
      List<String> ackIds = ApiFutures.allAsList(futures).get();
      for (String id : ackIds) {
        // Decoded metadata contains partition and offset.
        metadata.add(MessageMetadata.decode(id));
      }
      System.out.println(metadata + "\nPublished " + ackIds.size() + " messages.");

      if (publisher != null) {
        // Shut down the publisher.
        publisher.stopAsync().awaitTerminated();
        System.out.println("Publisher is shut down.");
      }
    }
  }
}

Python

Folgen Sie der Einrichtungsanleitung für Python unter Pub/Sub Lite-Clientbibliotheken, bevor Sie dieses Beispiel ausführen.

from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    MessageMetadata,
    TopicPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

topic_path = TopicPath(project_number, location, topic_id)

# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
    data = "Hello world!"
    api_future = publisher_client.publish(topic_path, data.encode("utf-8"))
    # result() blocks. To resolve API futures asynchronously, use add_done_callback().
    message_id = api_future.result()
    message_metadata = MessageMetadata.decode(message_id)
    print(
        f"Published a message to {topic_path} with partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
    )

Die Clientbibliothek sendet asynchron Nachrichten und verarbeitet Fehler. Wenn ein Fehler auftritt, sendet die Clientbibliothek die Nachricht erneut.

  1. Der Pub/Sub Lite-Dienst schließt den Stream.
  2. Die Clientbibliothek puffert die Nachrichten und stellt eine Verbindung zum Lite-Thema wieder her.
  3. Die Clientbibliothek sendet die Nachrichten der Reihe nach.

Nachdem Sie eine Nachricht veröffentlicht haben, speichert der Pub/Sub Lite-Dienst die Nachricht in einer Partition und gibt die Nachrichten-ID an den Publisher zurück.

Sortierschlüssel verwenden

Wenn Nachrichten denselben Reihenfolgeschlüssel haben, weist die Clientbibliothek die Nachrichten derselben Partition zu. Der Sortierschlüssel darf maximal 1.024 Byte lang sein.

Der Sortierschlüssel befindet sich im Feld key einer Nachricht. Sie können Sortierschlüssel mit der Clientbibliothek festlegen.

gcloud

Für diesen Befehl ist Python 3.6 oder höher erforderlich und das grpcio-Python-Paket muss installiert sein. Führen Sie für MacOS-, Linux- und Cloud Shell-Nutzer Folgendes aus:

sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1

Veröffentlichen Sie eine Nachricht mit dem Befehl gcloud pubsub lite-topics publish:

gcloud pubsub lite-topics publish TOPIC_ID \
    --location=LITE_LOCATION \
    --ordering-key=ORDERING_KEY \
    --message=MESSAGE_DATA

Ersetzen Sie Folgendes:

  • TOPIC_ID: die ID des Lite-Themas
  • LITE_LOCATION: der Standort des Lite-Themas
  • ORDERING_KEY: ein String, mit dem Nachrichten Partitionen zugewiesen werden
  • MESSAGE_DATA: ein String mit den Nachrichtendaten

Einfach loslegen (Go)

Folgen Sie der Einrichtungsanleitung für Go in Pub/Sub Lite-Clientbibliotheken, bevor Sie dieses Beispiel ausführen.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
)

func publishWithOrderingKey(w io.Writer, projectID, zone, topicID string, messageCount int) error {
	// projectID := "my-project-id"
	// zone := "us-central1-a"
	// topicID := "my-topic"
	// messageCount := 10
	ctx := context.Background()
	topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", projectID, zone, topicID)

	// Create the publisher client.
	publisher, err := pscompat.NewPublisherClient(ctx, topicPath)
	if err != nil {
		return fmt.Errorf("pscompat.NewPublisherClient error: %w", err)
	}

	// Ensure the publisher will be shut down.
	defer publisher.Stop()

	// Messages of the same ordering key will always get published to the same
	// partition. When OrderingKey is unset, messages can get published to
	// different partitions if more than one partition exists for the topic.
	var results []*pubsub.PublishResult
	for i := 0; i < messageCount; i++ {
		r := publisher.Publish(ctx, &pubsub.Message{
			OrderingKey: "test_ordering_key",
			Data:        []byte(fmt.Sprintf("message-%d", i)),
		})
		results = append(results, r)
	}

	// Print publish results.
	var publishedCount int
	for _, r := range results {
		// Get blocks until the result is ready.
		id, err := r.Get(ctx)
		if err != nil {
			// NOTE: A failed PublishResult indicates that the publisher client
			// encountered a fatal error and has permanently terminated. After the
			// fatal error has been resolved, a new publisher client instance must be
			// created to republish failed messages.
			fmt.Fprintf(w, "Publish error: %v\n", err)
			continue
		}

		// Metadata decoded from the id contains the partition and offset.
		metadata, err := pscompat.ParseMessageMetadata(id)
		if err != nil {
			return fmt.Errorf("failed to parse message metadata %q: %w", id, err)
		}
		fmt.Fprintf(w, "Published: partition=%d, offset=%d\n", metadata.Partition, metadata.Offset)
		publishedCount++
	}

	fmt.Fprintf(w, "Published %d messages with ordering key\n", publishedCount)
	return publisher.Error()
}

Java

Folgen Sie der Einrichtungsanleitung für Java unter Pub/Sub Lite-Clientbibliotheken, bevor Sie dieses Beispiel ausführen.

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.ExecutionException;

public class PublishWithOrderingKeyExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    publishWithOrderingKeyExample(cloudRegion, zoneId, projectNumber, topicId, regional);
  }

  // Publish a message to a topic with an ordering key.
  public static void publishWithOrderingKeyExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, boolean regional)
      throws ApiException, ExecutionException, InterruptedException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(location)
            .setName(TopicName.of(topicId))
            .build();

    PublisherSettings publisherSettings =
        PublisherSettings.newBuilder().setTopicPath(topicPath).build();

    Publisher publisher = Publisher.create(publisherSettings);

    // Start the publisher. Upon successful starting, its state will become RUNNING.
    publisher.startAsync().awaitRunning();

    String message = "message-with-ordering-key";

    // Convert the message to a byte string.
    ByteString data = ByteString.copyFromUtf8(message);
    PubsubMessage pubsubMessage =
        PubsubMessage.newBuilder()
            .setData(data)
            // Messages of the same ordering key will always get published to the
            // same partition. When OrderingKey is unset, messages can get published
            // to different partitions if more than one partition exists for the topic.
            .setOrderingKey("testing")
            .build();

    // Publish a message.
    ApiFuture<String> future = publisher.publish(pubsubMessage);

    // Shut down the publisher.
    publisher.stopAsync().awaitTerminated();

    String ackId = future.get();
    MessageMetadata metadata = MessageMetadata.decode(ackId);
    System.out.println("Published a message with ordering key:\n" + metadata);
  }
}

Python

Folgen Sie der Einrichtungsanleitung für Python unter Pub/Sub Lite-Clientbibliotheken, bevor Sie dieses Beispiel ausführen.

from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    MessageMetadata,
    TopicPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# num_messages = 100
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

topic_path = TopicPath(project_number, location, topic_id)

# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
    for message in range(num_messages):
        data = f"{message}"
        # Messages of the same ordering key will always get published to the same partition.
        # When ordering_key is unset, messsages can get published ot different partitions if
        # more than one partition exists for the topic.
        api_future = publisher_client.publish(
            topic_path, data.encode("utf-8"), ordering_key="testing"
        )
        # result() blocks. To resolve api futures asynchronously, use add_done_callback().
        message_id = api_future.result()
        message_metadata = MessageMetadata.decode(message_id)
        print(
            f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
        )

print(
    f"Finished publishing {num_messages} messages with an ordering key to {str(topic_path)}."
)

Mit Reihenfolgeschlüsseln können Sie mehrere Nachrichten an dieselbe Partition senden, sodass Abonnenten die Nachrichten in der Reihenfolge empfangen. Die Clientbibliothek kann einer gleichen Partition mehrere Reihenfolgeschlüssel zuweisen.

Uhrzeit des Termins festlegen

Sie können Ihre Lite-Mitteilungen anhand des Ereigniszeitraums veröffentlichen. Die Ereigniszeit ist ein benutzerdefiniertes Attribut, das Sie Ihrer Nachricht hinzufügen können.

Sie können den Ereigniszeitstempel mit der Clientbibliothek oder der gcloud CLI festlegen.

Für diesen Befehl ist Python 3.6 oder höher erforderlich und das grpcio-Python-Paket muss installiert sein. Führen Sie für MacOS-, Linux- und Cloud Shell-Nutzer Folgendes aus:

sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1

Veröffentlichen Sie eine Nachricht mit dem Befehl gcloud pubsub lite-topics publish:

gcloud pubsub lite-topics publish TOPIC_ID \
    --location=LITE_LOCATION \
    --event-time=EVENT_TIME \
    --message=MESSAGE_DATA

Ersetzen Sie Folgendes:

  • TOPIC_ID: die ID des Lite-Themas

  • LITE_LOCATION: der Standort des Lite-Themas

  • EVENT_TIME: eine vom Nutzer angegebene Ereigniszeit. Weitere Informationen zu Zeitformaten erhalten Sie, wenn Sie gcloud topic datetimes ausführen.

  • MESSAGE_DATA: ein String mit den Nachrichtendaten

Attribute verwenden

Nachrichtenattribute sind Schlüssel/Wert-Paare mit Metadaten zur Nachricht. Die Attribute können Text- oder Byte-Strings sein.

Die Attribute befinden sich im Feld attributes einer Nachricht. Mit der Clientbibliothek können Sie Attribute festlegen.

gcloud

Für diesen Befehl ist Python 3.6 oder höher erforderlich und das grpcio-Python-Paket muss installiert sein. Führen Sie für MacOS-, Linux- und Cloud Shell-Nutzer Folgendes aus:

sudo pip3 install grpcio
export CLOUDSDK_PYTHON_SITEPACKAGES=1

Veröffentlichen Sie eine Nachricht mit dem Befehl gcloud pubsub lite-topics publish:

gcloud pubsub lite-topics publish TOPIC_ID \
    --location=LITE_LOCATION \
    --message=MESSAGE_DATA \
    --attribute=KEY=VALUE,...

Ersetzen Sie Folgendes:

  • TOPIC_ID: die ID des Lite-Themas
  • LITE_LOCATION: der Standort des Lite-Themas
  • MESSAGE_DATA: ein String mit den Nachrichtendaten
  • KEY ist der Schlüssel eines Nachrichtenattributs
  • VALUE: der Wert für den Schlüssel des Nachrichtenattributs

Einfach loslegen (Go)

Folgen Sie der Einrichtungsanleitung für Go in Pub/Sub Lite-Clientbibliotheken, bevor Sie dieses Beispiel ausführen.

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
)

func publishWithCustomAttributes(w io.Writer, projectID, zone, topicID string) error {
	// projectID := "my-project-id"
	// zone := "us-central1-a"
	// topicID := "my-topic"
	ctx := context.Background()
	topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", projectID, zone, topicID)

	// Create the publisher client.
	publisher, err := pscompat.NewPublisherClient(ctx, topicPath)
	if err != nil {
		return fmt.Errorf("pscompat.NewPublisherClient error: %w", err)
	}

	// Ensure the publisher will be shut down.
	defer publisher.Stop()

	// Publish a message with custom attributes.
	result := publisher.Publish(ctx, &pubsub.Message{
		Data: []byte("message-with-custom-attributes"),
		Attributes: map[string]string{
			"year":   "2020",
			"author": "unknown",
		},
	})

	// Get blocks until the result is ready.
	id, err := result.Get(ctx)
	if err != nil {
		return fmt.Errorf("publish error: %w", err)
	}

	fmt.Fprintf(w, "Published a message with custom attributes: %v\n", id)
	return publisher.Error()
}

Java

Folgen Sie der Einrichtungsanleitung für Java unter Pub/Sub Lite-Clientbibliotheken, bevor Sie dieses Beispiel ausführen.

import com.google.api.core.ApiFuture;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.MessageTransforms;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.protobuf.util.Timestamps;
import com.google.pubsub.v1.PubsubMessage;
import java.time.Instant;
import java.util.concurrent.ExecutionException;

public class PublishWithCustomAttributesExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    publishWithCustomAttributesExample(cloudRegion, zoneId, projectNumber, topicId, regional);
  }

  // Publish messages to a topic with custom attributes.
  public static void publishWithCustomAttributesExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, boolean regional)
      throws ApiException, ExecutionException, InterruptedException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(location)
            .setName(TopicName.of(topicId))
            .build();

    PublisherSettings publisherSettings =
        PublisherSettings.newBuilder().setTopicPath(topicPath).build();

    Publisher publisher = Publisher.create(publisherSettings);

    // Start the publisher. Upon successful starting, its state will become RUNNING.
    publisher.startAsync().awaitRunning();

    // Prepare the message data as a byte string.
    String messageData = "message-with-custom-attributes";
    ByteString data = ByteString.copyFromUtf8(messageData);

    // Prepare a protobuf-encoded event timestamp for the message.
    Instant now = Instant.now();
    String eventTime =
        MessageTransforms.encodeAttributeEventTime(Timestamps.fromMillis(now.toEpochMilli()));

    PubsubMessage pubsubMessage =
        PubsubMessage.newBuilder()
            .setData(data)
            // Add two sets of custom attributes to the message.
            .putAllAttributes(ImmutableMap.of("year", "2020", "author", "unknown"))
            // Add an event timestamp as an attribute.
            .putAttributes(MessageTransforms.PUBSUB_LITE_EVENT_TIME_TIMESTAMP_PROTO, eventTime)
            .build();

    // Publish a message.
    ApiFuture<String> future = publisher.publish(pubsubMessage);

    // Shut down the publisher.
    publisher.stopAsync().awaitTerminated();

    String ackId = future.get();
    MessageMetadata metadata = MessageMetadata.decode(ackId);
    System.out.println("Published a message with custom attributes:\n" + metadata);
  }
}

Python

Folgen Sie der Einrichtungsanleitung für Python unter Pub/Sub Lite-Clientbibliotheken, bevor Sie dieses Beispiel ausführen.

from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    MessageMetadata,
    TopicPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

topic_path = TopicPath(project_number, location, topic_id)

# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
    data = "Hello world!"
    api_future = publisher_client.publish(
        topic_path,
        data.encode("utf-8"),
        year="2020",
        author="unknown",
    )
    # result() blocks. To resolve api futures asynchronously, use add_done_callback().
    message_id = api_future.result()
    message_metadata = MessageMetadata.decode(message_id)
    print(
        f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
    )

print(f"Finished publishing a message with custom attributes to {str(topic_path)}.")

Attribute können angeben, wie eine Nachricht verarbeitet werden soll. Abonnenten können das Feld attributes einer Nachricht parsen und die Nachricht entsprechend ihren Attributen verarbeiten.

Nachrichten im Batch verarbeiten

Die Clientbibliothek veröffentlicht Nachrichten im Batch. Größere Batches benötigen weniger Rechenressourcen, erhöhen aber die Latenz. Sie können die Batchgröße mit den Batcheinstellungen ändern.

In der folgenden Tabelle sind die Batch-Einstellungen aufgeführt, die Sie konfigurieren können:

Einstellung Beschreibung Standard
Größe der Anfrage Die maximale Größe des Batches in Byte. 3,5 MiB
Zahl der Nachrichten Die maximale Anzahl von Nachrichten in einem Batch. 1.000 Nachrichten
Verzögerung bei der Veröffentlichung Die Zeit in Millisekunden zwischen dem Hinzufügen der Nachricht zu einem Batch und dem Senden des Batches an das Lite-Thema. 50 Millisekunden

Sie können Batch-Einstellungen mit der Clientbibliothek konfigurieren.

Einfach loslegen (Go)

Folgen Sie der Einrichtungsanleitung für Go in Pub/Sub Lite-Clientbibliotheken, bevor Sie dieses Beispiel ausführen.

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsub"
	"cloud.google.com/go/pubsublite/pscompat"
)

func publishWithBatchSettings(w io.Writer, projectID, zone, topicID string, messageCount int) error {
	// projectID := "my-project-id"
	// zone := "us-central1-a"
	// topicID := "my-topic"
	// messageCount := 10
	ctx := context.Background()
	topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", projectID, zone, topicID)

	// Batch settings control how the publisher batches messages. These settings
	// apply per partition.
	// See https://pkg.go.dev/cloud.google.com/go/pubsublite/pscompat#pkg-variables
	// for DefaultPublishSettings.
	settings := pscompat.PublishSettings{
		ByteThreshold:  5 * 1024, // 5 KiB
		CountThreshold: 1000,     // 1,000 messages
		DelayThreshold: 100 * time.Millisecond,
	}

	// Create the publisher client.
	publisher, err := pscompat.NewPublisherClientWithSettings(ctx, topicPath, settings)
	if err != nil {
		return fmt.Errorf("pscompat.NewPublisherClientWithSettings error: %w", err)
	}

	// Ensure the publisher will be shut down.
	defer publisher.Stop()

	// Publish requests are sent to the server based on request size, message
	// count and time since last publish, whichever condition is met first.
	var results []*pubsub.PublishResult
	for i := 0; i < messageCount; i++ {
		r := publisher.Publish(ctx, &pubsub.Message{
			Data: []byte(fmt.Sprintf("message-%d", i)),
		})
		results = append(results, r)
	}

	// Print publish results.
	var publishedCount int
	for _, r := range results {
		// Get blocks until the result is ready.
		id, err := r.Get(ctx)
		if err != nil {
			// NOTE: A failed PublishResult indicates that the publisher client
			// encountered a fatal error and has permanently terminated. After the
			// fatal error has been resolved, a new publisher client instance must be
			// created to republish failed messages.
			fmt.Fprintf(w, "Publish error: %v\n", err)
			continue
		}
		fmt.Fprintf(w, "Published: %v\n", id)
		publishedCount++
	}

	fmt.Fprintf(w, "Published %d messages with batch settings\n", publishedCount)
	return publisher.Error()
}

Java

Folgen Sie der Einrichtungsanleitung für Java unter Pub/Sub Lite-Clientbibliotheken, bevor Sie dieses Beispiel ausführen.

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.batching.BatchingSettings;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudRegionOrZone;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.MessageMetadata;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import org.threeten.bp.Duration;

public class PublishWithBatchSettingsExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    int messageCount = 100;
    // True if using a regional location. False if using a zonal location.
    // https://cloud.google.com/pubsub/lite/docs/topics
    boolean regional = false;

    publishWithBatchSettingsExample(
        cloudRegion, zoneId, projectNumber, topicId, messageCount, regional);
  }

  // Publish messages to a topic with batch settings.
  public static void publishWithBatchSettingsExample(
      String cloudRegion,
      char zoneId,
      long projectNumber,
      String topicId,
      int messageCount,
      boolean regional)
      throws ApiException, ExecutionException, InterruptedException {

    CloudRegionOrZone location;
    if (regional) {
      location = CloudRegionOrZone.of(CloudRegion.of(cloudRegion));
    } else {
      location = CloudRegionOrZone.of(CloudZone.of(CloudRegion.of(cloudRegion), zoneId));
    }

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(location)
            .setName(TopicName.of(topicId))
            .build();

    Publisher publisher = null;
    List<ApiFuture<String>> futures = new ArrayList<>();

    try {
      // Batch settings control how the publisher batches messages
      long requestBytesThreshold = 5000L; // default : 3_500_000 bytes
      long messageCountBatchSize = 100L; // default : 1000L message
      Duration publishDelayThreshold = Duration.ofMillis(100); // default : 50 ms

      // Publish request get triggered based on request size, messages count & time since last
      // publish, whichever condition is met first.
      BatchingSettings batchingSettings =
          BatchingSettings.newBuilder()
              .setRequestByteThreshold(requestBytesThreshold)
              .setElementCountThreshold(messageCountBatchSize)
              .setDelayThreshold(publishDelayThreshold)
              .build();

      PublisherSettings publisherSettings =
          PublisherSettings.newBuilder()
              .setTopicPath(topicPath)
              .setBatchingSettings(batchingSettings)
              .build();

      publisher = Publisher.create(publisherSettings);

      // Start the publisher. Upon successful starting, its state will become RUNNING.
      publisher.startAsync().awaitRunning();

      for (int i = 0; i < messageCount; i++) {
        String message = "message-" + i;

        // Convert the message to a byte string.
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Publish a message.
        ApiFuture<String> future = publisher.publish(pubsubMessage);
        futures.add(future);
      }
    } finally {
      ArrayList<MessageMetadata> metadata = new ArrayList<>();
      List<String> ackIds = ApiFutures.allAsList(futures).get();
      System.out.println("Published " + ackIds.size() + " messages with batch settings.");

      if (publisher != null) {
        // Shut down the publisher.
        publisher.stopAsync().awaitTerminated();
      }
    }
  }
}

Python

Folgen Sie der Einrichtungsanleitung für Python unter Pub/Sub Lite-Clientbibliotheken, bevor Sie dieses Beispiel ausführen.

from google.cloud.pubsub_v1.types import BatchSettings
from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    MessageMetadata,
    TopicPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# num_messages = 100
# regional = True

if regional:
    location = CloudRegion(cloud_region)
else:
    location = CloudZone(CloudRegion(cloud_region), zone_id)

topic_path = TopicPath(project_number, location, topic_id)
batch_setttings = BatchSettings(
    # 2 MiB. Default to 3 MiB. Must be less than 4 MiB gRPC's per-message limit.
    max_bytes=2 * 1024 * 1024,
    # 100 ms. Default to 50 ms.
    max_latency=0.1,
    # Default to 1000.
    max_messages=100,
)

# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient(
    per_partition_batching_settings=batch_setttings
) as publisher_client:
    for message in range(num_messages):
        data = f"{message}"
        api_future = publisher_client.publish(topic_path, data.encode("utf-8"))
        # result() blocks. To resolve API futures asynchronously, use add_done_callback().
        message_id = api_future.result()
        message_metadata = MessageMetadata.decode(message_id)
        print(
            f"Published {data} to partition {message_metadata.partition.value} and offset {message_metadata.cursor.offset}."
        )

print(
    f"Finished publishing {num_messages} messages with batch settings to {str(topic_path)}."
)

Wenn eine Publisher-Anwendung gestartet wird, erstellt die Client-Bibliothek einen Batch für jede Partition in einem Lite-Thema. Wenn ein Lite-Thema beispielsweise zwei Partitionen hat, erstellen Publisher zwei Batches und senden jeden Batch an eine Partition.

Nachdem Sie eine Nachricht veröffentlicht haben, wird sie von der Clientbibliothek gepuffert, bis der Batch die maximale Anforderungsgröße, die maximale Anzahl von Nachrichten oder die Veröffentlichungsverzögerung überschreitet.

Nachrichten in eine Reihenfolge bringen

Lite-Themen sortieren Nachrichten in jeder Partition, indem Sie die Nachrichten veröffentlichen. Verwenden Sie einen Reihenfolgeschlüssel, um Nachrichten derselben Partition zuzuweisen.

Pub/Sub Lite liefert die Nachrichten aus einer Partition nacheinander und Abonnenten können die Nachrichten der Reihe nach verarbeiten. Weitere Informationen finden Sie unter Nachrichten empfangen.

Idempotenz veröffentlichen

Pub/Sub Lite-Clientbibliotheken unterstützen die idempotente Veröffentlichung in den folgenden Versionen:

Wenn die Veröffentlichung einer Nachricht aufgrund von Netzwerk- oder Serverfehlern wiederholt wird, wird sie genau einmal gespeichert. Idempotenz wird nur innerhalb derselben Sitzung garantiert. Wenn dieselbe Mitteilung mit einem neuen Publisher-Client noch einmal veröffentlicht wird, kann sie nicht garantiert werden. Es verursacht keine zusätzlichen Dienstkosten und erhöht auch die Veröffentlichungslatenz nicht.

Idempotente Veröffentlichung aktivieren oder deaktivieren

Die Idempotente Veröffentlichung ist in den Pub/Sub Lite-Clientbibliotheken standardmäßig aktiviert. Sie kann über die Einstellungen des Publisher-Client in der entsprechenden Clientbibliothek deaktiviert werden.

Wenn die idempotente Veröffentlichung aktiviert ist, kann der in einem Veröffentlichungsergebnis zurückgegebene Offset -1 sein. Dieser Wert wird zurückgegeben, wenn die Nachricht als Duplikat einer bereits erfolgreich veröffentlichten Nachricht identifiziert wird, der Server jedoch nicht genügend Informationen hatte, um den Offset der Nachricht zum Zeitpunkt der Veröffentlichung zurückzugeben. Von Abonnenten empfangene Nachrichten haben immer einen gültigen Offset.

Fehlerbehebung

Erhaltene Duplikate

Da die Idempotenz auf eine einzelne Sitzung beschränkt ist, können Duplikate empfangen werden, wenn Sie den Publisher-Client neu erstellen, um dieselben Nachrichten zu veröffentlichen.

Ein Abonnentenclient kann dieselbe Nachricht mehrmals erhalten, wenn den Abonnenten vom Pub/Sub Lite-Dienst automatisch Partitionen zugewiesen werden (Standardeinstellung). Eine Nachricht kann bei einer Neuzuweisung an einen anderen Abonnentenclient noch einmal zugestellt werden.

Publisher-Fehler

Der Status einer Publisher-Sitzung ist die automatische Speicherbereinigung auf dem Server nach sieben Tagen Inaktivität. Wenn eine Sitzung nach diesem Zeitraum fortgesetzt wird, wird der Publisher-Client mit einer Fehlermeldung wie „Fehlgeschlagene Vorbedingung: Nachricht mit Veröffentlichungssequenznummer von... erwartet“ beendet und akzeptiert keine neuen Nachrichten. Erstellen Sie den Publisher-Client neu, um diesen Fehler zu beheben.