Dataflow テンプレートを使用してストリーミング パイプラインを作成する

このドキュメントでは、Google 提供の Dataflow テンプレートを使用してストリーミング パイプラインを作成する方法を説明します。具体的には、クイックスタートで例として Pub/Sub to BigQuery テンプレートを使用します。

Pub/Sub to BigQuery テンプレートは、Pub/Sub トピックから JSON 形式のメッセージを読み取り、BigQuery テーブルに書き込むことができるストリーミング パイプラインです。


このタスクの手順をガイドに沿って Google Cloud コンソールで直接行う場合は、「ガイドを表示」をクリックしてください。

ガイドを表示


始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Google Cloud プロジェクトで課金が有効になっていることを確認します

  4. Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager API を有効にします。

    API を有効にする

  5. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  6. Google Cloud プロジェクトで課金が有効になっていることを確認します

  7. Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager API を有効にします。

    API を有効にする

  8. Cloud Storage バケットを作成します。
    1. Google Cloud コンソールで、Cloud Storage の [バケット] ページに移動します。

      [バケット] ページに移動

    2. [バケットを作成] をクリックします。
    3. [バケットの作成] ページでユーザーのバケット情報を入力します。次のステップに進むには、[続行] をクリックします。
      • [バケット名] に、一意のバケット名を入力します。バケットの名前空間は世界中の誰でも閲覧可能なため、機密情報をバケット名に含めないようにしてください。
      • [データの保存場所の選択] で、次の操作を行います。
        • [ロケーション タイプ] オプションを選択します。
        • [ロケーション] オプションを選択します。
      • [データのデフォルトのストレージ クラスを選択する] で、次を選択します。 Standard.
      • [オブジェクトへのアクセスを制御する方法を選択する] で [アクセス制御] オプションを選択します。
      • [詳細設定(省略可)] には、暗号化メソッド保持ポリシー、またはバケットラベルを指定します。
    4. [作成] をクリックします。
  9. 次のものをコピーします。これらは以後のセクションで使用されます。
    • Cloud Storage バケット名。
    • Google Cloud プロジェクト ID。

      ID を調べる方法については、プロジェクトの識別をご覧ください。
  10. このクイックスタートの手順を最後まで行うには、ユーザー アカウントに Dataflow 管理者ロールサービス アカウント ユーザー ロールが必要です。Compute Engine のデフォルトのサービス アカウントには、Dataflow ワーカーロールが必要です。Google Cloud コンソールで必要なロールを追加する手順は次のとおりです。

    1. [IAM] ページに移動します。
      [IAM] に移動
    2. プロジェクトを選択します。
    3. ユーザー アカウントを含む行で、プリンシパルを編集します」をクリックし、[ 別のロールを追加] をクリックします。
    4. プルダウン リストで、[Dataflow 管理者] ロールを選択します。
    5. サービス アカウント ユーザー ロールに同様の操作を行った後、[保存] をクリックします。
    6. Compute Engine のデフォルトのサービス アカウントを含む行で、プリンシパルを編集します」をクリックし、[ 別のロールを追加] をクリックします。
    7. プルダウン リストで、[Dataflow ワーカー] ロールを選択します。
    8. Pub/Sub 編集者ロールと BigQuery データ編集者ロールに同様の操作を行った後、[保存] をクリックします。

      ロール付与の詳細については、コンソールを使用して IAM ロールを付与するをご覧ください。

  11. デフォルトでは、新しいプロジェクトはデフォルト ネットワークで開始されます。プロジェクトのデフォルト ネットワークが無効または削除されている場合、Compute ネットワーク ユーザーのロールroles/compute.networkUser)を含むユーザー アカウント用のプロジェクト内にネットワークが必要です。

BigQuery データセットとテーブルを作成する

Google Cloud コンソールで Pub/Sub トピックに適したスキーマを使用して、BigQuery のデータセットとテーブルを作成します。

この例で、データセットの名前は taxirides、テーブルの名前は realtime です。このデータセットとテーブルを作成するには、以下の操作を行います。

  1. [BigQuery] ページに移動します。
    [BigQuery] に移動
  2. [エクスプローラ] パネルで、データセットを作成するプロジェクトの横にある [アクションを表示] をクリックしてから、[データセットを作成] をクリックします。
  3. [データセットを作成] パネルで、次の操作を行います。
    1. [データセット ID] に「taxirides」と入力します。データセット ID は、Google Cloud プロジェクトごとに一意です。
    2. [ロケーション タイプ] で [マルチリージョン] を選択してから、[US(米国の複数のリージョン)] を選択します。一般公開データセットは US マルチリージョン ロケーションに保存されています。わかりやすくするため、データセットを同じロケーションに配置します。
    3. その他のデフォルト設定はそのままにして、[データセットを作成] をクリックします。
  4. [エクスプローラ] パネルで、プロジェクトを開きます。
  5. taxirides データセットの隣にある アクションを表示」をクリックし、[テーブルを作成] をクリックします。
  6. [テーブルを作成] パネルで、次の操作を行います。
    1. [ソース] セクションの [テーブルの作成元] で [空のテーブル] を選択します。
    2. [送信先] セクションの [テーブル] に「realtime」と入力します。
    3. [スキーマ] セクションで [テキストとして編集] をクリックし、次のスキーマ定義をボックスに貼り付けます。
      ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,
      meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
    4. [パーティションとクラスタの設定] セクションの [パーティショニング] で、[タイムスタンプ] フィールドを選択します。
  7. その他のデフォルト設定はそのままにして、[テーブルを作成] をクリックします。

