Cloud Dataflow SQL の使用

このチュートリアルでは、SQL と Cloud Dataflow SQL UI を使用して Cloud Dataflow ジョブを実行する方法について説明します。サンプルを使いながら、Cloud Pub/Sub のデータ ストリームを BigQuery テーブルのデータと結合する手順を解説します。

目標

このチュートリアルの内容は次のとおりです。

  • SQL を使用して、Cloud Pub/Sub ストリーミング データを BigQuery テーブルデータと結合します。
  • Cloud Dataflow SQL UI から Cloud Dataflow ジョブをデプロイします。

料金

このチュートリアルでは、Google Cloud Platform の課金対象となる以下のコンポーネントを使用します。

  • Cloud Dataflow
  • Cloud Storage
  • Cloud Pub/Sub

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを出すことができます。 初めて GCP を使用される場合は、無料トライアルをご利用いただけます。

始める前に

  1. Google アカウントにログインします。

    Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。

  2. GCP Console のプロジェクト セレクタのページで、GCP プロジェクトを選択または作成します。

    プロジェクト セレクタのページに移動

  3. Google Cloud Platform プロジェクトに対して課金が有効になっていることを確認します。 プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  4. Cloud Dataflow、Compute Engine、Stackdriver Logging、Cloud Storage、Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Resource Manager 必要な API を有効にします。

    API を有効にする

  5. 認証情報の設定:
    1. GCP Console で [サービス アカウントキーの作成] ページに移動します。

      [サービス アカウントキーの作成] ページに移動
    2. [サービス アカウント] リストから [新しいサービス アカウント] を選択します。
    3. [サービス アカウント名] フィールドに名前を入力します。
    4. [役割] リストで、[プロジェクト] > [オーナー] を選択します。

      : [役割] フィールドの設定により、リソースにアクセスするサービス アカウントが承認されます。このフィールドは、後から GCP Console で表示または変更できます。本番環境アプリケーションを開発している場合は、[プロジェクト] > [オーナー] よりも詳細な権限を指定します。詳しくはサービス アカウントへの役割の付与をご覧ください。
    5. [作成] をクリックします。キーが含まれている JSON ファイルがパソコンにダウンロードされます。
  6. 環境変数 GOOGLE_APPLICATION_CREDENTIALS をサービス アカウント キーが含まれる JSON ファイルのファイルパスに設定します。この変数は現在のシェル セッションにのみ適用されるため、新しいセッションを開く場合は、変数を再度設定してください。

  7. Cloud SDK をインストールし、初期化します。インストール オプションを、次の中から選択します。場合によっては、このチュートリアルで使用するプロジェクトに project プロパティを設定する必要があります。
  8. GCP Console の BigQuery ウェブ UI に移動します。最後にアクセスしたプロジェクトが開きます。別のプロジェクトに切り替えるには、BigQuery ウェブ UI の先頭に表示されているプロジェクト名をクリックし、使用するプロジェクトを検索します。
    BigQuery ウェブ UI に移動

Cloud Dataflow SQL UI に切り替える

BigQuery ウェブ UI で、次の手順に沿って Cloud Dataflow UI に切り替えます。

  1. [展開] プルダウン メニューを開いて、[クエリの設定] を選択します。

  2. 右側に表示された [クエリの設定] メニューで、[Cloud Dataflow エンジン] を選択します。

  3. プロジェクトで Cloud Dataflow と Data Catalog API が有効になっていない場合、これらを有効にするように求められます。[API を有効にする] をクリックします。Cloud Dataflow API と Data Catalog API の有効化には数分かかる場合があります。

  4. API が有効になったら [保存] をクリックします。

サンプルソースを作成する

サンプルで使用するソースを作成します。このチュートリアルのサンプルでは、次のソースを使用します。

  • transactions という名前の Cloud Pub/Sub トピック - Cloud Pub/Sub トピックのサブスクリプションを介して送信されるトランザクション データのストリーム。各トランザクションのデータには、購入した商品、販売価格、購入場所などの情報が含まれています。Cloud Pub/Sub トピックを作成した後、トピックにメッセージを公開するスクリプトを作成します。このチュートリアルの後半で、このスクリプトを実行します。
  • us_state_salesregions という名前の BigQuery テーブル - 州と販売地域をマッピングするテーブル。このテーブルを作成する前に、BigQuery データセットを作成する必要があります。

Cloud Pub/Sub トピックにスキーマを割り当てる

スキーマを割り当てると、Cloud Pub/Sub トピックデータに SQL クエリを実行できます。現在、Cloud Dataflow SQL では、Cloud Pub/Sub トピック内のメッセージが JSON 形式でシリアル化されていることを前提としています。今後、他の形式(Avro など)のサポートが追加される予定です。

