テンプレートを使用したクイックスタート

このページでは、Google 提供の Cloud Dataflow テンプレートを使用してストリーミング パイプラインを作成する方法を説明します。具体的には、このページでは例として Cloud Pub/Sub Topic to BigQuery テンプレートを使用します。

始める前に

  1. Google アカウントにログインします。

    Google アカウントをまだお持ちでない場合は、新しいアカウントを登録します。

  2. GCP Console のプロジェクト セレクタのページで、GCP プロジェクトを選択または作成します。

    プロジェクト セレクタのページに移動

  3. Google Cloud Platform プロジェクトに対して課金が有効になっていることを確認します。 詳しくは、課金を有効にする方法をご覧ください。

  4. Cloud Dataflow、Compute Engine、Stackdriver Logging、Google Cloud Storage、Google Cloud Storage JSON、BigQuery、Cloud Pub/Sub、Cloud Resource Manager API を有効にします。

    APIを有効にする

  5. Cloud Storage バケットを作成します。
    1. GCP Console で、Cloud Storage ブラウザページに移動します。

      Cloud Storage ブラウザページに移動

    2. [バケットを作成] をクリックします。
    3. [バケットを作成] ダイアログ内で、以下の属性を指定します。
      • 名前: 一意のバケット名。バケットの名前空間は、全世界で、誰もが見られるようになっていますので、機密情報をバケット名に含めないようにしてください。
      • デフォルト ストレージ クラス: Standard
      • バケットデータが保存されるロケーション。
    4. [作成] をクリックします。

Cloud BigQuery データセットとテーブルの作成

自身の Cloud Pub/Sub トピックに適したスキーマを使用して、Google Cloud Shell または GCP Console で BigQuery データセットとテーブルを作成します。

この例で、データセットの名前は taxirides、テーブルの名前は realtime です。

Cloud Shell の使用

Cloud Shell を使用してデータセットとテーブルを作成します。

  1. 次のコマンドを実行して、データセットを作成します。
    bq mk taxirides
    出力は次のようになります。
    Dataset “myprojectid:taxirides” successfully created
  2. 次のコマンドを実行して、テーブルを作成します。
    bq mk \
    --time_partitioning_field timestamp \
    --schema ride_id:string,point_idx:integer,latitude:float,longitude:float,\
    timestamp:timestamp,meter_reading:float,meter_increment:float,ride_status:string,\
    passenger_count:integer -t taxirides.realtime
    出力は次のようになります。
    Table “myprojectid:taxirides.realtime” successfully created

    テーブルは、クエリ費用を削減してパフォーマンスを向上させるために分割されます。

Google Cloud Platform Console の使用

Google Cloud Platform Console を使用して、データセットとテーブルを作成します。

  1. BigQuery ウェブ UI に移動します。
    BigQuery ウェブ UI に移動
  2. ナビゲーションでプロジェクト名の横にある下矢印アイコンをクリックし、[Create new dataset] をクリックします。データセット ID として「taxirides」を入力します。

    BigQuery UI のデータセット作成ボタン。

    データセット ID は、プロジェクトごとに一意です。疑問符アイコンをクリックすると、ID の制限について確認できます。

  3. その他のデフォルト設定はすべてそのままにし、[OK] をクリックします。
  4. ナビゲーションで、作成したデータセット ID の上にカーソルを合わせます。ID の横にある下矢印アイコンをクリックし、[Create new table] をクリックします。
  5. [Source Data] の横にある [Create empty table] オプションを選択します。
  6. [Destination Table] で taxirides を選択し、「realtime」と入力します。
  7. [Schema] で [Edit as Text] を選択し、次のように入力します。
    ride_id:string,point_idx:integer,latitude:float,longitude:float,timestamp:timestamp,
    meter_reading:float,meter_increment:float,ride_status:string,passenger_count:integer
  8. [Options] の [Partitioning Type] フィールドで [Day] オプションを選択します。
  9. [Options] の [Partitioning Field] セレクタで [timestamp] 列を選択します。
  10. [テーブルを作成] ボタンをクリックします。
  11. BigQuery の設定

パイプラインを実行する

Google が提供する Cloud Pub/Sub Topic to BigQuery テンプレートを使用して、ストリーミング パイプラインを実行します。

  1. Cloud Dataflow ウェブ UI に移動します。
    Cloud Dataflow ウェブ UI に移動
  2. [テンプレートからジョブを作成] をクリックします。
  3. Cloud Dataflow ジョブの [ジョブ名] を入力します。
  4. [Cloud Dataflow テンプレート] で、[Cloud Pub/Sub Topic to BigQuery] テンプレートを選択します。
  5. [Cloud Pub/Sub 入力トピック] に、「projects/pubsub-public-data/topics/taxirides-realtime」を入力します。パイプラインは入力トピックから受信データを取得します。
  6. [BigQuery 出力テーブル] に「<myprojectid>:taxirides.realtime」を入力します。
  7. [一時的なロケーション] に「gs://<mybucket>/tmp/」を入力します。これは、ステージング済みパイプライン ジョブと同様に、一時ファイルを保存するためのサブフォルダです。
  8. [ジョブを実行] ボタンをクリックします。
  9. Cloud Dataflow でのジョブの作成
  10. BigQuery に書き込まれたデータを表示します。BigQuery ウェブ UI に移動します。
    BigQuery ウェブ UI に移動
    クエリは、標準 SQL を使用して送信できます。たとえば次のクエリでは、過去 24 時間以内に追加されたすべての行が選択されます。
    SELECT * FROM `myprojectid.taxirides.realtime`
    WHERE `timestamp` > TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1 DAY)
    LIMIT 1000

クリーンアップ

このチュートリアルで使用したリソースについて GCP アカウントに課金されないようにするために、以下を行ってください。

  1. Cloud Dataflow ウェブ UI に移動します。
    Cloud Dataflow ウェブ UI に移動
    1. ストリーミング ジョブは、Google Cloud Platform Console のジョブリストから選択が必要な場合があります。
    2. ナビゲーションで、[キャンセル] をクリックします。
    3. [キャンセル] ダイアログ ボックスで、パイプラインの処理として [キャンセル] または [ドレイン] のいずれかを選択します。
  2. BigQuery ウェブ UI に移動します。
    BigQuery ウェブ UI に移動
    1. ナビゲーションで、作成した taxirides データセットの上にカーソルを合わせます。
    2. ナビゲーションのデータセット名の横にある下矢印アイコンをクリックし、[データセットの削除] をクリックします。
    3. [データセットの削除] ダイアログ ボックスでデータセットの名前(taxirides)を入力し、[OK] をクリックして確定します。

次のステップ

このページは役立ちましたか?評価をお願いいたします。

フィードバックを送信...

ご不明な点がありましたら、Google のサポートページをご覧ください。