Dataflow SQL によるストリーミング データの結合

このチュートリアルでは、Dataflow SQL を使用して Pub/Sub のデータ ストリームを BigQuery テーブルのデータと結合する方法について説明します。

目標

このチュートリアルの内容は次のとおりです。

  • Pub/Sub ストリーミング データと BigQuery テーブルデータを結合する Dataflow SQL クエリを作成します。
  • Dataflow SQL UI から Dataflow ジョブをデプロイします。

費用

このチュートリアルでは、課金対象である次の Google Cloud コンポーネントを使用します。

  • Dataflow
  • Cloud Storage
  • Pub/Sub
  • Data Catalog

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  4. Cloud Dataflow、Compute Engine、Logging、Cloud Storage、Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Resource Manager、Data Catalog。 API を有効にします。

    API を有効にする

  5. サービス アカウントを作成します。

    1. Cloud Console で [サービス アカウントの作成] ページに移動します。

      [サービス アカウントの作成] に移動
    2. プロジェクトを選択します。
    3. [サービス アカウント名] フィールドに名前を入力します。Cloud Console は、この名前に基づいて [サービス アカウント ID] フィールドに入力します。

      [サービス アカウントの説明] フィールドに説明を入力します。例: Service account for quickstart

    4. [作成して続行] をクリックします。
    5. [ロールを選択] フィールドをクリックします。

      [クイック アクセス] で [基本]、[オーナー] の順にクリックします。

    6. [続行] をクリックします。
    7. [完了] をクリックして、サービス アカウントの作成を完了します。

      ブラウザ ウィンドウは閉じないでください。次のステップでこれを使用します。

  6. サービス アカウント キーを作成します。

    1. Cloud Console で、作成したサービス アカウントのメールアドレスをクリックします。
    2. [キー] をクリックします。
    3. [鍵を追加]、[新しい鍵を作成] の順にクリックします。
    4. [作成] をクリックします。JSON キーファイルがパソコンにダウンロードされます。
    5. [閉じる] をクリックします。
  7. 環境変数 GOOGLE_APPLICATION_CREDENTIALS を、サービス アカウント キーが含まれる JSON ファイルのパスに設定します。 この変数は現在のシェル セッションにのみ適用されるため、新しいセッションを開く場合は、変数を再度設定します。

  8. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  9. Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  10. Cloud Dataflow、Compute Engine、Logging、Cloud Storage、Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Resource Manager、Data Catalog。 API を有効にします。

    API を有効にする

  11. サービス アカウントを作成します。

    1. Cloud Console で [サービス アカウントの作成] ページに移動します。

      [サービス アカウントの作成] に移動
    2. プロジェクトを選択します。
    3. [サービス アカウント名] フィールドに名前を入力します。Cloud Console は、この名前に基づいて [サービス アカウント ID] フィールドに入力します。

      [サービス アカウントの説明] フィールドに説明を入力します。例: Service account for quickstart

    4. [作成して続行] をクリックします。
    5. [ロールを選択] フィールドをクリックします。

      [クイック アクセス] で [基本]、[オーナー] の順にクリックします。

    6. [続行] をクリックします。
    7. [完了] をクリックして、サービス アカウントの作成を完了します。

      ブラウザ ウィンドウは閉じないでください。次のステップでこれを使用します。

  12. サービス アカウント キーを作成します。

    1. Cloud Console で、作成したサービス アカウントのメールアドレスをクリックします。
    2. [キー] をクリックします。
    3. [鍵を追加]、[新しい鍵を作成] の順にクリックします。
    4. [作成] をクリックします。JSON キーファイルがパソコンにダウンロードされます。
    5. [閉じる] をクリックします。
  13. 環境変数 GOOGLE_APPLICATION_CREDENTIALS を、サービス アカウント キーが含まれる JSON ファイルのパスに設定します。 この変数は現在のシェル セッションにのみ適用されるため、新しいセッションを開く場合は、変数を再度設定します。

  14. Cloud SDK をインストールし、初期化します。インストール オプションを、次の中から選択します。場合によっては、このチュートリアルで使用するプロジェクトに project プロパティを設定する必要があります。
  15. Cloud Console で Dataflow SQL ウェブ UI に移動します。最後にアクセスしたプロジェクトが開きます。別のプロジェクトに切り替えるには、Dataflow SQL ウェブ UI の先頭に表示されているプロジェクト名をクリックし、使用するプロジェクトを検索します。
    Dataflow SQL ウェブ UI に移動

サンプルソースを作成する

サンプルで使用するソースを作成します。このチュートリアルのサンプルでは、次のソースを使用します。

  • transactions という名前の Pub/Sub トピック - Pub/Sub トピックのサブスクリプションを介して送信されるトランザクション データのストリーム。各トランザクションのデータには、購入した商品、販売価格、購入場所などの情報が含まれています。Pub/Sub トピックを作成したら、トピックにメッセージを公開するスクリプトを作成します。このチュートリアルの後半で、このスクリプトを実行します。
  • us_state_salesregions という名前の BigQuery テーブル - 州と販売地域をマッピングするテーブル。このテーブルを作成する前に、BigQuery データセットを作成する必要があります。

