クイックスタート: SQL を使用して Dataflow パイプラインを作成する

SQL を使用して Dataflow パイプラインを作成する

このクイックスタートでは、一般公開されている Pub/Sub トピックに対してクエリを実行する SQL 構文を記述する方法を学びます。SQL クエリが Dataflow パイプラインを実行し、そのパイプラインの結果が BigQuery テーブルに書き込まれます。

Dataflow SQL ジョブを実行するには、Google Cloud コンソール、ローカルマシンにインストールされた Google Cloud CLI、または Cloud Shell を使用します。この例では、Google Cloud コンソールに加えて、ローカルマシンまたは Cloud Shell のいずれかを使用する必要があります。

始める前に

  1. Google Cloud アカウントにログインします。Google Cloud を初めて使用する場合は、アカウントを作成して、実際のシナリオでの Google プロダクトのパフォーマンスを評価してください。新規のお客様には、ワークロードの実行、テスト、デプロイができる無料クレジット $300 分を差し上げます。
  2. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  3. Google Cloud プロジェクトで課金が有効になっていることを確認します

  4. Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager, and Google Cloud Data Catalog API を有効にします。

    API を有効にする

  5. Google Cloud Console の [プロジェクト セレクタ] ページで、Google Cloud プロジェクトを選択または作成します。

    プロジェクト セレクタに移動

  6. Google Cloud プロジェクトで課金が有効になっていることを確認します

  7. Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Resource Manager, and Google Cloud Data Catalog API を有効にします。

    API を有効にする

gcloud CLI をインストールして初期化する

  • ご使用のオペレーティング システム用の gcloud CLI パッケージをダウンロードし、gcloud CLI をインストールして構成します。

    ご使用のインターネット接続によっては、ダウンロードに時間がかかることがあります。

BigQuery データセットを作成する

このクイックスタートでは、Dataflow SQL パイプラインが BigQuery データセットを次のセクションで作成する BigQuery テーブルに公開します。

  • taxirides という名前の BigQuery データセットを作成します。

    bq mk taxirides
    

パイプラインの実行

  • タクシー乗車に関する一般公開された Pub/Sub トピックのデータを使用して、1 分ごとの乗客数を計算する Dataflow SQL パイプラインを実行します。このコマンドは、データ出力を保存する passengers_per_minute という名前の BigQuery テーブルも作成します。

    gcloud dataflow sql query \
        --job-name=dataflow-sql-quickstart \
        --region=us-central1 \
        --bigquery-dataset=taxirides \
        --bigquery-table=passengers_per_minute \
    'SELECT
         TUMBLE_START("INTERVAL 60 SECOND") as period_start,
         SUM(passenger_count) AS pickup_count,
    FROM pubsub.topic.`pubsub-public-data`.`taxirides-realtime`
    WHERE
        ride_status = "pickup"
    GROUP BY
        TUMBLE(event_timestamp, "INTERVAL 60 SECOND")'
    

    Dataflow SQL ジョブの実行が開始されるまでに少し時間がかかることがあります。

以下は、Dataflow SQL パイプラインで使用される値に関する説明です。

  • dataflow-sql-quickstart: Dataflow ジョブの名前
  • us-central1: ジョブを実行するリージョン
  • taxirides: シンクとして使用される BigQuery データセットの名前
  • passengers_per_minute: BigQuery テーブルの名前
  • taxirides-realtime: ソースとして使用される Pub/Sub トピックの名前

この SQL コマンドは、Pub/Sub トピック taxirides-realtime に対してクエリを実行し、60 秒ごとの乗客数を取得します。この一般公開トピックは、NYC Taxi & Limousine Commission のオープン データセットをベースとしています。

結果を見る

  • パイプラインが実行中であることを確認します。

    コンソール

    1. Google Cloud コンソールで、Dataflow の [ジョブ] ページに移動します。

      [ジョブ] に移動

    2. ジョブのリストで、[dataflow-sql-quickstart] をクリックします。

    3. [ジョブ情報] パネルで、[ジョブのステータス] 項目が [実行中] に設定されていることを確認します。

      ジョブが開始するまでに数分かかることがあります。ジョブが開始されるまで、[ジョブ ステータス] は [キューに格納済み] に設定されています。

    4. [ジョブグラフ] タブで、すべてのステップが実行されていることを確認します。

      ジョブの開始後、ステップの実行が開始されるまでに数分かかる場合があります。

      2 つの複合ステップを含む Dataflow ジョブグラフ。最初のステップは 6 分 45 秒、2 番目のステップは 1 秒間実行されます。

    5. Google Cloud コンソールで [BigQuery] ページに移動します。

      BigQuery に移動

    6. エディタで、次の SQL クエリを貼り付けて、[実行] をクリックします。

      'SELECT *
      FROM taxirides.passengers_per_minute
      ORDER BY pickup_count DESC
      LIMIT 5'
      

      このクエリは、passengers_per_minute テーブルから最も混雑した時間帯を返します。

    gcloud

    1. プロジェクトで実行されている Dataflow ジョブのリストを取得します。

      gcloud dataflow jobs list
      
    2. dataflow-sql-quickstart ジョブの詳細を取得します。

      gcloud dataflow jobs describe JOB_ID
      

      JOB_ID を、プロジェクトの dataflow-sql-quickstart ジョブのジョブ ID に置き換えます。

    3. passengers_per_minute テーブルから最も混雑した時間帯を返します。

      bq query \
      'SELECT *
      FROM taxirides.passengers_per_minute
      ORDER BY pickup_count DESC
      LIMIT 5'
      

クリーンアップ

このページで使用したリソースについて、Google Cloud アカウントに課金されないようにするには、次の操作を行います。

  1. Dataflow ジョブをキャンセルするには、[ジョブ] ページに移動します。

    [ジョブ] に移動

  2. ジョブのリストで、[dataflow-sql-quickstart] をクリックします。

  3. [停止] > [キャンセル] > [ジョブの停止] をクリックします。

  4. taxirides データセットを削除します。

    bq rm taxirides
    
  5. 削除を確認するには、「y」と入力します。

次のステップ