Lite トピックの作成と管理

このページでは、Lite トピックを作成、表示、削除する方法について説明します。

Lite トピックは、メッセージをパブリッシュできるゾーンリソースです。Lite トピックを作成するときは、メッセージを保存するゾーンを指定する必要があります。

Lite トピックを作成したら、Lite トピックにメッセージをパブリッシュする、Lite トピックに Lite サブスクリプションを作成する、Lite サブスクリプションからメッセージを受信することができます。

Lite トピックの作成

Lite トピックを作成するときは、スループットとストレージ容量を設定する必要があります。パーティションの数と容量によって、Lite トピックの容量が決まります。

Lite トピックのプロパティは次のとおりです。

  • パーティションの数: Lite トピックのパーティションの数。
  • パーティションあたりのストレージ: 各パーティションのストレージ容量(バイト単位)。30 GiB~10 TiB のストレージを指定できます。
  • パブリッシュ スループットの容量: 各パーティションの最大パブリッシュ スループット。スループット容量として 1 秒あたり 4~16 MiB を指定できます。この値は整数にする必要があります。最適な結果を得るには、最小限のパブリッシュ スループットで Lite トピックを作成します。トラフィックが増加した場合は、Lite トピックを更新してパブリッシュ スループットを引き上げます。
  • サブスクライブ スループット容量: 各パーティションの最大サブスクライブ スループット。スループット容量として 1 秒あたり 4~32 MiB を指定できます。この値は整数にする必要があります。

  • メッセージ保持期間: Lite トピックでメッセージが保存される最長時間。メッセージ保持期間を指定しなかった場合、Lite トピックはストレージ容量を超過するまでメッセージを保存します。

Lite トピックは、Cloud Console、gcloud コマンドライン ツール、または Pub/Sub Lite API を使用して作成できます。

Console

  1. Cloud Console で、[Lite トピック] ページに移動します。

    [Lite トピック] ページに移動

  2. [Lite トピックを作成] をクリックします。

  3. リージョンとリージョン内のゾーンを選択します。

  4. [名前] セクションでLite トピック ID を入力します。Lite トピック名には、Lite トピック ID、ゾーン、プロジェクト番号が含まれます。

  5. [スループット] セクションで、パーティションにプロビジョニングするための、パーティションの数を入力します。

  6. [メッセージ ストレージ] セクションで、パーティションあたりのストレージを入力します。

  7. オプション: [メッセージ保持] セクションで、[Drop messages after the specified duration even if storage is available] を選択して、保持期間を入力します。

  8. [作成] をクリック

gcloud

Lite トピックを作成するには、gcloud pubsub lite-topics create コマンドを使用します。

gcloud pubsub lite-topics create TOPIC_ID \
  --zone=ZONE \
  --partitions=NUMBER_OF_PARTITIONS \
  --per-partition-bytes=STORAGE_PER_PARTITION \
  [--message-retention-period=MESSAGE_RETENTION_PERIOD]

以下を置き換えます。

  • TOPIC_ID: Lite トピックの ID

  • ZONE: Pub/Sub Lite がサポートするゾーンの名前

  • NUMBER_OF_PARTITIONS: Lite トピックのパーティション数の整数

  • STORAGE_PER_PARTITION: 各パーティションのストレージ容量(30GiB など)

  • MESSAGE_RETENTION_PERIOD: Lite トピックがメッセージを保存する期間(1d2w など)

リクエストが成功すると、コマンドラインに確認メッセージが表示されます。

Created [TOPIC_ID].

プロトコル

Lite トピックを作成するには、次のような POST リクエストを送信します。

POST https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/ZONE/topics/TOPIC_ID
Authorization: Bearer $(gcloud auth print-access-token)

以下を置き換えます。

リクエスト本文に次のフィールドを指定します。

{
  "partitionConfig": {
       "count": NUMBER_OF_PARTITIONS,
       "capacity": {
         "publishMibPerSec": PUBLISHING_CAPACITY,
         "subscribeMibPerSec": SUBSCRIBING_CAPACITY,
       }
  },
  "retentionConfig": {
       "perPartitionBytes": STORAGE_PER_PARTITION,
       "period": MESSAGE_RETENTION_PERIOD,
  },
}

以下を置き換えます。

  • NUMBER_OF_PARTITIONS: Lite トピックのパーティション数の整数

  • STORAGE_PER_PARTITION: 各パーティションのストレージ容量(30GiB など)

  • PUBLISHING_CAPACITY: 各パーティションのパブリッシュ スループット容量を表す整数

  • SUBSCRIBING_CAPACITY: 各パーティションのサブスクライブ スループット容量を表す整数

  • MESSAGE_RETENTION_PERIOD: Lite トピックがメッセージを保存する期間(1d2w など)