Pub/Sub トピックにスキーマを割り当てる

スキーマを割り当てると、Pub/Sub トピックデータに SQL クエリを実行できます。現在、Dataflow SQL では、Pub/Sub トピックのメッセージは JSON 形式でシリアル化されます。

Pub/Sub トピックのサンプル transactions にスキーマを割り当てるには:

  1. テキスト ファイルを作成し、名前を transactions_schema.yaml にします。次のスキーマ テキストをコピーして transactions_schema.yaml に貼り付けます。
  - column: event_timestamp
    description: Pub/Sub event timestamp
    mode: REQUIRED
    type: TIMESTAMP
  - column: tr_time_str
    description: Transaction time string
    mode: NULLABLE
    type: STRING
  - column: first_name
    description: First name
    mode: NULLABLE
    type: STRING
  - column: last_name
    description: Last name
    mode: NULLABLE
    type: STRING
  - column: city
    description: City
    mode: NULLABLE
    type: STRING
  - column: state
    description: State
    mode: NULLABLE
    type: STRING
  - column: product
    description: Product
    mode: NULLABLE
    type: STRING
  - column: amount
    description: Amount of transaction
    mode: NULLABLE
    type: FLOAT
  1. gcloud コマンドライン ツールを使用してスキーマを割り当てます。

    a. 次のコマンドgcloud ツールを更新します。gcloud ツールのバージョンは 242.0.0 以降にする必要があります。

      gcloud components update
    

    b. コマンドライン ウィンドウで次のコマンドを実行します。project-id はプロジェクト ID に置き換え、path-to-filetransactions_schema.yaml ファイルのパスに置き換えます。

      gcloud data-catalog entries update \
        --lookup-entry='pubsub.topic.`project-id`.transactions' \
        --schema-from-file=path-to-file/transactions_schema.yaml
    

    コマンドのパラメータと許可されるスキーマ ファイルの形式については、gcloud data-catalog entries update のドキュメント ページをご覧ください。

    c. transactions Pub/Sub トピックにスキーマが正常に割り当てられたことを確認します。project-id は、実際のプロジェクト ID に置き換えます。

      gcloud data-catalog entries lookup 'pubsub.topic.`project-id`.transactions'
    

Pub/Sub ソースを検索する

Dataflow SQL UI では、アクセス権を持つプロジェクトの Pub/Sub データソース オブジェクトを検索できます。フルネームを覚える必要がありません。

このチュートリアルの例では、作成した transactions Pub/Sub トピックを検索します。

  1. 左側のパネルで projectid=project-id transactions を検索します。project-id は、実際のプロジェクト ID に置き換えます。

    Dataflow SQL ワークスペースの Data Catalog 検索パネル。

スキーマを表示する

  1. Dataflow SQL UI の左側のエクスプローラ パネルで、[トランザクション] をクリックするか、「projectid=project-id system=cloud_pubsub」と入力して Pub/Sub トピックを検索し、トピックを選択します。
  2. [スキーマ] の下に、Pub/Sub トピックに割り当てられたスキーマが表示されます。

    フィールド名のリストとその説明を含む、トピックに割り当てられたスキーマ。

SQL クエリを作成する

Dataflow SQL UI で、Dataflow ジョブに実行する SQL クエリを作成します。

次の SQL クエリはデータ拡充クエリです。州と販売地域をマッピングする BigQuery テーブル(us_state_salesregions)を使用して、sales_region フィールドを Pub/Sub イベント ストリーム(transactions)に新たに追加します。

次の SQL クエリをコピーして、クエリエディタに貼り付けます。project-id は実際のプロジェクト ID に置き換えます。

SELECT tr.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
  ON tr.state = sr.state_code

Dataflow SQL UI でクエリを入力すると、クエリ検証ツールによりクエリ構文が検証されます。クエリが有効な場合、緑色のチェックマーク アイコンが表示されます。クエリが無効な場合は、赤色の感嘆符アイコンが表示されます。クエリ構文が無効な場合は、検証ツールアイコンをクリックすると、修正が必要な箇所に関する情報が表示されます。

次のスクリーンショットでは、クエリエディタに有効なクエリが入力されています。緑色のチェックマークが表示されています。

エディタにチュートリアルのクエリが表示されている Dataflow SQL ワークスペース。

SQL クエリを実行する Dataflow ジョブを作成する

SQL クエリを実行するには、Dataflow SQL UI から Dataflow ジョブを作成します。

  1. クエリエディタの上にある [ジョブを作成] をクリックします。

  2. 右側に表示された [Dataflow ジョブの作成] パネルの [送信先] オプションで、[BigQuery] を選択します。次に、[データセット ID] で dataflow_sql_tutorial を選択し、[テーブル名] を sales に設定します。

    Dataflow SQL ジョブフォームを作成する。
  3. (省略可)Dataflow では、Dataflow SQL ジョブに最適な設定が自動的に選択されますが、オプション パラメータのメニューを展開して、次のパイプライン オプションを手動で指定することもできます。

    • ワーカーの最大数
    • ゾーン
    • サービス アカウントのメール
    • マシンタイプ
    • 追加テスト
    • ワーカー IP アドレスの構成
    • ネットワーク
    • サブネットワーク
  4. [作成] をクリックします。数分後に Dataflow ジョブが開始します。

