オプションの Cloud Run 関数のトリガーを使用して Pub/Sub に変更をストリーミングする


このチュートリアルでは、Bigtable change streams to Pub/Sub テンプレートの使用方法(トピックやテンプレートの構成方法など)について説明します。イベント ストリームによってトリガーされる Cloud Run 関数を、任意のプログラミング言語で作成することもできます。

このチュートリアルは、Bigtable、コードの記述、イベント ストリーミング サービスに精通している技術ユーザーを対象としています。

目標

このチュートリアルでは、次の方法を説明します。

  • 変更ストリームを有効にして Bigtable テーブルを作成する。
  • Bigtable 変更ストリーム スキーマを使用して Pub/Sub トピックを作成する。
  • テンプレートを使用して、Dataflow の Pub/Sub パイプラインに Bigtable 変更ストリームをデプロイする。
  • Pub/Sub で直接、または Cloud Run 関数のログでイベント ストリームを表示する。

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, Pub/Sub, Cloud Run functions, and Cloud Storage APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, Pub/Sub, Cloud Run functions, and Cloud Storage APIs.

    Enable the APIs

  8. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  9. cbt CLI を更新してインストールします。
    gcloud components update
    gcloud components install cbt

Pub/Sub トピックの作成

  1. Google Cloud コンソールで、Pub/Sub の [トピック] ページに移動します。

    [トピック] に移動

  2. [トピックを作成] をクリックします。

  3. ID を bigtable-change-stream-topic に設定します。

  4. [スキーマを使用する] を選択します。

  5. [Pub/Sub スキーマを選択] プルダウンで、[新しいスキーマを作成] をクリックします。スキーマを定義する新しいタブが開きます。

    1. スキーマ ID を bigtable-change-stream-schema に設定します。
    2. スキーマタイプを Avro に設定します。
    3. スキーマ定義として次の内容を貼り付けます。 スキーマの詳細については、テンプレートのドキュメント ページをご覧ください。
      {
          "name" : "ChangelogEntryMessage",
          "type" : "record",
          "namespace" : "com.google.cloud.teleport.bigtable",
          "fields" : [
            { "name" : "rowKey", "type" : "bytes"},
            {
              "name" : "modType",
              "type" : {
                "name": "ModType",
                "type": "enum",
                "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]}
            },
            { "name": "isGC", "type": "boolean" },
            { "name": "tieBreaker", "type": "int"},
            { "name": "columnFamily", "type": "string"},
            { "name": "commitTimestamp", "type" : "long"},
            { "name" : "sourceInstance", "type" : "string"},
            { "name" : "sourceCluster", "type" : "string"},
            { "name" : "sourceTable", "type" : "string"},
            { "name": "column", "type" : ["null", "bytes"]},
            { "name": "timestamp", "type" : ["null", "long"]},
            { "name": "timestampFrom", "type" : ["null", "long"]},
            { "name": "timestampTo", "type" : ["null", "long"]},
            { "name" : "value", "type" : ["null", "bytes"]}
        ]
      }
    
    1. [作成] をクリックしてスキーマを作成します。
  6. [スキーマの作成] タブを閉じて、スキーマリストを更新し、新しく定義したスキーマを選択します。

  7. [作成] をクリックしてトピックを作成します。

省略可: Cloud Run 関数を作成する

Pub/Sub ストリームを Cloud Run 関数で処理することもできます。

  1. bigtable-change-stream-topic トピックの [詳細] ページで、[Cloud Function をトリガー] をクリックします。
  2. [関数名] フィールドに「bt-ps-tutorial-function」と入力します。
  3. [ソースコード] セクションで [ランタイム] プルダウンをクリックし、目的のランタイムとプログラミング言語を選択します。受信時に変更ストリームを出力する hello world が生成されます。Cloud Run 関数の作成方法については、こちらのドキュメントをご覧ください。
  4. これ以外の項目はすべてデフォルト値のままにします。
  5. [関数をデプロイ] をクリックします。

変更ストリームを有効にしてテーブルを作成する

  1. Google Cloud コンソールで、Bigtable の [インスタンス] ページに移動します。

    [インスタンス] に移動

  2. このチュートリアルで使用しているインスタンスの ID をクリックします。

    使用可能なインスタンスがない場合は、近くのリージョンにデフォルトの構成でインスタンスを作成します。

  3. 左側のナビゲーション パネルで [テーブル] をクリックします。

  4. [テーブルの作成] をクリックします。

  5. テーブルに change-streams-pubsub-tutorial という名前を付けます。

  6. cf という名前の列ファミリーを追加します。

  7. [Enable change stream] を選択します。

  8. [作成] をクリックします。

データ パイプラインを初期化して変更ストリームを取得する

  1. Bigtable の [テーブル] ページで、テーブル change-streams-pubsub-tutorial を見つけます。
  2. [変更ストリーム] 列で、[接続] をクリックします。
  3. ダイアログで [Pub/Sub] を選択します。
  4. [Dataflow ジョブを作成] をクリックします。
  5. Dataflow の [ジョブの作成] ページで、出力 Pub/Sub トピック名を bigtable-change-stream-topic に設定します。
  6. Bigtable アプリケーション プロファイル ID を default に設定します。
  7. [ジョブを実行] をクリックします。
  8. ジョブのステータスが「開始中」または「実行中」になったら処理を続行します。ジョブがキューに追加されてから約 5 分かかります。

Bigtable にデータを書き込む

  1. Cloud Shell で数行を Bigtable に書き込み、変更ログが一部のデータを Pub/Sub ストリームに書き込めるようにします。ジョブの作成後にデータを書き込んでいれば変更が表示されます。ジョブ ステータスが「running」になるのを待つ必要はありません。

    cbt -instance=BIGTABLE_INSTANCE_ID -project=YOUR_PROJECT_ID \
        set change-streams-pubsub-tutorial user123 cf:col1=abc
    cbt -instance=BIGTABLE_INSTANCE_ID -project=YOUR_PROJECT_ID \
        set change-streams-pubsub-tutorial user546 cf:col1=def
    cbt -instance=BIGTABLE_INSTANCE_ID -project=YOUR_PROJECT_ID \
        set change-streams-pubsub-tutorial user789 cf:col1=ghi
    

Pub/Sub で変更ログを表示する

  1. Google Cloud コンソールで、Pub/Sub の [サブスクリプション] ページに移動します。

    [サブスクリプション] に移動

  2. トピック bigtable-change-stream-topic に対して自動的に作成されたサブスクリプションをクリックします。bigtable-change-stream-topic-sub という名前になります。

  3. [メッセージ] タブに移動します。

  4. [PULL] をクリックします。

  5. メッセージのリストを探索し、書き込んだデータを表示します。

    Pub/Sub の変更ログ メッセージ

省略可: Cloud Run 関数のログで変更を確認する

Cloud Run 関数を作成した場合は、ログで変更を確認できます。

  1. Google Cloud コンソールで、[Cloud Run 関数] に移動します。

    Cloud Run 関数に移動します

  2. 関数 bt-ps-tutorial-function をクリックします。

  3. [ログ] タブに移動します。

  4. ログが表示されるように、[重大度] が「情報」以上に設定されていることを確認します。

  5. ログを調べて、書き込んだデータを表示します。

出力は次のようになります。

Pub/Sub message: {"rowKey":"user789","modType":"SET_CELL","isGC":false,"tieBreaker":0,"columnFamily":"cf","commitTimestamp":1695653833064548,"sourceInstance":"YOUR-INSTANCE","sourceCluster":"YOUR-INSTANCE-c1","sourceTable":"change-streams-pubsub-tutorial","column":{"bytes":"col1"},"timestamp":{"long":1695653832278000},"timestampFrom":null,"timestampTo":null,"value":{"bytes":"ghi"}}

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

Bigtable テーブルを削除する

  1. Google Cloud コンソールで、Bigtable の [インスタンス] ページに移動します。

    [インスタンス] に移動

  2. このチュートリアルで使用しているインスタンスの ID をクリックします。

  3. 左側のナビゲーション パネルで [テーブル] をクリックします。

  4. change-streams-pubsub-tutorial テーブルを見つけます。

  5. [編集] をクリックします。

  6. [変更ストリームを有効にする] をオフにします。

  7. [保存] をクリックします。

  8. テーブルのオーバーフロー メニューを開きます。

  9. [削除] をクリックし、テーブル名を入力して確定します。

変更ストリーム パイプラインを停止する

  1. Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。

    ジョブに移動

  2. ジョブリストからストリーミング ジョブを選択します。

  3. ナビゲーションで、[停止] をクリックします。

  4. [ジョブの停止] ダイアログでパイプラインをキャンセルし、[ジョブの停止] をクリックします。

Pub/Sub トピックとサブスクリプションを削除する

  1. Google Cloud コンソールで、Pub/Sub の [トピック] ページに移動します。

    [トピック] に移動

  2. bigtable-change-stream-topic」トピックを選択します。

  3. [削除] をクリックして、確定します。

  4. サイドバーの [サブスクリプション] をクリックします。

  5. bigtable-change-stream-topic-sub サブスクリプションを選択します。

  6. [削除] をクリックして、確定します。

Cloud Run 関数を削除する

  1. Google Cloud コンソールで、[Cloud Run 関数] に移動します。

    Cloud Run 関数に移動します

  2. bt-ps-tutorial-function 関数を選択します。

  3. [削除] をクリックして、確定します。

次のステップ