テンプレートを使用したクイックスタート

このドキュメントでは、Google 提供の Dataflow テンプレートを使用してストリーミング パイプラインを作成する方法を説明します。具体的には、例として Pub/Sub Topic to BigQuery テンプレートを使用します。

Pub/Sub Topic to BigQuery テンプレートは、Pub/Sub トピックから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。


このタスクを Google Cloud コンソールで直接行う際の順を追ったガイダンスについては、[ガイドを表示] をクリックしてください。

ガイドを表示


以降のセクションでは、[ガイドを表示] をクリックした場合と同じ手順について説明します。

始める前に

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Google Cloud プロジェクトで課金が有効になっていることを確認します

  4. Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager API を有効にします。

    API を有効にする

  5. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  6. Google Cloud プロジェクトで課金が有効になっていることを確認します

  7. Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager API を有効にします。

    API を有効にする

  8. Cloud Storage バケットを作成します。
    1. Google Cloud コンソールで、Cloud Storage の [バケット] ページに移動します。

      [バケット] ページに移動

    2. [バケットを作成] をクリックします。
    3. [バケットの作成] ページでユーザーのバケット情報を入力します。次のステップに進むには、[続行] をクリックします。
      • [バケット名] に、一意のバケット名を入力します。バケットの名前空間は世界中の誰でも閲覧可能なため、機密情報をバケット名に含めないようにしてください。
      • [データの保存場所の選択] で、次の操作を行います。
        • [ロケーション タイプ] オプションを選択します。
        • [ロケーション] オプションを選択します。
      • [データのデフォルトのストレージ クラスを選択する] で、次を選択します。 Standard.
      • [オブジェクトへのアクセスを制御する方法を選択する] で [アクセス制御] オプションを選択します。
      • [詳細設定(省略可)] には、暗号化メソッド保持ポリシー、またはバケットラベルを指定します。
    4. [作成] をクリックします。
  9. 次のものをコピーします。これらは以後のセクションで使用されます。
    • Cloud Storage バケット名。
    • Google Cloud プロジェクト ID。

      ID を調べる方法については、プロジェクトの識別をご覧ください。

BigQuery データセットとテーブルを作成する

Google Cloud コンソールで Pub/Sub トピックに適したスキーマを使用して、BigQuery のデータセットとテーブルを作成します。

この例で、データセットの名前は taxirides、テーブルの名前は realtime です。このデータセットとテーブルを作成するには、以下の手順を行います。

  1. Google Cloud コンソールで [BigQuery] ページに移動します。
    [BigQuery] に移動
  2. [エクスプローラ] パネルで、データセットを作成するプロジェクトの横にある [アクションを表示] をクリックしてから、[データセットを作成] をクリックします。
  3. [データセットを作成] パネルで、次の操作を行います。
    1. [データセット ID] に「taxirides」と入力します。
    2. [データのロケーション] で [米国(US)] を選択します。一般公開データセットは US マルチリージョン ロケーションに保存されています。わかりやすくするため、データセットを同じロケーションに配置します。
    3. その他のデフォルト設定はそのままにして、[データセットを作成] をクリックします。
  4. [エクスプローラ] パネルで、プロジェクトを展開します。
  5. taxirides データセットの隣にある [アクションを表示] をクリックし、[開く] をクリックします。
  6. 詳細パネルで [ テーブルを作成] をクリックします。
  7. [テーブルの作成] パネルで、次の操作を行います。
    1. [ソース] セクションの [テーブルの作成元] で [空のテーブル] を選択します。
    2. [送信先] セクションの [テーブル名] に「realtime」と入力します。
    3. [スキーマ] セクションで [テキストとして編集] をクリックし、次のスキーマ定義をボックスに貼り付けます。
      ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,
      meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
    4. [パーティショニングとクラスタの設定] セクションの [パーティショニング] で、[タイムスタンプ] フィールドを選択します。
  8. その他のデフォルト設定はそのままにして、[テーブルを作成] をクリックします。

