Dataflow SQL によるストリーミング データの結合

このチュートリアルでは、Dataflow SQL を使用して Pub/Sub のデータ ストリームを BigQuery テーブルのデータと結合する方法について説明します。

目標

このチュートリアルの内容は次のとおりです。

  • Pub/Sub ストリーミング データと BigQuery テーブルデータを結合する Dataflow SQL クエリを作成します。
  • Dataflow SQL UI から Dataflow ジョブをデプロイします。

費用

このチュートリアルでは、Google Cloud の課金対象となる以下のコンポーネントを使用します。

  • Dataflow
  • Cloud Storage
  • Pub/Sub

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを出すことができます。新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

  1. Google アカウントにログインします。

    Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。

  2. Cloud Console のプロジェクト セレクタページで、Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタのページに移動

  3. Google Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  4. Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, and Cloud Resource Manager API を有効にします。

    API を有効にする

  5. 認証の設定:
    1. Cloud Console で、[サービス アカウント キーの作成] ページに移動します。

      [サービス アカウント キーの作成] ページに移動
    2. [サービス アカウント] リストから [新しいサービス アカウント] を選択します。
    3. [サービス アカウント名] フィールドに名前を入力します。
    4. [ロール] リストで、[プロジェクト] > [オーナー] を選択します。

      : [ロール] フィールドの設定により、リソースにアクセスするサービス アカウントが承認されます。このフィールドは、Cloud Console を使用して後から表示および変更できます。本番環境アプリケーションを開発している場合は、[プロジェクト > オーナー] よりも詳細な権限を指定します。詳しくは、サービス アカウントへのロールの付与をご覧ください。
    5. [作成] をクリックします。キーが含まれている JSON ファイルがパソコンにダウンロードされます。
  6. 環境変数 GOOGLE_APPLICATION_CREDENTIALS を、サービス アカウント キーが含まれる JSON ファイルのパスに設定します。 この変数は現在のシェル セッションにのみ適用されるため、新しいセッションを開く場合は、変数を再度設定します。

  7. Cloud SDK をインストールし、初期化します。インストール オプションを、次の中から選択します。場合によっては、このチュートリアルで使用するプロジェクトに project プロパティを設定する必要があります。
  8. Cloud Console の BigQuery ウェブ UI に移動します。最後にアクセスしたプロジェクトが開きます。別のプロジェクトに切り替えるには、BigQuery ウェブ UI の先頭に表示されているプロジェクト名をクリックし、使用するプロジェクトを検索します。
    BigQuery ウェブ UI に移動

Dataflow SQL UI に切り替える

BigQuery ウェブ UI で、次の手順に沿って Dataflow UI に切り替えます。

  1. [展開] プルダウン メニューを開いて、[クエリの設定] を選択します。

  2. 右側に表示された [クエリの設定] メニューで、[Dataflow エンジン] を選択します。

  3. プロジェクトで Dataflow と Data Catalog API が有効になっていない場合、これらを有効にするように求められます。[API を有効にする] をクリックします。Dataflow API と Data Catalog API の有効化には数分かかる場合があります。

  4. API が有効になったら [保存] をクリックします。

サンプルソースを作成する

サンプルで使用するソースを作成します。このチュートリアルのサンプルでは、次のソースを使用します。

  • transactions という名前の Pub/Sub トピック - Pub/Sub トピックのサブスクリプションを介して送信されるトランザクション データのストリーム。各トランザクションのデータには、購入した商品、販売価格、購入場所などの情報が含まれています。Pub/Sub トピックを作成したら、トピックにメッセージを公開するスクリプトを作成します。このチュートリアルの後半で、このスクリプトを実行します。
  • us_state_salesregions という名前の BigQuery テーブル - 州と販売地域をマッピングするテーブル。このテーブルを作成する前に、BigQuery データセットを作成する必要があります。

Pub/Sub ソースを検索する

Dataflow SQL UI では、アクセス権を持つプロジェクトの Pub/Sub データソース オブジェクトを検索できます。フルネームを覚える必要がありません。

このチュートリアルの例では、作成した transactions Pub/Sub トピックを追加します。

  1. 左側のナビゲーション パネルで、[データを追加] プルダウン リストをクリックして [Cloud Dataflow のソース] を選択します。

  2. 右側に表示される [Cloud Dataflow ソースを追加] パネルで、[Pub/Sub トピック] を選択します。検索ボックスで、「transactions」を検索します。トピックを選択して [追加] をクリックします。

Pub/Sub トピックにスキーマを割り当てる

スキーマを割り当てると、Pub/Sub トピックデータに SQL クエリを実行できます。現在、Dataflow SQL では、Pub/Sub トピックのメッセージは JSON 形式でシリアル化されます。今後、他の形式(Avro など)のサポートが追加される予定です。

