クイックスタート

このページでは、次の方法について説明します。

  • Google Cloud Console を使用して Lite トピックと Lite サブスクリプションを作成します。
  • Java 用の Pub/Sub Lite クライアント ライブラリを使用してメッセージを送受信します。

始める前に

  1. Google アカウントにログインします。

    Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。

  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    [プロジェクトの選択] ページに移動

  3. Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  4. Pub/Sub Lite API を有効にします。

    API を有効にする

  5. 認証の設定:
    1. Cloud Console で、[サービス アカウント キーの作成] ページに移動します。

      [サービス アカウント キーの作成] ページに移動
    2. [サービス アカウント] リストから [新しいサービス アカウント] を選択します。
    3. [サービス アカウント名] フィールドに名前を入力します。
    4. [ロール] リストから、プロジェクト > オーナー

    5. [作成] をクリックします。キーが含まれている JSON ファイルがパソコンにダウンロードされます。
  6. 環境変数 GOOGLE_APPLICATION_CREDENTIALS を、サービス アカウント キーが含まれる JSON ファイルのパスに設定します。 この変数は現在のシェル セッションにのみ適用されるため、新しいセッションを開く場合は、変数を再度設定します。

クライアント ライブラリをインストールする

Java

Maven を使用している場合は、次のものを pom.xml ファイルに追加します。BOM の詳細については、Google Cloud Platform ライブラリ BOM をご覧ください。

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-pubsublite</artifactId>
  <version>0.6.5</version>
</dependency>
<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>google-cloud-pubsub</artifactId>
  <version>1.110.0</version>
</dependency>

Gradle を使用している場合は、次のものを依存関係に追加します。

compile 'com.google.cloud:google-cloud-pubsublite:0.7.0'

sbt を使用している場合は、次のものを依存関係に追加します。

libraryDependencies += "com.google.cloud" % "google-cloud-pubsublite" % "0.7.0"

IntelliJ または Eclipse を使用している場合は、次の IDE プラグインを使用してプロジェクトにクライアント ライブラリを追加できます。

プラグインでは、サービス アカウントのキー管理などの追加機能も提供されます。詳細は各プラグインのドキュメントをご覧ください。

Python

pip install --upgrade google-cloud-pubsublite

Lite トピックの作成

Cloud Console を使用して Lite トピックを作成するには、次の手順に従います。

  1. Cloud Console で、[Lite トピック] ページに移動します。

    [Lite トピック] ページに移動

  2. [Lite トピックを作成] をクリックします。

  3. リージョンとリージョン内のゾーンを選択します。

  4. [名前] セクションに、Lite トピック ID として「your-lite-topic」と入力します。Lite トピック名には、Lite トピック ID、ゾーン、プロジェクト番号が含まれます。

  5. [作成] をクリックします。

Lite サブスクリプションの作成

Cloud Console を使用して Lite サブスクリプションを作成するには、次の手順に従います。

  1. Cloud Console で、[Lite サブスクリプション] ページに移動します。

    [Lite サブスクリプション] ページに移動

  2. [Lite サブスクリプションを作成] をクリックします。

  3. [Lite サブスクリプション ID] フィールドに「your-lite-subscription」と入力します。

  4. メッセージを受信する Lite トピックを選択します。

  5. [配信要件] セクションで、[保存後にメッセージを配信] を選択します。

  6. [作成] をクリックします。

Lite サブスクリプションは Lite トピックと同じゾーンに存在しています。

メッセージを送信する

次のパブリッシャー アプリケーションを使用して Lite トピックにメッセージを送信します。

Java

import com.google.api.core.ApiFuture;
import com.google.api.core.ApiFutures;
import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.PublishMetadata;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.Publisher;
import com.google.cloud.pubsublite.cloudpubsub.PublisherSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;

public class PublisherExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing topic for the publish example to work.
    String topicId = "your-topic-id";
    long projectNumber = Long.parseLong("123456789");
    int messageCount = 100;

    publisherExample(cloudRegion, zoneId, projectNumber, topicId, messageCount);
  }

  // Publish messages to a topic.
  public static void publisherExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, int messageCount)
      throws ApiException, ExecutionException, InterruptedException {

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setProject(ProjectNumber.of(projectNumber))
            .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setName(TopicName.of(topicId))
            .build();
    Publisher publisher = null;
    List<ApiFuture<String>> futures = new ArrayList<>();

    try {
      PublisherSettings publisherSettings =
          PublisherSettings.newBuilder().setTopicPath(topicPath).build();

      publisher = Publisher.create(publisherSettings);

      // Start the publisher. Upon successful starting, its state will become RUNNING.
      publisher.startAsync().awaitRunning();

      for (int i = 0; i < messageCount; i++) {
        String message = "message-" + i;

        // Convert the message to a byte string.
        ByteString data = ByteString.copyFromUtf8(message);
        PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

        // Publish a message. Messages are automatically batched.
        ApiFuture<String> future = publisher.publish(pubsubMessage);
        futures.add(future);
      }
    } finally {
      ArrayList<PublishMetadata> metadata = new ArrayList<>();
      List<String> ackIds = ApiFutures.allAsList(futures).get();
      for (String id : ackIds) {
        // Decoded metadata contains partition and offset.
        metadata.add(PublishMetadata.decode(id));
      }
      System.out.println(metadata + "\nPublished " + ackIds.size() + " messages.");

      if (publisher != null) {
        // Shut down the publisher.
        publisher.stopAsync().awaitTerminated();
        System.out.println("Publisher is shut down.");
      }
    }
  }
}

Python

