Dataflow テンプレートを使用してストリーミング パイプラインを作成する

このドキュメントでは、Google 提供の Dataflow テンプレートを使用してストリーミング パイプラインを作成する方法を説明します。具体的には、クイックスタートで例として Pub/Sub to BigQuery テンプレートを使用します。

Pub/Sub to BigQuery テンプレートは、Pub/Sub トピックから JSON 形式のメッセージを読み取り、BigQuery テーブルに書き込むことができるストリーミング パイプラインです。


このタスクの手順をガイドに沿って Google Cloud コンソールで直接行う場合は、「ガイドを表示」をクリックしてください。

ガイドを表示


始める前に

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Make sure that billing is enabled for your Google Cloud project.

  4. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Pub/Sub, and Resource Manager APIs.

    Enable the APIs

  8. Cloud Storage バケットを作成します。
    1. In the Google Cloud console, go to the Cloud Storage Buckets page.

      Go to Buckets page

    2. Click Create bucket.
    3. On the Create a bucket page, enter your bucket information. To go to the next step, click Continue.
      • For Name your bucket, enter a unique bucket name. Don't include sensitive information in the bucket name, because the bucket namespace is global and publicly visible.
      • For Choose where to store your data, do the following:
        • Select a Location type option.
        • Select a Location option.
      • For Choose a default storage class for your data, select the following: Standard.
      • For Choose how to control access to objects, select an Access control option.
      • For Advanced settings (optional), specify an encryption method, a retention policy, or bucket labels.
    4. Click Create.
  9. 次のものをコピーします。これらは以後のセクションで使用されます。
    • Cloud Storage バケット名。
    • Google Cloud プロジェクト ID。

      ID を調べる方法については、プロジェクトの識別をご覧ください。
  10. このクイックスタートの手順を最後まで行うには、ユーザー アカウントに Dataflow 管理者ロールサービス アカウント ユーザー ロールが必要です。Compute Engine のデフォルトのサービス アカウントには、Dataflow ワーカーロール、Storage オブジェクト管理者ロール、Pub/Sub 編集者ロール、BigQuery データ編集者ロール、閲覧者ロールが必要です。Google Cloud コンソールで必要なロールを追加する手順は次のとおりです。

    1. [IAM] ページに移動して、プロジェクトを選択します。
      [IAM] に移動
    2. ユーザー アカウントを含む行で、プリンシパルを編集します)アイコンをクリックします。[別のロールを追加] をクリックし、[Dataflow 管理者] と [サービス アカウント ユーザー] のロールを追加します。
    3. [保存] をクリックします。
    4. Compute Engine のデフォルトのサービス アカウントPROJECT_NUMBER-compute@developer.gserviceaccount.com)を含む行で、プリンシパルを編集します)アイコンをクリックします。
    5. [別のロールを追加] をクリックし、Dataflow ワーカーストレージ オブジェクト管理者Pub/Sub 編集者BigQuery データ編集者閲覧者のロールを追加します。
    6. [保存] をクリックします。

      ロール付与の詳細については、コンソールを使用して IAM ロールを付与するをご覧ください。

  11. デフォルトでは、新しいプロジェクトはデフォルト ネットワークで開始されます。プロジェクトのデフォルト ネットワークが無効または削除されている場合、Compute ネットワーク ユーザーのロールroles/compute.networkUser)を含むユーザー アカウント用のプロジェクト内にネットワークが必要です。

BigQuery データセットとテーブルを作成する

Google Cloud コンソールで Pub/Sub トピックに適したスキーマを使用して、BigQuery のデータセットとテーブルを作成します。

この例で、データセットの名前は taxirides、テーブルの名前は realtime です。このデータセットとテーブルを作成するには、以下の操作を行います。

  1. [BigQuery] ページに移動します。
    [BigQuery] に移動
  2. [エクスプローラ] パネルで、データセットを作成するプロジェクトの横にある [アクションを表示] をクリックしてから、[データセットを作成] をクリックします。
  3. [データセットを作成] パネルで、次の操作を行います。
    1. [データセット ID] に「taxirides」と入力します。データセット ID は、Google Cloud プロジェクトごとに一意です。
    2. [ロケーション タイプ] で [マルチリージョン] を選択してから、[US(米国の複数のリージョン)] を選択します。一般公開データセットは US マルチリージョン ロケーションに保存されています。わかりやすくするため、データセットを同じロケーションに配置します。
    3. その他のデフォルト設定はそのままにして、[データセットを作成] をクリックします。
  4. [エクスプローラ] パネルで、プロジェクトを開きます。
  5. taxirides データセットの隣にある アクションを表示」をクリックし、[テーブルを作成] をクリックします。
  6. [テーブルを作成] パネルで、次の操作を行います。
    1. [ソース] セクションの [テーブルの作成元] で [空のテーブル] を選択します。
    2. [送信先] セクションの [テーブル] に「realtime」と入力します。
    3. [スキーマ] セクションで [テキストとして編集] をクリックし、次のスキーマ定義をボックスに貼り付けます。
      ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,
      meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
    4. [パーティションとクラスタの設定] セクションの [パーティショニング] で、[タイムスタンプ] フィールドを選択します。
  7. その他のデフォルト設定はそのままにして、[テーブルを作成] をクリックします。