リクエストが成功すると、レスポンスは JSON 形式の Lite トピックになります。

{
  "name": projects/PROJECT_NUMBER/locations/ZONE/topics/TOPIC_ID,
  "partitionConfig": {
       "count": NUMBER_OF_PARTITIONS,
       "capacity": {
         "publishMibPerSec": PUBLISHING_CAPACITY,
         "subscribeMibPerSec": SUBSCRIBING_CAPACITY,
       }
  },
  "retentionConfig": {
       "perPartitionBytes": STORAGE_PER_PARTITION,
       "period": MESSAGE_RETENTION_PERIOD,
  },
}

Go

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsublite"
)

func createTopic(w io.Writer, projectID, region, zone, topicID string) error {
	// projectID := "my-project-id"
	// region := "us-central1" // see https://cloud.google.com/pubsub/lite/docs/locations
	// zone := "us-central1-a"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsublite.NewAdminClient(ctx, region)
	if err != nil {
		return fmt.Errorf("pubsublite.NewAdminClient: %v", err)
	}
	defer client.Close()

	const gib = 1 << 30
	// For ranges of fields in TopicConfig, see https://pkg.go.dev/cloud.google.com/go/pubsublite/#TopicConfig
	topic, err := client.CreateTopic(ctx, pubsublite.TopicConfig{
		Name:                       fmt.Sprintf("projects/%s/locations/%s/topics/%s", projectID, zone, topicID),
		PartitionCount:             2, // Must be >= 1 and cannot decrease after creation.
		PublishCapacityMiBPerSec:   4,
		SubscribeCapacityMiBPerSec: 8,
		PerPartitionBytes:          30 * gib,
		RetentionDuration:          pubsublite.InfiniteRetention,
	})
	if err != nil {
		return fmt.Errorf("client.CreateTopic got err: %v", err)
	}
	fmt.Fprintf(w, "Created topic: %s\n", topic.Name)
	return nil
}

Java

このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Java の設定手順を実施してください。

import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.proto.Topic;
import com.google.cloud.pubsublite.proto.Topic.PartitionConfig;
import com.google.cloud.pubsublite.proto.Topic.RetentionConfig;
import com.google.protobuf.util.Durations;

public class CreateTopicExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    int partitions = 1;

    createTopicExample(cloudRegion, zoneId, projectNumber, topicId, partitions);
  }

  public static void createTopicExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, int partitions)
      throws Exception {

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setName(TopicName.of(topicId))
            .build();

    Topic topic =
        Topic.newBuilder()
            .setPartitionConfig(
                PartitionConfig.newBuilder()
                    // Set throughput capacity per partition in MiB/s.
                    .setCapacity(
                        PartitionConfig.Capacity.newBuilder()
                            // Must be 4-16 MiB/s.
                            .setPublishMibPerSec(4)
                            // Must be 4-32 MiB/s.
                            .setSubscribeMibPerSec(8)
                            .build())
                    .setCount(partitions))
            .setRetentionConfig(
                RetentionConfig.newBuilder()
                    // How long messages are retained.
                    .setPeriod(Durations.fromDays(1))
                    // Set storage per partition to 30 GiB. This must be 30 GiB-10 TiB.
                    // If the number of bytes stored in any of the topic's partitions grows
                    // beyond this value, older messages will be dropped to make room for
                    // newer ones, regardless of the value of `period`.
                    .setPerPartitionBytes(30 * 1024 * 1024 * 1024L))
            .setName(topicPath.toString())
            .build();

    AdminClientSettings adminClientSettings =
        AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();

    try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
      Topic response = adminClient.createTopic(topic).get();
      System.out.println(response.getAllFields() + "created successfully.");
    }
  }
}

Python

このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Python の設定手順を実施してください。

from google.api_core.exceptions import AlreadyExists
from google.cloud.pubsublite import AdminClient, Topic
from google.cloud.pubsublite.types import CloudRegion, CloudZone, TopicPath
from google.protobuf.duration_pb2 import Duration

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# num_partitions = 1

