1 対多の Pub/Sub システムの構築

コレクションでコンテンツを整理 必要に応じて、コンテンツの保存と分類を行います。

はじめに

このチュートリアルでは、同期 RPC ではなく、Pub/Sub を介してメッセージを送信することによって通信する、一連の単純なアプリケーションを設定する手順について説明します。アプリケーションを分離することにより、メッセージングには以下のようなメリットがあります。

  • アプリケーションの堅牢性が強化されます。
  • 開発をシンプルにできる可能性があります。

たとえば、呼び出し元(パブリッシャー)が受信者(サブスクライバー)を起動して稼働状態にする必要がなくなります。Pub/Sub にメッセージを送信するだけです。パブリッシャーが、メッセージを受信する必要があるサブスクライバー アプリケーションの種類と数を把握する必要もありません。そのため 1 つ以上のサブスクライバー アプリケーションが稼働しているときは、サービスは常にサブスクライバー アプリケーションにメッセージを配信できます。

要件:

システムの概要

このチュートリアルでは、以下の図に示されているように、「Hello, World!」というメッセージを 1 対多の通信を使用して 2 つのサブスクライバーに送信するパブリッシャー アプリケーションを起動します。

トピック、トピックに関連付けられたサブスクリプション、Cloud Pub/Sub との間でメッセージを送受信するパブリッシャー アプリケーションとサブスクライバー アプリケーションの図

2 つのサブスクライバー アプリケーションには同一のコードを使用しますが、起動するタイミングは異なります。このプロセスでは、Pub/Sub で非同期通信を有効にできます。このシステムを構築する手順は以下のとおりです。

  1. 必要な Pub/Sub トピックとサブスクリプションを作成します。
  2. アプリケーションの認証に使用するサービス アカウントを作成します。
  3. IAM 権限を設定します。
  4. 1 つのパブリッシャーと 2 つのサブスクライバーの合計 3 つの独立したアプリケーションを起動させます。

チュートリアルのセットアップ

Google Cloud プロジェクト、Pub/Sub トピック、サブスクリプションを設定する

  1. Google Cloud Console にログインします。

    Google Cloud Console に移動

    Cloud を初めて使用する場合は、[有効化] をクリックしてから指示に従って Cloud アカウントを設定します。

  2. 既存のプロジェクトを選択するか、プロジェクトを新規作成します。Google Cloud を初めて使用する場合はデフォルトのプロジェクトが作成されます。

    Google Cloud コンソールの [ホーム] セクションで、プロジェクト ID をメモします。この値は、gcloud CLI の初期化プロセスで現在の Cloud Storage プロジェクトを設定するときに使用します。また、パブリッシャー アプリケーションとサブスクライバー アプリケーションを起動するときにはこの ID を Python スクリプトに渡します。

  3. Google Cloud Console の [Pub/Sub] セクションに移動します。

    [Pub/Sub] セクションに移動

    指示に従って API を有効にします。

  4. [トピックを作成] をクリックします。パブリッシャー アプリケーションがトピックにメッセージを送信します。hello_topic という名前を使用します。

  5. [トピックの詳細]ページで、[登録を作成]をクリックします。

    1. サブスクリプションに sub_one と名前を付けます。いかなるデフォルト設定も変更しないでください。ここでは pull サブスクリプションの種類である StreamingPull サブスクリプションを作成します。

    2. 同じ手順で sub_two という名前のサブスクリプションを作成し、hello_topic に関連付けます。

      トピックビューでトピック名をクリックすると、新しいサブスクリプションを表示したり、サブスクリプション ビューに変更したりできます。

この時点で、Pub/Sub 環境では、パブリッシュするアプリケーションとサブスクライブするアプリケーションの間のメッセージ フローを管理する準備が整います。

サービス アカウントの認証情報を作成する

次の手順は、サービス アカウントを使用している場合にのみ行ってください。

  1. サービス アカウントの作成:

    1. Google Cloud コンソールで、[サービス アカウント] ページに移動します。

      [サービス アカウント] に移動

    2. プロジェクトを選択します。

    3. [サービス アカウントを作成] をクリックします。

    4. [サービス アカウント名] フィールドに名前を入力します(例: pubsub-tutorial)。Google Cloud コンソールは、この名前に基づいて [サービス アカウント ID] フィールドに入力します。

    5. 省略可: [サービス アカウントの説明] 欄に、サービス アカウントの説明を入力します。

    6. [作成して続行] をクリックします。

    7. このチュートリアルでは、サービス アカウントにパブリッシュとサブスクライブを行う権限が必要です。プルダウンで [役割を選択] を選択し、[Pub/Sub パブリッシャー] の役割を追加します。

      文字列「pub」を使用して Pub/Sub の役割をフィルタしている [サービス アカウント権限] ダイアログ

    8. [別の役割を追加] をクリックして [Pub/Sub サブスクライバー] を追加します。

      [続行] ボタンをクリックする前の [サービス アカウント権限] ダイアログと Pub/Sub パブリッシャー、Pub/Sub サブスクライバー

    9. [完了] をクリックして、サービス アカウントの作成を完了します。

      ブラウザ ウィンドウは閉じないでください。次のステップでこれを使用します。

  2. 作成したサービス アカウントの JSON キーをダウンロードします。クライアント ライブラリはそのキーを使用して Pub/Sub API にアクセスします。

    1. Google Cloud コンソールで、作成したサービス アカウントのメールアドレスをクリックします。
    2. [キー] をクリックします。
    3. [鍵を追加]、[新しい鍵を作成] の順にクリックします。
    4. [作成] をクリックします。JSON キーファイルがパソコンにダウンロードされます。

      キーはダウンロード フォルダに送信されます。このチュートリアルでは、この場所にキーを保持します。

    5. キーファイルの名前を ~/Downloads/key.json に変更します。

    6. [閉じる] をクリックします。

