Dataflow SQL によるストリーミング データの結合


このチュートリアルでは、Dataflow SQL を使用して Pub/Sub のデータ ストリームを BigQuery テーブルのデータと結合する方法について説明します。

目標

このチュートリアルの内容は次のとおりです。

  • Pub/Sub ストリーミング データと BigQuery テーブルデータを結合する Dataflow SQL クエリを作成します。
  • Dataflow SQL UI から Dataflow ジョブをデプロイします。

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

  • Dataflow
  • Cloud Storage
  • Pub/Sub
  • Data Catalog

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Google Cloud プロジェクトで課金が有効になっていることを確認します

  4. Cloud Dataflow、Compute Engine、Logging、Cloud Storage、Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Resource Manager、Data Catalog。 API を有効にします。

    API を有効にする

  5. サービス アカウントを作成します。

    1. Google Cloud コンソールで [サービス アカウントの作成] ページに移動します。

      [サービス アカウントの作成] に移動
    2. プロジェクトを選択します。
    3. [サービス アカウント名] フィールドに名前を入力します。Google Cloud コンソールでは、この名前に基づいて [サービス アカウント ID] フィールドに値が設定されます。

      [サービス アカウントの説明] フィールドに説明を入力します。例: Service account for quickstart

    4. [作成して続行] をクリックします。
    5. サービス アカウントに Project > Owner ロールを付与します。

      ロールを付与するには、[ロールを選択] リストで [Project > Owner] を選択します。

    6. [続行] をクリックします。
    7. [完了] をクリックして、サービス アカウントの作成を完了します。

      ブラウザ ウィンドウは閉じないでください。次のステップでこれを使用します。

  6. サービス アカウント キーを作成します。

    1. Google Cloud コンソールで、作成したサービス アカウントのメールアドレスをクリックします。
    2. [キー] をクリックします。
    3. [鍵を追加]、[新しい鍵を作成] の順にクリックします。
    4. [作成] をクリックします。JSON キーファイルがパソコンにダウンロードされます。
    5. [閉じる] をクリックします。
  7. 環境変数 GOOGLE_APPLICATION_CREDENTIALS を、認証情報を含む JSON ファイルのパスに設定します。この変数は現在のシェル セッションにのみ適用されるため、新しいセッションを開く場合は、変数を再度設定する必要があります。

  8. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  9. Google Cloud プロジェクトで課金が有効になっていることを確認します

  10. Cloud Dataflow、Compute Engine、Logging、Cloud Storage、Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Resource Manager、Data Catalog。 API を有効にします。

    API を有効にする

  11. サービス アカウントを作成します。

    1. Google Cloud コンソールで [サービス アカウントの作成] ページに移動します。

      [サービス アカウントの作成] に移動
    2. プロジェクトを選択します。
    3. [サービス アカウント名] フィールドに名前を入力します。Google Cloud コンソールでは、この名前に基づいて [サービス アカウント ID] フィールドに値が設定されます。

      [サービス アカウントの説明] フィールドに説明を入力します。例: Service account for quickstart

    4. [作成して続行] をクリックします。
    5. サービス アカウントに Project > Owner ロールを付与します。

      ロールを付与するには、[ロールを選択] リストで [Project > Owner] を選択します。

    6. [続行] をクリックします。
    7. [完了] をクリックして、サービス アカウントの作成を完了します。

      ブラウザ ウィンドウは閉じないでください。次のステップでこれを使用します。

  12. サービス アカウント キーを作成します。

    1. Google Cloud コンソールで、作成したサービス アカウントのメールアドレスをクリックします。
    2. [キー] をクリックします。
    3. [鍵を追加]、[新しい鍵を作成] の順にクリックします。
    4. [作成] をクリックします。JSON キーファイルがパソコンにダウンロードされます。
    5. [閉じる] をクリックします。
  13. 環境変数 GOOGLE_APPLICATION_CREDENTIALS を、認証情報を含む JSON ファイルのパスに設定します。この変数は現在のシェル セッションにのみ適用されるため、新しいセッションを開く場合は、変数を再度設定する必要があります。

  14. gcloud CLI をインストールして初期化します。インストール オプションを、次の中から選択します。場合によっては、このチュートリアルで使用するプロジェクトに project プロパティを設定する必要があります。
  15. Google Cloud コンソールで Dataflow SQL ウェブ UI に移動します。最後にアクセスしたプロジェクトが開きます。別のプロジェクトに切り替えるには、Dataflow SQL ウェブ UI の先頭に表示されているプロジェクト名をクリックし、使用するプロジェクトを検索します。
    Dataflow SQL ウェブ UI に移動