cloud_region = CloudRegion(cloud_region)
location = CloudZone(cloud_region, zone_id)
topic_path = TopicPath(project_number, location, topic_id)
topic = Topic(
    name=str(topic_path),
    partition_config=Topic.PartitionConfig(
        # A topic must have at least one partition.
        count=num_partitions,
        # Set throughput capacity per partition in MiB/s.
        capacity=Topic.PartitionConfig.Capacity(
            # Set publish throughput capacity per partition to 4 MiB/s. Must be >= 4 and <= 16.
            publish_mib_per_sec=4,
            # Set subscribe throughput capacity per partition to 4 MiB/s. Must be >= 4 and <= 32.
            subscribe_mib_per_sec=8,
        ),
    ),
    retention_config=Topic.RetentionConfig(
        # Set storage per partition to 30 GiB. This must be in the range 30 GiB-10TiB.
        # If the number of byptes stored in any of the topic's partitions grows beyond
        # this value, older messages will be dropped to make room for newer ones,
        # regardless of the value of `period`.
        per_partition_bytes=30 * 1024 * 1024 * 1024,
        # Allow messages to be retained for 7 days.
        period=Duration(seconds=60 * 60 * 24 * 7),
    ),
)

client = AdminClient(cloud_region)
try:
    response = client.create_topic(topic)
    print(f"{response.name} created successfully.")
except AlreadyExists:
    print(f"{topic_path} already exists.")

使用可能なゾーンのリストについては、Pub/Sub Lite のロケーションをご覧ください。

Lite トピックを作成した後は、パーティションごとのスループットとストレージ容量をスケーリングできます。パーティション数は増やすことはできますが、減らすことはできません。詳細については、容量のスケーリングをご覧ください。

容量のプロビジョニング

Lite トピックはパーティションで構成されます。パーティションでは次の容量がサポートされます。

  • パブリッシュ スループット: メッセージをパブリッシュできる最大合計レート。
  • サブスクライブ スループット: Lite サブスクリプションにメッセージが転送される最大合計レート。
  • ストレージ: パーティション内のメッセージの最大合計サイズ。

ストレージ容量を超えても、Pub/Sub Lite サービスが公開に失敗することはありません。代わりに、最も古いメッセージのメッセージ保持期間に関係なく、パーティションから最も古いメッセージが削除されます。Pub/Sub Lite サービスは、ベスト エフォート方式でメッセージを削除します。

容量と料金の関係については、Pub/Sub Lite の料金をご覧ください。

Lite トピックの更新

メッセージの保持期間を更新して、Lite トピックの容量をスケールできます。トピックのゾーンは更新できません。Lite トピックの容量をスケールするには、容量のスケーリングをご覧ください。

Lite トピックは、Cloud Console、gcloud コマンドライン ツール、または Pub/Sub Lite API を使用して更新できます。

Console

  1. Cloud Console で、[Lite トピック] ページに移動します。

    [Lite トピック] ページに移動

  2. [Lite トピック ID] をクリックします。

  3. [編集] をクリックします。

  4. パーティションの数を入力します。これは増やすことはできますが、減らすことはできません。

  5. パーティションあたりのストレージ保持期間を入力します。

  6. パブリッシュ スループットを選択します。

  7. サブスクライブ スループットを選択します。

  8. パーティションあたりのストレージを入力します。

gcloud

Lite トピックを更新するには、gcloud pubsub lite-topics update コマンドを使用します。

gcloud pubsub lite-topics update TOPIC_ID \
  --zone=ZONE \
  --partitions=NUMBER_OF_PARTITIONS \
  --per-partition-publish-mib=PUBLISHING_CAPACITY \
  --per-partition-subscribe-mib=SUBSCRIBING_CAPACITY \
  --per-partition-bytes=STORAGE_PER_PARTITION \
  --message-retention-period=MESSAGE_RETENTION_PERIOD

以下を置き換えます。

  • TOPIC_ID: Lite トピックの ID

  • ZONE: Lite トピックが含まれるゾーンの名前

  • NUMBER_OF_PARTITIONS: Lite トピックに構成するパーティションの数。

  • PUBLISHING_CAPACITY: 各パーティションのパブリッシュ スループット容量を表す整数

  • SUBSCRIBING_CAPACITY: 各パーティションのサブスクライブ スループット容量を表す整数

  • STORAGE_PER_PARTITION: 各パーティションのストレージ容量(30GiB など)

  • MESSAGE_RETENTION_PERIOD: Lite トピックがメッセージを保存する期間(1d2w など)

リクエストが成功すると、コマンドラインに Lite トピックが表示されます。

name: projects/PROJECT_NUMBER/locations/ZONE/topics/TOPIC_ID
partitionConfig:
  count: NUMBER_OF_PARTITIONS
  capacity:
    publishMibPerSec: PUBLISHING_CAPACITY
    subscribeMibPerSec: SUBSCRIBING_CAPACITY
