クイックスタート: Apache Kafka API を使用して Pub/Sub Lite でメッセージをパブリッシュおよび受信する

Apache Kafka API を使用して Pub/Sub Lite でメッセージをパブリッシュおよび受信する

このページでは、Pub/Sub Lite Kafka Shim を使用して Pub/Sub Lite でメッセージをパブリッシュする方法と受信する方法について説明します。

Pub/Sub Lite Kafka Shim は、Apache Kafka Java クライアント ライブラリのユーザーが行う Pub/Sub Lite での作業をより容易にしてくれる Java ライブラリです。これには、Producer APIConsumer API を実装する必要があります。

Apache Kafka トピックなどの Pub/Sub Lite トピックは、数値オフセットを使用してコンシューマーの進行状況を追跡するパーティション分割ログであるために可能です。

2 つのシステムは類似していますが、実際にはいくつかの重要な相違点があります。

  • Pub/Sub Lite トピックは Kafka トピックと同等です。ただし、Lite トピックでは各トピック パーティションでスループットとストレージ容量が構成可能ですが、Kafka トピックの容量は Kafka クラスタ構成によって決定されます。
  • Pub/Sub Lite のサブスクリプションは、Kafka コンシューマー グループと同等です。Lite サブスクリプションは、サブスクライバーが読み取ることができる Lite トピック パーティションからのメッセージを表す、Google Cloud Platform 指定の 1 次リソースです。同様に、Kafka コンシューマー グループは、Kafka トピックのパーティションからデータを読み取るコンシューマーで構成されます。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Google Cloud CLI をインストールします。
  3. gcloud CLI を初期化するには:

    gcloud init
  4. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  5. Google Cloud プロジェクトで課金が有効になっていることを確認します

  6. Pub/Sub Lite API を有効にします。

    gcloud services enable pubsublite.googleapis.com
  7. Google アカウントのローカル認証情報を作成します。

    gcloud auth application-default login
  8. Google アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。 roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • EMAIL_ADDRESS は実際のメールアドレスに置き換えます。
    • ROLE は、個々のロールに置き換えます。
  9. Google Cloud CLI をインストールします。
  10. gcloud CLI を初期化するには:

    gcloud init
  11. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  12. Google Cloud プロジェクトで課金が有効になっていることを確認します

  13. Pub/Sub Lite API を有効にします。

    gcloud services enable pubsublite.googleapis.com
  14. Google アカウントのローカル認証情報を作成します。

    gcloud auth application-default login
  15. Google アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。 roles/pubsublite.admin

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • EMAIL_ADDRESS は実際のメールアドレスに置き換えます。
    • ROLE は、個々のロールに置き換えます。

クライアント ライブラリをインストールする

次のサンプルは、pubsublite-kafka クライアント ライブラリのインストール方法を示しています。

Java

Maven を BOM なしで使用している場合は、以下を依存関係に追加します。

<dependency>
  <groupId>com.google.cloud</groupId>
  <artifactId>pubsublite-kafka</artifactId>
  <version>1.1.1</version>
</dependency>

Gradle を使用している場合は、以下を依存関係に追加します。

implementation 'com.google.cloud:pubsublite-kafka:1.1.2'

sbt を使用している場合は、以下を依存関係に追加します。

libraryDependencies += "com.google.cloud" % "pubsublite-kafka" % "1.1.2"

Visual Studio Code、IntelliJ または Eclipse を使用している場合は、次の IDE プラグインでプロジェクトにクライアント ライブラリを追加できます。

プラグインでは、サービス アカウントのキー管理などの追加機能も提供されます。詳細は各プラグインのドキュメントをご覧ください。

Lite トピックと Lite サブスクリプションを作成する

次のコマンドを使用して、Lite トピックと Lite サブスクリプションを作成します。

gcloud pubsub lite-topics create LITE_TOPIC_NAME \
    --location=LITE_LOCATION \
    --partitions=1 \
    --per-partition-bytes=30GiB

gcloud pubsub lite-subscriptions create LITE_SUBSCRIPTION_NAME \
    --location=LITE_LOCATION \
    --topic=LITE_TOPIC_NAME \
    --starting-offset=end \
    --delivery-requirement=deliver-after-stored

以下を置き換えます。

  • LITE_TOPIC_NAME: 新しい Lite トピックの名前。
  • LITE_SUBSCRIPTION_NAME: 新しい Lite サブスクリプションの名前。
  • LITE_LOCATION: Lite トピックと Lite サブスクリプションを作成するロケーション。サポートされている Pub/Sub Lite ロケーションを選択します。また、リージョンのゾーンも指定します。例: us-central1-a

Pub/Sub Lite にメッセージをパブリッシュする

Lite トピックにデータを送信するための Pub/Sub Lite プロデューサーを作成します。このクラスは、KafkaProducer と同じインターフェースである org.apache.kafka.clients.Producer を実装します。

import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.kafka.ProducerSettings;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

