1 対多の Pub/Sub システムの構築

はじめに

このチュートリアルでは、同期 RPC ではなく、Pub/Sub を介してメッセージを送信することによって通信する、一連の単純なアプリケーションを設定する手順について説明します。アプリケーションを分離することにより、メッセージングには以下のようなメリットがあります。

  • アプリケーションの堅牢性が強化されます。
  • 開発をシンプルにできる可能性があります。

たとえば、呼び出し元(パブリッシャー)が受信者(サブスクライバー)を起動して稼働状態にする必要がなくなります。Pub/Sub にメッセージを送信するだけです。パブリッシャーが、メッセージを受信する必要があるサブスクライバー アプリケーションの種類と数を把握する必要もありません。そのため 1 つ以上のサブスクライバー アプリケーションが稼働しているときは、サービスは常にサブスクライバー アプリケーションにメッセージを配信できます。

要件:

システムの概要

このチュートリアルでは、以下の図に示されているように、「Hello, World!」というメッセージを 1 対多の通信を使用して 2 つのサブスクライバーに送信するパブリッシャー アプリケーションを起動します。

トピック、トピックに関連付けられたサブスクリプション、Cloud Pub/Sub との間でメッセージを送受信するパブリッシャー アプリケーションとサブスクライバー アプリケーションの図

2 つのサブスクライバー アプリケーションには同一のコードを使用しますが、起動するタイミングは異なります。このプロセスでは、Pub/Sub で非同期通信を有効にできます。このシステムを構築する手順は以下のとおりです。

  1. 必要な Pub/Sub トピックとサブスクリプションを作成します。
  2. アプリケーションの認証に使用するサービス アカウントを作成します。
  3. IAM 権限を設定します。
  4. 1 つのパブリッシャーと 2 つのサブスクライバーの合計 3 つの独立したアプリケーションを起動させます。

チュートリアルの設定

Google Cloud プロジェクト、Pub/Sub トピック、サブスクリプションを設定する

  1. Google Cloud Console にログインします。

    Google Cloud Console に移動

    Cloud を初めて使用する場合は、[有効化] をクリックしてから指示に従って Cloud アカウントを設定します。

  2. 既存のプロジェクトを選択するか、プロジェクトを新規作成します。Google Cloud を初めて使用する場合はデフォルトのプロジェクトが作成されます。

    Cloud Console の [ホーム] セクションで、プロジェクト ID をメモします。この値は、Cloud SDK の初期化プロセスで現在の Cloud Storage プロジェクトを設定するときに使用します。また、パブリッシャー アプリケーションとサブスクライバー アプリケーションを起動するときにはこの ID を Python スクリプトに渡します。

  3. Google Cloud Console の [Pub/Sub] セクションに移動します。

    [Pub/Sub] セクションに移動

    指示に従って API を有効にします。

  4. [トピックを作成] をクリックします。パブリッシャー アプリケーションがトピックにメッセージを送信します。hello_topic という名前を使用します。

  5. [トピックの詳細]ページで、[登録を作成]をクリックします。

    1. サブスクリプションに sub_one と名前を付けます。いかなるデフォルト設定も変更しないでください。ここでは pull サブスクリプションの種類である StreamingPull サブスクリプションを作成します。

    2. 同じ手順で sub_two という名前のサブスクリプションを作成し、hello_topic に関連付けます。

      トピックビューでトピック名をクリックすると、新しいサブスクリプションを表示したり、サブスクリプション ビューに変更したりできます。

この時点で、Pub/Sub 環境では、パブリッシュするアプリケーションとサブスクライブするアプリケーションの間のメッセージ フローを管理する準備が整います。

