オプションの Cloud Functions の関数のトリガーを使用して Pub/Sub に変更をストリーミングする


このチュートリアルでは、Bigtable change streams to Pub/Sub テンプレートの使用方法(トピックやテンプレートの構成方法など)について説明します。イベント ストリームによってトリガーされる Cloud Functions の関数を、任意のプログラミング言語で作成することもできます。

このチュートリアルは、Bigtable、コードの記述、イベント ストリーミング サービスに精通している技術ユーザーを対象としています。

目標

このチュートリアルでは、次の方法を説明します。

  • 変更ストリームを有効にして Bigtable テーブルを作成する。
  • Bigtable 変更ストリーム スキーマを使用して Pub/Sub トピックを作成する。
  • テンプレートを使用して、Dataflow の Pub/Sub パイプラインに Bigtable 変更ストリームをデプロイする。
  • Pub/Sub で直接、または Cloud Functions の関数のログでイベント ストリームを表示する。

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

このドキュメントに記載されているタスクの完了後、作成したリソースを削除すると、それ以上の請求は発生しません。詳細については、クリーンアップをご覧ください。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Google Cloud プロジェクトで課金が有効になっていることを確認します

  4. Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, Pub/Sub, Cloud Functions, and Cloud Storage API を有効にします。

    API を有効にする

  5. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  6. Google Cloud プロジェクトで課金が有効になっていることを確認します

  7. Dataflow, Cloud Bigtable API, Cloud Bigtable Admin API, Pub/Sub, Cloud Functions, and Cloud Storage API を有効にします。

    API を有効にする

  8. Google Cloud コンソールで、「Cloud Shell をアクティブにする」をクリックします。

    Cloud Shell をアクティブにする

    Google Cloud コンソールの下部で Cloud Shell セッションが開始し、コマンドライン プロンプトが表示されます。Cloud Shell はシェル環境です。Google Cloud CLI がすでにインストールされており、現在のプロジェクトの値もすでに設定されています。セッションが初期化されるまで数秒かかることがあります。

  9. cbt CLI を更新してインストールします。
    gcloud components update
    gcloud components install cbt
    

Pub/Sub トピックを作成する

  1. Google Cloud コンソールで、Pub/Sub の [トピック] ページに移動します。

    [トピック] に移動

  2. [トピックを作成] をクリックします。

  3. ID を bigtable-change-stream-topic に設定します。

  4. [スキーマを使用する] を選択します。

  5. [Pub/Sub スキーマを選択] プルダウンで、[新しいスキーマを作成] をクリックします。スキーマを定義する新しいタブが開きます。

    1. スキーマ ID を bigtable-change-stream-schema に設定します。
    2. スキーマタイプを Avro に設定します。
    3. スキーマ定義として次の内容を貼り付けます。 スキーマの詳細については、テンプレートのドキュメント ページをご覧ください。
      {
          "name" : "ChangelogEntryMessage",
          "type" : "record",
          "namespace" : "com.google.cloud.teleport.bigtable",
          "fields" : [
            { "name" : "rowKey", "type" : "bytes"},
            {
              "name" : "modType",
              "type" : {
                "name": "ModType",
                "type": "enum",
                "symbols": ["SET_CELL", "DELETE_FAMILY", "DELETE_CELLS", "UNKNOWN"]}
            },
            { "name": "isGC", "type": "boolean" },
            { "name": "tieBreaker", "type": "int"},
            { "name": "columnFamily", "type": "string"},
            { "name": "commitTimestamp", "type" : "long"},
            { "name" : "sourceInstance", "type" : "string"},
            { "name" : "sourceCluster", "type" : "string"},
            { "name" : "sourceTable", "type" : "string"},
            { "name": "column", "type" : ["null", "bytes"]},
            { "name": "timestamp", "type" : ["null", "long"]},
            { "name": "timestampFrom", "type" : ["null", "long"]},
            { "name": "timestampTo", "type" : ["null", "long"]},
            { "name" : "value", "type" : ["null", "bytes"]}
        ]
      }
    
    1. [作成] をクリックしてスキーマを作成します。
  6. [スキーマの作成] タブを閉じて、スキーマリストを更新し、新しく定義したスキーマを選択します。

  7. [作成] をクリックしてトピックを作成します。

省略可: Cloud Functions の関数を作成する

Pub/Sub ストリームを Cloud Functions の関数で処理することもできます。

  1. bigtable-change-stream-topic トピックの [詳細] ページで、[Cloud Function をトリガー] をクリックします。
  2. [関数名] フィールドに「bt-ps-tutorial-function」と入力します。
  3. [ソースコード] セクションで [ランタイム] プルダウンをクリックし、目的のランタイムとプログラミング言語を選択します。受信時に変更ストリームを出力する hello world が生成されます。Cloud Functions の関数の作成方法については、こちらのドキュメントをご覧ください。
  4. これ以外の項目はすべてデフォルト値のままにします。
  5. [関数をデプロイ] をクリックします。

変更ストリームを有効にしてテーブルを作成する

  1. Google Cloud コンソールで、Bigtable の [インスタンス] ページに移動します。

    [インスタンス] に移動

  2. このチュートリアルで使用しているインスタンスの ID をクリックします。

    使用可能なインスタンスがない場合は、近くのリージョンにデフォルトの構成でインスタンスを作成します。

  3. 左側のナビゲーション パネルで [テーブル] をクリックします。

  4. [テーブルの作成] をクリックします。

  5. テーブルに change-streams-pubsub-tutorial という名前を付けます。

  6. cf という名前の列ファミリーを追加します。

  7. [変更ストリームを有効にする] を選択します。

  8. [作成] をクリックします。