サービス アカウントに依存しない認証情報を提供するその他の方法については、アプリケーションのデフォルト認証情報の認証情報を提供するをご覧ください。

Google アカウントにロールを付与するには、適用する IAM のロールごとに次のコマンドを 1 回実行します。

gcloud projects add-iam-policy-binding PROJECT_ID --member="user:EMAIL_ADDRESS" --role=ROLE
  • PROJECT_ID は、実際のプロジェクト ID に置き換えます。
  • EMAIL_ADDRESS は実際のメールアドレスに置き換えます。
  • ROLE は、個々のロールに置き換えます。

Google Cloud CLI をインストールする

  1. Google Cloud CLI のインストールと初期化の手順に沿って操作します。

    • gcloud CLI の初期化中にプロジェクト ID を入力するオプションを選択し、作成したプロジェクトの ID を入力するか、セットアップ セクションで ID を選択してください。

    • gcloud CLI をインストールして初期化したら、このチュートリアルに戻ることができます。他のコンポーネントのインストールや Cloud クライアント ライブラリのダウンロードは不要です。

    gcloud CLI をインストールした後は、Google Cloud CLI を使用して Compute Engine で Pub/Sub オペレーションを実行できます。

  2. gcloud コマンドを使用する前に新しいターミナルを起動します。

    gcloud pubsub topics list
    gcloud pubsub subscriptions list
    

    gcloud config set project PROJECT_ID を使用すると、初期化中に設定したプロジェクトを他のプロジェクトに変更できます。

Python を取得して仮想環境を設定する

このチュートリアルは使用例を示すものであるため、仮想環境の設定セクションに示されている例に従う必要はありません。仮想環境をインストールしたら、このチュートリアルに戻ることができます。

パブリッシャーとサブスクライバーのコードを確認する

  1. このチュートリアルで必要な Pub/Sub Python ファイルを格納するプロジェクト フォルダを作成します。この場所に移動し、コードをダウンロードします。

     git clone https://github.com/googleapis/python-pubsub.git
    
  2. 次の手順に進む前に、開いているターミナルをすべて閉じます。

3 つのターミナルを設定する

  1. 各チュートリアル アプリケーション(1 つのパブリッシャーと 2 つのサブスクライバー)につき 1 つのターミナルを起動します。各ターミナルでこのセクションのすべての操作を行います。便宜上、これらのターミナルを次のように呼称します。

    • publisher ターミナル
    • sub_one ターミナル
    • sub_two ターミナル
  2. Python 仮想環境を作成して有効にします。

    最初のターミナルで次のコマンドを実行し、pyenv-qs という名前の仮想環境を作成して有効にします。

    Bash

    python -m venv pyenv-qs
    source pyenv-qs/bin/activate
    

    PowerShell

    py -m venv pyenv-qs
    .\pyenv-qs\Scripts\activate
    

    他の 2 つのターミナルで、次のコマンドを実行します。

    Bash

    source pyenv-qs/bin/activate
    

    PowerShell

    .\pyenv-qs\Scripts\activate
    

    activate コマンドを実行すると、コマンド プロンプトに (pyenv-qs) $ が表示されます。

    仮想環境で別のバージョンの Python を指定することもできます。

  3. pip を使用して、Pub/Sub Python クライアント ライブラリをインストールします。

    python -m pip install --user --upgrade google-cloud-pubsub

    JSON キーをサービス アカウントに関連付けます。主な Pub/Sub の役割は、サービス アカウントの認証情報の作成時に割り当てています。 Pub/Sub クライアント ライブラリは環境変数 GOOGLE_APPLICATION_CREDENTIALS にアクセスし、サービス アカウントに関連付けられたロールと権限が付与されます。

    Bash

    export GOOGLE_APPLICATION_CREDENTIALS=~/Downloads/key.json
    

    PowerShell

    $env:GOOGLE_APPLICATION_CREDENTIALS="$HOME\Downloads\key.json"
    
  4. 現在のプロジェクト ID で環境変数を設定します。この gcloud コマンドは、現在選択されているプロジェクト ID を判別し変数として設定します。

    Bash

    export PROJECT=`gcloud config get-value project`
    

    PowerShell

    $env:PROJECT=$(gcloud config get-value project)
    

    現在使用している Google Cloud が次の変数として正しく登録されていることを確認します。

    Bash

    echo $PROJECT
    

    PowerShell

    $env:PROJECT
    
  5. プロジェクト フォルダに移動し、チュートリアルのサンプル フォルダに移動します。

    cd python-pubsub/samples/snippets/quickstart/
    

