エミュレータを使用したローカルでのアプリのテスト

アプリケーションをローカルで開発してテストするには、Pub/Sub エミュレータを使用できます。これにより、本番環境の Pub/Sub サービスのローカル エミュレーションを可能にします。Pub/Sub エミュレータは Google Cloud CLI を使用して実行します。

エミュレータに対してアプリケーションを実行するには、まずエミュレータを起動し、環境変数を設定します。アプリケーションは、本番環境の Pub/Sub サービスではなくエミュレータと通信する必要があります。作成されたリソースとエミュレータに公開されたメッセージは、エミュレータ セッションの存続期間にわたって持続されます。

始める前に

Pub/Sub エミュレータを使用する前に、次の前提条件を満たす必要があります。

エミュレータをインストールする

コマンド プロンプトからエミュレータをインストールします。

gcloud components install pubsub-emulator
gcloud components update

エミュレータをコンテナ イメージとしてインストールする

エミュレータをコンテナとしてインストールし、実行するには、gCloud Docker イメージをダウンロードしてインストールします。

エミュレータの開始

エミュレータを起動するには、コマンド プロンプトから pubsub start を実行します。コマンドを実行する前に、PUBSUB_PROJECT_ID を有効なGoogle Cloud プロジェクト ID 文字列に置き換えます。Pub/Sub エミュレータはローカルで実行されるため、文字列は実際の Google Cloud プロジェクトを表す必要はありません。

gcloud beta emulators pubsub start --project=PUBSUB_PROJECT_ID [options]

フラグの全一覧については、gcloud beta emulators pubsub start をご覧ください。

エミュレータの起動後、次のようなメッセージが表示されます。

...
[pubsub] This is the Pub/Sub fake.
[pubsub] Implementation may be incomplete or differ from the real system.
...
[pubsub] INFO: Server started, listening on 8085

このメッセージは、Pub/Sub サーバーが、 Google Cloud エンドポイントではなく、ローカルマシン上のエミュレータ エンドポイントで実行されていることを示します。以下を含むすべてのオペレーションはローカルで行われます。

  • トピックまたはサブスクリプションの作成
  • パブリッシュ
  • 登録

環境変数を設定する

エミュレータを起動した後、アプリケーションが Pub/Sub ではなくエミュレータに接続するように環境変数を設定する必要があります。これらの環境変数は、アプリケーションの実行に使用するマシンに設定します。

環境変数はエミュレータを開始するたびに設定する必要があります。環境変数は、動的に割り当てられるポート番号に依存しています。この番号は、エミュレータを再起動するたびに変わる可能性があります。

変数の自動設定

アプリケーションとエミュレータが同じマシン上で動作している場合は、次のようにして環境変数を自動的に設定できます。

Linux / macOS

コマンド代入を使用して env-init を実行します。

$(gcloud beta emulators pubsub env-init)

Windows

env-init からの出力を使用してバッチファイルを作成、実行します。

gcloud beta emulators pubsub env-init > set_vars.cmd && set_vars.cmd

これで、アプリケーションが Pub/Sub エミュレータに接続します。

変数の手動設定

アプリケーションとエミュレータが別々のマシン上で動作している場合は、次のようにして環境変数を手動で設定します。

  1. env-init コマンドを実行します。

     gcloud beta emulators pubsub env-init

  2. アプリケーションを実行するマシンで、env-init コマンドの出力の指示に従って PUBSUB_EMULATOR_HOST 環境変数と値を設定します。この設定により、アプリケーションがエミュレータに接続します。必要に応じて、エミュレータに使用するプロジェクトに PUBSUB_PROJECT_ID 環境変数を設定できます。

    Linux / macOS
    export PUBSUB_EMULATOR_HOST=[::1]:8432
    export PUBSUB_PROJECT_ID=my-project-id
    Windows
    set PUBSUB_EMULATOR_HOST=[::1]:8432
    set PUBSUB_PROJECT_ID=my-project-id