パイプラインを実行する

Google が提供する Pub/Sub to BigQuery テンプレートを使用して、ストリーミング パイプラインを実行します。パイプラインは入力トピックから受信データを取得します。

  1. Dataflow の [ジョブ] ページに移動します。
    [ジョブ] に移動
  2. [テンプレートからジョブを作成] をクリックします。
  3. Dataflow ジョブの [ジョブ名] として「taxi-data」と入力します。
  4. [Dataflow テンプレート] で、[Pub/Sub to BigQuery] テンプレートを選択します。
  5. [BigQuery output table] に、次のテキストを入力します。
    PROJECT_ID:taxirides.realtime

    PROJECT_ID は、BigQuery データセットを作成したプロジェクトのプロジェクト ID に置き換えます。

  6. [オプションのソースパラメータ] セクションの [Pub/Sub トピックを入力] で、[トピックを手動で入力] をクリックします。
  7. ダイアログで、[トピック名] に次のように入力し、[保存] をクリックします。
    projects/pubsub-public-data/topics/taxirides-realtime

    一般公開されている Pub/Sub トピックは、NYC Taxi & Limousine Commission のオープン データセットに基づいています。このトピックの JSON 形式のサンプル メッセージを次に示します。

    {
      "ride_id": "19c41fc4-e362-4be5-9d06-435a7dc9ba8e",
      "point_idx": 217,
      "latitude": 40.75399,
      "longitude": -73.96302,
      "timestamp": "2021-03-08T02:29:09.66644-05:00",
      "meter_reading": 6.293821,
      "meter_increment": 0.029003782,
      "ride_status": "enroute",
      "passenger_count": 1
    }
  8. [一時的な保存場所] に次のように入力します。
    gs://BUCKET_NAME/temp/

    BUCKET_NAME を Cloud Storage バケットの名前に置き換えます。temp フォルダには、ステージング済みのパイプライン ジョブなどの一時ファイルが保存されます。

  9. プロジェクトにデフォルト ネットワークがない場合は、[ネットワーク] と [サブネットワーク] を入力します。詳細については、ネットワークとサブネットワークの指定をご覧ください。
  10. [ジョブを実行] をクリックします。

結果を表示する

realtime テーブルに書き込まれたデータを表示する方法は次のとおりです。

  1. [BigQuery] ページに移動します。

    [BigQuery] に移動

  2. [クエリを新規作成] をクリックします。新しいエディタタブが開きます。

    SELECT * FROM `PROJECT_ID.taxirides.realtime`
    WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
    LIMIT 1000

    PROJECT_ID は、BigQuery データセットを作成したプロジェクトのプロジェクト ID に置き換えます。テーブルにデータが表示されるまで、最大 5 分かかることがあります。

  3. [実行] をクリックします。

    クエリは、過去 24 時間以内にテーブルに追加された行を返します。標準 SQL を使用してクエリを実行することもできます。

クリーンアップ

このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の手順を実施します。

プロジェクトの削除

課金を停止する最も簡単な方法は、クイックスタート用に作成した Google Cloud プロジェクトを削除することです。

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

個々のリソースの削除

このクイックスタートで使用した Google Cloud プロジェクトを残しておく場合は、個々のリソースを削除します。

  1. Dataflow の [ジョブ] ページに移動します。
    [ジョブ] に移動
  2. ジョブリストからストリーミング ジョブを選択します。
  3. ナビゲーションで、[停止] をクリックします。
  4. [ジョブの停止] ダイアログで、パイプラインを [キャンセル] または [ドレイン] し、[ジョブの停止] をクリックします。
  5. [BigQuery] ページに移動します。
    [BigQuery] に移動
  6. [エクスプローラ] パネルで、プロジェクトを展開します。
  7. 削除するデータセットの横にある [アクションを表示] をクリックし、[開く] をクリックします。
  8. 詳細パネルで [データセットを削除] をクリックして、指示に従って操作します。
  9. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  10. Click the checkbox for the bucket that you want to delete.
  11. To delete the bucket, click Delete, and then follow the instructions.

次のステップ