Dataflow SQL の使用

このチュートリアルでは、SQL と Dataflow SQL UI を使用して Dataflow ジョブを実行する方法について説明します。サンプルを使いながら、Pub/Sub のデータ ストリームを BigQuery テーブルのデータと結合する手順を解説します。

目標

このチュートリアルの内容は次のとおりです。

  • SQL を使用して、Pub/Sub ストリーミング データを BigQuery テーブルデータと結合します。
  • Dataflow SQL UI から Dataflow ジョブをデプロイします。

費用

このチュートリアルでは、Google Cloud の課金対象となる以下のコンポーネントを使用します。

  • Dataflow
  • クラウド ストレージ
  • Pub/Sub

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを作成できます。新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

  1. Google アカウントにログインします。

    Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。

  2. Cloud Console のプロジェクト セレクタページで、Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタのページに移動

  3. Google Cloud プロジェクトに対して課金が有効になっていることを確認します。 プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  4. Cloud Dataflow、Compute Engine、Stackdriver Logging、Cloud Storage、Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Resource Manager API を有効にします。

    API を有効にする

  5. 認証の設定
    1. Cloud Console で、[サービス アカウント キーの作成] ページに移動します。

      [サービス アカウントキーの作成] ページに移動
    2. [サービス アカウント] リストから [新しいサービス アカウント] を選択します。
    3. [サービス アカウント名] フィールドに名前を入力します。
    4. [役割] リストで、[プロジェクト] > [オーナー] を選択します。

      : [役割] フィールドの設定により、リソースにアクセスするサービス アカウントが承認されます。このフィールドは、Cloud Console を使用して後から表示および変更できます。本番環境アプリケーションを開発している場合は、[プロジェクト] > [オーナー] よりも詳細な権限を指定します。詳しくはサービス アカウントへの役割の付与をご覧ください。
    5. [作成] をクリックします。キーが含まれている JSON ファイルがパソコンにダウンロードされます。
  6. 環境変数 GOOGLE_APPLICATION_CREDENTIALS を、サービス アカウント キーが含まれる JSON ファイルのパスに設定します。この変数は現在のシェル セッションにのみ適用されるため、新しいセッションを開く場合は、変数を再度設定してください。

  7. Cloud SDK をインストールし、初期化します。インストール オプションを、次の中から選択します。場合によっては、このチュートリアルで使用するプロジェクトに project プロパティを設定する必要があります。
  8. Cloud Console の BigQuery ウェブ UI に移動します。 最後にアクセスしたプロジェクトが開きます。別のプロジェクトに切り替えるには、BigQuery ウェブ UI の先頭に表示されているプロジェクト名をクリックし、使用するプロジェクトを検索します。
    BigQuery ウェブ UI に移動

Dataflow SQL UI に切り替える

BigQuery ウェブ UI で、次の手順に沿って Dataflow UI に切り替えます。

  1. [展開] プルダウン メニューを開いて、[クエリの設定] を選択します。

  2. 右側の[クエリ設定]メニューで[Dataflow エンジン]を選択します。

  3. プロジェクトで Dataflow API と Data Catalog API が有効になっていない場合は、有効にするように求められます。[API を有効にする] をクリックします。Dataflow API と Data Catalog API の有効化には、数分かかることがあります。

  4. API が有効になったら [保存] をクリックします。

サンプルソースを作成する

サンプルで使用するソースを作成します。このチュートリアルのサンプルでは、次のソースを使用します。

  • transactions という名前の Pub/Sub トピック - Pub/Sub トピックのサブスクリプションを経由して送信されるトランザクション データのストリーム。各トランザクションのデータには、購入した商品、販売価格、購入場所などの情報が含まれています。Pub/Sub トピックを作成したら、トピックにメッセージを公開するスクリプトを作成します。このチュートリアルの後半で、このスクリプトを実行します。
  • us_state_salesregions という名前の BigQuery テーブル - 州と販売地域をマッピングするテーブル。このテーブルを作成する前に、BigQuery データセットを作成する必要があります。

Pub/Sub ソースを検索する

Dataflow SQL UI は、アクセス権を持つプロジェクトの Pub/Sub データソース オブジェクトを検索できるので、フルネームを覚える必要がありません。

このチュートリアルの例では、作成した transactions Pub/Sub トピックを追加します。

  1. 左側のナビゲーション パネルで、[データを追加] プルダウン リストをクリックして [Cloud Dataflow のソース] を選択します。

  2. 右側に [Cloud Dataflow ソースを追加] パネルが表示されたら、[Pub/Sub topics] を選択します。検索ボックスで、「transactions」 を検索します。 トピックを選択して [追加] をクリックします。

スキーマを Pub/Sub トピックに割り当てる

スキーマを割り当てると、Pub/Sub トピックのデータに対して SQL クエリを実行できます。現在、Dataflow SQL では、Pub/Sub トピックのメッセージは JSON 形式でシリアル化されます。今後、他の形式(Avro など)のサポートが追加される予定です。