サンプルソースを作成する

サンプルで使用するソースを作成します。このチュートリアルのサンプルでは、次のソースを使用します。

  • transactions という名前の Pub/Sub トピック - Pub/Sub トピックのサブスクリプションを介して送信されるトランザクション データのストリーム。各トランザクションのデータには、購入した商品、販売価格、購入場所などの情報が含まれています。Pub/Sub トピックを作成したら、トピックにメッセージを公開するスクリプトを作成します。このチュートリアルの後半で、このスクリプトを実行します。
  • us_state_salesregions という名前の BigQuery テーブル - 州と販売地域をマッピングするテーブル。このテーブルを作成する前に、BigQuery データセットを作成する必要があります。

Pub/Sub トピックにスキーマを割り当てる

スキーマを割り当てると、Pub/Sub トピックデータに SQL クエリを実行できます。現在、Dataflow SQL では、Pub/Sub トピックのメッセージは JSON 形式でシリアル化されます。

Pub/Sub トピックのサンプル transactions にスキーマを割り当てるには:

  1. テキスト ファイルを作成し、名前を transactions_schema.yaml にします。次のスキーマ テキストをコピーして transactions_schema.yaml に貼り付けます。

      - column: event_timestamp
        description: Pub/Sub event timestamp
        mode: REQUIRED
        type: TIMESTAMP
      - column: tr_time_str
        description: Transaction time string
        mode: NULLABLE
        type: STRING
      - column: first_name
        description: First name
        mode: NULLABLE
        type: STRING
      - column: last_name
        description: Last name
        mode: NULLABLE
        type: STRING
      - column: city
        description: City
        mode: NULLABLE
        type: STRING
      - column: state
        description: State
        mode: NULLABLE
        type: STRING
      - column: product
        description: Product
        mode: NULLABLE
        type: STRING
      - column: amount
        description: Amount of transaction
        mode: NULLABLE
        type: FLOAT
    
  2. Google Cloud CLI を使用してスキーマを割り当てます。

    a. 次のコマンドを使用して、gcloud CLI を更新します。gcloud CLI のバージョンは 242.0.0 以降にする必要があります。

      gcloud components update
    

    b. コマンドライン ウィンドウで次のコマンドを実行します。project-id はプロジェクト ID に置き換え、path-to-filetransactions_schema.yaml ファイルのパスに置き換えます。

      gcloud data-catalog entries update \
        --lookup-entry='pubsub.topic.`project-id`.transactions' \
        --schema-from-file=path-to-file/transactions_schema.yaml
    

    コマンドのパラメータと許可されるスキーマ ファイルの形式については、gcloud data-catalog entries update のドキュメント ページをご覧ください。

    c. transactions Pub/Sub トピックにスキーマが正常に割り当てられたことを確認します。project-id は、実際のプロジェクト ID に置き換えます。

      gcloud data-catalog entries lookup 'pubsub.topic.`project-id`.transactions'
    

Pub/Sub ソースを検索する

Dataflow SQL UI では、アクセス権を持つプロジェクトの Pub/Sub データソース オブジェクトを検索できます。フルネームを覚える必要がありません。

このチュートリアルの例では、Dataflow SQL エディタに移動して、作成した transactions Pub/Sub トピックを検索します。

  1. SQL ワークスペースに移動します。

  2. [Dataflow SQL 編集者] パネルの検索バーで、projectid=project-id transactions を検索します。project-id は、実際のプロジェクト ID に置き換えます。

    Dataflow SQL ワークスペースの Data Catalog 検索パネル。