これで、アプリケーションが Pub/Sub エミュレータに接続します。

注: Python App Engine スタンダード ローカル開発サーバーを使用している場合は、次のようにコマンドラインでこの環境変数を渡す必要があります。

dev_appserver.py app.yaml --env_var PUBSUB_EMULATOR_HOST=${PUBSUB_EMULATOR_HOST}

dev_appserver.py[PATH_TO_CLOUD_SDK]/google-cloud-sdk/bin/dev_appserver.py に含まれています。

エミュレータの使用

エミュレータを使用するには、Cloud クライアント ライブラリを使用してアプリケーションを構築する必要があります。エミュレータでは、 Google Cloud コンソールまたは gcloud pubsub コマンドはサポートされていません。

次の例では、エミュレータと Python Cloud クライアント ライブラリを使用してさまざまなオペレーションを実行するアプリケーションの使用を示しています。これらのオペレーションの例として、トピックの作成、メッセージのパブリッシュ、メッセージの読み取りの方法が含まれます。

エミュレータの環境変数を設定したマシンで、次の手順を行います。

  1. Python リポジトリ全体のクローンを作成することによって、GitHub から Pub/Sub Python のサンプルを取得します。

  2. クローン作成されたリポジトリで、samples/snippets ディレクトリに移動します。残りのステップをこのディレクトリで行います。

  3. サンプルの実行に必要な依存環境を samples/snippets ディレクトリからインストールします。

    pip install -r requirements.txt
    
  4. トピックを作成します。

     python publisher.py PUBSUB_PROJECT_ID create TOPIC_ID
    
  5. (省略可)エミュレータで push サブスクリプションをテストするためのローカル push エンドポイントがない場合は、次の手順を実行して http://[::1]:3000/messages に作成します。

    1. JSON サーバーをインストールします。
      npm install -g json-server
      
    2. JSON サーバーを開始します。
      json-server --port 3000 --watch db.json
      
      ここで、db.json には以下のスターター コードが含まれています。
      {
         "messages": []
      }
      
    3. 次のステップの PUSH_ENDPOINT のために、http://[::1]:3000/messages をメモしてください。
  6. トピックへのサブスクリプションを作成します。

    • pull サブスクリプションを作成します。

      python subscriber.py PUBSUB_PROJECT_ID create TOPIC_ID SUBSCRIPTION_ID
      
    • push サブスクリプションを作成します。

      python subscriber.py PUBSUB_PROJECT_ID create-push TOPIC_ID SUBSCRIPTION_ID \
      PUSH_ENDPOINT
      
  7. トピックにメッセージをパブリッシュします。

     python publisher.py PUBSUB_PROJECT_ID publish TOPIC_ID
    
  8. トピックにパブリッシュされているメッセージを読み取ります。

    • pull サブスクリプションからメッセージを取得します。

      python subscriber.py PUBSUB_PROJECT_ID receive SUBSCRIPTION_ID
      
    • ローカル push エンドポイントに配信されたメッセージを確認します。たとえば、メッセージは次のようになります。

      {
        "messages": [
            {
                "subscription": "projects/PUBSUB_PROJECT_ID/subscriptions/SUBSCRIPTION_ID",
                "message": {
                    "data": "TWVzc2FnZSBudW1iZXIgMQ==",
                    "messageId": "10",
                    "attributes": {}
                },
                "id": 1
            },
            ...
        ]
      }
      

環境変数へのアクセス

Java と C# 以外の言語では、環境変数の設定で説明されているように PUBSUB_EMULATOR_HOST を設定すると、Pub/Sub クライアント ライブラリは、Pub/Sub ではなくローカル インスタンスで実行される API を自動的に呼び出します。

ただし C# と Java のクライアント ライブラリの場合には、エミュレータを使用するようにコードを変更する必要があります。

