テンプレートを使用したクイックスタート

このページでは、Google 提供の Dataflow テンプレートを使用してストリーミング パイプラインを作成する方法を説明します。具体的には、このページでは例として Pub/Sub Topic to BigQuery テンプレートを使用します。

Pub/Sub Topic to BigQuery テンプレートは、Pub/Sub トピックから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。

始める前に

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  4. Cloud Dataflow、Compute Engine、Stackdriver Logging、Cloud Storage、 Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Resource Manager API を有効にします。

    API を有効にする

  5. Cloud Storage バケットを作成する:
    1. Cloud Console で、Cloud Storage ブラウザページに移動します。

      [ブラウザ] に移動

    2. [バケットを作成] をクリックします。
    3. [バケットの作成] ページでユーザーのバケット情報を入力します。次のステップに進むには、[続行] をクリックします。
      • [バケット名] に、一意のバケット名を入力します。バケットの名前空間は世界中の誰でも閲覧可能なため、機密情報をバケット名に含めないようにしてください。
      • [データの保存場所の選択] で、次の操作を行います。
        • [ロケーション タイプ] オプションを選択します。
        • [ロケーション] オプションを選択します。
      • [データのデフォルトのストレージ クラスを選択する] で、次を選択します。 Standard.
      • [オブジェクトへのアクセスを制御する方法を選択する] で [アクセス制御] オプションを選択します。
      • [詳細設定(省略可)] には、暗号化メソッド保持ポリシー、またはバケットラベルを指定します。
    4. [作成] をクリックします。

BigQuery データセットとテーブルを作成する

Cloud Console で Pub/Sub トピックに適したスキーマを使用して、BigQuery のデータセットテーブルを作成します。

この例で、データセットの名前は taxiridesテーブルの名前は realtime です。 このデータセットとテーブルを作成するには、以下の手順を行います。

  1. Cloud Console で、[BigQuery] ページに移動します。
    BigQuery に移動
  2. [エクスプローラ] パネルで、データセットを作成するプロジェクトを選択します。
  3. 詳細パネルで [ データセットを作成] をクリックします。
  4. [データセットを作成] ページで次の操作を行います。
    1. [データセット ID] に taxirides を入力します。
    2. [データのロケーション] で [米国(US)] を選択します。現在、一般公開データセットは US マルチ リージョン ロケーションに保存されています。わかりやすくするため、データセットを同じロケーションに配置します。
  5. その他のデフォルト設定はそのままにして、[データセットを作成] をクリックします。
  6. [エクスプローラ] パネルで、プロジェクトを展開します。
  7. データセットの横にある ビュー アクション アイコンをクリックし、[開く] をクリックします。
  8. 詳細パネルで [ テーブルを作成] をクリックします。
  9. [テーブルの作成] ページで次の操作を行います。
    1. [ソース] セクションの [テーブルの作成元] で [空のテーブル] を選択します。
    2. [送信先] セクションの [テーブル名] に「realtime」と入力します。
    3. [スキーマ] セクションで [テキストとして編集] をクリックし、次のスキーマ定義をボックスに貼り付けます。
      ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,
      meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
    4. [パーティショニングとクラスタの設定] セクションの [パーティショニング] で、[タイムスタンプ] フィールドを選択します。
  10. その他のデフォルト設定はそのままにして、[テーブルを作成] をクリックします。

パイプラインの実行

Google が提供する Pub/Sub Topic to BigQuery テンプレートを使用して、ストリーミング パイプラインを実行します。

  1. Cloud Console で、Dataflow [ジョブ] ページに移動します。
    Dataflow ジョブに移動
  2. [テンプレートからジョブを作成] をクリックします。
  3. Dataflow ジョブの [ジョブ名] を入力します。
  4. [Dataflow テンプレート] では、[Pub/Sub Topic to BigQuery] テンプレートを選択します。
  5. [Input Pub/Sub topic] に「projects/pubsub-public-data/topics/taxirides-realtime」と入力します。パイプラインは入力トピックから受信データを取得します。

    これは、NYC Taxi & Limousine Commission のオープン データセットをベースにした公開トピックで、Google Maps Directions API を使用してルーティング情報を追加して拡大し、タイムスタンプを付け加えてリアルタイムのシナリオをシミュレートします。JSON 形式のこのトピックのメッセージ例を次に示します。

    {
      "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e",
      "point_idx": 217,
      "latitude": 40.75399,
      "longitude": -73.96302,
      "timestamp": "2021-03-08T02:29:09.66644-05:00",
      "meter_reading": 6.293821,
      "meter_increment": 0.029003782,
      "ride_status": "enroute",
      "passenger_count": 1
    }
  6. [BigQuery 出力テーブル] に「PROJECT_ID:taxirides.realtime」を入力します。PROJECT_ID を、BigQuery データセットを作成したプロジェクトのプロジェクト ID に置き換えます。
  7. [一時的なロケーション] に「gs://BUCKET_NAME/temp/」と入力します。BUCKET_NAME を、使用する Cloud Storage バケットの名前に置き換えます。temp は、ステージングされるパイプライン ジョブなど、一時ファイルを保存するためのフォルダです。
  8. [ジョブを実行] をクリックします。
  9. BigQuery に書き込まれたデータを表示します。BigQuery ページに移動します。
    BigQuery に移動

    クエリは、標準 SQL を使用して送信できます。たとえば次のクエリでは、過去 24 時間以内に追加されたすべての行が選択されます。

    SELECT * FROM `PROJECT_ID.taxirides.realtime`
    WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
    LIMIT 1000

    PROJECT_ID は、BigQuery データセットを作成したプロジェクトのプロジェクト ID に置き換えます。

クリーンアップ

このクイックスタートで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の手順を行います。

  1. Cloud Console で、Dataflow [ジョブ] ページに移動します。
    Dataflow ジョブに移動
    1. ジョブリストからストリーミング ジョブを選択します。
    2. ナビゲーションで、[停止] をクリックします。
    3. [ジョブの停止] ダイアログで、パイプラインの処理として [キャンセル] または [ドレイン] を選択し、[ジョブの停止] をクリックします。
  2. Cloud Console で、[BigQuery] ページに移動します。
    BigQuery に移動
    1. [エクスプローラ] パネルで、プロジェクトを展開します。
    2. データセットの横にあるビュー アクション アイコンをクリックし、[開く] をクリックします。
    3. 詳細パネルで、[データセットを削除] をクリックします。
    4. [データセットの削除] ダイアログで、データセットの名前(taxirides)を入力し、[削除] をクリックして確定します。

次のステップ