パイプラインを実行する

Google が提供する Pub/Sub Topic to BigQuery テンプレートを使用して、ストリーミング パイプラインを実行します。パイプラインは入力トピックから受信データを取得します。

  1. Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。
    [ジョブ] に移動
  2. [テンプレートからジョブを作成] をクリックします。
  3. Dataflow ジョブの [ジョブ名] として「taxi-data」と入力します。
  4. [Dataflow テンプレート] で、[Pub/Sub Topic to BigQuery] テンプレートを選択します。
  5. [Input Pub/Sub topic] に、次のように入力します。
    projects/pubsub-public-data/topics/taxirides-realtime

    一般公開されている Pub/Sub トピックは、NYC Taxi & Limousine Commission のオープン データセットに基づいています。このトピックの JSON 形式のサンプル メッセージを次に示します。

    {
      "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e",
      "point_idx": 217,
      "latitude": 40.75399,
      "longitude": -73.96302,
      "timestamp": "2021-03-08T02:29:09.66644-05:00",
      "meter_reading": 6.293821,
      "meter_increment": 0.029003782,
      "ride_status": "enroute",
      "passenger_count": 1
    }
  6. [BigQuery 出力テーブル] に、次のテキストを入力します。
    PROJECT_ID:taxirides.realtime

    PROJECT_ID は、BigQuery データセットを作成したプロジェクトのプロジェクト ID に置き換えます。

  7. [一時的なロケーション] に、次のように入力します。
    gs://BUCKET_NAME/temp/

    BUCKET_NAME を Cloud Storage バケットの名前に置き換えます。temp フォルダには、ステージングされたパイプライン ジョブなどの一時ファイルが格納されます。

  8. [ジョブを実行] をクリックします。

結果を表示する

realtime テーブルに書き込まれたデータを表示する方法は次のとおりです。

  1. Google Cloud コンソールで [BigQuery] ページに移動します。

    [BigQuery] に移動

  2. 次のクエリをクエリエディタに貼り付けます。

    SELECT * FROM `PROJECT_ID.taxirides.realtime`
    WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
    LIMIT 1000

    PROJECT_ID は、BigQuery データセットを作成したプロジェクトのプロジェクト ID に置き換えます。テーブルにデータが表示されるまで、最大で 1 分かかることがあります。

  3. [実行] をクリックします。

    クエリは、過去 24 時間以内にテーブルに追加された行を返します。標準 SQL を使用してクエリを実行することもできます。

クリーンアップ

このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の手順を行います。

プロジェクトの削除

課金を停止する最も簡単な方法は、クイックスタート用に作成した Google Cloud プロジェクトを削除することです。

  1. Google Cloud コンソールで、[リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

個々のリソースの削除

このクイックスタートで使用した Google Cloud プロジェクトを残しておく場合は、個々のリソースを削除します。

  1. Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。
    [ジョブ] に移動
  2. ジョブリストからストリーミング ジョブを選択します。
  3. ナビゲーションで、[停止] をクリックします。
  4. [ジョブの停止] ダイアログで、パイプラインを [キャンセル] または [ドレイン] し、[ジョブの停止] をクリックします。
  5. Google Cloud コンソールで [BigQuery] ページに移動します。
    [BigQuery] に移動
  6. [エクスプローラ] パネルで、プロジェクトを展開します。
  7. 削除するデータセットの横にある [アクションを表示] をクリックし、[開く] をクリックします。
  8. 詳細パネルで [データセットを削除] をクリックして、指示に従って操作します。
  9. Google Cloud コンソールで、Cloud Storage の [バケット] ページに移動します。

    [バケット] に移動

  10. 削除するバケットのチェックボックスをクリックします。
  11. バケットを削除するには、 [削除] をクリックして、指示に沿って操作します。

次のステップ