サービス アカウントの認証情報を作成する

  1. コンソールの [サービス アカウント] セクションに移動します。

    IAM サービス アカウントに移動

  2. プロジェクトを選択して [サービス アカウントの作成] をクリックします。

  3. サービス アカウント名(「pubsub-tutorial」など)を入力します。

  4. [作成] をクリック

  5. このチュートリアルでは、サービス アカウントにパブリッシュとサブスクライブを行う権限が必要です。プルダウンで [役割を選択] を選択し、[Pub/Sub パブリッシャー] の役割を追加します。

    文字列「pub」を使用して Pub/Sub の役割をフィルタしている [サービス アカウント権限] ダイアログ

  6. [別の役割を追加] をクリックして [Pub/Sub サブスクライバー] を追加します。

    [続行] ボタンをクリックする前の [サービス アカウント権限] ダイアログと Pub/Sub パブリッシャー、Pub/Sub サブスクライバー

  7. [続行] をクリックします。ユーザーにこのサービス アカウントへのアクセスを許可する必要はありません。

  8. [キーを作成] をクリックします。このキーはクライアント ライブラリが Pub/Sub API にアクセスする際に使用されます。

  9. [JSON] を選択し、[作成] をクリックします。

    キーはダウンロード フォルダに送信されます。このチュートリアルでは、この場所にキーを保持します。

  10. キーファイルの名前を ~/Downloads/key.json に変更します。

Cloud SDK のインストール

  1. Cloud SDK のインストールと初期化の手順に従って操作します。

    • Cloud SDK の初期化中にプロジェクト ID を入力するオプションを選択し、作成したプロジェクトの ID を入力するか、セットアップ セクションで ID を選択してください。

    • Cloud SDK をインストールして初期化したら、このチュートリアルに戻ることができます。他のコンポーネントのインストールや Cloud クライアント ライブラリのダウンロードは不要です。

    Cloud SDK をインストールした後は、gcloud コマンドライン ツールで Compute Engine の Pub/Sub オペレーションを実行できます。

  2. gcloud コマンドを使用する前に新しいターミナルを起動します。

    gcloud pubsub topics list
    gcloud pubsub subscriptions list
    

    gcloud config set project PROJECT_ID を使用すると、初期化中に設定したプロジェクトを他のプロジェクトに変更できます。

Python を取得して仮想環境を設定する

このチュートリアルは使用例を示すものであるため、仮想環境の設定セクションに示されている例に従う必要はありません。仮想環境をインストールしたら、このチュートリアルに戻ることができます。

パブリッシャーとサブスクライバーのコードを確認する

  1. このチュートリアルで必要な Pub/Sub Python ファイルを格納するプロジェクト フォルダを作成します。この場所に移動し、コードをダウンロードします。

     git clone https://github.com/googleapis/python-pubsub.git
    
  2. 次の手順に進む前に、開いているターミナルをすべて閉じます。

3 つのターミナルを設定する

  1. 各チュートリアル アプリケーション(1 つのパブリッシャーと 2 つのサブスクライバー)につき 1 つのターミナルを起動します。各ターミナルでこのセクションのすべての操作を行います。便宜上、これらのターミナルを次のように呼称します。

    • publisher ターミナル
    • sub_one ターミナル
    • sub_two ターミナル
  2. Python 仮想環境を作成して有効にします。

    最初のターミナルで次のコマンドを実行し、pyenv-qs という名前の仮想環境を作成して有効にします。

    Bash

    python -m venv pyenv-qs
    source pyenv-qs/bin/activate
    

    PowerShell

    py -m venv pyenv-qs
    .\pyenv-qs\Scripts\activate
    

    他の 2 つのターミナルで、次のコマンドを実行します。

    Bash

    source pyenv-qs/bin/activate
    

    PowerShell

    .\pyenv-qs\Scripts\activate
    

    activate コマンドを実行すると、コマンド プロンプトに (pyenv-qs) $ が表示されます。

    仮想環境で別のバージョンの Python を指定することもできます。

  3. pip を使用して Pub/Sub Python クライアント ライブラリをインストールします。

    pip install --upgrade google-cloud-pubsub

    JSON キーをサービス アカウントに関連付けます。主な Pub/Sub の役割は、サービス アカウントの認証情報の作成時に割り当てています。 Pub/Sub クライアント ライブラリは環境変数 GOOGLE_APPLICATION_CREDENTIALS にアクセスし、サービス アカウントに関連付けられたロールと権限が付与されます。

    Bash

    export GOOGLE_APPLICATION_CREDENTIALS=~/Downloads/key.json
    

    PowerShell

    $env:GOOGLE_APPLICATION_CREDENTIALS="$HOME\Downloads\key.json"
    
  4. 現在のプロジェクト ID で環境変数を設定します。この gcloud コマンドは、現在選択されているプロジェクト ID を判別し変数として設定します。

    Bash

    export PROJECT=gcloud config get-value project
    

    PowerShell

    $env:PROJECT=$(gcloud config get-value project)
    

    現在使用している GCP が次の変数として正しく登録されていることを確認します。

    Bash

    echo $PROJECT
    

    PowerShell

    $env:PROJECT
    
  5. プロジェクト フォルダに移動し、チュートリアルのサンプル フォルダに移動します。

    cd python-pubsub/samples/snippets/quickstart/
    