from google.cloud.pubsublite.cloudpubsub import PublisherClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    PublishMetadata,
    TopicPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# topic_id = "your-topic-id"
# num_messages = 100

location = CloudZone(CloudRegion(cloud_region), zone_id)
topic_path = TopicPath(project_number, location, topic_id)

# PublisherClient() must be used in a `with` block or have __enter__() called before use.
with PublisherClient() as publisher_client:
    data = "Hello world!"
    api_future = publisher_client.publish(topic_path, data.encode("utf-8"))
    # result() blocks. To resolve API futures asynchronously, use add_done_callback().
    message_id = api_future.result()
    publish_metadata = PublishMetadata.decode(message_id)
    print(
        f"Published a message to partition {publish_metadata.partition.value} and offset {publish_metadata.cursor.offset}."
    )

パブリッシャーは 100 件のメッセージを Lite トピックに送信し、Pub/Sub Lite サービスが受信したメッセージの数を出力します。

メッセージの受信

次のサブスクライバー アプリケーションを使用して、Lite サブスクリプションからメッセージを受信します。

Java

import com.google.api.gax.rpc.ApiException;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.cloudpubsub.Subscriber;
import com.google.cloud.pubsublite.cloudpubsub.SubscriberSettings;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscriberExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Choose an existing subscription for the subscribe example to work.
    String subscriptionId = "your-subscription-id";
    long projectNumber = Long.parseLong("123456789");

    subscriberExample(cloudRegion, zoneId, projectNumber, subscriptionId);
  }

  public static void subscriberExample(
      String cloudRegion, char zoneId, long projectNumber, String subscriptionId)
      throws ApiException {

    SubscriptionPath subscriptionPath =
        SubscriptionPath.newBuilder()
            .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    // The message stream is paused based on the maximum size or number of messages that the
    // subscriber has already received, whichever condition is met first.
    FlowControlSettings flowControlSettings =
        FlowControlSettings.builder()
            // 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
            .setBytesOutstanding(10 * 1024 * 1024L)
            // 1,000 outstanding messages. Must be >0.
            .setMessagesOutstanding(1000L)
            .build();

    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          System.out.println("Id : " + message.getMessageId());
          System.out.println("Data : " + message.getData().toStringUtf8());
          consumer.ack();
        };

    SubscriberSettings subscriberSettings =
        SubscriberSettings.newBuilder()
            .setSubscriptionPath(subscriptionPath)
            .setReceiver(receiver)
            // Flow control settings are set at the partition level.
            .setPerPartitionFlowControlSettings(flowControlSettings)
            .build();

    Subscriber subscriber = Subscriber.create(subscriberSettings);

    // Start the subscriber. Upon successful starting, its state will become RUNNING.
    subscriber.startAsync().awaitRunning();

    System.out.println("Listening to messages on " + subscriptionPath.toString() + "...");

    try {
      System.out.println(subscriber.state());
      // Wait 90 seconds for the subscriber to reach TERMINATED state. If it encounters
      // unrecoverable errors before then, its state will change to FAILED and an
      // IllegalStateException will be thrown.
      subscriber.awaitTerminated(90, TimeUnit.SECONDS);
    } catch (TimeoutException t) {
      // Shut down the subscriber. This will change the state of the subscriber to TERMINATED.
      subscriber.stopAsync().awaitTerminated();
      System.out.println("Subscriber is shut down: " + subscriber.state());
    }
  }
}

Python

from concurrent.futures._base import TimeoutError
from google.cloud.pubsublite.cloudpubsub import SubscriberClient
from google.cloud.pubsublite.types import (
    CloudRegion,
    CloudZone,
    FlowControlSettings,
    SubscriptionPath,
)

# TODO(developer):
# project_number = 1122334455
# cloud_region = "us-central1"
# zone_id = "a"
# subscription_id = "your-subscription-id"
# timeout = 90

location = CloudZone(CloudRegion(cloud_region), zone_id)
subscription_path = SubscriptionPath(project_number, location, subscription_id)
# Configure when to pause the message stream for more incoming messages based on the
# maximum size or number of messages that a single-partition subscriber has received,
# whichever condition is met first.
per_partition_flow_control_settings = FlowControlSettings(
    # 1,000 outstanding messages. Must be >0.
    messages_outstanding=1000,
    # 10 MiB. Must be greater than the allowed size of the largest message (1 MiB).
    bytes_outstanding=10 * 1024 * 1024,
)

def callback(message):
    message_data = message.data.decode("utf-8")
    print(f"Received {message_data} of ordering key {message.ordering_key}.")
    message.ack()

# SubscriberClient() must be used in a `with` block or have __enter__() called before use.
with SubscriberClient() as subscriber_client:

    streaming_pull_future = subscriber_client.subscribe(
        subscription_path,
        callback=callback,
        per_partition_flow_control_settings=per_partition_flow_control_settings,
    )

    print(f"Listening for messages on {str(subscription_path)}...")

    try:
        streaming_pull_future.result(timeout=timeout)
    except TimeoutError or KeyboardInterrupt:
        streaming_pull_future.cancel()
        assert streaming_pull_future.done()

サブスクライバーはメッセージを受信すると、メッセージ ID とメッセージ データを出力します。

クリーンアップ

このクイックスタートで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の手順を行います。

  1. Cloud Console で、[Lite トピック] ページに移動します。

    [Lite トピック] ページに移動

  2. [your-lite-topic] をクリックします。

  3. [Lite トピックの詳細] ページで [削除] をクリックします。

  4. 表示されるフィールドに、「delete」と入力して削除する Lite トピックを確定します。

  5. [削除] をクリックします。

次のステップ