スキーマを表示する

  1. Dataflow SQL UI の [Dataflow SQL 編集者] パネルで [トランザクション] をクリックするか、「projectid=project-id system=cloud_pubsub」と入力して Pub/Sub トピックを検索し、トピックを選択します。
  2. [スキーマ] の下に、Pub/Sub トピックに割り当てられたスキーマが表示されます。

    フィールド名のリストとその説明を含む、トピックに割り当てられたスキーマ。

SQL クエリを作成する

Dataflow SQL UI で、Dataflow ジョブに実行する SQL クエリを作成します。

次の SQL クエリはデータ拡充クエリです。州と販売地域をマッピングする BigQuery テーブル(us_state_salesregions)を使用して、sales_region フィールドを Pub/Sub イベント ストリーム(transactions)に新たに追加します。

次の SQL クエリをコピーして、クエリエディタに貼り付けます。project-id は実際のプロジェクト ID に置き換えます。

SELECT tr.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
  ON tr.state = sr.state_code

Dataflow SQL UI でクエリを入力すると、クエリ検証ツールによりクエリ構文が検証されます。クエリが有効な場合、緑色のチェックマーク アイコンが表示されます。クエリが無効な場合は、赤色の感嘆符アイコンが表示されます。クエリ構文が無効な場合は、検証ツールアイコンをクリックすると、修正が必要な箇所に関する情報が表示されます。

次のスクリーンショットでは、クエリエディタに有効なクエリが入力されています。緑色のチェックマークが表示されています。

エディタにチュートリアルのクエリが表示されている Dataflow SQL ワークスペース。

SQL クエリを実行する Dataflow ジョブを作成する

SQL クエリを実行するには、Dataflow SQL UI から Dataflow ジョブを作成します。

  1. クエリエディタで、[ジョブを作成] をクリックします。

  2. 表示された [Dataflow ジョブの作成] パネルで、次の操作を行います。

    • [Destination] で、[BigQuery] を選択します。
    • データセット ID に [dataflow_sql_tutorial] を選択します。
    • テーブル名に「sales」と入力します。
    Dataflow SQL ジョブフォームを作成する。
  3. (省略可)Dataflow では、Dataflow SQL ジョブに最適な設定が自動的に選択されますが、オプション パラメータのメニューを開いて、次のパイプライン オプションを手動で指定することもできます。

    • ワーカーの最大数
    • ゾーン
    • サービス アカウントのメール
    • マシンタイプ
    • 追加テスト
    • ワーカー IP アドレスの構成
    • ネットワーク
    • サブネットワーク
  4. [作成] をクリックします。Dataflow ジョブの実行が開始されるまでに数分かかります。

Dataflow ジョブを表示する

Dataflow は、SQL クエリを Apache Beam パイプラインに変換します。[ジョブを表示] をクリックして Dataflow ウェブ UI を開き、パイプラインのグラフィカル表現を表示します。

Dataflow ウェブ UI に表示された SQL クエリからのパイプライン。

パイプライン内で発生している変換の詳細を確認するには、それぞれのボックスをクリックします。たとえば、「Run SQL Query」というラベルのボックスをクリックすると、バックグラウンドで実行されたオペレーションが表示されます。

最初の 2 つのボックスは、結合した 2 つの入力を表しています。1 つは Pub/Sub トピックの transactions で、もう 1 つは BigQuery テーブルの us_state_salesregions です。

2 つの入力の結合が 25 秒で完了している書き込み出力。

ジョブの結果が含まれる出力テーブルを表示するには、BigQuery UI に移動します。[エクスプローラ] パネルのプロジェクトで、作成した dataflow_sql_tutorial データセットをクリックします。出力テーブル sales をクリックします。出力テーブルの内容が [プレビュー] タブに表示されます。