アプリケーションを起動してメッセージ フローを確認する

サブスクライバー 1 のアプリケーションを起動する

sub_one ターミナルでサブスクライバー 1 を起動します。

Bash

python sub.py $PROJECT sub_one

PowerShell

py sub.py $env:PROJECT sub_one

起動すると、このアプリケーションはサーバーとの双方向ストリーミング接続を開きます。Pub/Sub は、ストリーミングを介してメッセージを配信します。

サブスクライバー 1 アプリケーションは sub_one サブスクリプションでメッセージのリッスンを開始します。

パブリッシャー アプリケーションを起動する

publisher ターミナルでパブリッシャー アプリケーションを起動します。

Bash

python pub.py $PROJECT hello_topic

PowerShell

py pub.py $env:PROJECT hello_topic

パブリッシャー アプリケーションが起動すると、Pub/Sub システムが次の処理を行います。

  • パブリッシャー アプリケーションが「Hello, World!」というメッセージを Pub/Sub に送信します。既存のサブスクリプションにはこれは認識されません。サーバーはメッセージ ID も割り当てます。

  • サブスクライバー 1 アプリケーションが「Hello World」というメッセージを受信して出力し、確認応答を Pub/Sub に送信します。

  • パブリッシャー アプリケーションが確認応答を出力します。この確認応答により、メッセージの処理に成功したことと、このサブスクライバーや他の sub_one サブスクライバーに再送信する必要がないことが Pub/Sub に通知されます。

Pub/Sub によりメッセージが sub_one から削除されます。

パブリッシャー アプリケーションはメッセージをパブリッシュし、メッセージ ID を割り当てます。サブスクライバー 1 アプリケーションは「Hello World」メッセージを受信し、確認応答を送信します。

サブスクライバー 2 アプリケーションを起動する

sub_two ターミナルでサブスクライバー 2 を起動します。

Bash

python sub.py $PROJECT sub_two

PowerShell

py sub.py $env:PROJECT sub_two

このサブスクライバーは、sub_two サブスクリプションに配信されたメッセージを受信します。サブスクライバー 2 では sub.py スクリプトが再利用されます。ただし、サブスクライバー 2 はパブリッシャーがトピックとサブスクリプションにメッセージを送信するまで起動されません。パブリッシャーサブスクライバー 2 を直接呼び出した場合、パブリッシャー アプリケーションはサブスクライバー 2 が起動するまで待機するかタイムアウトする必要があります。Pub/Sub は、サブスクライバー 2 へのメッセージを効率的に保存して管理します。

サブスクライバー 2 がリッスンを開始し、リッスンのため待機していたメッセージを sub_two で受信します。

これで、Pub/Sub を使用して開発を行う準備が整いました。

いかがでしたか

Cloud Pub/Sub のサポートページにその他の参考資料やリンクがありますので、こちらもご利用ください。

クリーンアップ

  1. 実行中のすべてのアプリケーションを停止します。

  2. ローカル環境から ~/pubsub-quickstart ディレクトリを削除します。

  3. Google Cloud Console の [IAM と管理] セクションでチュートリアル プロジェクトをシャットダウンします。

  4. rm ~/Downloads/key.json でサービス アカウントの認証情報を削除します。

次のステップ

以下のことができます。

  • github でチュートリアルの pub.py コードと sub.py コードを確認し、Pub/Sub の他のサンプルを閲覧します。演習として、1 秒間隔で現地時間をパブリッシュするバージョンの pub.py を作成します。

  • メッセージのバッチ処理の方法を学びます。

  • push サブスクリプションを使用して、App Engine エンドポイントCloud Functions をトリガーするメッセージを受信します。

  • 再生を使用して、確認応答済みのメッセージを取得します。デフォルトでは、Pub/Sub はサブスクリプションから確認応答済みのメッセージを削除します。たとえば、このチュートリアルでは sub.py を再実行して、「Hello, World!」というメッセージを再度受信することはできません。再生機能を使用すると、メッセージが確認された後にメッセージを受信するようにサブスクリプションを設定できます。