1 対多の Pub/Sub システムの構築

このチュートリアルでは、同期 RPC ではなく、Pub/Sub を介してメッセージを送信することによって通信する、一連のアプリケーションを設定する手順について説明します。アプリケーションを分離することにより、メッセージングには以下のようなメリットがあります。

  • アプリケーションの堅牢性が強化されます。
  • 開発をシンプルにできる可能性があります。

たとえば、呼び出し元(パブリッシャー)が受信者(サブスクライバー)を起動して稼働状態にする必要がなくなります。Pub/Sub にメッセージを送信するだけです。パブリッシャーが、メッセージを受信する必要があるサブスクライバー アプリケーションの種類と数を把握する必要もありません。そのため 1 つ以上のサブスクライバー アプリケーションが稼働しているときは、サービスは常にサブスクライバー アプリケーションにメッセージを配信できます。

システムの概要

このチュートリアルでは、次の図に示すように、1 対多の通信を使用して 2 つのサブスクライバーに「Hello, World!」メッセージを送信するパブリッシャー アプリケーションを起動します。

トピック、トピックに関連付けられたサブスクリプション、Cloud Pub/Sub との間でメッセージを送受信するパブリッシャー アプリケーションとサブスクライバー アプリケーションの図

2 つのサブスクライバー アプリケーションには同一のコードを使用しますが、起動するタイミングは異なります。このプロセスでは、Pub/Sub で非同期通信を有効にできます。このシステムを構築する手順は、次のとおりです。

  1. アプリケーションが認証に使用する IAM サービス アカウントを作成します。
  2. IAM 権限を設定します。
  3. Pub/Sub トピックとサブスクリプションを作成する
  4. 1 つのパブリッシャーと 2 つのサブスクライバーの合計 3 つの独立したアプリケーションを起動させます。

始める前に

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud CLI をインストールします。
  3. gcloud CLI を初期化するには:

    gcloud init
  4. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  5. Google Cloud プロジェクトで課金が有効になっていることを確認します

  6. Pub/Sub API を有効にします。

    gcloud services enable pubsub.googleapis.com
  7. Google アカウントのローカル認証情報を作成します。

    gcloud auth application-default login
  8. Google アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。 roles/pubsub.publisher, roles/pubsub.subscriber

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • EMAIL_ADDRESS は実際のメールアドレスに置き換えます。
    • ROLE は、個々のロールに置き換えます。
  9. Google Cloud CLI をインストールします。
  10. gcloud CLI を初期化するには:

    gcloud init
  11. Google Cloud プロジェクトを作成または選択します

    • Google Cloud プロジェクトを作成します。

      gcloud projects create PROJECT_ID

      PROJECT_ID は、作成する Google Cloud プロジェクトの名前に置き換えます。

    • 作成した Google Cloud プロジェクトを選択します。

      gcloud config set project PROJECT_ID

      PROJECT_ID は、実際の Google Cloud プロジェクト名に置き換えます。

  12. Google Cloud プロジェクトで課金が有効になっていることを確認します

  13. Pub/Sub API を有効にします。

    gcloud services enable pubsub.googleapis.com
  14. Google アカウントのローカル認証情報を作成します。

    gcloud auth application-default login
  15. Google アカウントにロールを付与します。次の IAM ロールごとに次のコマンドを 1 回実行します。 roles/pubsub.publisher, roles/pubsub.subscriber

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
    • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
    • EMAIL_ADDRESS は実際のメールアドレスに置き換えます。
    • ROLE は、個々のロールに置き換えます。

Python のインストール

このチュートリアルでは、Pub/Sub クライアント ライブラリを使用します。これには、Python 3.7 以降が必要です。Python のインストールの手順を完了します。

Pub/Sub プロジェクトを設定する

アプリケーションのパブリッシュとサブスクライブの間のメッセージ フローを管理するには、トピックと 2 つの異なるサブスクリプションを作成します。

Pub/Sub トピックの作成

ID hello_topic を含むトピックを作成します。

gcloud pubsub topics create hello_topic

Pub/Sub サブスクリプションの作成

2 つのサブスクリプションを作成し、トピックに関連付けます。

これらのサブスクリプションは、pull サブスクリプションの一種である ストリーミング Pull サブスクリプションです。

サブスクリプション 1

ID sub_one を含むサブスクリプションを作成し、hello_topic に添付します。

gcloud pubsub subscriptions create sub_one --topic=hello_topic

サブスクリプション 2

ID sub_two を含むサブスクリプションを作成し、hello_topic に添付します。

gcloud pubsub subscriptions create sub_two --topic=hello_topic

1 対多のシステムを構築する

パブリッシャーとサブスクライバーのコードをダウンロードする

  1. このチュートリアルで必要な Pub/Sub Python ファイルをダウンロードします。

     git clone https://github.com/googleapis/python-pubsub.git
    
  2. 次の手順に進む前に、開いているターミナルをすべて閉じます。