retentionConfig:
  perPartitionBytes: STORAGE_PER_PARTITION
  period: MESSAGE_RETENTION_PERIOD

プロトコル

Lite トピックを更新するには、次のような PATCH リクエストを送信します。

PATCH https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/ZONE/topics/TOPIC_ID?updateMask=partitionConfig.capacity,retentionConfig.perPartitionBytes,retentionConfig.period,partitionConfig.count
Authorization: Bearer $(gcloud auth print-access-token)

以下を置き換えます。

  • REGION: Lite トピックが含まれるゾーンのリージョン

  • PROJECT_NUMBER: Lite トピックのプロジェクトのプロジェクト番号

  • ZONE: Lite トピックが含まれるゾーンの名前

  • TOPIC_ID: Lite トピックの ID

リクエスト本文に次のフィールドを指定します。

{
  "partitionConfig": {
      "count": NUMBER_OF_PARTITIONS,
      "capacity": {
         "publishMibPerSec": PUBLISHING_CAPACITY,
         "subscribeMibPerSec": SUBSCRIBING_CAPACITY,
      }
   },
   "retentionConfig": {
       "perPartitionBytes": STORAGE_PER_PARTITION,
       "period": MESSAGE_RETENTION_PERIOD,
   },
}

以下を置き換えます。

  • PUBLISHING_CAPACITY: 各パーティションのパブリッシュ スループット容量を表す整数

  • SUBSCRIBING_CAPACITY: 各パーティションのサブスクライブ スループット容量を表す整数

  • STORAGE_PER_PARTITION: 各パーティションのストレージ容量(30GiB など)

  • MESSAGE_RETENTION_PERIOD: Lite トピックがメッセージを保存する期間(1d2w など)

  • NUMBER_OF_PARTITIONS: Lite トピックに構成するパーティションの数。

リクエストが成功すると、レスポンスは JSON 形式の Lite トピックになります。

{
  "name": projects/PROJECT_NUMBER/locations/ZONE/topics/TOPIC_ID,
  "partitionConfig": {
      "count": NUMBER_OF_PARTITIONS,
      "capacity": {
         "publishMibPerSec": PUBLISHING_CAPACITY,
         "subscribeMibPerSec": SUBSCRIBING_CAPACITY,
      }
   },
   "retentionConfig": {
       "perPartitionBytes": STORAGE_PER_PARTITION,
       "period": MESSAGE_RETENTION_PERIOD,
   },
}

Go

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"
	"io"
	"time"

	"cloud.google.com/go/pubsublite"
)

func updateTopic(w io.Writer, projectID, region, zone, topicID string) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// zone := "us-central1-a"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsublite.NewAdminClient(ctx, region)
	if err != nil {
		return fmt.Errorf("pubsublite.NewAdminClient: %v", err)
	}
	defer client.Close()

	topicPath := fmt.Sprintf("projects/%s/locations/%s/topics/%s", projectID, zone, topicID)
	// For ranges of fields in TopicConfigToUpdate, see https://pkg.go.dev/cloud.google.com/go/pubsublite/#TopicConfigToUpdate
	config := pubsublite.TopicConfigToUpdate{
		Name:                       topicPath,
		PartitionCount:             3, // Partition count cannot decrease.
		PublishCapacityMiBPerSec:   8,
		SubscribeCapacityMiBPerSec: 16,
		PerPartitionBytes:          60 * 1024 * 1024 * 1024,
		RetentionDuration:          24 * time.Hour,
	}
	updatedCfg, err := client.UpdateTopic(ctx, config)
	if err != nil {
		return fmt.Errorf("client.UpdateTopic got err: %v", err)
	}
	fmt.Fprintf(w, "Updated topic: %#v\n", *updatedCfg)
	return nil
}

Java

このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Java の設定手順を実施してください。

import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.proto.Topic;
import com.google.cloud.pubsublite.proto.Topic.PartitionConfig;
import com.google.cloud.pubsublite.proto.Topic.RetentionConfig;
import com.google.protobuf.FieldMask;
import com.google.protobuf.util.Durations;
import java.util.Arrays;