サンプルの Pub/Sub トピックDataflow ソースとして追加した後、次の手順を実行して Dataflow SQL UI のトピックにスキーマを割り当てます。

  1. [リソース] パネルでトピックを選択します。

  2. [スキーマ] タブで [スキーマを編集] をクリックします。右側に [スキーマ] サイドパネルが開きます。

  3. [テキストとして編集] ボタンを切り替え、次のインライン スキーマをエディタに貼り付けます。[送信] をクリックします。

    [
      {
          "description": "Pub/Sub event timestamp",
          "name": "event_timestamp",
          "mode": "REQUIRED",
          "type": "TIMESTAMP"
      },
      {
          "description": "Transaction time string",
          "name": "tr_time_str",
          "type": "STRING"
      },
      {
          "description": "First name",
          "name": "first_name",
          "type": "STRING"
      },
      {
          "description": "Last name",
          "name": "last_name",
          "type": "STRING"
      },
      {
          "description": "City",
          "name": "city",
          "type": "STRING"
      },
      {
          "description": "State",
          "name": "state",
          "type": "STRING"
      },
      {
          "description": "Product",
          "name": "product",
          "type": "STRING"
      },
      {
          "description": "Amount of transaction",
          "name": "amount",
          "type": "FLOAT64"
      }
    ]
    
  4. (省略可)[トピックをプレビュー] をクリックしてメッセージの内容を確認し、定義したスキーマと一致していることを確認します。

スキーマを表示する

  1. Dataflow SQL UI の左側のナビゲーション パネルで、[Cloud Dataflow のソース] をクリックします。
  2. [Pub/Sub トピック] をクリックします。
  3. [トランザクション] をクリックします。
  4. [スキーマ] の下に、transactions Pub/Sub トピックに割り当てられたスキーマが表示されます。
フィールド名のリストとその説明を含む、トピックに割り当てられたスキーマ。

SQL クエリを作成する

Dataflow SQL UI で、Dataflow ジョブに実行する SQL クエリを作成します。

次の SQL クエリはデータ拡充クエリです。州と販売地域をマッピングする BigQuery テーブル(us_state_salesregions)を使用して、sales_region フィールドを Pub/Sub イベント ストリーム(transactions)に新たに追加します。

次の SQL クエリをコピーして、クエリエディタに貼り付けます。project-id は実際のプロジェクト ID に置き換えます。

SELECT tr.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_dataset.us_state_salesregions AS sr
  ON tr.state = sr.state_code

Dataflow SQL UI でクエリを入力すると、クエリ検証ツールによりクエリ構文が検証されます。クエリが有効な場合、緑色のチェックマーク アイコンが表示されます。クエリが無効な場合は、赤色の感嘆符アイコンが表示されます。クエリ構文が無効な場合は、検証ツールアイコンをクリックすると、修正が必要な箇所に関する情報が表示されます。

次のスクリーンショットでは、クエリエディタに有効なクエリが入力されています。緑色のチェックマークが表示されています。

SQL クエリを実行する Dataflow ジョブを作成する

SQL クエリを実行するには、Dataflow SQL UI から Dataflow ジョブを作成します。

  1. クエリエディタで [Dataflow ジョブを作成] をクリックします。

  2. 右側に表示された [Dataflow ジョブを作成] パネルで、デフォルトのテーブル名dfsqltable_sales に変更します。

  3. (省略可)Dataflow では、Dataflow SQL ジョブに最適な設定が自動的に選択されますが、オプション パラメータのメニューを展開して、次のパイプライン オプションを手動で指定することもできます。

    • ワーカーの最大数
    • ゾーン
    • サービス アカウントのメール
    • マシンタイプ
    • 追加テスト
    • ワーカー IP アドレスの構成
    • ネットワーク
    • サブネットワーク
  4. [作成] をクリックします。数分後に Dataflow ジョブが開始します。

  5. UI に [クエリ結果] パネルが表示されます。後でジョブの [クエリ結果] パネルに戻るには、[ジョブの履歴] パネルでジョブを検索し、[クエリをエディタで開く] ボタンを使用します。詳しくは、Dataflow ジョブと出力を表示するをご覧ください。

  6. [ジョブ情報] でジョブ ID のリンクをクリックします。これで、Dataflow ウェブ UI で新しいブラウザタブが開き、Dataflow ジョブの詳細ページが表示されます。

Dataflow ジョブと出力を表示する

Dataflow は、SQL クエリを Apache Beam パイプラインに変換します。新しいブラウザタブに表示された Dataflow ウェブ UI で、パイプラインの詳細を確認できます。

システム レイテンシの同時スパイクが 30 秒未満であり、データの更新の遅延が 30 秒を超えることを示すジョブ パイプラインとサマリー グラフ。

