Dataflow SQL でストリーミング データを結合する


このチュートリアルでは、Dataflow SQL を使用して Pub/Sub のデータ ストリームを BigQuery テーブルのデータと結合する方法について説明します。

目標

このチュートリアルの内容は次のとおりです。

  • Pub/Sub ストリーミング データと BigQuery テーブルデータを結合する Dataflow SQL クエリを作成します。
  • Dataflow SQL UI から Dataflow ジョブをデプロイします。

費用

このドキュメントでは、Google Cloud の次の課金対象のコンポーネントを使用します。

  • Dataflow
  • Cloud Storage
  • Pub/Sub
  • Data Catalog

料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを生成できます。 新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。

始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager and Data Catalog. APIs.

    Enable the APIs

  5. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the Project > Owner role to the service account.

      To grant the role, find the Select a role list, then select Project > Owner.

    6. Click Continue.
    7. Click Done to finish creating the service account.

      Do not close your browser window. You will use it in the next step.

  6. Create a service account key:

    1. In the Google Cloud console, click the email address for the service account that you created.
    2. Click Keys.
    3. Click Add key, and then click Create new key.
    4. Click Create. A JSON key file is downloaded to your computer.
    5. Click Close.
  7. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  8. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  9. Make sure that billing is enabled for your Google Cloud project.

  10. Enable the Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager and Data Catalog. APIs.

    Enable the APIs

  11. Create a service account:

    1. In the Google Cloud console, go to the Create service account page.

      Go to Create service account
    2. Select your project.
    3. In the Service account name field, enter a name. The Google Cloud console fills in the Service account ID field based on this name.

      In the Service account description field, enter a description. For example, Service account for quickstart.

    4. Click Create and continue.
    5. Grant the Project > Owner role to the service account.

      To grant the role, find the Select a role list, then select Project > Owner.

    6. Click Continue.
    7. Click Done to finish creating the service account.

      Do not close your browser window. You will use it in the next step.

  12. Create a service account key:

    1. In the Google Cloud console, click the email address for the service account that you created.
    2. Click Keys.
    3. Click Add key, and then click Create new key.
    4. Click Create. A JSON key file is downloaded to your computer.
    5. Click Close.
  13. Set the environment variable GOOGLE_APPLICATION_CREDENTIALS to the path of the JSON file that contains your credentials. This variable applies only to your current shell session, so if you open a new session, set the variable again.

  14. gcloud CLI をインストールして初期化します。インストール オプションを、次の中から選択します。場合によっては、このチュートリアルで使用するプロジェクトに project プロパティを設定する必要があります。
  15. Google Cloud コンソールで Dataflow SQL ウェブ UI に移動します。最後にアクセスしたプロジェクトが開きます。別のプロジェクトに切り替えるには、Dataflow SQL ウェブ UI の先頭に表示されているプロジェクト名をクリックし、使用するプロジェクトを検索します。
    Dataflow SQL ウェブ UI に移動

サンプルソースを作成する

サンプルで使用するソースを作成します。このチュートリアルのサンプルでは、次のソースを使用します。

  • transactions という名前の Pub/Sub トピック - Pub/Sub トピックのサブスクリプションを介して送信されるトランザクション データのストリーム。各トランザクションのデータには、購入した商品、販売価格、購入場所などの情報が含まれています。Pub/Sub トピックを作成したら、トピックにメッセージを公開するスクリプトを作成します。このチュートリアルの後半で、このスクリプトを実行します。
  • us_state_salesregions という名前の BigQuery テーブル - 州と販売地域をマッピングするテーブル。このテーブルを作成する前に、BigQuery データセットを作成する必要があります。

Pub/Sub トピックにスキーマを割り当てる

スキーマを割り当てると、Pub/Sub トピックデータに SQL クエリを実行できます。現在、Dataflow SQL では、Pub/Sub トピックのメッセージは JSON 形式でシリアル化されます。

Pub/Sub トピックのサンプル transactions にスキーマを割り当てるには:

  1. テキスト ファイルを作成し、名前を transactions_schema.yaml にします。次のスキーマ テキストをコピーして transactions_schema.yaml に貼り付けます。

      - column: event_timestamp
        description: Pub/Sub event timestamp
        mode: REQUIRED
        type: TIMESTAMP
      - column: tr_time_str
        description: Transaction time string
        mode: NULLABLE
        type: STRING
      - column: first_name
        description: First name
        mode: NULLABLE
        type: STRING
      - column: last_name
        description: Last name
        mode: NULLABLE
        type: STRING
      - column: city
        description: City
        mode: NULLABLE
        type: STRING
      - column: state
        description: State
        mode: NULLABLE
        type: STRING
      - column: product
        description: Product
        mode: NULLABLE
        type: STRING
      - column: amount
        description: Amount of transaction
        mode: NULLABLE
        type: FLOAT
    
  2. Google Cloud CLI を使用してスキーマを割り当てます。

    a. 次のコマンドを使用して、gcloud CLI を更新します。gcloud CLI のバージョンは 242.0.0 以降にする必要があります。

      gcloud components update
    

    b. コマンドライン ウィンドウで次のコマンドを実行します。project-id はプロジェクト ID に置き換え、path-to-filetransactions_schema.yaml ファイルのパスに置き換えます。

      gcloud data-catalog entries update \
        --lookup-entry='pubsub.topic.`project-id`.transactions' \
        --schema-from-file=path-to-file/transactions_schema.yaml
    

    コマンドのパラメータと許可されるスキーマ ファイルの形式については、gcloud data-catalog entries update のドキュメント ページをご覧ください。

    c. transactions Pub/Sub トピックにスキーマが正常に割り当てられたことを確認します。project-id は、実際のプロジェクト ID に置き換えます。

      gcloud data-catalog entries lookup 'pubsub.topic.`project-id`.transactions'
    