public class UpdateTopicExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");

    updateTopicExample(cloudRegion, zoneId, projectNumber, topicId);
  }

  public static void updateTopicExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId) throws Exception {

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setName(TopicName.of(topicId))
            .build();

    Iterable<String> iterablePaths =
        Arrays.asList(
            "partition_config.scale",
            "retention_config.per_partition_bytes",
            "retention_config.period");

    FieldMask fieldMask = FieldMask.newBuilder().addAllPaths(iterablePaths).build();

    Topic topic =
        Topic.newBuilder()
            .setPartitionConfig(
                PartitionConfig.newBuilder()
                    // Set publishing throughput to 4 times the standard partition
                    // throughput of 4 MiB per sec. This must be in the range [1,4]. A
                    // topic with `scale` of 2 and count of 10 is charged for 20 partitions.
                    .setScale(4)
                    .build())
            .setRetentionConfig(
                RetentionConfig.newBuilder()
                    // Set storage per partition to 200 GiB. This must be 30 GiB-10 TiB.
                    // If the number of bytes stored in any of the topic's partitions grows
                    // beyond this value, older messages will be dropped to make room for
                    // newer ones, regardless of the value of `period`.
                    // Be careful when decreasing storage per partition as it may cause
                    // lost messages.
                    .setPerPartitionBytes(200 * 1024 * 1024 * 1024L)
                    .setPeriod(Durations.fromDays(7)))
            .setName(topicPath.toString())
            .build();

    AdminClientSettings adminClientSettings =
        AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();

    try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
      Topic topicBeforeUpdate = adminClient.getTopic(topicPath).get();
      System.out.println("Before update: " + topicBeforeUpdate.getAllFields());

      Topic topicAfterUpdate = adminClient.updateTopic(topic, fieldMask).get();
      System.out.println("After update: " + topicAfterUpdate.getAllFields());
    }
  }
}

Python

このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Python の設定手順を実施してください。

from google.api_core.exceptions import NotFound
from google.cloud.pubsublite import AdminClient, Topic
from google.cloud.pubsublite.types import CloudRegion, CloudZone, TopicPath
from google.protobuf.duration_pb2 import Duration
from google.protobuf.field_mask_pb2 import FieldMask

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"

cloud_region = CloudRegion(cloud_region)
location = CloudZone(cloud_region, zone_id)
topic_path = TopicPath(project_number, location, topic_id)

# Defines which topic fields to update.
field_mask = FieldMask(
    paths=[
        "partition_config.scale",
        "retention_config.per_partition_bytes",
        "retention_config.period",
    ]
)

# Defines how to update the topic fields.
topic = Topic(
    name=str(topic_path),
    partition_config=Topic.PartitionConfig(
        # Set publishing throughput to 2x standard partition throughput of 4 MiB
        # per second. This must in the range [1,4]. A topic with `scale` of 2 and
        # `count` of 10 is charged for 20 partitions.
        scale=2,
    ),
    retention_config=Topic.RetentionConfig(
        # Set storage per partition to 100 GiB. This must be in the range 30 GiB-10TiB.
        # If the number of byptes stored in any of the topic's partitions grows beyond
        # this value, older messages will be dropped to make room for newer ones,
        # regardless of the value of `period`.
        # Be careful when decreasing storage per partition as it may cuase lost messages.
        per_partition_bytes=100 * 1024 * 1024 * 1024,
        # Allow messages to be stored for 14 days.
        period=Duration(seconds=60 * 60 * 24 * 14),
    ),
)

client = AdminClient(cloud_region)
try:
    response = client.update_topic(topic, field_mask)
    print(f"{response.name} updated successfully.")
except NotFound:
    print(f"{topic_path} not found.")

スループット容量のスケーリング

トピックのスループット容量は、垂直方向および水平方向にスケーリングできます。垂直方向のスケーリングでは、トピックのパーティションの容量を上下に調整します。Pub/Sub Lite サービスでは、Lite トピックの各パーティションに対して同じスループット容量をプロビジョニングします。たとえば、パブリッシュ スループット容量が 1 秒あたり 10 MiB の場合、各パーティションのパブリッシュ スループットは 1 秒あたり 10 MiB になります。

パーティションの数を増やして Lite トピックの容量を水平方向に増やすこともできます。トピック内のパーティションの数は減らすことはできません。

パーティションの数を変更すると、メッセージの相対的な順序は保持されません。トピック内のパーティション数を更新すると、注文キーからパーティションへのマッピングに使用されるハッシュ関数が変更されます。パーティション カウントの更新後にパブリッシュされた同じキーのメッセージが、更新の前にパブリッシュされたものとは異なるパーティションにマッピングされる可能性が高くなります。また、トピックのサイズ変更は、すべてのパブリッシャーに反映されるまでに時間がかかります。そのため、一部のパブリッシャーが新しいハッシュ関数を使用していて、一部のパブリッシャーがまだ古いハッシュ関数を使用している場合に少し時間がかかります。

