データソースと宛先の使用

このページでは、Dataflow SQL を使用してデータを取得し、クエリの結果を書き込む方法について説明します。

Dataflow SQL は次のソースに対してクエリを実行できます。

Dataflow SQL は、クエリの結果を次の宛先に書き込むことができます。

Pub/Sub

Pub/Sub トピックのクエリ

Dataflow SQL でクエリを実行して Pub/Sub トピックを取得するには、次の操作を行います。

  1. Dataflow ソースとして Pub/Sub トピックを追加します

  2. Pub/Sub トピックにスキーマを割り当てます

  3. Dataflow SQL クエリで Pub/Sub トピックを使用します

Pub/Sub トピックの追加

BigQuery ウェブ UI を使用して、Pub/Sub トピックを Dataflow ソースとして追加できます。

  1. Google Cloud Console で、Dataflow SQL を使用できる BigQuery ページに移動します。

    BigQuery に移動

  2. ナビゲーション パネルで、[データを追加] プルダウン リストをクリックして [Cloud Dataflow のソース] を選択します。

    Cloud Dataflow ソースが選択された [データの追加] プルダウン リスト

  3. [Cloud Dataflow ソースを追加] パネルで、[Cloud Pub/Sub トピック] を選択してトピックを検索します。

    次のスクリーンショットでは、transactions Pub/Sub トピックを検索しています。

    [Cloud Dataflow ソースを追加] パネルで Pub/Sub トピック オプションが選択され、トランザクション検索クエリが完了し、トランザクション トピックが選択されています。

  4. [追加] をクリックします。

Pub/Sub トピックを Dataflow ソースとして追加すると、ナビゲーション メニューの [リソース] セクションに Pub/Sub トピックが表示されます。

トピックを検索するには、[Cloud Dataflow ソース]、[Cloud Pub/Sub トピック] の順に展開します。

Pub/Sub トピック スキーマの割り当て

Pub/Sub トピック スキーマは、次のフィールドから構成されています。

  • event_timestamp フィールド。

    Pub/Sub イベントのタイムスタンプは、メッセージの公開時間を表します。タイムスタンプは Pub/Sub メッセージに自動的に追加されます。

  • Pub/Sub メッセージの Key-Value ペアのフィールド。

    たとえば、メッセージ {"k1":"v1", "k2":"v2"} のスキーマには k1k2 という 2 つの STRING フィールドがあります。

Pub/Sub トピックにスキーマを割り当てるには、Cloud Console または Google Cloud CLI を使用します。

Console

Pub/Sub トピックにスキーマを割り当てるには、次の操作を行います。

  1. [リソース] パネルでトピックを選択します。

  2. [スキーマ] タブで [スキーマを編集] をクリックします。[スキーマ] サイドパネルが開き、スキーマ フィールドが表示されます。

    スキーマを追加または編集できるサイドパネル

  3. [フィールドを追加] をクリックして、スキーマにフィールドを追加します。または、[テキストとして編集] ボタンを切り替えて、スキーマ テキスト全体をコピーして貼り付けます。

    たとえば、販売トランザクションを含む Pub/Sub トピックのスキーマ テキストは次のようになります。

    [
      {
          "description": "Pub/Sub event timestamp",
          "name": "event_timestamp",
          "mode": "REQUIRED",
          "type": "TIMESTAMP"
      },
      {
          "description": "Transaction time string",
          "name": "tr_time_str",
          "mode": "NULLABLE",
          "type": "STRING"
      },
      {
          "description": "First name",
          "name": "first_name",
          "mode": "NULLABLE",
          "type": "STRING"
      },
      {
          "description": "Last name",
          "name": "last_name",
          "mode": "NULLABLE",
          "type": "STRING"
      },
      {
          "description": "City",
          "name": "city",
          "mode": "NULLABLE",
          "type": "STRING"
      },
      {
          "description": "State",
          "name": "state",
          "mode": "NULLABLE",
          "type": "STRING"
      },
      {
          "description": "Product",
          "name": "product",
          "mode": "NULLABLE",
          "type": "STRING"
      },
      {
          "description": "Amount of transaction",
          "name": "amount",
          "mode": "NULLABLE",
          "type": "FLOAT64"
      }
    ]
    
  4. [送信] をクリックします。

  5. (省略可)[トピックをプレビュー] をクリックしてメッセージの内容を確認し、定義したスキーマと一致していることを確認します。

    Dataflow SQL ページの詳細パネル。Pub/Sub トピックが選択され、[トピックをプレビュー] ボタンがハイライト表示されています。