アプリケーションを起動してメッセージ フローを確認する

サブスクライバー 1 のアプリケーションを起動する

sub_one ターミナルでサブスクライバー 1 を起動します。

Bash

python sub.py $PROJECT sub_one

PowerShell

py sub.py $env:PROJECT sub_one

起動すると、このアプリケーションはサーバーとの双方向ストリーミング接続を確立します。Pub/Sub は、その接続を介してメッセージを配信します。

サブスクライバー 1 アプリケーションは sub_one サブスクリプションでメッセージのリッスンを開始します。

パブリッシャー アプリケーションを起動する

publisher ターミナルでパブリッシャー アプリケーションを起動します。

Bash

python pub.py $PROJECT hello_topic

PowerShell

py pub.py $env:PROJECT hello_topic

パブリッシャー アプリケーションが起動すると、Pub/Sub システムが次の処理を行います。

  • パブリッシャー アプリケーションが「Hello, World!」というメッセージを Pub/Sub に送信します。既存のサブスクリプションにはこれは認識されません。このサーバーはメッセージ ID の割り当ても行います。

  • サブスクライバー 1 アプリケーションが「Hello World」というメッセージを受信して出力し、確認応答を Pub/Sub に送信します。

  • パブリッシャー アプリケーションが確認応答を出力します。この確認応答により、メッセージの処理に成功したことと、このサブスクライバーや他の sub_one サブスクライバーに再送信する必要がないことが Pub/Sub に通知されます。

Pub/Sub によりメッセージが sub_one から削除されます。

パブリッシャー アプリケーションはメッセージをパブリッシュし、メッセージ ID を割り当てます。サブスクライバー 1 アプリケーションは「Hello World」メッセージを受信し、確認応答を送信します。

サブスクライバー 2 アプリケーションを起動する

sub_two ターミナルでサブスクライバー 2 を起動します。

Bash

python sub.py $PROJECT sub_two

PowerShell

py sub.py $env:PROJECT sub_two

このサブスクライバーは、sub_two サブスクリプションに配信されたメッセージを受信します。サブスクライバー 2 では sub.py スクリプトが再利用されます。ただし、サブスクライバー 2 はパブリッシャーがトピックとサブスクリプションにメッセージを送信するまで起動されません。パブリッシャーサブスクライバー 2 を直接呼び出した場合、パブリッシャー アプリケーションはサブスクライバー 2 が起動するまで待機するかタイムアウトする必要があります。Pub/Sub は、サブスクライバー 2 へのメッセージを効率的に保存して管理します。

サブスクライバー 2 がリッスンを開始し、リッスンのため待機していたメッセージを sub_two で受信します。

これで、Pub/Sub を使用して開発を行う準備が整いました。

いかがでしたか

Cloud Pub/Sub のサポートページにその他の参考資料やリンクがありますので、こちらもご利用ください。

クリーンアップ

  1. 実行中のすべてのアプリケーションを停止します。

  2. ローカル環境から ~/pubsub-quickstart ディレクトリを削除します。

  3. Google Cloud Console の [IAM と管理] セクションでチュートリアル プロジェクトを終了します。

  4. rm ~/Downloads/key.json でサービス アカウントの認証情報を削除します。

次のステップ

以下のことができます。

  • github でチュートリアルの pub.py コードと sub.py コードを確認し、Pub/Sub の他のサンプルを閲覧できます。演習として、1 秒間隔で現地時間をパブリッシュするバージョンの pub.py を作成します。

  • メッセージのバッチ処理の方法を学びます。

  • push サブスクリプションを使用して、App Engine エンドポイントCloud Functions をトリガーするメッセージを受信します。

  • 再生を使用して、確認応答済みのメッセージを取得します。デフォルトでは、Pub/Sub はサブスクリプションから確認応答済みのメッセージを削除します。たとえば、このチュートリアルでは sub.py を再実行して、「Hello, World!」というメッセージを再度受信することはできません。再生機能を使用すると、メッセージが確認応答された後、そのメッセージを受信するようにサブスクリプションを設定できます。

  • 他の言語のクライアント ライブラリを使ってみる。