sales のプレビュー テーブルには、tr_time_str、first_name、last_name、city、state、product、amount、sales_region の列があります。

以前に実行されたジョブを表示してクエリを編集する

Dataflow UI では、過去のジョブとクエリが Dataflow の [ジョブ] ページに保存されます。

ジョブ履歴リストを使用して、以前の SQL クエリを確認できます。たとえば、販売地域別の販売数を 15 秒ごとに集計するようにクエリを変更できます。[ジョブ] ページを使用して、このチュートリアルで開始した実行中のジョブにアクセスし、SQL クエリをコピーして、変更したクエリを使用して別のジョブを実行します。

  1. Dataflow の [ジョブ] ページで、編集するジョブをクリックします。

  2. [ジョブの詳細] ページの [ジョブ情報] パネルにある [パイプライン オプション] で SQL クエリを見つけます。queryString の行を見つけます。

    queryString という名前のジョブ パイプライン オプション。
  3. 次の SQL クエリをコピーして SQL ワークスペースの [Dataflow SQL 編集者] に貼り付け、タンブリング ウィンドウを追加します。project-id は、実際のプロジェクト ID に置き換えます。

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
       ON tr.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
    
  4. 変更したクエリを使用して新しいジョブを作成するには、[ジョブを作成] をクリックします。

クリーンアップ

このチュートリアルで使用したリソースについて、Cloud 請求先アカウントに課金されないようにする手順は次のとおりです。

  1. transactions_injector.py 公開スクリプトが実行されている場合は、それを停止します。

  2. 実行中の Dataflow ジョブを停止します。Google Cloud コンソールで Dataflow ウェブ UI に移動します。

    Dataflow ウェブ UI に移動

    このチュートリアルで作成したジョブごとに、次の操作を行います。

    1. ジョブの名前をクリックします。

    2. [ジョブの詳細] ページで、[停止] をクリックします。[ジョブの停止] ダイアログが開き、ジョブの停止方法に関する選択肢が表示されます。

    3. [キャンセル] を選択します。

    4. [ジョブを停止] をクリックします。サービスはすべてのデータの取り込みと処理をできる限り早く停止します。[キャンセル] は処理を即時に停止するため、処理中のデータが失われる可能性があります。ジョブの停止には数分かかることがあります。

  3. BigQuery データセットを削除します。Google Cloud コンソールで BigQuery ウェブ UI に移動します。

    BigQuery ウェブ UI に移動

    1. [エクスプローラ] パネルの [リソース] セクションで、作成した dataflow_sql_tutorial データセットをクリックします。

    2. 詳細パネルで [削除] をクリックします。確認ダイアログが表示されます。

    3. [データセットの削除] ダイアログ ボックスで「delete」と入力して削除コマンドを確認し、[削除] をクリックします。

  4. Pub/Sub トピックを削除します。Google Cloud コンソールで Pub/Sub の [トピック] ページに移動します。

    Pub/Sub の [トピック] ページに移動

    1. transactions」トピックを選択します。

    2. [削除] をクリックして、トピックを完全に削除します。確認ダイアログが表示されます。

    3. [トピックの削除] ダイアログ ボックスで「delete」を入力して削除コマンドを確認し、[削除] をクリックします。

    4. Pub/Sub サブスクリプション ページに移動します。

    5. transactions の残りのサブスクリプションを選択します。実行中のジョブがない場合、サブスクリプションが表示されない可能性があります。

    6. [削除] をクリックして、サブスクリプションを完全に削除します。確認ダイアログで [削除] をクリックします。

  5. Cloud Storage で Dataflow ステージング バケットを削除します。Google Cloud コンソールで Cloud Storage の [バケット] ページに移動します。

    [バケット] に移動

    1. Dataflow ステージング バケットを選択します。

    2. [削除] をクリックして、バケットを完全に削除します。確認ダイアログが表示されます。

    3. [バケットの削除] ダイアログ ボックスで「DELETE」と入力して削除コマンドを確認し、[削除] をクリックします。

次のステップ