例の Pub/Sub トピックDataflow ソースとして追加した後、次の手順を実行して Dataflow SQL UI のトピックにスキーマを割り当てます。

  1. リソース パネルでトピックを選択します。

  2. スキーマタブで、スキーマの編集をクリックします。右側のスキーマサイドパネルが開きます。

    スキーマを追加または編集するサイドパネル

  3. テキストとして編集ボタンを切り替え、次のインライン スキーマをエディタに貼り付けます。[ 送信] をクリックします。

    [
          {
              "description": "Pub/Sub event timestamp",
              "name": "event_timestamp",
              "mode": "REQUIRED",
              "type": "TIMESTAMP"
          },
          {
              "description": "Transaction time string",
              "name": "tr_time_str",
              "type": "STRING"
          },
          {
              "description": "First name",
              "name": "first_name",
              "type": "STRING"
          },
          {
              "description": "Last name",
              "name": "last_name",
              "type": "STRING"
          },
          {
              "description": "City",
              "name": "city",
              "type": "STRING"
          },
          {
              "description": "State",
              "name": "state",
              "type": "STRING"
          },
          {
              "description": "Product",
              "name": "product",
              "type": "STRING"
          },
          {
              "description": "Amount of transaction",
              "name": "amount",
              "type": "FLOAT64"
          }
        ]
        
  4. (オプション)プレビュー トピックをクリックしてメッセージの内容を確認し、定義したスキーマと一致することを確認します。

    プレビュー トピックボタンが開きます

スキーマを表示する

  1. Dataflow SQL UI の左側のナビゲーション パネルで、Cloud Dataflow ソースをクリックします。
  2. Pub/Sub トピックをクリックします。
  3. [トランザクション] をクリックします。
  4. スキーマの下で、transactions Pub/Sub トピックに割り当てられたスキーマを表示できます。

SQL クエリを作成する

Dataflow SQL UI を使用すると、SQL クエリを作成して Dataflow ジョブを実行できます。

次の SQL クエリはデータ拡充クエリです。州と販売地域をマッピングする BigQuery テーブル(sales_region)を使用して、transactions フィールドを Pub/Sub イベント ストリーム(us_state_salesregions)に新たに追加します。

次の SQL クエリをコピーして、クエリエディタに貼り付けます。 [PROJECT-ID] は実際のプロジェクト ID に置き換えてください。

    SELECT tr.*, sr.sales_region
    FROM pubsub.topic.`project-id`.transactions as tr
      INNER JOIN bigquery.table.`project-id`.dataflow_sql_dataset.us_state_salesregions AS sr
      ON tr.state = sr.state_code
    

Dataflow SQL UI にクエリを入力する際、クエリの検証ツールクエリ構文を検証します。クエリが有効な場合、緑色のチェックマーク アイコンが表示されます。クエリが無効な場合は、赤色の感嘆符アイコンが表示されます。クエリ構文が無効な場合は、検証ツールアイコンをクリックすると、修正が必要な箇所に関する情報が表示されます。

次のスクリーンショットでは、クエリエディタに有効なクエリが入力されています。検証ツールに緑色のチェックマークが表示されています。

エディタにクエリを入力します。

Dataflow ジョブを作成して SQL クエリを実行する

SQL クエリを実行するには、Dataflow SQL UI から Dataflow ジョブを作成します。

  1. クエリ エディタの下にある、[Dataflow ジョブを作成] をクリックします。

  2. 右側で開く Dataflow ジョブ作成パネルで、デフォルトのテーブル名dfsqltable_sales に変更します。

  3. オプション: Dataflow では、Dataflow SQL ジョブに最適な設定が自動的に選択されますが、オプションのパラメータのメニューを展開して、手動で次のパイプライン オプションを指定できます。

    • ワーカーの最大数
    • ゾーン
    • サービス アカウントのメール
    • マシンタイプ
    • 追加テスト
    • ワーカー IP アドレスの構成
    • ネットワーク
    • サブネットワーク
  4. [作成] をクリックします。Dataflow ジョブの実行には数分かかります。

  5. UI に [クエリ結果] パネルが表示されます。後でジョブのクエリ結果に戻るには、[ジョブの履歴]パネルでジョブを探し、Dataflow ジョブと出力の表示 に示されているように、[エディタでクエリを開く] ボタンを使用します。

  6. [ジョブ情報] でジョブ ID のリンクをクリックします。これで、Dataflow ウェブ UI の Dataflow ジョブの詳細ページの新しいブラウザタブが開きます。

Dataflow ジョブと出力の表示

Dataflow は、SQL クエリを Apache Beam パイプラインに変換します。新しいブラウザタブで開いた Dataflow ウェブ UI では、パイプラインをグラフィカルに表示できます。