Cloud Pub/Sub トピックのサンプル transactions にスキーマを割り当てるには:

  1. テキスト ファイルを作成し、名前を transactions_schema.yaml にします。次のスキーマ テキストをコピーして transactions_schema.yaml に貼り付けます。
  - column: event_timestamp
    description: Pub/Sub event timestamp
    mode: REQUIRED
    type: TIMESTAMP
  - column: attributes
    description: Pub/Sub message attributes
    mode: NULLABLE
    type: MAP<STRING,STRING>
  - column: payload
    description: Pub/Sub message payload
    mode: NULLABLE
    type: STRUCT
    subcolumns:
    - column: tr_time_str
      description: Transaction time string
      mode: NULLABLE
      type: STRING
    - column: first_name
      description: First name
      mode: NULLABLE
      type: STRING
    - column: last_name
      description: Last name
      mode: NULLABLE
      type: STRING
    - column: city
      description: City
      mode: NULLABLE
      type: STRING
    - column: state
      description: State
      mode: NULLABLE
      type: STRING
    - column: product
      description: Product
      mode: NULLABLE
      type: STRING
    - column: amount
      description: Amount of transaction
      mode: NULLABLE
      type: FLOAT
  1. gcloud コマンドライン ツールを使用してスキーマを割り当てます。

    a. 次のコマンドgcloud ツールを更新します。gcloud ツールのバージョンは 242.0.0 以降にする必要があります。

      gcloud components update
    

    b. コマンドライン ウィンドウで次のコマンドを実行します。プロジェクト ID を実際のプロジェクト ID に、path-to-filetransactions_schema.yaml ファイルのパスに置き換えます。

      gcloud beta data-catalog entries update \
        --lookup-entry='pubsub.topic.`project-id`.transactions' \
        --schema-from-file=path-to-file/transactions_schema.yaml
    

    コマンドのパラメータと使用可能なスキーマ ファイルのフォーマットについては、gcloud beta data-catalog entries update のドキュメント ページをご覧ください。

    c. transactions Cloud Pub/Sub トピックにスキーマが正常に割り当てられたことを確認します。project-id を、実際のプロジェクト ID に置き換えます。

      gcloud beta data-catalog entries lookup 'pubsub.topic.`project-id`.transactions'
    

Cloud Pub/Sub ソースを検索する

Cloud Dataflow SQL UI では、アクセス権があるプロジェクトの Cloud Pub/Sub データソース オブジェクトを検索できます。オブジェクトの完全な名前を覚えておく必要はありません。

このチュートリアルのサンプルでは、作成した transactions Cloud Pub/Sub トピックを追加します。

  1. 左側のナビゲーション パネルで、[データを追加] プルダウン リストをクリックして [Cloud Dataflow のソース] を選択します。

  2. 右側に表示される [Cloud Dataflow ソースを追加] パネルで、[Cloud Pub/Sub トピック] を選択します。検索ボックスで、「transactions」 を検索します。トピックを選択して [追加] をクリックします。

スキーマを表示する

  1. Cloud Dataflow SQL UI の左側のナビゲーション パネルで、[Cloud Dataflow のソース] をクリックします。
  2. [Cloud Pub/Sub トピック] をクリックします。
  3. [トランザクション] をクリックします。
  4. [スキーマ] に、transactions Cloud Pub/Sub トピックに割り当てられたスキーマが表示されます。

SQL クエリを作成する

Cloud Dataflow SQL UI で、Cloud Dataflow ジョブに実行する SQL クエリを作成します。

次の SQL クエリはデータ拡充クエリです。州と販売地域をマッピングする BigQuery テーブル(us_state_salesregions)を使用して、sales_region フィールドを Cloud Pub/Sub イベント ストリーム(transactions)に新たに追加します。

次の SQL クエリをコピーして、クエリエディタに貼り付けます。project-id を実際のプロジェクト ID に置き換えます。

SELECT tr.payload.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_dataset.us_state_salesregions AS sr
  ON tr.payload.state = sr.state_code

Cloud Dataflow SQL UI でクエリを入力すると、クエリ検証ツールによりクエリ構文が検証されます。クエリが有効な場合、緑色のチェックマーク アイコンが表示されます。クエリが無効な場合は、赤色の感嘆符アイコンが表示されます。クエリ構文が無効な場合は、検証ツールアイコンをクリックすると、修正が必要な箇所に関する情報が表示されます。

次のスクリーンショットでは、クエリエディタに有効なクエリが入力されています。緑色のチェックマークが表示されています。

エディタにクエリを入力します。

SQL クエリを実行する Cloud Dataflow ジョブを作成する

SQL クエリを実行するには、Cloud Dataflow SQL UI から Cloud Dataflow ジョブを作成します。

  1. クエリエディタの下で、[Cloud Dataflow ジョブを作成] をクリックします。

  2. 右側に表示された [Cloud Dataflow ジョブを作成] パネルで、デフォルトのテーブル名を dfsqltable_sales に変更します。

  3. [作成] をクリックします。数分後に Cloud Dataflow ジョブが開始します。

  4. UI に [クエリ結果] パネルが表示されます。後でジョブの [クエリ結果] パネルに戻るには、[ジョブの履歴] パネルでジョブを検索し、[クエリをエディタで開く] ボタンを使用します。詳しくは、Cloud Dataflow ジョブと出力を表示するをご覧ください。

  5. [ジョブ情報] でジョブ ID のリンクをクリックします。新しいブラウザタブが開き、Cloud Dataflow ウェブ UI の Cloud Dataflow ジョブの詳細ページが表示されます。

