このチュートリアルでは、Dataflow SQL を使用して Pub/Sub のデータ ストリームを BigQuery テーブルのデータと結合する方法について説明します。
目標
このチュートリアルの内容は次のとおりです。
- Pub/Sub ストリーミング データと BigQuery テーブルデータを結合する Dataflow SQL クエリを作成します。
- Dataflow SQL UI から Dataflow ジョブをデプロイします。
費用
このチュートリアルでは、Google Cloud の課金対象となる以下のコンポーネントを使用します。
- Dataflow
- Cloud Storage
- Pub/Sub
料金計算ツールを使うと、予想使用量に基づいて費用の見積もりを出すことができます。新しい Google Cloud ユーザーは無料トライアルをご利用いただける場合があります。
始める前に
- Google アカウントにログインします。
Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。
-
Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
-
Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する。
- Cloud Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Cloud Pub/Sub, and Cloud Resource Manager API を有効にします。
- 認証の設定:
-
Cloud Console で、[サービス アカウント キーの作成] ページに移動します。
[サービス アカウント キーの作成] ページに移動 - [サービス アカウント] リストから [新しいサービス アカウント] を選択します。
- [サービス アカウント名] フィールドに名前を入力します。
[ロール] リストから、プロジェクト > オーナー
- [作成] をクリックします。キーが含まれている JSON ファイルがパソコンにダウンロードされます。
-
-
環境変数
GOOGLE_APPLICATION_CREDENTIALS
を、サービス アカウント キーが含まれる JSON ファイルのパスに設定します。 この変数は現在のシェル セッションにのみ適用されるため、新しいセッションを開く場合は、変数を再度設定します。 - Cloud SDK をインストールし、初期化します。インストール オプションを、次の中から選択します。場合によっては、このチュートリアルで使用するプロジェクトに
project
プロパティを設定する必要があります。 - Cloud Console の BigQuery ウェブ UI に移動します。最後にアクセスしたプロジェクトが開きます。別のプロジェクトに切り替えるには、BigQuery ウェブ UI の先頭に表示されているプロジェクト名をクリックし、使用するプロジェクトを検索します。
BigQuery ウェブ UI に移動
Dataflow SQL UI に切り替える
BigQuery ウェブ UI で、次の手順に沿って Dataflow UI に切り替えます。
[展開] プルダウン メニューを開いて、[クエリの設定] を選択します。
右側に表示された [クエリの設定] メニューで、[Dataflow エンジン] を選択します。
プロジェクトで Dataflow と Data Catalog API が有効になっていない場合、これらを有効にするように求められます。[API を有効にする] をクリックします。Dataflow API と Data Catalog API の有効化には数分かかる場合があります。
API が有効になったら [保存] をクリックします。
サンプルソースを作成する
サンプルで使用するソースを作成します。このチュートリアルのサンプルでは、次のソースを使用します。
transactions
という名前の Pub/Sub トピック - Pub/Sub トピックのサブスクリプションを介して送信されるトランザクション データのストリーム。各トランザクションのデータには、購入した商品、販売価格、購入場所などの情報が含まれています。Pub/Sub トピックを作成したら、トピックにメッセージを公開するスクリプトを作成します。このチュートリアルの後半で、このスクリプトを実行します。us_state_salesregions
という名前の BigQuery テーブル - 州と販売地域をマッピングするテーブル。このテーブルを作成する前に、BigQuery データセットを作成する必要があります。
Pub/Sub ソースを検索する
Dataflow SQL UI では、アクセス権を持つプロジェクトの Pub/Sub データソース オブジェクトを検索できます。フルネームを覚える必要がありません。
このチュートリアルの例では、作成した transactions
Pub/Sub トピックを追加します。
左側のナビゲーション パネルで、[データを追加] プルダウン リストをクリックして [Cloud Dataflow のソース] を選択します。
右側に表示される [Cloud Dataflow ソースを追加] パネルで、[Pub/Sub トピック] を選択します。検索ボックスで、「
transactions
」を検索します。トピックを選択して [追加] をクリックします。
Pub/Sub トピックにスキーマを割り当てる
スキーマを割り当てると、Pub/Sub トピックデータに SQL クエリを実行できます。現在、Dataflow SQL では、Pub/Sub トピックのメッセージは JSON 形式でシリアル化されます。今後、他の形式(Avro など)のサポートが追加される予定です。
サンプルの Pub/Sub トピックを Dataflow ソースとして追加した後、次の手順を実行して Dataflow SQL UI のトピックにスキーマを割り当てます。
[リソース] パネルでトピックを選択します。
[スキーマ] タブで [スキーマを編集] をクリックします。右側に [スキーマ] サイドパネルが開きます。
[テキストとして編集] ボタンを切り替え、次のインライン スキーマをエディタに貼り付けます。[送信] をクリックします。
[ { "description": "Pub/Sub event timestamp", "name": "event_timestamp", "mode": "REQUIRED", "type": "TIMESTAMP" }, { "description": "Transaction time string", "name": "tr_time_str", "type": "STRING" }, { "description": "First name", "name": "first_name", "type": "STRING" }, { "description": "Last name", "name": "last_name", "type": "STRING" }, { "description": "City", "name": "city", "type": "STRING" }, { "description": "State", "name": "state", "type": "STRING" }, { "description": "Product", "name": "product", "type": "STRING" }, { "description": "Amount of transaction", "name": "amount", "type": "FLOAT64" } ]
(省略可)[トピックをプレビュー] をクリックしてメッセージの内容を確認し、定義したスキーマと一致していることを確認します。
スキーマを表示する
- Dataflow SQL UI の左側のナビゲーション パネルで、[Cloud Dataflow のソース] をクリックします。
- [Pub/Sub トピック] をクリックします。
- [トランザクション] をクリックします。
- [スキーマ] の下に、
transactions
Pub/Sub トピックに割り当てられたスキーマが表示されます。