C#

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある C# 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub C# API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。


using Google.Api.Gax;
using Google.Cloud.PubSub.V1;
using System;
using System.Collections.Generic;
using System.Threading.Tasks;

public class EmulatorSupportSample
{
    public async Task WithEmulatorAsync(string projectId, string topicId, string subscriptionId)
    {
        // Use EmulatorDetection.EmulatorOrProduction to create service clients that will
        // that will connect to the PubSub emulator if the PUBSUB_EMULATOR_HOST environment
        // variable is set, but will otherwise connect to the production environment.

        // Create the PublisherServiceApiClient using the PublisherServiceApiClientBuilder
        // and setting the EmulatorDection property.
        PublisherServiceApiClient publisherService = await new PublisherServiceApiClientBuilder
        {
            EmulatorDetection = EmulatorDetection.EmulatorOrProduction
        }.BuildAsync();

        // Use the client as you'd normally do, to create a topic in this example.
        TopicName topicName = new TopicName(projectId, topicId);
        publisherService.CreateTopic(topicName);

        // Create the SubscriberServiceApiClient using the SubscriberServiceApiClientBuilder
        // and setting the EmulatorDection property.
        SubscriberServiceApiClient subscriberService = await new SubscriberServiceApiClientBuilder
        {
            EmulatorDetection = EmulatorDetection.EmulatorOrProduction
        }.BuildAsync();

        // Use the client as you'd normally do, to create a subscription in this example.
        SubscriptionName subscriptionName = new SubscriptionName(projectId, subscriptionId);
        subscriberService.CreateSubscription(subscriptionName, topicName, pushConfig: null, ackDeadlineSeconds: 60);

        // Create the PublisherClient using PublisherClientBuilder to set the EmulatorDetection property.
        PublisherClient publisher = await new PublisherClientBuilder
        {
            TopicName = topicName,
            EmulatorDetection = EmulatorDetection.EmulatorOrProduction
        }.BuildAsync();
        // Use the client as you'd normally do, to send a message in this example.
        await publisher.PublishAsync("Hello, Pubsub");
        await publisher.ShutdownAsync(TimeSpan.FromSeconds(15));

        // Create the SubscriberClient using SubscriberClientBuild to set the EmulatorDetection property.
        SubscriberClient subscriber = await new SubscriberClientBuilder
        {
            SubscriptionName = subscriptionName,
            EmulatorDetection = EmulatorDetection.EmulatorOrProduction
        }.BuildAsync();
        List<PubsubMessage> receivedMessages = new List<PubsubMessage>();

        // Use the client as you'd normally do, to listen for messages in this example.
        await subscriber.StartAsync((msg, cancellationToken) =>
        {
            receivedMessages.Add(msg);
            Console.WriteLine($"Received message {msg.MessageId} published at {msg.PublishTime.ToDateTime()}");
            Console.WriteLine($"Text: '{msg.Data.ToStringUtf8()}'");
            // In this example we stop the subscriber when the message is received.
            // You may leave the subscriber running, and it will continue to received published messages
            // if any.
            // This is non-blocking, and the returned Task may be awaited.
            subscriber.StopAsync(TimeSpan.FromSeconds(15));
            // Return Reply.Ack to indicate this message has been handled.
            return Task.FromResult(SubscriberClient.Reply.Ack);
        });
    }
}

Java

このサンプルを試す前に、Pub/Sub クイックスタート: クライアント ライブラリの使用にある Java 向けの手順に従って設定を行ってください。 詳細については、Pub/Sub Java API のリファレンス ドキュメントをご覧ください。

Pub/Sub に対する認証を行うには、アプリケーションのデフォルト認証情報を設定します。詳細については、ローカル開発環境の認証を設定するをご覧ください。