Dataflow ジョブを表示する

Dataflow は、SQL クエリを Apache Beam パイプラインに変換します。新しいブラウザタブに表示された Dataflow ウェブ UI で、パイプラインの詳細を確認できます。

Dataflow ウェブ UI に表示された SQL クエリからのパイプライン。

ボックスをクリックすると、パイプラインで発生した変換の詳細が表示されます。たとえば、Run SQL Query というラベルのボックスをクリックすると、バックグラウンドで実行されたオペレーションが表示されます。

上の 2 つのボックスは、結合した 2 つの入力を表しています。1 つは Pub/Sub トピックの transactions で、もう 1 つは BigQuery テーブルの us_state_salesregions です。

2 つの入力の結合が 25 秒で完了している書き込み出力。

ジョブ結果が含まれる出力テーブルを表示するには、BigQuery UI に移動してテーブルを表示します。左側のエクスプローラ パネルで、プロジェクトの下に表示されている、作成済みの dataflow_sql_tutorial データセットをクリックします。続いて、出力テーブルの sales をクリックします。出力テーブルの内容が [プレビュー] タブに表示されます。

sales のプレビュー テーブルには、tr_time_str、first_name、last_name、city、state、product、amount、sales_region の列があります。

以前に実行されたジョブを表示してクエリを編集する

Dataflow UI では、過去のジョブとクエリが [ジョブ] ページに保存されます。

ジョブ履歴リストを使用して、以前の SQL クエリを確認できます。たとえば、販売地域別の販売数を 15 秒ごとに集計するようにクエリを変更できます。[ジョブ] ページを使用して、このチュートリアルで開始した実行中のジョブにアクセスし、SQL クエリをコピーして、変更したクエリを使用して別のジョブを実行します。

  1. Dataflow の [ジョブ] ページで、編集するジョブをクリックします。

  2. [ジョブの詳細] ページの右側のパネルにある [パイプライン オプション] で SQL クエリを探します。queryString の行を見つけます。

    queryString という名前のジョブ パイプライン オプション。
  3. SQL クエリをクエリエディタにコピーして貼り付け、タンブリング ウィンドウを追加します。次のクエリをコピーする場合は、project-id を実際のプロジェクト ID に置き換えます。

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
       ON tr.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
    
  4. 変更したクエリを使用して新しいジョブを作成するには、[ジョブを作成] をクリックします。

クリーンアップ

このチュートリアルで使用したリソースについて、Cloud 請求先アカウントに課金されないようにする手順は次のとおりです。

  1. transactions_injector.py 公開スクリプトが実行されている場合は、それを停止します。

  2. 実行中の Dataflow ジョブを停止します。Cloud Console で Dataflow ウェブ UI に移動します。

    Dataflow ウェブ UI に移動

    このチュートリアルで作成したジョブごとに、次の操作を行います。

    1. ジョブの名前をクリックします。

    2. ジョブの [ジョブの概要] パネルで [ジョブを停止] をクリックします。[ジョブの停止] ダイアログが開き、ジョブの停止方法に関する選択肢が表示されます。

    3. [キャンセル] をクリックします。

    4. [ジョブを停止] をクリックします。サービスはすべてのデータの取り込みと処理をできる限り早く停止します。[キャンセル] は処理を即時に停止するため、処理中のデータが失われる可能性があります。ジョブの停止には数分かかることがあります。

  3. BigQuery データセットを削除します。Cloud Console の BigQuery ウェブ UI に移動します。

    BigQuery ウェブ UI に移動

    1. [エクスプローラ] パネルの [リソース] セクションで、作成した dataflow_sql_tutorial データセットをクリックします。

    2. 詳細パネルの右側で [データセットを削除] をクリックします。この操作を行うと、データセット、テーブル、すべてのデータが削除されます。

    3. [データセットの削除] ダイアログ ボックスでデータセットの名前(dataflow_sql_tutorial)を入力して、[削除] をクリックします。

  4. Pub/Sub トピックを削除します。Cloud Console の Pub/Sub トピックページに移動します。

    Pub/Sub トピックページに移動

    1. transactions トピックの横にあるチェックボックスにチェックを入れます。

    2. [削除] をクリックして、トピックを永続的に削除します。

    3. Pub/Sub サブスクリプション ページに移動します。

    4. transactions の残っているサブスクリプションの横にあるチェックボックスにチェックを入れます。実行中のジョブがない場合、サブスクリプションが表示されない可能性があります。

    5. [削除] をクリックして、サブスクリプションを永続的に削除します。

  5. Cloud Storage で Dataflow ステージング バケットを削除します。Cloud Console の Cloud Storage ブラウザに移動します。

    Cloud Storage ブラウザに移動

    1. Dataflow ステージング バケットの隣にあるチェックボックスをオンにします。

    2. [削除] をクリックして、バケットを永続的に削除します。

次のステップ