データ パイプラインを初期化して変更ストリームを取得する

  1. Bigtable の [テーブル] ページで、テーブル change-streams-pubsub-tutorial を見つけます。
  2. [変更ストリーム] 列で、[接続] をクリックします。
  3. ダイアログで [Pub/Sub] を選択します。
  4. [Dataflow ジョブを作成] をクリックします。
  5. Dataflow の [ジョブの作成] ページで、出力 Pub/Sub トピック名を bigtable-change-stream-topic に設定します。
  6. Bigtable アプリケーション プロファイル ID を default に設定します。
  7. [ジョブを実行] をクリックします。
  8. ジョブのステータスが「開始中」または「実行中」になったら処理を続行します。ジョブがキューに追加されてから約 5 分かかります。

Bigtable にデータを書き込む

  1. Cloud Shell で数行を Bigtable に書き込み、変更ログが一部のデータを Pub/Sub ストリームに書き込めるようにします。ジョブの作成後にデータを書き込んでいれば変更が表示されます。ジョブ ステータスが「running」になるのを待つ必要はありません。

    cbt -instance=BIGTABLE_INSTANCE_ID -project=YOUR_PROJECT_ID \
        set change-streams-pubsub-tutorial user123 cf:col1=abc
    cbt -instance=BIGTABLE_INSTANCE_ID -project=YOUR_PROJECT_ID \
        set change-streams-pubsub-tutorial user546 cf:col1=def
    cbt -instance=BIGTABLE_INSTANCE_ID -project=YOUR_PROJECT_ID \
        set change-streams-pubsub-tutorial user789 cf:col1=ghi
    

Pub/Sub で変更ログを表示する

  1. Google Cloud コンソールで、Pub/Sub の [サブスクリプション] ページに移動します。

    [サブスクリプション] に移動

  2. トピック bigtable-change-stream-topic に対して自動的に作成されたサブスクリプションをクリックします。bigtable-change-stream-topic-sub という名前になります。

  3. [メッセージ] タブに移動します。

  4. [Pull] をクリックします。

  5. メッセージのリストを探索し、書き込んだデータを表示します。

    Pub/Sub の変更ログ メッセージ

省略可: Cloud Functions のログで変更を確認する

Cloud Functions の関数を作成した場合は、ログで変更を確認できます。

  1. Google Cloud コンソールで、[Cloud Functions] に移動します。

    Cloud Functions に移動

  2. 関数 bt-ps-tutorial-function をクリックします。

  3. [ログ] タブに移動します。

  4. ログが表示されるように、[重大度] が「情報」以上に設定されていることを確認します。

  5. ログを調べて、書き込んだデータを表示します。

出力は次のようになります。

Pub/Sub message: {"rowKey":"user789","modType":"SET_CELL","isGC":false,"tieBreaker":0,"columnFamily":"cf","commitTimestamp":1695653833064548,"sourceInstance":"YOUR-INSTANCE","sourceCluster":"YOUR-INSTANCE-c1","sourceTable":"change-streams-pubsub-tutorial","column":{"bytes":"col1"},"timestamp":{"long":1695653832278000},"timestampFrom":null,"timestampTo":null,"value":{"bytes":"ghi"}}

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、リソースを含むプロジェクトを削除するか、プロジェクトを維持して個々のリソースを削除します。

Bigtable テーブルを削除する

  1. Google Cloud コンソールで、Bigtable の [インスタンス] ページに移動します。

    [インスタンス] に移動

  2. このチュートリアルで使用しているインスタンスの ID をクリックします。

  3. 左側のナビゲーション パネルで [テーブル] をクリックします。

  4. change-streams-pubsub-tutorial テーブルを見つけます。

  5. [編集] をクリックします。

  6. [変更ストリームを有効にする] をオフにします。

  7. [保存] をクリックします。

  8. テーブルのオーバーフロー メニューを開きます。

  9. [削除] をクリックし、テーブル名を入力して確定します。

変更ストリーム パイプラインを停止する

  1. Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。

    [ジョブ] に移動

  2. ジョブリストからストリーミング ジョブを選択します。

  3. ナビゲーションで、[停止] をクリックします。

  4. [ジョブの停止] ダイアログでパイプラインをキャンセルし、[ジョブの停止] をクリックします。

Pub/Sub トピックとサブスクリプションを削除する

  1. Google Cloud コンソールで、Pub/Sub の [トピック] ページに移動します。

    [トピック] に移動

  2. bigtable-change-stream-topic」トピックを選択します。

  3. [削除] をクリックして、確定します。

  4. サイドバーの [サブスクリプション] をクリックします。

  5. bigtable-change-stream-topic-sub サブスクリプションを選択します。

  6. [削除] をクリックして、確定します。

Cloud Functions の関数を削除する

  1. Google Cloud コンソールで、[Cloud Functions] に移動します。

    Cloud Functions に移動

  2. bt-ps-tutorial-function 関数を選択します。

  3. [削除] をクリックして、確定します。

次のステップ