SQL クエリを作成する
Dataflow SQL UI で、Dataflow ジョブに実行する SQL クエリを作成します。
次の SQL クエリはデータ拡充クエリです。州と販売地域をマッピングする BigQuery テーブル(us_state_salesregions
)を使用して、sales_region
フィールドを Pub/Sub イベント ストリーム(transactions
)に新たに追加します。
次の SQL クエリをコピーして、クエリエディタに貼り付けます。project-id は実際のプロジェクト ID に置き換えます。
SELECT tr.*, sr.sales_region FROM pubsub.topic.`project-id`.transactions as tr INNER JOIN bigquery.table.`project-id`.dataflow_sql_dataset.us_state_salesregions AS sr ON tr.state = sr.state_code
Dataflow SQL UI でクエリを入力すると、クエリ検証ツールによりクエリ構文が検証されます。クエリが有効な場合、緑色のチェックマーク アイコンが表示されます。クエリが無効な場合は、赤色の感嘆符アイコンが表示されます。クエリ構文が無効な場合は、検証ツールアイコンをクリックすると、修正が必要な箇所に関する情報が表示されます。
次のスクリーンショットでは、クエリエディタに有効なクエリが入力されています。緑色のチェックマークが表示されています。

SQL クエリを実行する Dataflow ジョブを作成する
SQL クエリを実行するには、Dataflow SQL UI から Dataflow ジョブを作成します。
クエリエディタで [Dataflow ジョブを作成] をクリックします。
右側に表示された [Dataflow ジョブを作成] パネルで、デフォルトのテーブル名を
dfsqltable_sales
に変更します。(省略可)Dataflow では、Dataflow SQL ジョブに最適な設定が自動的に選択されますが、オプション パラメータのメニューを展開して、次のパイプライン オプションを手動で指定することもできます。
- ワーカーの最大数
- ゾーン
- サービス アカウントのメール
- マシンタイプ
- 追加テスト
- ワーカー IP アドレスの構成
- ネットワーク
- サブネットワーク
[作成] をクリックします。数分後に Dataflow ジョブが開始します。
UI に [クエリ結果] パネルが表示されます。後でジョブの [クエリ結果] パネルに戻るには、[ジョブの履歴] パネルでジョブを検索し、[クエリをエディタで開く] ボタンを使用します。詳しくは、Dataflow ジョブと出力を表示するをご覧ください。
[ジョブ情報] でジョブ ID のリンクをクリックします。これで、Dataflow ウェブ UI で新しいブラウザタブが開き、Dataflow ジョブの詳細ページが表示されます。
Dataflow ジョブと出力を表示する
Dataflow は、SQL クエリを Apache Beam パイプラインに変換します。新しいブラウザタブに表示された Dataflow ウェブ UI で、パイプラインの詳細を確認できます。

ボックスをクリックすると、パイプラインで発生した変換の詳細が表示されます。たとえば、Run SQL Query というラベルのボックスをクリックすると、バックグラウンドで実行されたオペレーションが表示されます。
上の 2 つのボックスは、結合した 2 つの入力を表しています。1 つは Pub/Sub トピックの transactions
で、もう 1 つは BigQuery テーブルの us_state_salesregions
です。