Pub/Sub ソースを検索する

Dataflow SQL UI では、アクセス権を持つプロジェクトの Pub/Sub データソース オブジェクトを検索できます。フルネームを覚える必要がありません。

このチュートリアルの例では、Dataflow SQL エディタに移動して、作成した transactions Pub/Sub トピックを検索します。

  1. SQL ワークスペースに移動します。

  2. [Dataflow SQL 編集者] パネルの検索バーで、projectid=project-id transactions を検索します。project-id は、実際のプロジェクト ID に置き換えます。

    Dataflow SQL ワークスペースの Data Catalog 検索パネル。

スキーマを表示する

  1. Dataflow SQL UI の [Dataflow SQL 編集者] パネルで [トランザクション] をクリックするか、「projectid=project-id system=cloud_pubsub」と入力して Pub/Sub トピックを検索し、トピックを選択します。
  2. [スキーマ] の下に、Pub/Sub トピックに割り当てられたスキーマが表示されます。

    フィールド名のリストとその説明を含む、トピックに割り当てられたスキーマ。

SQL クエリを作成する

Dataflow SQL UI で、Dataflow ジョブに実行する SQL クエリを作成します。

次の SQL クエリはデータ拡充クエリです。州と販売地域をマッピングする BigQuery テーブル(us_state_salesregions)を使用して、sales_region フィールドを Pub/Sub イベント ストリーム(transactions)に新たに追加します。

次の SQL クエリをコピーして、クエリエディタに貼り付けます。project-id は実際のプロジェクト ID に置き換えます。

SELECT tr.*, sr.sales_region
FROM pubsub.topic.`project-id`.transactions as tr
  INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
  ON tr.state = sr.state_code

Dataflow SQL UI でクエリを入力すると、クエリ検証ツールによりクエリ構文が検証されます。クエリが有効な場合、緑色のチェックマーク アイコンが表示されます。クエリが無効な場合は、赤色の感嘆符アイコンが表示されます。クエリ構文が無効な場合は、検証ツールアイコンをクリックすると、修正が必要な箇所に関する情報が表示されます。

次のスクリーンショットでは、クエリエディタに有効なクエリが入力されています。緑色のチェックマークが表示されています。

エディタにチュートリアルのクエリが表示されている Dataflow SQL ワークスペース。

SQL クエリを実行する Dataflow ジョブを作成する

SQL クエリを実行するには、Dataflow SQL UI から Dataflow ジョブを作成します。

  1. クエリエディタで、[ジョブを作成] をクリックします。

  2. 表示された [Dataflow ジョブの作成] パネルで、次の操作を行います。

    • [Destination] で、[BigQuery] を選択します。
    • データセット ID に [dataflow_sql_tutorial] を選択します。
    • テーブル名に「sales」と入力します。
    Dataflow SQL ジョブフォームを作成する。
  3. (省略可)Dataflow では、Dataflow SQL ジョブに最適な設定が自動的に選択されますが、オプション パラメータのメニューを開いて、次のパイプライン オプションを手動で指定することもできます。

    • ワーカーの最大数
    • ゾーン
    • サービス アカウントのメール
    • マシンタイプ
    • 追加テスト
    • ワーカー IP アドレスの構成
    • ネットワーク
    • サブネットワーク
  4. [作成] をクリックします。Dataflow ジョブの実行が開始されるまでに数分かかります。

Dataflow ジョブを表示する

Dataflow は、SQL クエリを Apache Beam パイプラインに変換します。[ジョブを表示] をクリックして Dataflow ウェブ UI を開き、パイプラインのグラフィカル表現を表示します。

Dataflow ウェブ UI に表示された SQL クエリからのパイプライン。

パイプライン内で発生している変換の詳細を確認するには、それぞれのボックスをクリックします。たとえば、「Run SQL Query」というラベルのボックスをクリックすると、バックグラウンドで実行されたオペレーションが表示されます。

