テンプレートを使用したクイックスタート

このページでは、Google 提供の Dataflow テンプレートを使用してストリーミング パイプラインを作成する方法を説明します。具体的には、このページでは例として Pub/Sub Topic to BigQuery テンプレートを使用します。

始める前に

  1. Google アカウントにログインします。

    Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。

  2. Cloud Console のプロジェクト セレクタページで、Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタのページに移動

  3. Google Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  4. Cloud Dataflow、Compute Engine、Stackdriver Logging、Cloud Storage、 Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Resource Manager API を有効にします。

    API を有効にする

  5. Cloud Storage バケットを作成する:
    1. Cloud Console で、[Cloud Storage ブラウザ] ページに移動します。

      Cloud Storage ブラウザページに移動

    2. [バケットを作成] をクリックします。
    3. [バケットの作成] ダイアログ内で、以下の属性を指定します。
      • 名前: 一意のバケット名。バケットの名前空間はグローバルであり、一般公開されるため、バケット名に機密情報を含めないでください。
      • デフォルトのストレージ クラス: Standard
      • バケットデータが保存されるロケーション
    4. [作成] をクリックします。

BigQuery データセットとテーブルを作成する

自身の Cloud Pub/Sub トピックに適したスキーマを使用して、Google Cloud Shell または GCP Console で BigQuery データセットテーブルを作成します。

この例で、データセットの名前は taxiridesテーブルの名前は realtime です。

Cloud Shell の使用

Cloud Shell を使用してデータセットとテーブルを作成します。

  1. 次のコマンドを実行して、データセットを作成します。
    bq mk taxirides
    出力は次のようになります。
    Dataset “myprojectid:taxirides” successfully created
  2. 次のコマンドを実行して、テーブルを作成します。
    bq mk \
    --time_partitioning_field timestamp \
    --schema ride_id:string,point_idx:integer,latitude:float,longitude:float,\
    timestamp:timestamp,meter_reading:float,meter_increment:float,ride_status:string,\
    passenger_count:integer -t taxirides.realtime
    出力は次のようになります。
    Table “myprojectid:taxirides.realtime” successfully created

    テーブルは、クエリ費用を削減してパフォーマンスを向上させるために分割されます。

Google Cloud Platform Console の使用

Google Cloud Console を使用して、データセットとテーブルを作成します。

  1. BigQuery ウェブ UI に移動します。
    BigQuery ウェブ UI に移動
  2. ナビゲーションでプロジェクト名の横にある下矢印アイコンをクリックし、[Create new dataset] をクリックします。データセット ID として「taxirides」を入力します。

    BigQuery UI のデータセット作成ボタン。

    データセット ID は、プロジェクトごとに一意です。疑問符アイコンをクリックすると、ID の制限について確認できます。

  3. その他のデフォルト設定はそのままにして、[OK] をクリックします。
  4. ナビゲーションで、作成したデータセット ID の上にポインタを置きます。ID の横にある下矢印アイコンをクリックし、[Create new table] をクリックします。
  5. [Source Data] の横にある [Create empty table] オプションを選択します。
  6. [Destination Table] で taxirides を選択し、「realtime」と入力します。
  7. [Schema] で [Edit as Text] を選択し、次のように入力します。
    ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,
    meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
  8. [Options] の [Partitioning Type] フィールドで [Day] オプションを選択します。
  9. [Options] の [Partitioning Field] セレクタで [timestamp] 列を選択します。
  10. [テーブルを作成] ボタンをクリックします。
  11. BigQuery の設定

パイプラインの実行

Google が提供する Pub/Sub Topic to BigQuery テンプレートを使用して、ストリーミング パイプラインを実行します。

  1. Dataflow ウェブ UI に移動に移動します。
    Cloud Dataflow ウェブ UI に移動
  2. [テンプレートからジョブを作成] をクリックします。
  3. Dataflow ジョブの [ジョブ名] を入力します。
  4. [Dataflow テンプレート] で、[Cloud Pub/Sub Topic to BigQuery] テンプレートを選択します。
  5. [Pub/Sub 入力トピック] に、「projects/pubsub-public-data/topics/taxirides-realtime」を入力します。パイプラインは入力トピックから受信データを取得します。
  6. [BigQuery 出力テーブル] に「<myprojectid>:taxirides.realtime」を入力します。
  7. [一時的なロケーション] に「gs://<mybucket>/tmp/」を入力します。これは、ステージング済みパイプライン ジョブと同様に、一時ファイルを保存するためのサブフォルダです。
  8. [ジョブを実行] ボタンをクリックします。
  9. Cloud Dataflow でのジョブの作成
  10. BigQuery に書き込まれたデータを表示します。BigQuery ウェブ UI に移動します。
    BigQuery ウェブ UI に移動
    クエリは、標準 SQL を使用して送信できます。たとえば次のクエリでは、過去 24 時間以内に追加されたすべての行が選択されます。
    SELECT * FROM `myprojectid.taxirides.realtime`
    WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
    LIMIT 1000

クリーンアップ

このクイックスタートで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の手順を行います。

  1. Dataflow ウェブ UI に移動に移動します。
    Cloud Dataflow ウェブ UI に移動
    1. ストリーミング ジョブは、Google Cloud Console のジョブリストから選択する必要があります。
    2. ナビゲーションで、[キャンセル] をクリックします。
    3. [キャンセル] ダイアログ ボックスで、パイプラインの処理として [キャンセル] または [ドレイン] のいずれかを選択します。
  2. BigQuery ウェブ UI に移動します。
    BigQuery ウェブ UI に移動
    1. ナビゲーションで、作成した taxirides データセットの上にカーソルを合わせます。
    2. ナビゲーションのデータセット名の横にある下矢印アイコンをクリックし、[データセットの削除] をクリックします。
    3. [データセットの削除] ダイアログ ボックスでデータセットの名前(taxirides)を入力し、[OK] をクリックして確定します。

次のステップ