Pub/Sub にデータをエクスポートする(リバース ETL)
Pub/Sub にデータをエクスポートするには、BigQuery の継続的クエリを使用する必要があります。継続的クエリのプレビューに登録するには、リクエスト フォームにご記入ください。この機能に関するフィードバックやサポートのリクエストを行う場合は、bq-continuous-queries-feedback@google.com 宛てにメールをお送りください。
このドキュメントでは、BigQuery から Pub/Sub へのリバース ETL(RETL)を設定する方法について説明します。これを行うには、継続的クエリで EXPORT DATA
ステートメントを使用して、BigQuery から Pub/Sub トピックにデータをエクスポートします。
Pub/Sub への RETL ワークフローを使用すると、BigQuery の分析機能と Pub/Sub の非同期でスケーラブルなグローバル メッセージング サービスを組み合わせることができます。このワークフローにより、ダウンストリームのアプリケーションとサービスにイベント ドリブンでデータを提供できます。
前提条件
サービス アカウントを作成する必要があります。結果を Pub/Sub トピックにエクスポートする継続的クエリを実行するには、サービス アカウントが必要です。
継続的クエリの結果をメッセージとして受信するには、Pub/Sub トピックと、ターゲット アプリケーションがそれらのメッセージを受信するための Pub/Sub サブスクリプションを作成する必要があります。
必要なロール
このセクションでは、継続的クエリを作成するユーザー アカウントと、継続的クエリを実行するサービス アカウントに必要なロールと権限について説明します。
ユーザー アカウント権限
BigQuery でジョブを作成するには、ユーザー アカウントに bigquery.jobs.create
IAM 権限が必要です。次の各 IAM ロールには、bigquery.jobs.create
権限が付与されています。
- BigQuery ユーザー(
roles/bigquery.user
) - BigQuery ジョブユーザー(
roles/bigquery.jobUser
) - BigQuery 管理者(
roles/bigquery.admin
)
サービス アカウントを使用して実行されるジョブを送信するには、ユーザー アカウントにサービス アカウント ユーザー(roles/iam.serviceAccountUser
)ロールが必要です。同じユーザー アカウントを使用してサービス アカウントを作成する場合、ユーザー アカウントにはサービス アカウント管理者(roles/iam.serviceAccountAdmin
)ロールが必要です。プロジェクト内のすべてのサービス アカウントではなく、単一のサービス アカウントへのユーザーのアクセスを制限する方法については、単一のロールを付与するをご覧ください。
ユーザー アカウントが継続的クエリのユースケースに必要な API を有効にしなければならない場合は、ユーザー アカウントに Service Usage 管理者(roles/serviceusage.serviceUsageAdmin
)ロールが必要です。
サービス アカウントの権限
BigQuery テーブルからデータをエクスポートするには、サービス アカウントに bigquery.tables.export
IAM 権限が必要です。次の IAM ロールにはそれぞれ bigquery.tables.export
権限が付与されています。
- BigQuery データ閲覧者(
roles/bigquery.dataViewer
) - BigQuery データ編集者(
roles/bigquery.dataEditor
) - BigQuery データオーナー(
roles/bigquery.dataOwner
) - BigQuery 管理者(
roles/bigquery.admin
)
サービス アカウントが Pub/Sub にアクセスできるようにするには、サービス アカウントに次の IAM ロールの両方を付与する必要があります。
必要な権限は、カスタムロールを使用して取得することもできます。
始める前に
Enable the BigQuery and Pub/Sub APIs.
Pub/Sub へのエクスポート
EXPORT DATA
ステートメントを使用して、データを Pub/Sub トピックにエクスポートします。
コンソール
Google Cloud コンソールで [BigQuery] ページに移動します。
クエリエディタで、[展開] > [クエリ設定] の順にクリックします。
[継続的クエリ] セクションで、[継続的クエリモードを使用する] チェックボックスをオンにします。
[サービス アカウント] ボックスで、作成したサービス アカウントを選択します。
[保存] をクリックします。
クエリエディタで次のステートメントを入力します。
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID' ) AS ( QUERY );
次のように置き換えます。
PROJECT_ID
: プロジェクト ID。TOPIC_ID
: Pub/Sub トピック ID。トピック ID は、Google Cloud コンソールの [トピック] ページで取得できます。QUERY
: エクスポートするデータを選択する SQL ステートメント。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。
[実行] をクリックします。
bq
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
コマンドラインで、次のフラグを指定して
bq query
コマンドを使用し、継続的クエリを実行します。- クエリを連続的に実行するには、
--continuous
フラグをtrue
に設定します。 --connection_property
フラグを使用して、使用するサービス アカウントを指定します。
bq query --project_id=PROJECT_ID --use_legacy_sql=false \ --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \ 'EXPORT DATA OPTIONS (format = "CLOUD_PUBSUB", uri = "https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID") AS (QUERY);'
次のように置き換えます。
PROJECT_ID
: プロジェクト ID。SERVICE_ACCOUNT_EMAIL
: サービス アカウントのメールアドレス。サービス アカウントのメールアドレスは、Google Cloud コンソールの [サービス アカウント] ページで確認できます。QUERY
: エクスポートするデータを選択する SQL ステートメント。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。
- クエリを連続的に実行するには、
API
jobs.insert
メソッドを呼び出して、継続的クエリを実行します。Job
リソースのJobConfigurationQuery
リソースで、次のフィールドを設定します。- クエリを継続的に実行するには、
continuous
フィールドをtrue
に設定します。 connection_property
フィールドに、使用するサービス アカウントを指定します。
curl --request POST \ 'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs' --header 'Authorization: Bearer $(gcloud auth print-access-token) \ --header 'Accept: application/json' \ --header 'Content-Type: application/json' \ --data '("configuration":("query":"EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/PROJECT_ID/topics/TOPIC_ID') AS (QUERY);","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \ --compressed
次のように置き換えます。
PROJECT_ID
: プロジェクト ID。QUERY
: エクスポートするデータを選択する SQL ステートメント。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。SERVICE_ACCOUNT_EMAIL
: サービス アカウントのメールアドレス。サービス アカウントのメールアドレスは、Google Cloud コンソールの [サービス アカウント] ページで確認できます。
- クエリを継続的に実行するには、
複数の列を Pub/Sub にエクスポートする
出力に複数の列を含める場合は、列値を含む構造体列を作成し、TO_JSON_STRING
関数を使用して構造体値を JSON 文字列に変換します。次の例では、4 つの列からデータをエクスポートし、JSON 文字列としてフォーマットします。
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude)) AS message FROM `myproject.real_time_taxi_streaming.taxi_rides` WHERE ride_status = 'enroute' );
エクスポートの最適化
継続的クエリジョブのパフォーマンスが使用可能なコンピューティング リソースによって制限されていると思われる場合は、BigQuery CONTINUOUS
スロット予約割り当てのサイズを増やしてみてください。
制限事項
- エクスポートされたデータは、単一の
STRING
列またはBYTES
列で構成する必要があります。列名は任意の名前にできます。 - Pub/Sub にエクスポートするには、継続的クエリを使用する必要があります。
- 継続的クエリで Pub/Sub トピックにスキーマを渡すことはできません。
- スキーマを使用する Pub/Sub トピックにデータをエクスポートすることはできません。
NULL
値を含むデータはエクスポートできません。継続的クエリにWHERE message IS NOT NULL
フィルタを追加すると、クエリ結果からNULL
値を除外できます。- エクスポートするデータは Pub/Sub の割り当てを超えないようにする必要があります。
料金
継続的クエリでデータをエクスポートする場合は、BigQuery 容量のコンピューティング料金に基づいて課金されます。継続的クエリを実行するには、Enterprise エディションまたは Enterprise Plus エディションを使用する予約と、CONTINUOUS
ジョブタイプを使用する予約割り当てが必要です。
データのエクスポート後、Pub/Sub の使用料金が発生します。詳細については、Pub/Sub の料金をご覧ください。