ストレージ容量のスケーリング

Lite トピックのストレージ容量を増減することもできます。Lite トピックでは、各パーティションに同じ容量のストレージをプロビジョニングします。ストレージを 60 GiB に増やすと、各パーティションのストレージ容量が 60 GiB になります。

Lite トピックでストレージ容量を減らすと、Pub/Sub Lite サービスは最初に最も古いメッセージを削除します。

容量と料金の関係については、Pub/Sub Lite の料金をご覧ください。

Lite トピックの詳細情報の取得

Cloud Console、gcloud コマンドライン ツール、または Pub/Sub Lite API を使用して Lite トピックの詳細を取得できます。

Console

  1. Cloud Console で、[Lite トピック] ページに移動します。

    [Lite トピック] ページに移動

  2. [Lite トピック ID] をクリックします。

gcloud

Lite トピックの詳細を取得するには、gcloud pubsub lite-topics describe コマンドを使用します。

gcloud pubsub lite-topics describe TOPIC_ID \
--zone=ZONE

以下を置き換えます。

  • TOPIC_ID: Lite トピックの ID

  • ZONE: Lite トピックが含まれるゾーンの名前

リクエストが成功すると、コマンドラインに Lite トピックが表示されます。

name: projects/PROJECT_NUMBER/locations/ZONE/topics/TOPIC_ID
partitionConfig:
  count: NUMBER_OF_PARTITIONS
  capacity:
    publishMibPerSec: PUBLISHING_CAPACITY
    subscribeMibPerSec: SUBSCRIBING_CAPACITY
retentionConfig:
  perPartitionBytes: STORAGE_PER_PARTITION
  period: MESSAGE_RETENTION_PERIOD

プロトコル

Lite トピックの詳細を取得するには、次のような GET リクエストを送信します。

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/ZONE/topics/TOPIC_ID
Authorization: Bearer $(gcloud auth print-access-token)

以下を置き換えます。

  • REGION: Lite トピックが含まれるゾーンのリージョン

  • PROJECT_NUMBER: Lite トピックのプロジェクトのプロジェクト番号

  • ZONE: Lite トピックが含まれるゾーンの名前

  • TOPIC_ID: Lite トピックの ID

リクエストが成功すると、レスポンスは JSON 形式の Lite トピックになります。

{
  "name": projects/PROJECT_NUMBER/locations/ZONE/topics/TOPIC_ID,
  "partitionConfig": {
      "count": NUMBER_OF_PARTITIONS,
      "capacity": {
         "publishMibPerSec": PUBLISHING_CAPACITY,
         "subscribeMibPerSec": SUBSCRIBING_CAPACITY,
      }
   },
   "retentionConfig": {
       "perPartitionBytes": STORAGE_PER_PARTITION,
       "period": MESSAGE_RETENTION_PERIOD,
   },
}

Go

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsublite"
)

func getTopic(w io.Writer, projectID, region, zone, topicID string) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// zone := "us-central1-a"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsublite.NewAdminClient(ctx, region)
	if err != nil {
		return fmt.Errorf("pubsublite.NewAdminClient: %v", err)
	}
	defer client.Close()

	topicName := fmt.Sprintf("projects/%s/locations/%s/topics/%s", projectID, zone, topicID)
	topic, err := client.Topic(ctx, topicName)
	if err != nil {
		return fmt.Errorf("client.Topic got err: %v", err)
	}
	fmt.Fprintf(w, "Got topic: %#v\n", *topic)
	return nil
}

Java

このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Java の設定手順を実施してください。

import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.proto.Topic;

public class GetTopicExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");

    getTopicExample(cloudRegion, zoneId, projectNumber, topicId);
  }

  public static void getTopicExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId) throws Exception {
    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setName(TopicName.of(topicId))
            .build();

    AdminClientSettings adminClientSettings =
        AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();

    try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
      Topic topic = adminClient.getTopic(topicPath).get();
      long numPartitions = adminClient.getTopicPartitionCount(topicPath).get();
      System.out.println(topic.getAllFields() + "\nhas " + numPartitions + " partition(s).");
    }
  }
}

Python

このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Python の設定手順を実施してください。

from google.api_core.exceptions import NotFound
from google.cloud.pubsublite import AdminClient
from google.cloud.pubsublite.types import CloudRegion, CloudZone, TopicPath

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"

cloud_region = CloudRegion(cloud_region)
location = CloudZone(cloud_region, zone_id)
topic_path = TopicPath(project_number, location, topic_id)