gcloud

Pub/Sub トピックにスキーマを割り当てるには、次の操作を行います。

  1. スキーマ テキストを含む JSON ファイルを作成します。

    たとえば、販売トランザクションを含む Pub/Sub トピックのスキーマ テキストは次のようになります。

    [
      {
          "description": "Pub/Sub event timestamp",
          "column": "event_timestamp",
          "mode": "REQUIRED",
          "type": "TIMESTAMP"
      },
      {
          "description": "Transaction time string",
          "column": "tr_time_str",
          "mode": "NULLABLE",
          "type": "STRING"
      },
      {
          "description": "First name",
          "column": "first_name",
          "mode": "NULLABLE",
          "type": "STRING"
      },
      {
          "description": "Last name",
          "column": "last_name",
          "mode": "NULLABLE",
          "type": "STRING"
      },
      {
          "description": "City",
          "column": "city",
          "mode": "NULLABLE",
          "type": "STRING"
      },
      {
          "description": "State",
          "column": "state",
          "mode": "NULLABLE",
          "type": "STRING"
      },
      {
          "description": "Product",
          "column": "product",
          "mode": "NULLABLE",
          "type": "STRING"
      },
      {
          "description": "Amount of transaction",
          "column": "amount",
          "mode": "NULLABLE",
          "type": "FLOAT64"
      }
    ]
    
  2. gcloud data-catalog entries コマンドを使用して、Pub/Sub トピックにスキーマを割り当てます。

    gcloud data-catalog entries update \
     --lookup-entry='pubsub.topic.`PROJECT_ID`.`TOPIC_NAME`' \
     --schema-from-file=FILE_PATH
    

    次のように置き換えます。

    • PROJECT_ID: プロジェクト ID
    • TOPIC_NAME: Pub/Sub トピック名
    • FILE_PATH: スキーマ テキストを含む JSON ファイルのパス
  3. (省略可)次のコマンドを実行して、スキーマが Pub/Sub トピックに正常に割り当てられていることを確認します。

    gcloud data-catalog entries lookup \
     'pubsub.topic.`PROJECT_ID`.`TOPIC_NAME`'
    

Pub/Sub トピックの使用

Dataflow SQL クエリで Pub/Sub を参照するには、次の識別子を使用します。

pubsub.topic.`PROJECT_ID`.`TOPIC_NAME`

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • TOPIC_NAME: Pub/Sub トピック名
識別子は、Dataflow SQL の語彙構造に従う必要があります。識別子に文字、数字、アンダースコア以外の文字が含まれている場合は、識別子をバッククォートで囲みます。

たとえば、次のクエリではプロジェクト dataflow-sql の Dataflow トピック daily.transactions から選択しています。

SELECT *
FROM pubsub.topic.`dataflow-sql`.`daily.transactions`

Pub/Sub トピックへの書き込み

クエリの結果を Pub/Sub トピックに書き込むには、Cloud Console または Google Cloud CLI を使用します。

Console

Pub/Sub トピックにクエリ結果を書き込むには、Dataflow SQL を使用してクエリを実行します。

  1. Cloud Console で、Dataflow SQL を使用できる BigQuery ページに移動します。

    BigQuery に移動

  2. クエリエディタに Dataflow SQL クエリを入力します。

  3. [Cloud Dataflow ジョブを作成] をクリックして、ジョブ オプションのパネルを開きます。

  4. パネルの [送信先] セクションで、[出力タイプ]、[Cloud Pub/Sub トピック] の順に選択します。

  5. [Cloud Pub/Sub トピック] をクリックして、トピックを選択します。

  6. [作成] をクリックします。