ボックスをクリックすると、パイプラインで発生した変換の詳細が表示されます。たとえば、[SQL クエリを実行] というラベルのボックスをクリックすると、バックグラウンドで実行されたオペレーションが表示されます。

上の 2 つのボックスは、結合した 2 つの入力を表しています。1 つは Pub/Sub トピックの transactions で、もう 1 つは BigQuery テーブルの us_state_salesregions です。

ジョブ結果が含まれる出力テーブルを表示するには、Dataflow SQL UI のブラウザタブに戻ります。左側のナビゲーション パネルで、プロジェクトの下に表示されている、作成した dataflow_sql_dataset データセットをクリックします。続いて、出力テーブルの dfsqltable_sales をクリックします。[プレビュー] タブに出力テーブルの内容が表示されます。

以前に実行されたジョブを表示してクエリを編集する

Dataflow SQL UI は、過去のジョブとクエリをジョブ履歴パネルに保存します。ジョブは、ジョブを開始した日付順に表示されます。ジョブリストでは、実行中のジョブがある日付が先に表示され、その後に、実行中のジョブがない日付が続きます。

以前の SQL クエリを編集し、新しい Dataflow ジョブを実行するには、ジョブ履歴リストを使用します。たとえば、販売地域別の販売数を 15 秒ごとに集計するようにクエリを変更する場合、ジョブの履歴パネルを使用して、このチュートリアルで開始した実行中のジョブにアクセスし、SQL クエリを変更して別のジョブとして実行します。

  1. 左側のナビゲーション パネルで、[ジョブの履歴] をクリックします。

  2. [ジョブの履歴] で Cloud Dataflow をクリックします。プロジェクトで実行した以前のジョブがすべて表示されます。

  3. 編集するジョブをクリックします。[クエリエディタで開く] をクリックします。

  4. [クエリエディタ] で SQL クエリを編集し、タンブリング ウィンドウを追加します。次のクエリをコピーする場合は、project-id を実際のプロジェクト ID に置き換えます。

         SELECT
           sr.sales_region,
           TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
           SUM(tr.amount) as amount
         FROM pubsub.topic.`project-id`.transactions AS tr
           INNER JOIN bigquery.table.`project-id`.dataflow_sql_dataset.us_state_salesregions AS sr
           ON tr.state = sr.state_code
         GROUP BY
           sr.sales_region,
           TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
        
  5. クエリエディタで [Cloud Dataflow ジョブを作成] をクリックし、変更したクエリを使用して新しいジョブを作成します。

クリーンアップ

このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにする手順は次のとおりです。

  1. transactions_injector.py 公開スクリプトが実行されている場合は、それを停止します。

  2. 実行中の Dataflow ジョブを停止します。Cloud Console で Dataflow ウェブ UI に移動します。

    Dataflow ウェブ UI に移動

    このチュートリアルで作成したジョブごとに、次の操作を行います。

    1. ジョブの名前をクリックします。

    2. ジョブの [ジョブの概要] パネルで [ジョブを停止] をクリックします。[ジョブの停止] ダイアログが開き、ジョブの停止方法に関する選択肢が表示されます。

    3. [キャンセル] をクリックします。

    4. [ジョブを停止] をクリックします。サービスはすべてのデータの取り込みと処理をできる限り早く停止します。[キャンセル] は処理を即時に停止するため、処理中のデータが失われる可能性があります。ジョブの停止には数分かかることがあります。

  3. BigQuery データセットを削除します。Cloud Console の BigQuery ウェブ UI に移動します。

    BigQuery ウェブ UI に移動

    1. ナビゲーション パネルの [リソース] セクションで、作成した dataflow_sql_dataset データセットをクリックします。

    2. 詳細パネルの右側で [データセットを削除] をクリックします。この操作を行うと、データセット、テーブル、すべてのデータが削除されます。

    3. [データセットの削除] ダイアログ ボックスでデータセットの名前(dataflow_sql_dataset)を入力し、[削除] をクリックして確定します。

  4. Pub/Sub トピックを削除します。Cloud Console の Pub/Sub トピックページに移動します。

    Pub/Sub トピックページに移動

    1. transactions トピックの横にあるチェックボックスにチェックを入れます。

    2. [削除] をクリックして、トピックを永続的に削除します。

    3. Pub/Sub サブスクリプション ページに移動します。

    4. transactions の残っているサブスクリプションの横にあるチェックボックスにチェックを入れます。 実行中のジョブがない場合、サブスクリプションが表示されない可能性があります。

    5. [削除] をクリックして、サブスクリプションを永続的に削除します。

  5. Cloud Storage で Dataflow ステージング バケットを削除します。Cloud Console で Cloud Storage ブラウザに移動します。

    Cloud Storage ブラウザに移動

    1. Dataflow ステージング バケットの横にあるチェックボックスをオンにします。

    2. [削除] をクリックして、バケットを永続的に削除します。

次のステップ