client = AdminClient(cloud_region)
try:
    response = client.get_topic(topic_path)
    num_partitions = client.get_topic_partition_count(topic_path)
    print(f"{response.name} has {num_partitions} partition(s).")
except NotFound:
    print(f"{topic_path} not found.")

Lite トピックの一覧表示

Cloud Console、gcloud コマンドライン ツール、Pub/Sub Lite API を使用して、プロジェクトの Lite トピックを一覧表示できます。

Console

プロジェクトの Lite トピックのリストを表示するには、[Lite トピック] ページに移動します。

gcloud

プロジェクト内の Lite トピックを一覧表示するには、gcloud pubsub lite-topics list コマンドを使用します。

gcloud pubsub lite-topics list \
  --zone=ZONE

ZONE は、Lite トピックが存在しているゾーンの名前に置き換えます。

リクエストが成功すると、コマンドラインに Lite トピックが表示されます。

---
name: projects/PROJECT_NUMBER/locations/ZONE/topics/TOPIC_ID
partitionConfig:
  count: NUMBER_OF_PARTITIONS
  capacity:
    publishMibPerSec: PUBLISHING_CAPACITY
    subscribeMibPerSec: SUBSCRIBING_CAPACITY
retentionConfig:
  perPartitionBytes: STORAGE_PER_PARTITION
  period: MESSAGE_RETENTION_PERIOND
---
name: projects/PROJECT_NUMBER/locations/ZONE/topics/TOPIC_ID
partitionConfig:
  count: NUMBER_OF_PARTITIONS
  capacity:
    publishMibPerSec: PUBLISHING_CAPACITY
    subscribeMibPerSec: SUBSCRIBING_CAPACITY
retentionConfig:
  perPartitionBytes: STORAGE_PER_PARTITION
  period: MESSAGE_RETENTION_PERIOND

プロトコル

プロジェクト内の Lite トピックを一覧表示するには、次のように GET リクエストを送信します。

GET https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/topics
Authorization: Bearer $(gcloud auth print-access-token)

以下を置き換えます。

  • REGION: Lite トピックが含まれるゾーンのリージョン

  • PROJECT_NUMBER: Lite トピックのプロジェクトのプロジェクト番号

リクエストが成功すると、Lite トピックのリストが JSON 形式で返されます。

{
  "topics": [
      {
          "name": "projects/PROJECT_NUMBER/locations/ZONE/topics/TOPIC_ID",
      },
      {
          "name": "projects/PROJECT_NUMBER/locations/ZONE/topics/TOPIC_ID",
      }
  ]
}

Go

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsublite"
	"google.golang.org/api/iterator"
)

func listTopics(w io.Writer, projectID, region, zone string) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// zone := "us-central1-a"
	ctx := context.Background()
	client, err := pubsublite.NewAdminClient(ctx, region)
	if err != nil {
		return fmt.Errorf("pubsublite.NewAdminClient: %v", err)
	}
	defer client.Close()

	parent := fmt.Sprintf("projects/%s/locations/%s", projectID, zone)
	topicIter := client.Topics(ctx, parent)
	for {
		topic, err := topicIter.Next()
		if err == iterator.Done {
			break
		}
		if err != nil {
			return fmt.Errorf("topicIter.Next got err: %v", err)
		}
		fmt.Fprintf(w, "Got topic: %#v\n", topic)
	}
	return nil
}

Java

このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Java の設定手順を実施してください。

import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.LocationPath;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.proto.Topic;
import java.util.List;

public class ListTopicsExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    long projectNumber = Long.parseLong("123456789");

    listTopicsExample(cloudRegion, zoneId, projectNumber);
  }

  public static void listTopicsExample(String cloudRegion, char zoneId, long projectNumber)
      throws Exception {

    AdminClientSettings adminClientSettings =
        AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();

    LocationPath locationPath =
        LocationPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .build();

    try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
      List<Topic> topics = adminClient.listTopics(locationPath).get();
      for (Topic topic : topics) {
        System.out.println(topic.getAllFields());
      }
      System.out.println(topics.size() + " topic(s) listed.");
    }
  }
}

Python

このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Python の設定手順を実施してください。

from google.cloud.pubsublite import AdminClient
from google.cloud.pubsublite.types import CloudRegion, CloudZone, LocationPath

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"

cloud_region = CloudRegion(cloud_region)
location = CloudZone(cloud_region, zone_id)
location_path = LocationPath(project_number, location)

client = AdminClient(cloud_region)
response = client.list_topics(location_path)