gcloud

Pub/Sub トピックにクエリの結果を書き込むには、gcloud dataflow sql queryコマンドで --pubsub-topic フラグを使用します。

gcloud dataflow sql query \
  --job-name=JOB_NAME \
  --region=REGION \
  --pubsub-project=PROJECT_ID \
  --pubsub-topic=TOPIC_NAME \
  'QUERY'

次のように置き換えます。

  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • PROJECT_ID: プロジェクト ID
  • TOPIC_NAME: Pub/Sub トピック名
  • QUERY: Dataflow SQL クエリ

宛先の Pub/Sub トピックのスキーマは、クエリの結果のスキーマと一致している必要があります。宛先の Pub/Sub トピックにスキーマがない場合、クエリの結果に一致するスキーマが自動的に割り当てられます。

Cloud Storage

Cloud Storage ファイルセットのクエリ

Dataflow SQL でクエリを実行して Cloud Storage ファイルセットを取得するには、次の操作を行います。

  1. Dataflow SQL 用に Data Catalog ファイルセットを作成します。

  2. Dataflow ソースとして Cloud Storage ファイルセットを追加します。

  3. Dataflow SQL クエリで Cloud Storage ファイルセットを使用します。

Cloud Storage ファイルセットの作成

Cloud Storage ファイルセットを作成する方法については、エントリ グループとファイルセットの作成をご覧ください。

Cloud Storage ファイルセットにはスキーマを設定し、ヘッダー行を含まない CSV ファイルのみを含める必要があります。

Cloud Storage ファイルセットの追加

Dataflow SQL で、Cloud Storage ファイルセットを Dataflow ソースとして追加します。

  1. Cloud Console で、Dataflow SQL を使用できる BigQuery ページに移動します。

    BigQuery に移動

  2. ナビゲーション パネルで、[データを追加] プルダウン リストをクリックして [Cloud Dataflow のソース] を選択します。

    Cloud Dataflow ソースが選択された [データの追加] プルダウン リスト

  3. [Cloud Dataflow ソースを追加] パネルで Cloud Storage ファイルセットを選択して、トピックを検索します。

  4. [追加] をクリックします。

Cloud Storage ファイルセットを Dataflow ソースとして追加すると、ナビゲーション メニューの [リソース] セクションに Cloud Storage ファイルセットが表示されます。

ファイルセットを検索するには、[Cloud Dataflow のソース]、[Cloud Storage トピック] の順に展開します。

Cloud Storage ファイルセットの使用

Dataflow SQL クエリで Cloud Storage テーブルを参照するには、次の識別子を使用します。

datacatalog.entry.`PROJECT_ID`.REGION.`ENTRY_GROUP`.`FILESET_NAME`

