テンプレートを使用したクイックスタート
このドキュメントでは、Google 提供の Dataflow テンプレートを使用してストリーミング パイプラインを作成する方法を説明します。具体的には、例として Pub/Sub Topic to BigQuery テンプレートを使用します。
Pub/Sub Topic to BigQuery テンプレートは、Pub/Sub トピックから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。
このタスクを Google Cloud コンソールで直接行う際の順を追ったガイダンスについては、[ガイドを表示] をクリックしてください。
以降のセクションでは、[ガイドを表示] をクリックした場合と同じ手順について説明します。
始める前に
- Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
-
Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
-
Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager API を有効にします。
-
Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
-
Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager API を有効にします。
- Cloud Storage バケットを作成します。
- Google Cloud コンソールで、Cloud Storage の [バケット] ページに移動します。
- [バケットを作成] をクリックします。
- [バケットの作成] ページでユーザーのバケット情報を入力します。次のステップに進むには、[続行] をクリックします。
- [作成] をクリックします。
- 次のものをコピーします。これらは以後のセクションで使用されます。
- Cloud Storage バケット名。
- Google Cloud プロジェクト ID。
ID を調べる方法については、プロジェクトの識別をご覧ください。
BigQuery データセットとテーブルを作成する
Google Cloud コンソールで Pub/Sub トピックに適したスキーマを使用して、BigQuery のデータセットとテーブルを作成します。
この例で、データセットの名前は taxirides
、テーブルの名前は realtime
です。このデータセットとテーブルを作成するには、以下の手順を行います。
- Google Cloud コンソールで [BigQuery] ページに移動します。
[BigQuery] に移動 - [エクスプローラ] パネルで、データセットを作成するプロジェクトの横にある [ アクションを表示] をクリックしてから、[データセットを作成] をクリックします。
- [データセットを作成] パネルで、次の操作を行います。
- [データセット ID] に「
taxirides
」と入力します。 - [データのロケーション] で [米国(US)] を選択します。一般公開データセットは
US
マルチリージョン ロケーションに保存されています。わかりやすくするため、データセットを同じロケーションに配置します。 - その他のデフォルト設定はそのままにして、[データセットを作成] をクリックします。
- [
エクスプローラ ] パネルで、プロジェクトを展開します。 taxirides
データセットの隣にある [ アクションを表示] をクリックし、[開く] をクリックします。- 詳細パネルで [ テーブルを作成] をクリックします。
- [テーブルの作成] パネルで、次の操作を行います。
- [ソース] セクションの [テーブルの作成元] で [空のテーブル] を選択します。
- [送信先] セクションの [テーブル名] に「
realtime
」と入力します。 - [スキーマ] セクションで [テキストとして編集] をクリックし、次のスキーマ定義をボックスに貼り付けます。
ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp, meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
- [パーティショニングとクラスタの設定] セクションの [パーティショニング] で、[タイムスタンプ] フィールドを選択します。
- その他のデフォルト設定はそのままにして、[テーブルを作成] をクリックします。
パイプラインを実行する
Google が提供する Pub/Sub Topic to BigQuery テンプレートを使用して、ストリーミング パイプラインを実行します。パイプラインは入力トピックから受信データを取得します。
- Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。
[ジョブ] に移動 - [
テンプレートからジョブを作成 ] をクリックします。 - Dataflow ジョブの [ジョブ名] として「
taxi-data
」と入力します。 - [Dataflow テンプレート] で、[Pub/Sub Topic to BigQuery] テンプレートを選択します。
- [Input Pub/Sub topic] に、次のように入力します。
projects/pubsub-public-data/topics/taxirides-realtime
一般公開されている Pub/Sub トピックは、NYC Taxi & Limousine Commission のオープン データセットに基づいています。このトピックの JSON 形式のサンプル メッセージを次に示します。
{ "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e", "point_idx": 217, "latitude": 40.75399, "longitude": -73.96302, "timestamp": "2021-03-08T02:29:09.66644-05:00", "meter_reading": 6.293821, "meter_increment": 0.029003782, "ride_status": "enroute", "passenger_count": 1 }
- [BigQuery 出力テーブル] に、次のテキストを入力します。
PROJECT_ID:taxirides.realtime
PROJECT_ID
は、BigQuery データセットを作成したプロジェクトのプロジェクト ID に置き換えます。 - [一時的なロケーション] に、次のように入力します。
gs://BUCKET_NAME/temp/
BUCKET_NAME
を Cloud Storage バケットの名前に置き換えます。temp
フォルダには、ステージングされたパイプライン ジョブなどの一時ファイルが格納されます。 - [ジョブを実行] をクリックします。
結果を表示する
realtime
テーブルに書き込まれたデータを表示する方法は次のとおりです。
Google Cloud コンソールで [BigQuery] ページに移動します。
次のクエリをクエリエディタに貼り付けます。
SELECT * FROM `PROJECT_ID.taxirides.realtime` WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY) LIMIT 1000
PROJECT_ID
は、BigQuery データセットを作成したプロジェクトのプロジェクト ID に置き換えます。テーブルにデータが表示されるまで、最大で 1 分かかることがあります。[実行] をクリックします。
クエリは、過去 24 時間以内にテーブルに追加された行を返します。標準 SQL を使用してクエリを実行することもできます。
クリーンアップ
このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の手順を行います。
プロジェクトの削除
課金を停止する最も簡単な方法は、クイックスタート用に作成した Google Cloud プロジェクトを削除することです。- Google Cloud コンソールで、[リソースの管理] ページに移動します。
- プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
- ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。
個々のリソースの削除
このクイックスタートで使用した Google Cloud プロジェクトを残しておく場合は、個々のリソースを削除します。
- Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。
[ジョブ] に移動 - ジョブリストからストリーミング ジョブを選択します。
- ナビゲーションで、[停止] をクリックします。
- [ジョブの停止] ダイアログで、パイプラインを [キャンセル] または [ドレイン] し、[ジョブの停止] をクリックします。
- Google Cloud コンソールで [BigQuery] ページに移動します。
[BigQuery] に移動 - [エクスプローラ] パネルで、プロジェクトを展開します。
- 削除するデータセットの横にある [ アクションを表示] をクリックし、[開く] をクリックします。
- 詳細パネルで [データセットを削除] をクリックして、指示に従って操作します。
- Google Cloud コンソールで、Cloud Storage の [バケット] ページに移動します。
- 削除するバケットのチェックボックスをクリックします。
- バケットを削除するには、 [削除] をクリックして、指示に沿って操作します。