3 つのターミナルを設定する

  1. 各チュートリアル アプリケーション(1 つのパブリッシャーと 2 つのサブスクライバー)につき 1 つのターミナルを起動します。便宜上、このチュートリアルでは次のターミナルを呼び出します。

    • publisher ターミナル
    • sub_one ターミナル
    • sub_two ターミナル
  2. publisher ターミナルで、pyenv-qs という名前の Python 仮想環境を作成して有効にします。

    Bash

    python -m venv pyenv-qs
    source pyenv-qs/bin/activate
    

    PowerShell

    py -m venv pyenv-qs
    .\pyenv-qs\Scripts\activate
    

    sub_one ターミナルと sub_two ターミナルで、次のコマンドを実行します。

    Bash

    source pyenv-qs/bin/activate
    

    PowerShell

    .\pyenv-qs\Scripts\activate
    

    activate コマンドを実行すると、コマンド プロンプトに (pyenv-qs) $ という値が表示されます。

  3. publisher ターミナルで、pip を使用して Pub/Sub Python クライアント ライブラリをインストールします。

    python -m pip install --upgrade google-cloud-pubsub
  4. 3 つのターミナルすべてで、現在のプロジェクト ID を使用して環境変数を設定します。この gcloud コマンドは、選択したプロジェクト ID を判別し変数として設定します。

    Bash

    export PROJECT=`gcloud config get-value project`
    

    PowerShell

    $env:PROJECT=$(gcloud config get-value project)
    
  5. 3 つすべてのターミナルで、サンプルコードを含むプロジェクトパスに変更します。

    cd python-pubsub/samples/snippets/quickstart/
    

アプリケーションを起動してメッセージ フローを確認する

サブスクライバー 1 のアプリケーションを起動する

sub_one ターミナルでサブスクライバー 1 を起動します。

Bash

python sub.py $PROJECT sub_one

PowerShell

py sub.py $env:PROJECT sub_one

起動すると、このアプリケーションはサーバーとの双方向ストリーミング接続を確立します。Pub/Sub は、その接続を介してメッセージを配信します。

サブスクライバー 1 アプリケーションは sub_one サブスクリプションでメッセージのリッスンを開始します。

パブリッシャー アプリケーションを起動する

publisher ターミナルでパブリッシャー アプリケーションを起動します。

Bash

python pub.py $PROJECT hello_topic

PowerShell

py pub.py $env:PROJECT hello_topic

パブリッシャー アプリケーションが起動すると、Pub/Sub システムが次の処理を行います。

  • パブリッシャー アプリケーションが「Hello, World!」というメッセージを Pub/Sub に送信します。既存のサブスクリプションにはこれは認識されません。このサーバーはメッセージ ID の割り当ても行います。

  • サブスクライバー 1 アプリケーションが「Hello World」というメッセージを受信して出力し、確認応答を Pub/Sub に送信します。

  • パブリッシャー アプリケーションが確認応答を出力します。この確認応答により、メッセージの処理に成功したことと、このサブスクライバーや他の sub_one サブスクライバーに再送信する必要がないことが Pub/Sub に通知されます。

Pub/Sub によりメッセージが sub_one から削除されます。

パブリッシャー アプリケーションはメッセージをパブリッシュし、メッセージ ID を割り当てます。サブスクライバー 1 アプリケーションは「Hello World」メッセージを受信し、確認応答を送信します。

サブスクライバー 2 アプリケーションを起動する

sub_two ターミナルでサブスクライバー 2 を起動します。

Bash

python sub.py $PROJECT sub_two

PowerShell

py sub.py $env:PROJECT sub_two

このサブスクライバーは、sub_two サブスクリプションに配信されたメッセージを受信します。サブスクライバー 2 では sub.py スクリプトが再利用されます。ただし、サブスクライバー 2 はパブリッシャーがトピックとサブスクリプションにメッセージを送信するまで起動されません。パブリッシャーサブスクライバー 2 を直接呼び出した場合、パブリッシャー アプリケーションはサブスクライバー 2 が起動するまで待機するかタイムアウトする必要があります。Pub/Sub は、サブスクライバー 2 へのメッセージを効率的に保存して管理します。

サブスクライバー 2 がリッスンを開始し、リッスンのため待機していたメッセージを sub_two で受信します。

これで、Pub/Sub を使用して開発を行う準備が整いました。

いかがでしたか

Cloud Pub/Sub のサポートページにその他の参考資料やリンクがありますので、こちらもご利用ください。

クリーンアップ

  1. 実行中のすべてのアプリケーションを停止します。
  2. サンプルコードのディレクトリをローカル環境から削除します。
  3. トピックを削除します。

    gcloud pubsub topics delete hello_topic
    
  4. サブスクリプションを削除します。

    gcloud pubsub subscriptions delete sub_one
    
    gcloud pubsub subscriptions delete sub_two
    
  5. Google Cloud Console の [IAM と管理] セクションでチュートリアル プロジェクトを終了します。

  6. 作成した認証情報を取り消して、ローカル認証情報ファイルを削除します。

    gcloud auth application-default revoke
  7. (省略可)gcloud CLI から認証情報を取り消します。

    gcloud auth revoke

次のステップ

以下のことができます。

  • GitHub でチュートリアルの pub.py コードと sub.py コードを確認し、Pub/Sub の他のサンプルを閲覧できます。演習として、1 秒間隔で現地時間をパブリッシュするバージョンの pub.py を作成します。

  • メッセージのバッチ処理の方法を学びます。

  • push サブスクリプションを使用して、App Engine エンドポイントCloud Functions をトリガーするメッセージを受信します。

  • 再生を使用して、確認応答済みのメッセージを取得します。デフォルトでは、Pub/Sub はサブスクリプションから確認応答済みのメッセージを削除します。たとえば、このチュートリアルでは sub.py を再実行して、「Hello, World!」というメッセージを再度受信することはできません。再生機能を使用すると、メッセージが確認応答された後、そのメッセージを受信するようにサブスクリプションを設定できます。

  • 他の言語のクライアント ライブラリを使用します。