次のように置き換えます。

  • PROJECT_ID: プロジェクト ID
  • REGION: リージョン エンドポイント(例: us-west1
  • ENTRY_GROUP: Cloud Storage ファイルセットのエントリ グループ
  • FILESET_NAME: Cloud Storage ファイルセットの名前
識別子は、Dataflow SQL の語彙構造に従う必要があります。識別子に文字、数字、アンダースコア以外の文字が含まれている場合は、識別子をバッククォートで囲みます。

たとえば、次のクエリでは、プロジェクト dataflow-sql とエントリ グループ my-fileset-group の Cloud Storage ファイルセット daily.registrations から選択しています。

SELECT *
FROM datacatalog.entry.`dataflow-sql`.`us-central1`.`my-fileset-group`.`daily.registrations`

BigQuery

BigQuery テーブルのクエリ

Dataflow SQL でクエリを実行して BigQuery テーブルを取得するには、次の操作を行います。

  1. Dataflow SQL 用に BigQuery テーブルを作成します。

  2. Dataflow SQL クエリで BigQuery テーブルを使用します。

BigQuery テーブルを Dataflow ソースとして追加する必要はありません。

BigQuery テーブルの作成

Dataflow SQL 用の BigQuery テーブルを作成する方法については、スキーマ定義を持つ空のテーブルを作成をご覧ください。

クエリでの BigQuery テーブルの使用

Dataflow SQL クエリで BigQuery テーブルを参照するには、次の識別子を使用します。

bigquery.table.`PROJECT_ID`.`DATASET_NAME`.`TABLE_NAME`

識別子は、Dataflow SQL の語彙構造に従う必要があります。識別子に文字、数字、アンダースコア以外の文字が含まれている場合は、識別子をバッククォートで囲みます。

たとえば、次のクエリでは、データセット dataflow_sql_dataset とプロジェクト dataflow-sql の BigQuery テーブル us_state_salesregions を使用しています。

SELECT *
FROM bigquery.table.`dataflow-sql`.dataflow_sql_dataset.us_state_salesregions

BigQuery テーブルへの書き込み

クエリ結果を Dataflow SQL クエリに書き込むには、Cloud Console または Google Cloud CLI を使用します。

Console

クエリの結果を Dataflow SQL クエリに書き込むには、Dataflow SQL でクエリを実行します。

  1. Cloud Console で、Dataflow SQL を使用できる BigQuery ページに移動します。

    BigQuery に移動

  2. クエリエディタに Dataflow SQL クエリを入力します。

  3. [Cloud Dataflow ジョブを作成] をクリックして、ジョブ オプションのパネルを開きます。

  4. パネルの [送信先] セクションで、[出力タイプ] > [BigQuery] の順に選択します。

  5. [データセット ID] をクリックし、[読み込まれたデータセット] または [新しいデータセットの作成] を選択します。

  6. [テーブル名] フィールドに宛先テーブルを入力します。

  7. (省略可)BigQuery テーブルへのデータの読み込み方法を選択します。

    • 空の場合に書き込む: (デフォルト)テーブルが空の場合にのみデータを書き込みます。
    • テーブルに追加する: テーブルの末尾にデータを追加します。
    • テーブルの上書き: 新しいデータを書き込む前に、テーブル内の既存のデータをすべて消去します。
  8. [作成] をクリックします。

gcloud

BigQuery テーブルにクエリの結果を書き込むには、gcloud dataflow sql query コマンドで --bigquery-table フラグを使用します。

gcloud dataflow sql query \
  --job-name=JOB_NAME \
  --region=REGION \
  --bigquery-dataset=DATASET_NAME \
  --bigquery-table=TABLE_NAME \
  'QUERY'

次のように置き換えます。

  • JOB_NAME: 任意のジョブ名
  • REGION: リージョン エンドポイント(例: us-west1
  • DATASET_NAME: BigQuery データセット名
  • TABLE_NAME: BigQuery テーブル名
  • QUERY: Dataflow SQL クエリ

BigQuery テーブルにデータを書き込む方法を選択するには、--bigquery-write-disposition フラグと次の値を使用します。

  • write-empty: (デフォルト)テーブルが空の場合にのみデータを書き込みます。
  • write-append: テーブルの末尾にデータを追加します。
  • write-truncate: 新しいデータを書き込む前に、テーブル内の既存のデータをすべて消去します。
gcloud dataflow sql query \
  --job-name=JOB_NAME \
  --region=REGION \
  --bigquery-dataset=DATASET_NAME \
  --bigquery-table=TABLE_NAME \
  --bigquery-write-disposition=WRITE_MODE
  'QUERY'

WRITE_MODE は、BigQuery の書き込み処理値に置き換えます。

宛先の BigQuery テーブルのスキーマは、クエリの結果のスキーマと一致している必要があります。宛先の BigQuery テーブルにスキーマがない場合、クエリの結果に一致するスキーマが自動的に割り当てられます。