このページでは、Dataflow SQL の gcloud
コマンドライン ツールを使用して Dataflow ジョブを作成する方法について説明します。Dataflow ジョブは、Dataflow SQL クエリの結果を BigQuery データセットのテーブルに書き込みます。
始める前に
- Google アカウントにログインします。
Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。
-
Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
-
Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する。
- Dataflow, Compute Engine, Logging, Cloud Storage, Cloud Storage JSON, BigQuery, Pub/Sub, Resource Manager, and Data Catalog API を有効にします。
- Cloud SDK をインストールし、初期化します。
BigQuery データセットを作成する
taxirides
という名前の BigQuery データセットを作成します。
bq mk taxirides
Pub/Sub トピックのクエリを行う
一般公開の Pub/Sub トピック taxirides-realtime
に乗客数を 10 秒ごとにクエリします。
gcloud beta dataflow sql query \
--job-name=dataflow-sql-quickstart \
--region=us-central1 \
--bigquery-dataset=taxirides \
--bigquery-table=passengers_per_minute \
'SELECT
TUMBLE_START("INTERVAL 10 SECOND") as period_start,
SUM(passenger_count) AS pickup_count,
FROM pubsub.topic.`pubsub-public-data`.`taxirides-realtime`
WHERE
ride_status = "pickup"
GROUP BY
TUMBLE(event_timestamp, "INTERVAL 10 SECOND")'
クエリ結果を表示する
Dataflow ジョブが実行されていることを確認します。
Dataflow モニタリング インターフェースに移動します。
ジョブのリストで、[dataflow-sql-quickstart]をクリックします。
[ジョブ情報] パネルで、[ジョブのステータス] 項目が [実行中] に設定されていることを確認します。
ジョブの開始には数分かかることがあります。ジョブが開始されるまで、[ジョブ ステータス] は [キューに格納済み] に設定されています。
[ジョブグラフ] タブで、各ステップが少なくとも 1 秒実行されていることを確認します。
ジョブの開始後、ステップの実行が開始されるまでに数分かかることがあります。
passengers_per_minute
テーブルから最も混雑した時間帯を返します。bq query \ 'SELECT * FROM taxirides.passengers_per_minute ORDER BY pickup_count DESC LIMIT 5'
クリーンアップ
このクイックスタートで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の手順を行います。
taxirides
データセットを削除します。bq rm
コマンドを実行します。bq rm taxirides
確定するには「
y
」と入力します。
Dataflow ジョブをキャンセルします。
Dataflow モニタリング インターフェースに移動します。
ジョブのリストで、[dataflow-sql-quickstart]をクリックします。
[停止] > [キャンセル] > [ジョブの停止] をクリックします。
次のステップ
- Dataflow SQL の使用について学習する。
- Dataflow SQL によるストリーミング データの結合チュートリアルを読む。
- Dataflow SQL クエリでのデータソースとデータ デスティネーションの使用について読む。
- Dataflow SQL の
gcloud
コマンドライン ツールについて調べる。