このページでは、Dataflow SQL を使用してデータを取得し、クエリの結果を書き込む方法について説明します。
Dataflow SQL は次のソースに対してクエリを実行できます。
- Pub/Sub トピックからのデータのストリーミング
- Cloud Storage ファイルセットからのデータのストリーミングとバッチ
- BigQuery テーブルからのデータのバッチ
Dataflow SQL は、クエリの結果を次の宛先に書き込むことができます。
Pub/Sub
Pub/Sub トピックのクエリ
Dataflow SQL でクエリを実行して Pub/Sub トピックを取得するには、次の操作を行います。
Dataflow ソースとして Pub/Sub トピックを追加します。
Pub/Sub トピックにスキーマを割り当てます。
Dataflow SQL クエリで Pub/Sub トピックを使用します。
Pub/Sub トピックの追加
BigQuery ウェブ UI を使用して、Pub/Sub トピックを Dataflow ソースとして追加できます。
Google Cloud Console で、Dataflow SQL を使用できる BigQuery ページに移動します。
ナビゲーション パネルで、[データを追加] プルダウン リストをクリックして [Cloud Dataflow のソース] を選択します。
[Cloud Dataflow ソースを追加] パネルで、[Cloud Pub/Sub トピック] を選択してトピックを検索します。
次のスクリーンショットでは、
transactions
Pub/Sub トピックを検索しています。[追加] をクリックします。
Pub/Sub トピックを Dataflow ソースとして追加すると、ナビゲーション メニューの [リソース] セクションに Pub/Sub トピックが表示されます。
トピックを検索するには、[Cloud Dataflow ソース]、[Cloud Pub/Sub トピック] の順に展開します。
Pub/Sub トピック スキーマの割り当て
Pub/Sub トピック スキーマは、次のフィールドから構成されています。
event_timestamp
フィールド。Pub/Sub イベントのタイムスタンプは、メッセージの公開時間を表します。タイムスタンプは Pub/Sub メッセージに自動的に追加されます。
Pub/Sub メッセージの Key-Value ペアのフィールド。
たとえば、メッセージ
{"k1":"v1", "k2":"v2"}
のスキーマにはk1
とk2
という 2 つのSTRING
フィールドがあります。
Pub/Sub トピックにスキーマを割り当てるには、Cloud Console または Google Cloud CLI を使用します。
Console
Pub/Sub トピックにスキーマを割り当てるには、次の操作を行います。
[リソース] パネルでトピックを選択します。
[スキーマ] タブで [スキーマを編集] をクリックします。[スキーマ] サイドパネルが開き、スキーマ フィールドが表示されます。
[フィールドを追加] をクリックして、スキーマにフィールドを追加します。または、[テキストとして編集] ボタンを切り替えて、スキーマ テキスト全体をコピーして貼り付けます。
たとえば、販売トランザクションを含む Pub/Sub トピックのスキーマ テキストは次のようになります。
[ { "description": "Pub/Sub event timestamp", "name": "event_timestamp", "mode": "REQUIRED", "type": "TIMESTAMP" }, { "description": "Transaction time string", "name": "tr_time_str", "mode": "NULLABLE", "type": "STRING" }, { "description": "First name", "name": "first_name", "mode": "NULLABLE", "type": "STRING" }, { "description": "Last name", "name": "last_name", "mode": "NULLABLE", "type": "STRING" }, { "description": "City", "name": "city", "mode": "NULLABLE", "type": "STRING" }, { "description": "State", "name": "state", "mode": "NULLABLE", "type": "STRING" }, { "description": "Product", "name": "product", "mode": "NULLABLE", "type": "STRING" }, { "description": "Amount of transaction", "name": "amount", "mode": "NULLABLE", "type": "FLOAT64" } ]
[送信] をクリックします。
(省略可)[トピックをプレビュー] をクリックしてメッセージの内容を確認し、定義したスキーマと一致していることを確認します。
gcloud
Pub/Sub トピックにスキーマを割り当てるには、次の操作を行います。
スキーマ テキストを含む JSON ファイルを作成します。
たとえば、販売トランザクションを含む Pub/Sub トピックのスキーマ テキストは次のようになります。
[ { "description": "Pub/Sub event timestamp", "column": "event_timestamp", "mode": "REQUIRED", "type": "TIMESTAMP" }, { "description": "Transaction time string", "column": "tr_time_str", "mode": "NULLABLE", "type": "STRING" }, { "description": "First name", "column": "first_name", "mode": "NULLABLE", "type": "STRING" }, { "description": "Last name", "column": "last_name", "mode": "NULLABLE", "type": "STRING" }, { "description": "City", "column": "city", "mode": "NULLABLE", "type": "STRING" }, { "description": "State", "column": "state", "mode": "NULLABLE", "type": "STRING" }, { "description": "Product", "column": "product", "mode": "NULLABLE", "type": "STRING" }, { "description": "Amount of transaction", "column": "amount", "mode": "NULLABLE", "type": "FLOAT64" } ]
gcloud data-catalog entries
コマンドを使用して、Pub/Sub トピックにスキーマを割り当てます。gcloud data-catalog entries update \ --lookup-entry='pubsub.topic.`PROJECT_ID`.`TOPIC_NAME`' \ --schema-from-file=FILE_PATH
次のように置き換えます。
PROJECT_ID
: プロジェクト IDTOPIC_NAME
: Pub/Sub トピック名FILE_PATH
: スキーマ テキストを含む JSON ファイルのパス
(省略可)次のコマンドを実行して、スキーマが Pub/Sub トピックに正常に割り当てられていることを確認します。
gcloud data-catalog entries lookup \ 'pubsub.topic.`PROJECT_ID`.`TOPIC_NAME`'
Pub/Sub トピックの使用
Dataflow SQL クエリで Pub/Sub を参照するには、次の識別子を使用します。
pubsub.topic.`PROJECT_ID`.`TOPIC_NAME`
次のように置き換えます。
PROJECT_ID
: プロジェクト IDTOPIC_NAME
: Pub/Sub トピック名
たとえば、次のクエリではプロジェクト dataflow-sql
の Dataflow トピック daily.transactions
から選択しています。
SELECT *
FROM pubsub.topic.`dataflow-sql`.`daily.transactions`
Pub/Sub トピックへの書き込み
クエリの結果を Pub/Sub トピックに書き込むには、Cloud Console または Google Cloud CLI を使用します。
Console
Pub/Sub トピックにクエリ結果を書き込むには、Dataflow SQL を使用してクエリを実行します。
Cloud Console で、Dataflow SQL を使用できる BigQuery ページに移動します。
クエリエディタに Dataflow SQL クエリを入力します。
[Cloud Dataflow ジョブを作成] をクリックして、ジョブ オプションのパネルを開きます。
パネルの [送信先] セクションで、[出力タイプ]、[Cloud Pub/Sub トピック] の順に選択します。
[Cloud Pub/Sub トピック] をクリックして、トピックを選択します。
[作成] をクリックします。
gcloud
Pub/Sub トピックにクエリの結果を書き込むには、gcloud dataflow sql query
コマンドで --pubsub-topic
フラグを使用します。
gcloud dataflow sql query \ --job-name=JOB_NAME \ --region=REGION \ --pubsub-project=PROJECT_ID \ --pubsub-topic=TOPIC_NAME \ 'QUERY'
次のように置き換えます。
JOB_NAME
: 任意のジョブ名REGION
: リージョン エンドポイント(例:us-west1
)PROJECT_ID
: プロジェクト IDTOPIC_NAME
: Pub/Sub トピック名QUERY
: Dataflow SQL クエリ
宛先の Pub/Sub トピックのスキーマは、クエリの結果のスキーマと一致している必要があります。宛先の Pub/Sub トピックにスキーマがない場合、クエリの結果に一致するスキーマが自動的に割り当てられます。
Cloud Storage
Cloud Storage ファイルセットのクエリ
Dataflow SQL でクエリを実行して Cloud Storage ファイルセットを取得するには、次の操作を行います。
Dataflow SQL 用に Data Catalog ファイルセットを作成します。
Dataflow ソースとして Cloud Storage ファイルセットを追加します。
Dataflow SQL クエリで Cloud Storage ファイルセットを使用します。
Cloud Storage ファイルセットの作成
Cloud Storage ファイルセットを作成する方法については、エントリ グループとファイルセットの作成をご覧ください。
Cloud Storage ファイルセットにはスキーマを設定し、ヘッダー行を含まない CSV ファイルのみを含める必要があります。
Cloud Storage ファイルセットの追加
Dataflow SQL で、Cloud Storage ファイルセットを Dataflow ソースとして追加します。
Cloud Console で、Dataflow SQL を使用できる BigQuery ページに移動します。
ナビゲーション パネルで、[データを追加] プルダウン リストをクリックして [Cloud Dataflow のソース] を選択します。
[Cloud Dataflow ソースを追加] パネルで Cloud Storage ファイルセットを選択して、トピックを検索します。
[追加] をクリックします。
Cloud Storage ファイルセットを Dataflow ソースとして追加すると、ナビゲーション メニューの [リソース] セクションに Cloud Storage ファイルセットが表示されます。
ファイルセットを検索するには、[Cloud Dataflow のソース]、[Cloud Storage トピック] の順に展開します。
Cloud Storage ファイルセットの使用
Dataflow SQL クエリで Cloud Storage テーブルを参照するには、次の識別子を使用します。
datacatalog.entry.`PROJECT_ID`.REGION.`ENTRY_GROUP`.`FILESET_NAME`
次のように置き換えます。
PROJECT_ID
: プロジェクト IDREGION
: リージョン エンドポイント(例:us-west1
)ENTRY_GROUP
: Cloud Storage ファイルセットのエントリ グループFILESET_NAME
: Cloud Storage ファイルセットの名前
たとえば、次のクエリでは、プロジェクト dataflow-sql
とエントリ グループ my-fileset-group
の Cloud Storage ファイルセット daily.registrations
から選択しています。
SELECT *
FROM datacatalog.entry.`dataflow-sql`.`us-central1`.`my-fileset-group`.`daily.registrations`
BigQuery
BigQuery テーブルのクエリ
Dataflow SQL でクエリを実行して BigQuery テーブルを取得するには、次の操作を行います。
Dataflow SQL 用に BigQuery テーブルを作成します。
Dataflow SQL クエリで BigQuery テーブルを使用します。
BigQuery テーブルを Dataflow ソースとして追加する必要はありません。
BigQuery テーブルの作成
Dataflow SQL 用の BigQuery テーブルを作成する方法については、スキーマ定義を持つ空のテーブルを作成をご覧ください。
クエリでの BigQuery テーブルの使用
Dataflow SQL クエリで BigQuery テーブルを参照するには、次の識別子を使用します。
bigquery.table.`PROJECT_ID`.`DATASET_NAME`.`TABLE_NAME`
識別子は、Dataflow SQL の語彙構造に従う必要があります。識別子に文字、数字、アンダースコア以外の文字が含まれている場合は、識別子をバッククォートで囲みます。
たとえば、次のクエリでは、データセット dataflow_sql_dataset
とプロジェクト dataflow-sql
の BigQuery テーブル us_state_salesregions
を使用しています。
SELECT *
FROM bigquery.table.`dataflow-sql`.dataflow_sql_dataset.us_state_salesregions
BigQuery テーブルへの書き込み
クエリ結果を Dataflow SQL クエリに書き込むには、Cloud Console または Google Cloud CLI を使用します。
Console
クエリの結果を Dataflow SQL クエリに書き込むには、Dataflow SQL でクエリを実行します。
Cloud Console で、Dataflow SQL を使用できる BigQuery ページに移動します。
クエリエディタに Dataflow SQL クエリを入力します。
[Cloud Dataflow ジョブを作成] をクリックして、ジョブ オプションのパネルを開きます。
パネルの [送信先] セクションで、[出力タイプ] > [BigQuery] の順に選択します。
[データセット ID] をクリックし、[読み込まれたデータセット] または [新しいデータセットの作成] を選択します。
[テーブル名] フィールドに宛先テーブルを入力します。
(省略可)BigQuery テーブルへのデータの読み込み方法を選択します。
- 空の場合に書き込む: (デフォルト)テーブルが空の場合にのみデータを書き込みます。
- テーブルに追加する: テーブルの末尾にデータを追加します。
- テーブルの上書き: 新しいデータを書き込む前に、テーブル内の既存のデータをすべて消去します。
[作成] をクリックします。
gcloud
BigQuery テーブルにクエリの結果を書き込むには、gcloud dataflow sql query
コマンドで --bigquery-table
フラグを使用します。
gcloud dataflow sql query \ --job-name=JOB_NAME \ --region=REGION \ --bigquery-dataset=DATASET_NAME \ --bigquery-table=TABLE_NAME \ 'QUERY'
次のように置き換えます。
JOB_NAME
: 任意のジョブ名REGION
: リージョン エンドポイント(例:us-west1
)DATASET_NAME
: BigQuery データセット名TABLE_NAME
: BigQuery テーブル名QUERY
: Dataflow SQL クエリ
BigQuery テーブルにデータを書き込む方法を選択するには、--bigquery-write-disposition
フラグと次の値を使用します。
write-empty
: (デフォルト)テーブルが空の場合にのみデータを書き込みます。write-append
: テーブルの末尾にデータを追加します。write-truncate
: 新しいデータを書き込む前に、テーブル内の既存のデータをすべて消去します。
gcloud dataflow sql query \ --job-name=JOB_NAME \ --region=REGION \ --bigquery-dataset=DATASET_NAME \ --bigquery-table=TABLE_NAME \ --bigquery-write-disposition=WRITE_MODE 'QUERY'
WRITE_MODE
は、BigQuery の書き込み処理値に置き換えます。
宛先の BigQuery テーブルのスキーマは、クエリの結果のスキーマと一致している必要があります。宛先の BigQuery テーブルにスキーマがない場合、クエリの結果に一致するスキーマが自動的に割り当てられます。