このページでは、Google 提供の Dataflow テンプレートを使用してストリーミング パイプラインを作成する方法を説明します。具体的には、このページでは例として Pub/Sub Topic to BigQuery テンプレートを使用します。
始める前に
- Google アカウントにログインします。
Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。
-
Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。
-
Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する。
- Cloud Dataflow、Compute Engine、Stackdriver Logging、Cloud Storage、 Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Resource Manager API を有効にします。
- Cloud Storage バケットを作成する:
- Cloud Console の [Cloud Storage ブラウザ] ページに移動します。
- [バケットを作成] をクリックします。
- [バケットの作成] ダイアログ内で、以下の属性を指定します。
- 名前: 一意のバケット名。バケットの名前空間はグローバルであり、一般公開されるため、バケット名に機密情報を含めないでください。
- デフォルトのストレージ クラス: Standard
- バケットデータが保存されるロケーション
- [作成] をクリックします。
トピックの作成
- ウェブ UI の Pub/Sub トピックページに移動します。
[Pub/Sub トピック] ページに移動 - add [トピックを作成] をクリックします。
- [トピック ID] フィールドに、一意のトピック名を入力します(例:
taxirides-realtime
)。 - [保存] をクリックします。
BigQuery データセットとテーブルを作成する
自身の Cloud Pub/Sub トピックに適したスキーマを使用して、Google Cloud Shell または GCP Console で BigQuery データセットとテーブルを作成します。
この例で、データセットの名前は taxirides
、テーブルの名前は realtime
です。
Cloud Shell の使用
Cloud Shell を使用してデータセットとテーブルを作成します。
- 次のコマンドを実行して、データセットを作成します。
bq mk taxirides
出力は次のようになります。Dataset “myprojectid:taxirides” successfully created
- 次のコマンドを実行して、テーブルを作成します。
bq mk \ --time_partitioning_field timestamp \ --schema ride_id:string,point_idx:integer,latitude:float,longitude:float,\ timestamp:timestamp,meter_reading:float,meter_increment:float,ride_status:string,\ passenger_count:integer -t taxirides.realtime
出力は次のようになります。Table “myprojectid:taxirides.realtime” successfully created
テーブルは、クエリ費用を削減してパフォーマンスを向上させるために分割されます。
Google Cloud Platform Console の使用
Google Cloud Console を使用して、データセットとテーブルを作成します。
- BigQuery ウェブ UI に移動します。
BigQuery ウェブ UI に移動 - ナビゲーションでプロジェクト名の横にある下矢印アイコンをクリックし、[Create new dataset] をクリックします。データセット ID として「
taxirides
」を入力します。データセット ID は、プロジェクトごとに一意です。疑問符アイコンをクリックすると、ID の制限について確認できます。
- その他のデフォルト設定はそのままにして、[OK] をクリックします。
- ナビゲーションで、作成したデータセット ID の上にポインタを置きます。ID の横にある下矢印アイコンをクリックし、[Create new table] をクリックします。
- [Source Data] の横にある [Create empty table] オプションを選択します。
- [Destination Table] で
taxirides
を選択し、「realtime
」と入力します。 - [Schema] で [Edit as Text] を選択し、次のように入力します。
ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp, meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
- [Options] の [Partitioning Type] フィールドで [Day] オプションを選択します。
- [Options] の [Partitioning Field] セレクタで [timestamp] 列を選択します。
- [テーブルを作成] ボタンをクリックします。

パイプラインの実行
Google が提供する Pub/Sub Topic to BigQuery テンプレートを使用して、ストリーミング パイプラインを実行します。
- Dataflow ウェブ UI に移動に移動します。
Cloud Dataflow ウェブ UI に移動 - [テンプレートからジョブを作成] をクリックします。
- Dataflow ジョブの [ジョブ名] を入力します。
- [Dataflow テンプレート] で、[Cloud Pub/Sub Topic to BigQuery] テンプレートを選択します。
- [Pub/Sub 入力トピック] に、「
projects/pubsub-public-data/topics/taxirides-realtime
」を入力します。パイプラインは入力トピックから受信データを取得します。 - [BigQuery 出力テーブル] に「
myprojectid:taxirides.realtime
」を入力します。 - [一時的なロケーション] に「
gs://mybucket/temp/
」を入力します。これは、ステージング済みパイプライン ジョブと同様に、一時ファイルを保存するためのサブフォルダです。 - [ジョブを実行] ボタンをクリックします。
- BigQuery に書き込まれたデータを表示します。BigQuery ウェブ UI に移動します。
BigQuery ウェブ UI に移動
クエリは、標準 SQL を使用して送信できます。たとえば次のクエリでは、過去 24 時間以内に追加されたすべての行が選択されます。SELECT * FROM `myprojectid.taxirides.realtime` WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY) LIMIT 1000

クリーンアップ
このクイックスタートで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の手順を行います。
- Dataflow ウェブ UI に移動に移動します。
Cloud Dataflow ウェブ UI に移動 - BigQuery ウェブ UI に移動します。
BigQuery ウェブ UI に移動- ナビゲーションで、作成した taxirides データセットの上にカーソルを合わせます。
- ナビゲーションのデータセット名の横にある下矢印アイコンをクリックし、[データセットの削除] をクリックします。
- [データセットの削除] ダイアログ ボックスでデータセットの名前(taxirides)を入力し、[OK] をクリックして確定します。