for topic in response:
    print(topic)

print(f"{len(response)} topic(s) listed in your project and location.")

Lite トピックの削除

Lite トピックは、Cloud Console、gcloud コマンドライン ツール、または Pub/Sub Lite API を使用して削除できます。

Console

  1. Cloud Console で、[Lite トピック] ページに移動します。

    [Lite トピック] ページに移動

  2. [Lite トピック ID] をクリックします。

  3. [Lite トピックの詳細] ページで [削除] をクリックします。

  4. 表示されるフィールドに、「delete」と入力して削除する Lite トピックを確定します。

  5. [削除] をクリックします。

gcloud

Lite トピックを削除するには、gcloud pubsub lite-topics delete コマンドを使用します。

  1. delete コマンドを実行します。

    gcloud pubsub lite-topics delete TOPIC_ID \
     --zone=ZONE
    

    以下を置き換えます。

    • TOPIC_ID: Lite トピックの ID

    • ZONE: Lite トピックが含まれるゾーンの名前

  2. 確定するには「Y」と入力します。

リクエストが成功した場合のレスポンスは、次のようになります。

Deleted topic [TOPIC_ID].

プロトコル

Lite トピックを削除するには、次のような DELETE リクエストを送信します。

DELETE https://REGION-pubsublite.googleapis.com/v1/admin/projects/PROJECT_NUMBER/locations/ZONE/topics/TOPIC_ID
Authorization: Bearer $(gcloud auth print-access-token)

以下を置き換えます。

  • REGION: Lite トピックが含まれるゾーンのリージョン

  • PROJECT_NUMBER: Lite トピックのプロジェクトのプロジェクト番号

  • ZONE: Lite トピックが含まれるゾーンの名前

  • TOPIC_ID: Lite トピックの ID

リクエストが成功した場合のレスポンスは空の JSON オブジェクトです。

Go

このサンプルを試す前に、クイックスタート: クライアント ライブラリの使用の Go の設定手順を実施してください。詳細については、Pub/Sub Go API のリファレンス ドキュメントをご覧ください。

import (
	"context"
	"fmt"
	"io"

	"cloud.google.com/go/pubsublite"
)

func deleteTopic(w io.Writer, projectID, region, zone, topicID string) error {
	// projectID := "my-project-id"
	// region := "us-central1"
	// zone := "us-central1-a"
	// topicID := "my-topic"
	ctx := context.Background()
	client, err := pubsublite.NewAdminClient(ctx, region)
	if err != nil {
		return fmt.Errorf("pubsublite.NewAdminClient: %v", err)
	}
	defer client.Close()

	err = client.DeleteTopic(ctx, fmt.Sprintf("projects/%s/locations/%s/topics/%s", projectID, zone, topicID))
	if err != nil {
		return fmt.Errorf("client.DeleteTopic got err: %v", err)
	}
	fmt.Fprint(w, "Deleted topic\n")
	return nil
}

Java

このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Java の設定手順を実施してください。

import com.google.cloud.pubsublite.AdminClient;
import com.google.cloud.pubsublite.AdminClientSettings;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;

public class DeleteTopicExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");

    deleteTopicExample(cloudRegion, zoneId, projectNumber, topicId);
  }

  public static void deleteTopicExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId) throws Exception {
    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setName(TopicName.of(topicId))
            .build();

    AdminClientSettings adminClientSettings =
        AdminClientSettings.newBuilder().setRegion(CloudRegion.of(cloudRegion)).build();

    try (AdminClient adminClient = AdminClient.create(adminClientSettings)) {
      adminClient.deleteTopic(topicPath).get();
      System.out.println(topicPath.toString() + " deleted successfully.");
    }
  }

Python

このサンプルを実行する前に、Pub/Sub Lite クライアント ライブラリの Python の設定手順を実施してください。

from google.api_core.exceptions import NotFound
from google.cloud.pubsublite import AdminClient
from google.cloud.pubsublite.types import CloudRegion, CloudZone, TopicPath

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"

cloud_region = CloudRegion(cloud_region)
location = CloudZone(cloud_region, zone_id)
topic_path = TopicPath(project_number, location, topic_id)

client = AdminClient(cloud_region)
try:
    client.delete_topic(topic_path)
    print(f"{topic_path} deleted successfully.")
except NotFound:
    print(f"{topic_path} not found.")

Lite トピックを削除すると、そのトピックにメッセージをパブリッシュすることはできません。Lite トピックの Lite サブスクリプションは引き続き存在しますが、Lite サブスクリプションからメッセージを受信することはできません。