Cloud Dataflow ジョブと出力を表示する

Cloud Dataflow は、SQL クエリを Apache Beam パイプラインに変換します。新しいブラウザタブに表示された Cloud Dataflow ウェブ UI で、パイプラインの詳細を確認できます。

ボックスをクリックすると、パイプラインで発生した変換の詳細が表示されます。たとえば、[SQL クエリを実行] というラベルのボックスをクリックすると、バックグラウンドで実行されたオペレーションが表示されます。

上の 2 つのボックスは、結合した 2 つの入力を表しています。1 つは Cloud Pub/Sub トピックの transactions で、もう 1 つは BigQuery テーブルの us_state_salesregions です。

ジョブの結果を含む出力テーブルを表示するには、Cloud Dataflow SQL UI のブラウザタブに戻ります。左側のナビゲーション パネルで、プロジェクトの下に表示されている、作成した dataflow_sql_dataset データセットをクリックします。続いて、出力テーブルの dfsqltable_sales をクリックします。[プレビュー] タブに出力テーブルの内容が表示されます。

以前に実行されたジョブを表示してクエリを編集する

Cloud Dataflow SQL UI では、以前に実行されたジョブとクエリを [ジョブの履歴] パネルで確認できます。ジョブは、ジョブを開始した日付順に表示されます。ジョブリストでは、実行中のジョブがある日付が先に表示され、その後に、実行中のジョブがない日付が続きます。

ジョブ履歴のリストを使用すると、以前の SQL クエリを編集して新しい Cloud Dataflow ジョブを実行できます。たとえば、販売地域別の販売数を 15 秒ごとに集計するようにクエリを変更する場合、ジョブの履歴パネルを使用して、このチュートリアルで開始した実行中のジョブにアクセスし、SQL クエリを変更して別のジョブとして実行します。

  1. 左側のナビゲーション パネルで、[ジョブの履歴] をクリックします。

  2. [ジョブの履歴] で Cloud Dataflow をクリックします。プロジェクトで実行した以前のジョブがすべて表示されます。

  3. 編集するジョブをクリックします。[クエリエディタで開く] をクリックします。

  4. [クエリエディタ] で SQL クエリを編集し、タンブリング ウィンドウを追加します。次のクエリをコピーする場合は、project-id を実際のプロジェクト ID に置き換えます。

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.payload.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_dataset.us_state_salesregions AS sr
       ON tr.payload.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
    
  5. クエリエディタで [Cloud Dataflow ジョブを作成] をクリックし、変更したクエリを使用して新しいジョブを作成します。

クリーンアップ

このチュートリアルで使用するリソースについて、Google Cloud Platform アカウントに課金されないようにする手順は次のとおりです。

  1. transactions_injector.py 公開スクリプトが実行されている場合は、それを停止します。

  2. 実行中の Cloud Dataflow ジョブを停止します。GCP Console で Cloud Dataflow ウェブ UI に移動します。

    Cloud Dataflow ウェブ UI に移動

    このチュートリアルで作成したジョブごとに、次の操作を行います。

    1. ジョブの名前をクリックします。

    2. ジョブの [ジョブの概要] パネルで [ジョブを停止] をクリックします。[ジョブの停止] ダイアログが開き、ジョブの停止方法に関する選択肢が表示されます。

    3. [キャンセル] をクリックします。

    4. [ジョブを停止] をクリックします。サービスはすべてのデータの取り込みと処理をできる限り早く停止します。[キャンセル] は処理を即時に停止するため、処理中のデータが失われる可能性があります。ジョブの停止には数分かかることがあります。

  3. BigQuery データセットを削除します。GCP Console の BigQuery ウェブ UI に移動します。

    BigQuery ウェブ UI に移動

    1. ナビゲーション パネルの [リソース] セクションで、作成した dataflow_sql_dataset データセットをクリックします。

    2. 詳細パネルの右側で [データセットを削除] をクリックします。この操作を行うと、データセット、テーブル、すべてのデータが削除されます。

    3. [データセットの削除] ダイアログ ボックスでデータセットの名前(dataflow_sql_dataset)を入力して、[削除] をクリックします。

  4. Cloud Pub/Sub トピックを削除します。GCP Console の Cloud Pub/Sub トピックページに移動します。

    Cloud Pub/Sub トピックページに移動

    1. transactions トピックの横にあるチェックボックスにチェックを入れます。

    2. [削除] をクリックして、トピックを永続的に削除します。

    3. Cloud Pub/Sub のサブスクリプション ページに移動します。

    4. transactions の残っているサブスクリプションの横にあるチェックボックスにチェックを入れます。実行中のジョブがない場合、サブスクリプションが表示されない可能性があります。

    5. [削除] をクリックして、サブスクリプションを永続的に削除します。

  5. Cloud Storage で Cloud Dataflow ステージング バケットを削除します。GCP Console の Cloud Storage ブラウザに移動します。

    Cloud Storage ブラウザに移動

    1. Cloud Dataflow ステージング バケットの隣にあるチェックボックスをオンにします。

    2. [削除] をクリックして、バケットを永続的に削除します。

次のステップ

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。