パイプラインを実行する

Google が提供する Pub/Sub to BigQuery テンプレートを使用して、ストリーミング パイプラインを実行します。パイプラインは入力トピックから受信データを取得します。

  1. Dataflow の [ジョブ] ページに移動します。
    [ジョブ] に移動
  2. [テンプレートからジョブを作成] をクリックします。
  3. Dataflow ジョブの [ジョブ名] として「taxi-data」と入力します。
  4. [Dataflow テンプレート] で、[Pub/Sub to BigQuery] テンプレートを選択します。
  5. [BigQuery output table] に、次のテキストを入力します。
    PROJECT_ID:taxirides.realtime

    PROJECT_ID は、BigQuery データセットを作成したプロジェクトのプロジェクト ID に置き換えます。

  6. [オプション パラメータ] を開きます。
  7. [Input Pub/Sub topic] で、[トピックを手動で入力] をクリックします。
  8. ダイアログで、[トピック名] に次のように入力し、[保存] をクリックします。
    projects/pubsub-public-data/topics/taxirides-realtime

    一般公開されている Pub/Sub トピックは、NYC Taxi & Limousine Commission のオープン データセットに基づいています。このトピックの JSON 形式のサンプル メッセージを次に示します。

    {
      "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e",
      "point_idx": 217,
      "latitude": 40.75399,
      "longitude": -73.96302,
      "timestamp": "2021-03-08T02:29:09.66644-05:00",
      "meter_reading": 6.293821,
      "meter_increment": 0.029003782,
      "ride_status": "enroute",
      "passenger_count": 1
    }
  9. [一時的な保存場所] に次のように入力します。
    gs://BUCKET_NAME/temp/

    BUCKET_NAME を Cloud Storage バケットの名前に置き換えます。temp フォルダには、ステージング済みのパイプライン ジョブなどの一時ファイルが保存されます。

  10. プロジェクトにデフォルト ネットワークがない場合は、[ネットワーク] と [サブネットワーク] を入力します。詳細については、ネットワークとサブネットワークの指定をご覧ください。
  11. [ジョブを実行] をクリックします。

結果を表示する

realtime テーブルに書き込まれたデータを表示する方法は次のとおりです。

  1. [BigQuery] ページに移動します。

    [BigQuery] に移動

  2. [クエリを新規作成] をクリックします。新しいエディタタブが開きます。

    SELECT * FROM `PROJECT_ID.taxirides.realtime`
    WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
    LIMIT 1000

    PROJECT_ID は、BigQuery データセットを作成したプロジェクトのプロジェクト ID に置き換えます。テーブルにデータが表示されるまで、最大で 1 分かかることがあります。

  3. [実行] をクリックします。

    クエリは、過去 24 時間以内にテーブルに追加された行を返します。標準 SQL を使用してクエリを実行することもできます。

クリーンアップ

このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の手順を実施します。

プロジェクトの削除

課金を停止する最も簡単な方法は、クイックスタート用に作成した Google Cloud プロジェクトを削除することです。

  1. Google Cloud コンソールで、[リソースの管理] ページに移動します。

    [リソースの管理] に移動

  2. プロジェクト リストで、削除するプロジェクトを選択し、[削除] をクリックします。
  3. ダイアログでプロジェクト ID を入力し、[シャットダウン] をクリックしてプロジェクトを削除します。

個々のリソースの削除

このクイックスタートで使用した Google Cloud プロジェクトを残しておく場合は、個々のリソースを削除します。

  1. Dataflow の [ジョブ] ページに移動します。
    [ジョブ] に移動
  2. ジョブリストからストリーミング ジョブを選択します。
  3. ナビゲーションで、[停止] をクリックします。
  4. [ジョブの停止] ダイアログで、パイプラインを [キャンセル] または [ドレイン] し、[ジョブの停止] をクリックします。
  5. [BigQuery] ページに移動します。
    [BigQuery] に移動
  6. [エクスプローラ] パネルで、プロジェクトを展開します。
  7. 削除するデータセットの横にある [アクションを表示] をクリックし、[開く] をクリックします。
  8. 詳細パネルで [データセットを削除] をクリックして、指示に従って操作します。
  9. Google Cloud コンソールで、Cloud Storage の [バケット] ページに移動します。

    [バケット] に移動

  10. 削除するバケットのチェックボックスをクリックします。
  11. バケットを削除するには、 [削除] をクリックして、指示に沿って操作します。

次のステップ