テンプレートを使用したクイックスタート

このクイックスタートでは、Google 提供の Dataflow テンプレートを使用してストリーミング パイプラインを作成する方法を説明します。具体的には、例として Pub/Sub Topic to BigQuery テンプレートを使用します。

Pub/Sub Topic to BigQuery テンプレートは、Pub/Sub トピックから JSON 形式のメッセージを読み取り、それらを BigQuery テーブルに書き込むストリーミング パイプラインです。

始める前に

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Cloud プロジェクトに対して課金が有効になっていることを確認します。プロジェクトに対して課金が有効になっていることを確認する方法を学習する

  4. Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, and Cloud Resource Manager API を有効にします。

    API を有効にする

  5. Cloud Storage バケットを作成する:
    1. Cloud Console で、Cloud Storage ブラウザページに移動します。

      [ブラウザ] に移動

    2. [バケットを作成] をクリックします。
    3. [バケットの作成] ページでユーザーのバケット情報を入力します。次のステップに進むには、[続行] をクリックします。
      • [バケット名] に、一意のバケット名を入力します。バケットの名前空間は世界中の誰でも閲覧可能なため、機密情報をバケット名に含めないようにしてください。
      • [データの保存場所の選択] で、次の操作を行います。
        • [ロケーション タイプ] オプションを選択します。
        • [ロケーション] オプションを選択します。
      • [データのデフォルトのストレージ クラスを選択する] で、次を選択します。 Standard
      • [オブジェクトへのアクセスを制御する方法を選択する] で [アクセス制御] オプションを選択します。
      • [詳細設定(省略可)] には、暗号化メソッド保持ポリシー、またはバケットラベルを指定します。
    4. [作成] をクリックします。
  6. 次のものをコピーします。これらは以後のセクションで使用されます。
    • Cloud Storage バケット名。
    • Google Cloud プロジェクト ID。ID を調べる方法については、プロジェクトの識別をご覧ください。

BigQuery データセットとテーブルを作成する

Cloud Console で Pub/Sub トピックに適したスキーマを使用して、BigQuery のデータセットとテーブルを作成します。

この例で、データセットの名前は taxirides、テーブルの名前は realtime です。このデータセットとテーブルを作成するには、以下の手順を行います。

  1. Cloud Console で、BigQuery ページに移動します。
    BigQuery に移動
  2. [エクスプローラ] パネルで、データセットを作成するプロジェクトの横にある [ 操作を表示] をクリックしてから、[開く] をクリックします。
  3. 詳細パネルで [ データセットを作成] をクリックします。
  4. [データセットを作成] パネルで、次の操作を行います。
    1. [データセット ID] に「taxirides」を入力します。
    2. [データのロケーション] で [米国(US)] を選択します。一般公開データセットは US マルチリージョン ロケーションに保存されています。わかりやすくするため、データセットを同じロケーションに配置します。
  5. その他のデフォルト設定はそのままにして、[データセットを作成] をクリックします。
  6. [エクスプローラ] パネルで、プロジェクトを展開します。
  7. taxirides データセットの隣にある [ アクションを表示] をクリックし、[開く] をクリックします。
  8. 詳細パネルで [ テーブルを作成] をクリックします。
  9. [テーブルの作成] パネルで、次の操作を行います。
    1. [ソース] セクションの [テーブルの作成元] で [空のテーブル] を選択します。
    2. [送信先] セクションの [テーブル名] に「realtime」と入力します。
    3. [スキーマ] セクションで [テキストとして編集] トグルをクリックし、次のスキーマ定義をボックスに貼り付けます。
      ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,
      meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
    4. [パーティショニングとクラスタの設定] セクションの [パーティショニング] で、[タイムスタンプ] フィールドを選択します。
  10. その他のデフォルト設定はそのままにして、[テーブルを作成] をクリックします。

パイプラインの実行

Google が提供する Pub/Sub Topic to BigQuery テンプレートを使用して、ストリーミング パイプラインを実行します。パイプラインは入力トピックから受信データを取得します。

  1. Cloud Console で、Dataflow の [ジョブ] ページに移動します。
    [ジョブ] に移動
  2. [テンプレートからジョブを作成] をクリックします。
  3. Dataflow ジョブの [ジョブ名] を入力します。
  4. [Dataflow テンプレート] では、[Pub/Sub Topic to BigQuery] テンプレートを選択します。
  5. [Input Pub/Sub topic] に、次のように入力します。
    projects/pubsub-public-data/topics/taxirides-realtime

    一般公開されている Pub/Sub トピックは、NYC Taxi & Limousine Commission のオープン データセットに基づいています。このトピックの JSON 形式のサンプル メッセージを次に示します。

    {
      "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e",
      "point_idx": 217,
      "latitude": 40.75399,
      "longitude": -73.96302,
      "timestamp": "2021-03-08T02:29:09.66644-05:00",
      "meter_reading": 6.293821,
      "meter_increment": 0.029003782,
      "ride_status": "enroute",
      "passenger_count": 1
    }
  6. [BigQuery 出力テーブル] に、次のテキストを入力します。
    PROJECT_ID:taxirides.realtime

    PROJECT_ID は、BigQuery データセットを作成したプロジェクトのプロジェクト ID に置き換えます。

  7. [一時的なロケーション] に、次のように入力します。
    gs://BUCKET_NAME/temp/

    BUCKET_NAME を Cloud Storage バケットの名前に置き換えます。temp フォルダには、ステージングされたパイプライン ジョブなどの一時ファイルが格納されます。

  8. [ジョブを実行] をクリックします。

結果を表示する

realtime テーブルに書き込まれたデータを表示する方法は次のとおりです。

  1. Cloud Console で、BigQuery ページに移動します。
    BigQuery に移動
  2. [クエリエディタ] ペインで、次のクエリを実行します。
    SELECT * FROM `PROJECT_ID.taxirides.realtime`
    WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
    LIMIT 1000

    PROJECT_ID は、BigQuery データセットを作成したプロジェクトのプロジェクト ID に置き換えます。テーブルにデータが表示されるまで、最大で 1 分かかることがあります。

    クエリは、過去 24 時間以内にテーブルに追加された行を返します。標準 SQL を使用してクエリを実行することもできます。

クリーンアップ

このクイックスタートで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の手順に従います。

プロジェクトの削除

課金を停止する最も簡単な方法は、クイックスタート用に作成した Google Cloud プロジェクトを削除することです。

  1. Cloud Console で [リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

個々のリソースの削除

このクイックスタートで使用した Google Cloud プロジェクトを残しておく場合は、個々のリソースを削除します。

  1. Cloud Console で、Dataflow の [ジョブ] ページに移動します。
    [ジョブ] に移動
  2. ジョブリストからストリーミング ジョブを選択します。
  3. ナビゲーションで、[停止] をクリックします。
  4. [ジョブの停止] ダイアログで、パイプラインを [キャンセル] または [ドレイン] し、[ジョブの停止] をクリックします。
  5. Cloud Console で、BigQuery ページに移動します。
    BigQuery に移動
  6. [エクスプローラ] パネルで、プロジェクトを展開します。
  7. 削除するデータセットの横にある [ アクションを表示] をクリックし、[開く] をクリックします。
  8. 詳細パネルで [データセットを削除] をクリックして、指示に従って操作します。
  9. Cloud Console の Cloud Storage ブラウザページに移動します。

    [ブラウザ] に移動

  10. 削除するバケットのチェックボックスをクリックします。
  11. バケットを削除するには、 [削除] をクリックして、指示に沿って操作します。

次のステップ