最初の 2 つのボックスは、結合した 2 つの入力を表しています。1 つは Pub/Sub トピックの transactions で、もう 1 つは BigQuery テーブルの us_state_salesregions です。

2 つの入力の結合が 25 秒で完了している書き込み出力。

ジョブの結果が含まれる出力テーブルを表示するには、BigQuery UI に移動します。[エクスプローラ] パネルのプロジェクトで、作成した dataflow_sql_tutorial データセットをクリックします。出力テーブル sales をクリックします。出力テーブルの内容が [プレビュー] タブに表示されます。

sales のプレビュー テーブルには、tr_time_str、first_name、last_name、city、state、product、amount、sales_region の列があります。

以前に実行されたジョブを表示してクエリを編集する

Dataflow UI では、過去のジョブとクエリが Dataflow の [ジョブ] ページに保存されます。

ジョブ履歴リストを使用して、以前の SQL クエリを確認できます。たとえば、販売地域別の販売数を 15 秒ごとに集計するようにクエリを変更できます。[ジョブ] ページを使用して、このチュートリアルで開始した実行中のジョブにアクセスし、SQL クエリをコピーして、変更したクエリを使用して別のジョブを実行します。

  1. Dataflow の [ジョブ] ページで、編集するジョブをクリックします。

  2. [ジョブの詳細] ページの [ジョブ情報] パネルにある [パイプライン オプション] で SQL クエリを見つけます。queryString の行を見つけます。

    queryString という名前のジョブ パイプライン オプション。
  3. 次の SQL クエリをコピーして SQL ワークスペースの [Dataflow SQL 編集者] に貼り付け、タンブリング ウィンドウを追加します。project-id は、実際のプロジェクト ID に置き換えます。

     SELECT
       sr.sales_region,
       TUMBLE_START("INTERVAL 15 SECOND") AS period_start,
       SUM(tr.amount) as amount
     FROM pubsub.topic.`project-id`.transactions AS tr
       INNER JOIN bigquery.table.`project-id`.dataflow_sql_tutorial.us_state_salesregions AS sr
       ON tr.state = sr.state_code
     GROUP BY
       sr.sales_region,
       TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
    
  4. 変更したクエリを使用して新しいジョブを作成するには、[ジョブを作成] をクリックします。

クリーンアップ

このチュートリアルで使用したリソースについて、Cloud 請求先アカウントに課金されないようにする手順は次のとおりです。

  1. transactions_injector.py 公開スクリプトが実行されている場合は、それを停止します。

  2. 実行中の Dataflow ジョブを停止します。Google Cloud コンソールで Dataflow ウェブ UI に移動します。

    Dataflow ウェブ UI に移動

    このチュートリアルで作成したジョブごとに、次の操作を行います。

    1. ジョブの名前をクリックします。

    2. [ジョブの詳細] ページで、[停止] をクリックします。[ジョブの停止] ダイアログが開き、ジョブの停止方法に関する選択肢が表示されます。

    3. [キャンセル] を選択します。

    4. [ジョブを停止] をクリックします。サービスはすべてのデータの取り込みと処理をできる限り早く停止します。[キャンセル] は処理を即時に停止するため、処理中のデータが失われる可能性があります。ジョブの停止には数分かかることがあります。

  3. BigQuery データセットを削除します。Google Cloud コンソールで BigQuery ウェブ UI に移動します。

    BigQuery ウェブ UI に移動

    1. [エクスプローラ] パネルの [リソース] セクションで、作成した dataflow_sql_tutorial データセットをクリックします。

    2. 詳細パネルで [削除] をクリックします。確認ダイアログが表示されます。

    3. [データセットの削除] ダイアログ ボックスで「delete」と入力して削除コマンドを確認し、[削除] をクリックします。

  4. Pub/Sub トピックを削除します。Google Cloud コンソールで Pub/Sub の [トピック] ページに移動します。

    Pub/Sub の [トピック] ページに移動

    1. transactions」トピックを選択します。

    2. [削除] をクリックして、トピックを完全に削除します。確認ダイアログが表示されます。

    3. [トピックの削除] ダイアログ ボックスで「delete」を入力して削除コマンドを確認し、[削除] をクリックします。

    4. Pub/Sub サブスクリプション ページに移動します。

    5. transactions の残りのサブスクリプションを選択します。実行中のジョブがない場合、サブスクリプションが表示されない可能性があります。

    6. [削除] をクリックして、サブスクリプションを完全に削除します。確認ダイアログで [削除] をクリックします。

  5. Cloud Storage で Dataflow ステージング バケットを削除します。Google Cloud コンソールで Cloud Storage の [バケット] ページに移動します。

    [バケット] に移動

    1. Dataflow ステージング バケットを選択します。

    2. [削除] をクリックして、バケットを完全に削除します。確認ダイアログが表示されます。

    3. [バケットの削除] ダイアログ ボックスで「DELETE」と入力して削除コマンドを確認し、[削除] をクリックします。

次のステップ