ボックスをクリックすると、パイプラインで発生した変換の詳細が表示されます。たとえば、Run SQL Query というラベルのボックスをクリックすると、バックグラウンドで実行されたオペレーションが表示されます。

上の 2 つのボックスは、結合した 2 つの入力を表しています。1 つは Pub/Sub トピックの transactions で、もう 1 つは BigQuery テーブルの us_state_salesregions です。

2 つの入力の結合書き込みは 25 秒以内に完了します。

ジョブ結果が含まれる出力テーブルを表示するには、Dataflow SQL UI のブラウザタブに戻ります。左側のナビゲーション パネルで、プロジェクトの下に表示されている、作成した dataflow_sql_dataset データセットをクリックします。続いて、出力テーブルの dfsqltable_sales をクリックします。[プレビュー] タブに出力テーブルの内容が表示されます。

dfsqltable_sales のプレビュー テーブルには、tr_time_str、first_name、last_name、city、state、product、amount、sales_region の列があります。

以前に実行されたジョブを表示してクエリを編集する

Dataflow SQL UI では、以前に実行されたジョブとクエリを [ジョブの履歴] パネルで確認できます。ジョブは、ジョブを開始した日付順に表示されます。ジョブリストでは、実行中のジョブがある日付が先に表示され、その後に、実行中のジョブがない日付が続きます。

ジョブ履歴リストを使用して以前の SQL クエリを編集し、新しい Dataflow ジョブを実行できます。たとえば、販売地域別の販売数を 15 秒ごとに集計するようにクエリを変更できます。ジョブの履歴パネルを使用して、このチュートリアルで開始した実行中のジョブにアクセスし、SQL クエリを変更して別のジョブとして実行します。

  1. 左側のナビゲーション パネルで、[ジョブの履歴] をクリックします。

  2. [ジョブの履歴] で Cloud Dataflow をクリックします。プロジェクトで実行した以前のジョブがすべて表示されます。

    ジョブの履歴。ジョブが実行された日時とステータス アイコンが表示されます。
  3. 編集するジョブをクリックします。[クエリエディタで開く] をクリックします。

  4. [クエリエディタ] で SQL クエリを編集し、タンブリング ウィンドウを追加します。次のクエリをコピーする場合は、project-id を実際のプロジェクト ID に置き換えます。

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_dataset.us_state_salesregions AS sr
       ON tr.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
    
  5. クエリエディタで [Cloud Dataflow ジョブを作成] をクリックし、変更したクエリを使用して新しいジョブを作成します。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにする手順は次のとおりです。

  1. transactions_injector.py 公開スクリプトが実行されている場合は、それを停止します。

  2. 実行中の Dataflow ジョブを停止します。Cloud Console で Dataflow ウェブ UI に移動します。

    Dataflow ウェブ UI に移動

    このチュートリアルで作成したジョブごとに、次の操作を行います。

    1. ジョブの名前をクリックします。

    2. ジョブの [ジョブの概要] パネルで [ジョブを停止] をクリックします。[ジョブの停止] ダイアログが開き、ジョブの停止方法に関する選択肢が表示されます。

    3. [キャンセル] をクリックします。

    4. [ジョブを停止] をクリックします。サービスはすべてのデータの取り込みと処理をできる限り早く停止します。[キャンセル] は処理を即時に停止するため、処理中のデータが失われる可能性があります。ジョブの停止には数分かかることがあります。

  3. BigQuery データセットを削除します。Cloud Console の BigQuery ウェブ UI に移動します。

    BigQuery ウェブ UI に移動

    1. ナビゲーション パネルの [リソース] セクションで、作成した dataflow_sql_dataset データセットをクリックします。

    2. 詳細パネルの右側で [データセットを削除] をクリックします。この操作を行うと、データセット、テーブル、すべてのデータが削除されます。

    3. [データセットの削除] ダイアログ ボックスでデータセットの名前(dataflow_sql_dataset)を入力して、[削除] をクリックします。

  4. Pub/Sub トピックを削除します。Cloud Console の Pub/Sub トピックページに移動します。

    Pub/Sub トピックページに移動

    1. transactions トピックの横にあるチェックボックスにチェックを入れます。

    2. [削除] をクリックして、トピックを永続的に削除します。

    3. Pub/Sub サブスクリプション ページに移動します。

    4. transactions の残っているサブスクリプションの横にあるチェックボックスにチェックを入れます。実行中のジョブがない場合、サブスクリプションが表示されない可能性があります。

    5. [削除] をクリックして、サブスクリプションを永続的に削除します。

  5. Cloud Storage で Dataflow ステージング バケットを削除します。Cloud Console の Cloud Storage ブラウザに移動します。

    Cloud Storage ブラウザに移動

    1. Dataflow ステージング バケットの隣にあるチェックボックスをオンにします。

    2. [削除] をクリックして、バケットを永続的に削除します。

次のステップ