継続的クエリを作成する
このドキュメントでは、BigQuery で継続的クエリを実行する方法について説明します。
BigQuery の継続的クエリは、継続的に実行される SQL ステートメントです。継続的クエリを使用すると、BigQuery で受信データをリアルタイムで分析し、結果を Bigtable または Pub/Sub にエクスポートしたり、BigQuery テーブルに書き込むことができます。
アカウントの種類を選択する
ユーザー アカウントを使用して継続的クエリジョブを作成して実行できます。また、ユーザー アカウントを使用して継続的クエリジョブを作成し、サービス アカウントを使用して実行することもできます。結果を Pub/Sub トピックにエクスポートする継続的クエリを実行するには、サービス アカウントを使用する必要があります。
ユーザー アカウントを使用する場合、継続的クエリは 2 日間実行されます。サービス アカウントを使用する場合、継続的クエリは明示的にキャンセルされるまで実行されます。詳しくは、認可をご覧ください。
必要な権限
このセクションでは、継続的クエリの作成と実行に必要な権限について説明します。前述の Identity and Access Management(IAM)ロールの代わりに、カスタムロールを使用して必要な権限を取得することもできます。
ユーザー アカウントを使用する場合の権限
このセクションでは、ユーザー アカウントを使用して継続的クエリを作成して実行するために必要なロールと権限について説明します。
BigQuery でジョブを作成するには、ユーザー アカウントに bigquery.jobs.create
IAM 権限が必要です。次の各 IAM ロールには、bigquery.jobs.create
権限が付与されています。
- BigQuery ユーザー(
roles/bigquery.user
) - BigQuery ジョブユーザー(
roles/bigquery.jobUser
) - BigQuery 管理者(
roles/bigquery.admin
)
BigQuery テーブルからデータをエクスポートするには、ユーザー アカウントに bigquery.tables.export
IAM 権限が必要です。次の各 IAM ロールには、bigquery.tables.export
権限が付与されています。
- BigQuery データ閲覧者(
roles/bigquery.dataViewer
) - BigQuery データ編集者(
roles/bigquery.dataEditor
) - BigQuery データオーナー(
roles/bigquery.dataOwner
) - BigQuery 管理者(
roles/bigquery.admin
)
BigQuery テーブルのデータを更新するには、ユーザー アカウントに bigquery.tables.updateData
IAM 権限が必要です。次の各 IAM ロールには、bigquery.tables.updateData
権限が付与されています。
- BigQuery データ編集者(
roles/bigquery.dataEditor
) - BigQuery データオーナー(
roles/bigquery.dataOwner
) - BigQuery 管理者(
roles/bigquery.admin
)
ユーザー アカウントが継続的クエリのユースケースに必要な API を有効にしなければならない場合は、ユーザー アカウントに Service Usage 管理者(roles/serviceusage.serviceUsageAdmin
)ロールが必要です。
サービス アカウントを使用する場合の権限
このセクションでは、継続的クエリを作成するユーザー アカウントと、継続的クエリを実行するサービス アカウントに必要なロールと権限について説明します。
ユーザー アカウント権限
BigQuery でジョブを作成するには、ユーザー アカウントに bigquery.jobs.create
IAM 権限が必要です。次の各 IAM ロールには、bigquery.jobs.create
権限が付与されています。
- BigQuery ユーザー(
roles/bigquery.user
) - BigQuery ジョブユーザー(
roles/bigquery.jobUser
) - BigQuery 管理者(
roles/bigquery.admin
)
サービス アカウントを使用して実行されるジョブを送信するには、ユーザー アカウントにサービス アカウント ユーザー(roles/iam.serviceAccountUser
)ロールが必要です。同じユーザー アカウントを使用してサービス アカウントを作成する場合、ユーザー アカウントにはサービス アカウント管理者(roles/iam.serviceAccountAdmin
)ロールが必要です。プロジェクト内のすべてのサービス アカウントではなく、単一のサービス アカウントへのユーザーのアクセスを制限する方法については、単一のロールを付与するをご覧ください。
ユーザー アカウントが継続的クエリのユースケースに必要な API を有効にしなければならない場合は、ユーザー アカウントに Service Usage 管理者(roles/serviceusage.serviceUsageAdmin
)ロールが必要です。
サービス アカウントの権限
BigQuery テーブルからデータをエクスポートするには、サービス アカウントに bigquery.tables.export
IAM 権限が必要です。次の IAM ロールにはそれぞれ bigquery.tables.export
権限が付与されています。
- BigQuery データ閲覧者(
roles/bigquery.dataViewer
) - BigQuery データ編集者(
roles/bigquery.dataEditor
) - BigQuery データオーナー(
roles/bigquery.dataOwner
) - BigQuery 管理者(
roles/bigquery.admin
)
bigquery.tables.updateData
IAM 権限が必要です。次の各 IAM ロールには、bigquery.tables.updateData
権限が付与されています。
- BigQuery データ編集者(
roles/bigquery.dataEditor
) - BigQuery データオーナー(
roles/bigquery.dataOwner
) - BigQuery 管理者(
roles/bigquery.admin
)
始める前に
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the BigQuery API.
予約を作成する
Enterprise または Enterprise Plus エディションの予約を作成し、CONTINUOUS
ジョブタイプで予約割り当てを作成します。
Pub/Sub へのエクスポート
Pub/Sub にデータをエクスポートするには、追加の API、IAM 権限、Google Cloud リソースが必要です。詳細については、Pub/Sub にエクスポートするをご覧ください。
カスタム属性を Pub/Sub メッセージにメタデータとして埋め込む
Pub/Sub 属性を使用して、優先度、送信元、宛先、追加のメタデータなど、メッセージに関する追加情報を指定できます。属性を使用して、サブスクリプションでメッセージをフィルタすることもできます。
継続的クエリの結果で、列の名前が _ATTRIBUTES
の場合、その値は Pub/Sub メッセージ属性にコピーされます。_ATTRIBUTES
内に指定されたフィールドは、属性キーとして使用されます。
_ATTRIBUTES
列は JSON
型で、ARRAY<STRUCT<STRING, STRING>>
または STRUCT<STRING>
の形式にする必要があります。
例については、Pub/Sub トピックにデータをエクスポートするをご覧ください。
Bigtable にエクスポートする
Bigtable にデータをエクスポートするには、追加の API、IAM 権限、Google Cloud リソースが必要です。詳細については、Bigtable にエクスポートするをご覧ください。
BigQuery テーブルにデータを書き込む
BigQuery テーブルにデータを書き込むには、INSERT
ステートメントを使用します。
AI 関数を使用する
継続的クエリでサポートされている AI 関数を使用するには、追加の API、IAM 権限、Google Cloud リソースが必要です。詳細については、ユースケースに応じて次のいずれかをご覧ください。
ML.GENERATE_TEXT
関数を使用してテキストを生成するML.GENERATE_EMBEDDING
関数を使用してテキスト エンベディングを生成するML.UNDERSTAND_TEXT
関数を使用してテキストを理解するML.TRANSLATE
関数を使用してテキストを翻訳する
継続的クエリで AI 関数を使用する場合は、クエリ出力が関数の割り当て内かどうか検討してください。割り当てを超えた場合は、処理されないレコードを個別に処理する必要があります。
ユーザー アカウントを使用して継続的クエリを実行する
このセクションでは、ユーザー アカウントを使用して継続的クエリを実行する方法について説明します。継続的クエリの実行後、クエリの実行を中断することなく、Google Cloud コンソール、ターミナル ウィンドウ、またはアプリケーションを閉じることができます。
継続的クエリを実行する手順は次のとおりです。
コンソール
Google Cloud コンソールで [BigQuery] ページに移動します。
クエリエディタで [展開] をクリックします。
[クエリモードを選択] で、[継続的クエリ] を選択します。
[確認] をクリックします。
クエリエディタで、継続的クエリの SQL ステートメントを入力します。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。
[実行] をクリックします。
bq
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Cloud Shell で、
--continuous
フラグを指定してbq query
コマンドを使用し、継続的クエリを実行します。bq query --use_legacy_sql=false --continuous=true 'QUERY'
QUERY
は、継続的クエリの SQL ステートメントに置き換えます。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。
API
jobs.insert
メソッドを呼び出して、継続的クエリを実行します。Job
リソースの JobConfigurationQuery
で continuous
フィールドを true
に設定する必要があります。
curl --request POST \ 'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs --header 'Authorization: Bearer $(gcloud auth application-default print-access-token)' \ --header 'Accept: application/json' \ --header 'Content-Type: application/json' \ --data '("configuration":("continuous":true,"useLegacySql":false,"query":"QUERY"))' --compressed
次のように置き換えます。
PROJECT_ID
: プロジェクト ID。QUERY
: 継続的クエリの SQL ステートメント。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。
サービス アカウントを使用して継続的クエリを実行する
このセクションでは、サービス アカウントを使用して継続的クエリを実行する方法について説明します。継続的クエリの実行後、クエリの実行を中断することなく、Google Cloud コンソール、ターミナル ウィンドウ、またはアプリケーションを閉じることができます。
サービス アカウントを使用して継続的クエリを実行する手順は次のとおりです。
コンソール
- サービス アカウントを作成します。
- サービス アカウントに必要な権限を付与します。
Google Cloud コンソールで [BigQuery] ページに移動します。
クエリエディタで [展開] をクリックします。
[クエリモードを選択] で、[継続的クエリ] を選択します。
[確認] をクリックします。
クエリエディタで、[展開] > [クエリ設定] の順にクリックします。
[継続的クエリ] セクションで、[サービス アカウント] ボックスを使用して、作成したサービス アカウントを選択します。
[保存] をクリックします。
クエリエディタで、継続的クエリの SQL ステートメントを入力します。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。
[実行] をクリックします。
bq
- サービス アカウントを作成します。
- サービス アカウントに必要な権限を付与します。
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
コマンドラインで、次のフラグを指定して
bq query
コマンドを使用し、継続的クエリを実行します。- クエリを連続的に実行するには、
--continuous
フラグをtrue
に設定します。 --connection_property
フラグを使用して、使用するサービス アカウントを指定します。
bq query --project_id=PROJECT_ID --use_legacy_sql=false \ --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \ 'QUERY'
次のように置き換えます。
PROJECT_ID
: プロジェクト ID。SERVICE_ACCOUNT_EMAIL
: サービス アカウントのメールアドレス。サービス アカウントのメールアドレスは、Google Cloud コンソールの [サービス アカウント] ページで確認できます。QUERY
: 継続的クエリの SQL ステートメント。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。
- クエリを連続的に実行するには、
API
- サービス アカウントを作成します。
- サービス アカウントに必要な権限を付与します。
jobs.insert
メソッドを呼び出して、継続的クエリを実行します。Job
リソースのJobConfigurationQuery
リソースで、次のフィールドを設定します。- クエリを継続的に実行するには、
continuous
フィールドをtrue
に設定します。 connection_property
フィールドに、使用するサービス アカウントを指定します。
curl --request POST \ 'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs --header 'Authorization: Bearer $(gcloud auth print-access-token) \ --header 'Accept: application/json' \ --header 'Content-Type: application/json' \ --data '("configuration":("query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \ --compressed
次のように置き換えます。
PROJECT_ID
: プロジェクト ID。QUERY
: 継続的クエリの SQL ステートメント。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。SERVICE_ACCOUNT_EMAIL
: サービス アカウントのメールアドレス。サービス アカウントのメールアドレスは、Google Cloud コンソールの [サービス アカウント] ページで確認できます。
- クエリを継続的に実行するには、
例
次の SQL の例は、継続的クエリの一般的なユースケースを示しています。
データを Pub/Sub トピックにエクスポートする
次の例では、タクシー乗車情報のストリーミングを受信している BigQuery テーブルのデータをフィルタし、メッセージ属性を使用してリアルタイムで Pub/Sub トピックにデータをパブリッシュする継続的クエリを示します。
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude)) AS message, TO_JSON( STRUCT( CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES FROM `myproject.real_time_taxi_streaming.taxi_rides` WHERE ride_status = 'enroute' );
データを Bigtable テーブルにエクスポートする
次の例は、タクシーの乗車情報のストリーミングを受信している BigQuery テーブルからデータをフィルタし、Bigtable テーブルにデータをリアルタイムでエクスポートする継続的クエリを示しています。
EXPORT DATA OPTIONS ( format = 'CLOUD_BIGTABLE', truncate = TRUE, overwrite = TRUE, uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides') AS ( SELECT CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey, STRUCT( timestamp, latitude, longitude, meter_reading, ride_status, passenger_count) AS features FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'enroute' );
BigQuery テーブルにデータを書き込む
次の例は、タクシー乗車情報のストリーミングを受信している BigQuery テーブルからデータをフィルタして変換し、データを別の BigQuery テーブルにリアルタイムで書き込む継続的クエリを示しています。これにより、ダウンストリームの詳細な分析にデータを使用できるようになります。
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'dropoff';
Vertex AI モデルを使用してデータを処理する
次の例は、Vertex AI モデルを使用して、現在の緯度と経度に基づいてタクシー乗客向けの広告を生成し、結果を Pub/Sub トピックにリアルタイムでエクスポートする継続的クエリを示しています。
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude, prompt, ml_generate_text_llm_result)) AS message FROM ML.GENERATE_TEXT( MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`, ( SELECT timestamp, ride_id, latitude, longitude, CONCAT( 'Generate an ad based on the current latitude of ', latitude, ' and longitude of ', longitude) AS prompt FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'enroute' ), STRUCT( 50 AS max_output_tokens, 1.0 AS temperature, 40 AS top_k, 1.0 AS top_p, TRUE AS flatten_json_output)) AS ml_output );
特定の時点から継続的クエリを開始する
継続的クエリを開始すると、選択したテーブル内のすべての行が処理され、新しい行が追加されると処理されます。既存データの一部またはすべてを処理しない場合、APPENDS
変更履歴関数を使用して、特定の時点から処理を開始できます。
次の例は、APPENDS
関数を使用して特定の時点から継続的クエリを開始する方法を示しています。
EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING(STRUCT(ride_id, timestamp, latitude, longitude)) AS message FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, '2024-06-12 01:23:03.652423 UTC', NULL) WHERE ride_status = 'enroute');
継続的クエリの SQL を変更する
継続的クエリジョブの実行中に、継続的クエリで使用される SQL を更新することはできません。継続的クエリジョブをキャンセルし、SQL を変更してから、元の継続的クエリジョブを停止したポイントから新しい継続的クエリジョブを開始する必要があります。
継続的クエリで使用される SQL を変更する手順は次のとおりです。
- 更新する継続的クエリジョブのジョブの詳細を表示し、ジョブ ID をメモします。
- 可能であれば、アップストリーム データの収集を一時停止します。これを行わないと、継続的クエリが再起動したときにデータが重複する可能性があります。
- 変更する継続的クエリをキャンセルします。
INFORMATION_SCHEMA
JOBS
ビューを使用して、元の継続的クエリジョブのend_time
値を取得します。SELECT end_time FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT WHERE EXTRACT(DATE FROM creation_time) = current_date() AND error_result.reason = 'stopped' AND job_id = 'JOB_ID';
次のように置き換えます。
PROJECT_ID
: プロジェクト ID。REGION
: プロジェクトで使用されるリージョン。JOB_ID
: 手順 1 で特定した継続的クエリジョブ ID。
手順 5 で取得した
end_time
値を開始点として、特定の時点から継続的クエリが開始するように、継続的クエリの SQL ステートメントを変更します。必要な変更を反映するように、継続的クエリの SQL ステートメントを変更します。
変更した継続的クエリを実行します。
継続的クエリをキャンセルする
他のジョブと同様に、継続的クエリジョブをキャンセルできます。ジョブがキャンセルされてからクエリの実行が停止するまでに、最大で 1 分かかることがあります。