public class ProducerExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Use an existing Pub/Sub Lite topic.
    String topicId = "your-topic-id";
    // Using the project number is required for constructing a Pub/Sub Lite
    // topic path that the Kafka producer can use.
    long projectNumber = Long.parseLong("123456789");

    producerExample(cloudRegion, zoneId, projectNumber, topicId);
  }

  public static void producerExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId)
      throws InterruptedException, ExecutionException {
    TopicPath topicPath =
        TopicPath.newBuilder()
            .setLocation(CloudZone.of(CloudRegion.of(cloudRegion), zoneId))
            .setProject(ProjectNumber.of(projectNumber))
            .setName(TopicName.of(topicId))
            .build();

    ProducerSettings producerSettings =
        ProducerSettings.newBuilder().setTopicPath(topicPath).build();

    List<Future<RecordMetadata>> futures = new ArrayList<>();
    try (Producer<byte[], byte[]> producer = producerSettings.instantiate()) {
      for (long i = 0L; i < 10L; i++) {
        String key = "demo";
        Future<RecordMetadata> future =
            producer.send(
                new ProducerRecord(
                    topicPath.toString(), key.getBytes(), ("message-" + i).getBytes()));
        futures.add(future);
      }
      for (Future<RecordMetadata> future : futures) {
        RecordMetadata meta = future.get();
        System.out.println(meta.offset());
      }
    }
    System.out.printf("Published 10 messages to %s%n", topicPath.toString());
  }
}

Pub/Sub Lite からのメッセージの受信

Lite サブスクリプションからデータを受け取るための Pub/Sub Lite コンシューマを作成します。このクラスは、KafkaConsumer と同じインターフェースである org.apache.kafka.clients.consumer.Consumer を実装します。

import com.google.cloud.pubsublite.CloudRegion;
import com.google.cloud.pubsublite.CloudZone;
import com.google.cloud.pubsublite.ProjectNumber;
import com.google.cloud.pubsublite.SubscriptionName;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicName;
import com.google.cloud.pubsublite.TopicPath;
import com.google.cloud.pubsublite.cloudpubsub.FlowControlSettings;
import com.google.cloud.pubsublite.kafka.ConsumerSettings;
import java.time.Duration;
import java.util.Arrays;
import java.util.Base64;
import java.util.HashSet;
import java.util.Set;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;

public class ConsumerExample {

  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String cloudRegion = "your-cloud-region";
    char zoneId = 'b';
    // Use an existing Pub/Sub Lite topic and subscription.
    String topicId = "your-topic-id";
    String subscriptionId = "your-subscription-id";
    // Using the project number here is required for constructing a Pub/Sub Lite
    // topic path that the Kafka consumer can use.
    long projectNumber = Long.parseLong("123456789");

    consumerExample(cloudRegion, zoneId, projectNumber, topicId, subscriptionId);
  }

  public static void consumerExample(
      String cloudRegion, char zoneId, long projectNumber, String topicId, String subscriptionId) {

    CloudZone location = CloudZone.of(CloudRegion.of(cloudRegion), zoneId);

    TopicPath topicPath =
        TopicPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(TopicName.of(topicId))
            .build();

    SubscriptionPath subscription =
        SubscriptionPath.newBuilder()
            .setLocation(location)
            .setProject(ProjectNumber.of(projectNumber))
            .setName(SubscriptionName.of(subscriptionId))
            .build();

    FlowControlSettings flowControlSettings =
        FlowControlSettings.builder()
            // 50 MiB. Must be greater than the allowed size of the largest message (1 MiB).
            .setBytesOutstanding(50 * 1024 * 1024L)
            // 10,000 outstanding messages. Must be >0.
            .setMessagesOutstanding(10000L)
            .build();

    ConsumerSettings settings =
        ConsumerSettings.newBuilder()
            .setSubscriptionPath(subscription)
            .setPerPartitionFlowControlSettings(flowControlSettings)
            .setAutocommit(true)
            .build();

    Set<ConsumerRecord<byte[], byte[]>> hashSet = new HashSet<>();
    try (Consumer<byte[], byte[]> consumer = settings.instantiate()) {
      // The consumer can only subscribe to the topic that it is associated to.
      // If this is the only subscriber for this subscription, it will take up
      // to 90s for the subscriber to warm up.
      consumer.subscribe(Arrays.asList(topicPath.toString()));
      while (true) {
        ConsumerRecords<byte[], byte[]> records = consumer.poll(Duration.ofMillis(100));
        for (ConsumerRecord<byte[], byte[]> record : records) {
          long offset = record.offset();
          String value = Base64.getEncoder().encodeToString(record.value());
          hashSet.add(record);
          System.out.printf("Received %s: %s%n", offset, value);
        }
        // Early exit. Remove entirely to keep the consumer alive indefinitely.
        if (hashSet.size() >= 10) {
          System.out.println("Received 10 messages.");
          break;
        }
      }
    }
  }
}

クリーンアップ

このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースと一緒に Cloud プロジェクトを削除します。

  1. Lite トピックとサブスクリプションを削除します。

    gcloud pubsub lite-topics delete LITE_TOPIC_NAME
    gcloud pubsub lite-subscriptions delete LITE_SUBSCRIPTION_NAME
    

  2. サービス アカウントを削除します。
    gcloud iam service-accounts delete SERVICE_ACCOUNT_EMAIL
  3. 作成した認証情報を取り消して、ローカル認証情報ファイルを削除します。

    gcloud auth application-default revoke
  4. (省略可)gcloud CLI から認証情報を取り消します。

    gcloud auth revoke

次のステップ