import com.google.api.core.ApiFuture;
import com.google.api.gax.core.CredentialsProvider;
import com.google.api.gax.core.NoCredentialsProvider;
import com.google.api.gax.grpc.GrpcTransportChannel;
import com.google.api.gax.rpc.FixedTransportChannelProvider;
import com.google.api.gax.rpc.TransportChannelProvider;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.TopicAdminClient;
import com.google.cloud.pubsub.v1.TopicAdminSettings;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.PubsubMessage;
import com.google.pubsub.v1.Topic;
import com.google.pubsub.v1.TopicName;
import io.grpc.ManagedChannel;
import io.grpc.ManagedChannelBuilder;

public class UsePubSubEmulatorExample {
  public static void main(String... args) throws Exception {
    String hostport = System.getenv("PUBSUB_EMULATOR_HOST");
    ManagedChannel channel = ManagedChannelBuilder.forTarget(hostport).usePlaintext().build();
    try {
      TransportChannelProvider channelProvider =
          FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
      CredentialsProvider credentialsProvider = NoCredentialsProvider.create();

      // Set the channel and credentials provider when creating a `TopicAdminClient`.
      // Can be done similarly for a `SubscriptionAdminClient`.
      TopicAdminClient topicAdminClient =
          TopicAdminClient.create(
              TopicAdminSettings.newBuilder()
                  .setTransportChannelProvider(channelProvider)
                  .setCredentialsProvider(credentialsProvider)
                  .build());

      TopicName topicName = TopicName.of("my-project-id", "my-topic-id");
      Topic topic = topicAdminClient.createTopic(topicName);
      System.out.println("Created topic: " + topic.getName());

      // Set the channel and credentials provider when creating a `Publisher`.
      // Can be done similarly for a `Subscriber`.
      Publisher publisher =
          Publisher.newBuilder(topicName)
              .setChannelProvider(channelProvider)
              .setCredentialsProvider(credentialsProvider)
              .build();

      String message = "Hello World!";
      ByteString data = ByteString.copyFromUtf8(message);
      PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();

      ApiFuture<String> messageIdFuture = publisher.publish(pubsubMessage);
      String messageId = messageIdFuture.get();
      System.out.println("Published message ID: " + messageId);
    } finally {
      channel.shutdown();
    }
  }
}

エミュレータの停止

エミュレータを停止するには、Ctrl+C キーを押します。

エミュレータを停止した後、次のコマンドを実行して PUBSUB_EMULATOR_HOST 環境変数を削除し、アプリケーションが Pub/Sub に接続するようにします。

Linux / macOS
unset PUBSUB_EMULATOR_HOST
Windows
set PUBSUB_EMULATOR_HOST=

エミュレータのコマンドライン引数

Cloud Pub/Sub エミュレータのコマンドライン引数の詳細については、gcloud beta emulators pubsub をご覧ください。

サポートされている機能

エミュレータは、次の Pub/Sub 機能をサポートしています。

  • メッセージの公開
  • push サブスクリプションと pull サブスクリプションからのメッセージの受信
  • メッセージの順序指定
  • メッセージの再生
  • デッドレター トピックへのメッセージの転送
  • メッセージ配信のポリシーの再試行
  • Avro のスキーマのサポート

既知の制限事項

  • UpdateTopicUpdateSnapshot の RPC はサポートされていません。
  • IAM オペレーションはサポートされていません。
  • 構成可能なメッセージ保持はサポートされていません。すべてのメッセージが無期限に保持されます。
  • サブスクリプションの有効期限はサポートされていません。サブスクリプションの有効期限はありません。
  • フィルタリングはサポートされていません。
  • プロトコル バッファのスキーマ サポート。
  • BigQuery サブスクリプションを作成することはできますが、BigQuery にメッセージは送信されません。
  • 順序指定サブスクリプションのタイムスタンプまでシークすることはできません。

問題を報告するには、公開 Issue Tracker を送信してください。

次のステップ

  • minikube で Pub/Sub エミュレータを使用する方法については、このブログ投稿をご覧ください。