ジョブ結果が含まれる出力テーブルを表示するには、Dataflow SQL UI のブラウザタブに戻ります。左側のナビゲーション パネルで、プロジェクトの下に表示されている、作成した dataflow_sql_dataset
データセットをクリックします。続いて、出力テーブルの dfsqltable_sales
をクリックします。[プレビュー] タブに出力テーブルの内容が表示されます。

以前に実行されたジョブを表示してクエリを編集する
Dataflow SQL UI では、以前に実行されたジョブとクエリを [ジョブの履歴] パネルで確認できます。ジョブは、ジョブを開始した日付順に表示されます。ジョブリストでは、実行中のジョブがある日付が先に表示され、その後に、実行中のジョブがない日付が続きます。
ジョブ履歴リストを使用して以前の SQL クエリを編集し、新しい Dataflow ジョブを実行できます。たとえば、販売地域別の販売数を 15 秒ごとに集計するようにクエリを変更できます。ジョブの履歴パネルを使用して、このチュートリアルで開始した実行中のジョブにアクセスし、SQL クエリを変更して別のジョブとして実行します。
左側のナビゲーション パネルで、[ジョブの履歴] をクリックします。
[ジョブの履歴] で Cloud Dataflow をクリックします。プロジェクトで実行した以前のジョブがすべて表示されます。
編集するジョブをクリックします。[クエリエディタで開く] をクリックします。
[クエリエディタ] で SQL クエリを編集し、タンブリング ウィンドウを追加します。次のクエリをコピーする場合は、project-id を実際のプロジェクト ID に置き換えます。
SELECT sr.sales_region, TUMBLE_START("INTERVAL 15 SECOND") AS period_start, SUM(tr.amount) as amount FROM pubsub.topic.`project-id`.transactions AS tr INNER JOIN bigquery.table.`project-id`.dataflow_sql_dataset.us_state_salesregions AS sr ON tr.state = sr.state_code GROUP BY sr.sales_region, TUMBLE(tr.event_timestamp, "INTERVAL 15 SECOND")
クエリエディタで [Cloud Dataflow ジョブを作成] をクリックし、変更したクエリを使用して新しいジョブを作成します。
クリーンアップ
このチュートリアルで使用したリソースについて、Google Cloud アカウントに課金されないようにする手順は次のとおりです。
transactions_injector.py
公開スクリプトが実行されている場合は、それを停止します。実行中の Dataflow ジョブを停止します。Cloud Console で Dataflow ウェブ UI に移動します。
このチュートリアルで作成したジョブごとに、次の操作を行います。
ジョブの名前をクリックします。
ジョブの [ジョブの概要] パネルで [ジョブを停止] をクリックします。[ジョブの停止] ダイアログが開き、ジョブの停止方法に関する選択肢が表示されます。
[キャンセル] をクリックします。
[ジョブを停止] をクリックします。サービスはすべてのデータの取り込みと処理をできる限り早く停止します。[キャンセル] は処理を即時に停止するため、処理中のデータが失われる可能性があります。ジョブの停止には数分かかることがあります。
BigQuery データセットを削除します。Cloud Console の BigQuery ウェブ UI に移動します。
ナビゲーション パネルの [リソース] セクションで、作成した dataflow_sql_dataset データセットをクリックします。
詳細パネルの右側で [データセットを削除] をクリックします。この操作を行うと、データセット、テーブル、すべてのデータが削除されます。
[データセットの削除] ダイアログ ボックスでデータセットの名前(
dataflow_sql_dataset
)を入力して、[削除] をクリックします。
Pub/Sub トピックを削除します。Cloud Console の Pub/Sub トピックページに移動します。
transactions
トピックの横にあるチェックボックスにチェックを入れます。[削除] をクリックして、トピックを永続的に削除します。
Pub/Sub サブスクリプション ページに移動します。
transactions
の残っているサブスクリプションの横にあるチェックボックスにチェックを入れます。実行中のジョブがない場合、サブスクリプションが表示されない可能性があります。[削除] をクリックして、サブスクリプションを永続的に削除します。
Cloud Storage で Dataflow ステージング バケットを削除します。Cloud Console の Cloud Storage ブラウザに移動します。
Dataflow ステージング バケットの隣にあるチェックボックスをオンにします。
[削除] をクリックして、バケットを永続的に削除します。
次のステップ
- Dataflow SQL の概要を確認する。
- ストリーミング パイプラインの基本について学ぶ。
- データのソースと宛先の使用について読む。
- Dataflow SQL リファレンスを確認する。
- Cloud Next 2019 で行われたストリーミング分析のデモを見る。