継続的クエリを作成する
このドキュメントでは、BigQuery で継続的クエリを実行する方法について説明します。
BigQuery の継続的クエリは、継続的に実行される SQL ステートメントです。継続的クエリを使用すると、BigQuery で受信データをリアルタイムで分析し、結果を Bigtable または Pub/Sub にエクスポートするか、BigQuery テーブルに書き込むことができます。
アカウントの種類を選択する
ユーザー アカウントを使用して継続的クエリジョブを作成して実行できます。また、ユーザー アカウントを使用して継続的クエリジョブを作成し、サービス アカウントを使用して実行することもできます。結果を Pub/Sub トピックにエクスポートする継続的クエリを実行するには、サービス アカウントを使用する必要があります。
ユーザー アカウントを使用する場合、継続的クエリは最大 2 日間実行されます。サービス アカウントを使用する場合、継続的クエリは最大 150 日間実行されます。詳しくは、認可をご覧ください。
必要な権限
このセクションでは、継続的クエリの作成と実行に必要な権限について説明します。前述の Identity and Access Management(IAM)ロールの代わりに、カスタムロールを使用して必要な権限を取得することもできます。
ユーザー アカウントを使用する場合の権限
このセクションでは、ユーザー アカウントを使用して継続的クエリを作成して実行するために必要なロールと権限について説明します。
BigQuery でジョブを作成するには、ユーザー アカウントに bigquery.jobs.create
IAM 権限が必要です。次の各 IAM ロールには、bigquery.jobs.create
権限が付与されています。
- BigQuery ユーザー(
roles/bigquery.user
) - BigQuery ジョブユーザー(
roles/bigquery.jobUser
) - BigQuery 管理者(
roles/bigquery.admin
)
BigQuery テーブルからデータをエクスポートするには、ユーザー アカウントに bigquery.tables.export
IAM 権限が必要です。次の各 IAM ロールには、bigquery.tables.export
権限が付与されています。
- BigQuery データ閲覧者(
roles/bigquery.dataViewer
) - BigQuery データ編集者(
roles/bigquery.dataEditor
) - BigQuery データオーナー(
roles/bigquery.dataOwner
) - BigQuery 管理者(
roles/bigquery.admin
)
BigQuery テーブルのデータを更新するには、ユーザー アカウントに bigquery.tables.updateData
IAM 権限が必要です。次の各 IAM ロールには、bigquery.tables.updateData
権限が付与されています。
- BigQuery データ編集者(
roles/bigquery.dataEditor
) - BigQuery データオーナー(
roles/bigquery.dataOwner
) - BigQuery 管理者(
roles/bigquery.admin
)
ユーザー アカウントが継続的クエリのユースケースに必要な API を有効にしなければならない場合は、ユーザー アカウントに Service Usage 管理者(roles/serviceusage.serviceUsageAdmin
)ロールが必要です。
サービス アカウントを使用する場合の権限
このセクションでは、継続的クエリを作成するユーザー アカウントと、継続的クエリを実行するサービス アカウントに必要なロールと権限について説明します。
ユーザー アカウント権限
BigQuery でジョブを作成するには、ユーザー アカウントに bigquery.jobs.create
IAM 権限が必要です。次の各 IAM ロールには、bigquery.jobs.create
権限が付与されています。
- BigQuery ユーザー(
roles/bigquery.user
) - BigQuery ジョブユーザー(
roles/bigquery.jobUser
) - BigQuery 管理者(
roles/bigquery.admin
)
サービス アカウントを使用して実行されるジョブを送信するには、ユーザー アカウントにサービス アカウント ユーザー(roles/iam.serviceAccountUser
)ロールが必要です。同じユーザー アカウントを使用してサービス アカウントを作成する場合、ユーザー アカウントにはサービス アカウント管理者(roles/iam.serviceAccountAdmin
)ロールが必要です。プロジェクト内のすべてのサービス アカウントではなく、単一のサービス アカウントへのユーザーのアクセスを制限する方法については、単一のロールを付与するをご覧ください。
ユーザー アカウントが継続的クエリのユースケースに必要な API を有効にしなければならない場合は、ユーザー アカウントに Service Usage 管理者(roles/serviceusage.serviceUsageAdmin
)ロールが必要です。
サービス アカウントの権限
BigQuery テーブルからデータをエクスポートするには、サービス アカウントに bigquery.tables.export
IAM 権限が必要です。次の IAM ロールにはそれぞれ bigquery.tables.export
権限が付与されています。
- BigQuery データ閲覧者(
roles/bigquery.dataViewer
) - BigQuery データ編集者(
roles/bigquery.dataEditor
) - BigQuery データオーナー(
roles/bigquery.dataOwner
) - BigQuery 管理者(
roles/bigquery.admin
)
bigquery.tables.updateData
IAM 権限が必要です。次の各 IAM ロールには、bigquery.tables.updateData
権限が付与されています。
- BigQuery データ編集者(
roles/bigquery.dataEditor
) - BigQuery データオーナー(
roles/bigquery.dataOwner
) - BigQuery 管理者(
roles/bigquery.admin
)
始める前に
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the BigQuery API.
予約を作成する
Enterprise または Enterprise Plus エディションの予約を作成し、CONTINUOUS
ジョブタイプで予約割り当てを作成します。この予約では、自動スケーリングを使用できます。継続的クエリの予約割り当てには、予約の制限が適用されます。
Pub/Sub へのエクスポート
Pub/Sub にデータをエクスポートするには、追加の API、IAM 権限、 Google Cloud リソースが必要です。詳細については、Pub/Sub にエクスポートするをご覧ください。
カスタム属性を Pub/Sub メッセージにメタデータとして埋め込む
Pub/Sub 属性を使用して、優先度、送信元、宛先、追加のメタデータなど、メッセージに関する追加情報を指定できます。属性を使用して、サブスクリプションでメッセージをフィルタすることもできます。
継続的クエリの結果で、列の名前が _ATTRIBUTES
の場合、その値は Pub/Sub メッセージ属性にコピーされます。_ATTRIBUTES
内に指定されたフィールドは、属性キーとして使用されます。
_ATTRIBUTES
列は JSON
型で、ARRAY<STRUCT<STRING, STRING>>
または STRUCT<STRING>
の形式にする必要があります。
例については、Pub/Sub トピックにデータをエクスポートするをご覧ください。
Bigtable にエクスポートする
Bigtable にデータをエクスポートするには、追加の API、IAM 権限、 Google Cloudリソースが必要です。詳細については、Bigtable にエクスポートするをご覧ください。
Spanner にエクスポートする
Spanner にデータをエクスポートするには、追加の API、IAM 権限、 Google Cloudリソースが必要です。詳細については、Spanner にエクスポートする(リバース ETL)をご覧ください。
BigQuery テーブルにデータを書き込む
BigQuery テーブルにデータを書き込むには、INSERT
ステートメントを使用します。
AI 関数を使用する
継続的クエリでサポートされている AI 関数を使用するには、追加の API、IAM 権限、 Google Cloudリソースが必要です。詳細については、ユースケースに応じて次のいずれかをご覧ください。
ML.GENERATE_TEXT
関数を使用してテキストを生成するML.GENERATE_EMBEDDING
関数を使用してテキスト エンベディングを生成するML.UNDERSTAND_TEXT
関数を使用してテキストを理解するML.TRANSLATE
関数を使用してテキストを翻訳する
継続的クエリで AI 関数を使用する場合は、クエリ出力が関数の割り当て内かどうか検討してください。割り当てを超えた場合は、処理されないレコードを個別に処理する必要があります。
開始点を指定する
処理する最も古いデータを指定するには、継続的クエリの FROM
句で APPENDS
関数を使用する必要があります。たとえば、APPENDS(TABLE my_table, CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
は、継続的クエリの開始時刻の 10 分前までにテーブル my_table
に追加されたデータを処理するように BigQuery に指示します。my_table
に追加されたデータは、入力された時点で処理されます。データ処理に遅延は発生しません。継続的クエリで APPENDS
関数を使用する場合は、end_timestamp
引数を指定しないでください。
次の例は、タクシー乗車情報のストリーミングを受信している BigQuery テーブルに対してクエリを実行するときに、APPENDS
関数を使用して特定の時点から継続的クエリを開始する方法を示しています。
EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING(STRUCT(ride_id, timestamp, latitude, longitude)) AS message FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute');
現在の時刻より前の開始時刻を指定する
現在の時点より前のデータを処理する場合は、APPENDS
関数を使用して、クエリの開始点を指定します。指定する開始点は、選択するテーブルのタイムトラベル期間内にある必要があります。デフォルトでは、タイムトラベル期間は過去 7 日間です。
タイムトラベル期間外のデータを含めるには、標準クエリを使用して特定の時点までのデータを挿入またはエクスポートし、その時点から継続的クエリを開始します。
例
次の例は、タクシー乗車情報のストリーミングを受信している BigQuery テーブルから特定の時点までの古いデータをテーブルに読み込み、古いデータのカットオフ ポイントから継続的クエリを開始する方法を示しています。
標準クエリを実行して、特定の時点までのデータをバックフィルします。
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM `myproject.real_time_taxi_streaming.taxirides` -- Include all data inserted into the table up to this point in time. -- This timestamp must be within the time travel window. FOR SYSTEM_TIME AS OF '2025-01-01 00:00:00 UTC' WHERE ride_status = 'dropoff';
クエリが停止した時点から継続的クエリを実行します。
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to start processing -- data right where the batch query left off. -- This timestamp must be within the time travel window. TIMESTAMP '2025-01-01 00:00:00 UTC' + INTERVAL 1 MICROSECOND) WHERE ride_status = 'dropoff';
ユーザー アカウントを使用して継続的クエリを実行する
このセクションでは、ユーザー アカウントを使用して継続的クエリを実行する方法について説明します。継続的クエリの実行後、クエリの実行を中断することなく、 Google Cloud コンソール、ターミナル ウィンドウ、またはアプリケーションを閉じることができます。ユーザー アカウントで実行される継続的クエリは、最大 2 日間実行され、その後自動的に停止します。新しい受信データの処理を続行するには、新しい継続的クエリを開始し、開始点を指定します。このプロセスを自動化するには、失敗したクエリを再試行するをご覧ください。
継続的クエリを実行する手順は次のとおりです。
コンソール
Google Cloud コンソールで、[BigQuery] ページに移動します。
クエリエディタで [
その他] をクリックします。- [クエリモードを選択] で、[継続的クエリ] を選択します。
- [確認] をクリックします。
- 省略可: クエリの実行時間を制御するには、[クエリの設定] をクリックして、[ジョブのタイムアウト] をミリ秒単位で設定します。
クエリエディタで、継続的クエリの SQL ステートメントを入力します。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。
[実行] をクリックします。
bq
-
In the Google Cloud console, activate Cloud Shell.
Cloud Shell で、
--continuous
フラグを指定してbq query
コマンドを使用し、継続的クエリを実行します。bq query --use_legacy_sql=false --continuous=true 'QUERY'
QUERY
は、継続的クエリの SQL ステートメントに置き換えます。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。クエリの実行時間を制御するには、--job_timeout_ms
フラグを使用します。PROJECT_ID
: プロジェクト ID。QUERY
: 継続的クエリの SQL ステートメント。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。
API
jobs.insert
メソッドを呼び出して、継続的クエリを実行します。Job
リソースの JobConfigurationQuery
で continuous
フィールドを true
に設定する必要があります。必要に応じて、jobTimeoutMs
フィールドを設定して、クエリの実行時間を制御できます。
curl --request POST \ "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \ --header "Authorization: Bearer $(gcloud auth print-access-token)" \ --header "Content-Type: application/json; charset=utf-8" \ --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true}}}' \ --compressed
次のように置き換えます。
サービス アカウントを使用して継続的クエリを実行する
このセクションでは、サービス アカウントを使用して継続的クエリを実行する方法について説明します。継続的クエリの実行後、クエリの実行を中断することなく、 Google Cloud コンソール、ターミナル ウィンドウ、またはアプリケーションを閉じることができます。サービス アカウントを使用して実行される継続的クエリは、最大 150 日間実行され、その後自動的に停止します。新しい受信データの処理を続行するには、新しい継続的クエリを開始し、開始点を指定します。このプロセスを自動化するには、失敗したクエリを再試行するをご覧ください。
サービス アカウントを使用して継続的クエリを実行する手順は次のとおりです。
コンソール
- サービス アカウントを作成します。
- サービス アカウントに必要な権限を付与します。
Google Cloud コンソールで、[BigQuery] ページに移動します。
クエリエディタで [展開] をクリックします。
[クエリモードを選択] で、[継続的クエリ] を選択します。
[確認] をクリックします。
クエリエディタで、[その他 > クエリ設定] の順にクリックします。
[継続的クエリ] セクションで、[サービス アカウント] ボックスを使用して、作成したサービス アカウントを選択します。
省略可: クエリの実行時間を制御するには、[ジョブのタイムアウト] をミリ秒単位で設定します。
[保存] をクリックします。
クエリエディタで、継続的クエリの SQL ステートメントを入力します。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。
[実行] をクリックします。
bq
- サービス アカウントを作成します。
- サービス アカウントに必要な権限を付与します。
-
In the Google Cloud console, activate Cloud Shell.
コマンドラインで、次のフラグを指定して
bq query
コマンドを使用し、継続的クエリを実行します。- クエリを連続的に実行するには、
--continuous
フラグをtrue
に設定します。 --connection_property
フラグを使用して、使用するサービス アカウントを指定します。- 省略可:
--job_timeout_ms
フラグを設定して、クエリの実行時間を制限します。
bq query --project_id=PROJECT_ID --use_legacy_sql=false \ --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \ 'QUERY'
次のように置き換えます。
PROJECT_ID
: プロジェクト ID。SERVICE_ACCOUNT_EMAIL
: サービス アカウントのメールアドレス。サービス アカウントのメールアドレスは、 Google Cloud コンソールの [サービス アカウント] ページで確認できます。QUERY
: 継続的クエリの SQL ステートメント。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。
- クエリを連続的に実行するには、
- サービス アカウントを作成します。
- サービス アカウントに必要な権限を付与します。
jobs.insert
メソッドを呼び出して、継続的クエリを実行します。Job
リソースのJobConfigurationQuery
リソースで、次のフィールドを設定します。- クエリを継続的に実行するには、
continuous
フィールドをtrue
に設定します。 connectionProperties
フィールドに、使用するサービス アカウントを指定します。
必要に応じて、
JobConfiguration
リソースのjobTimeoutMs
フィールドを設定して、クエリの実行時間を制御できます。curl --request POST \ "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \ --header "Authorization: Bearer $(gcloud auth print-access-token)" \ --header "Content-Type: application/json; charset=utf-8" \ --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":[{"key":"service_account","value":"SERVICE_ACCOUNT_EMAIL"}]}}}' \ --compressed
次のように置き換えます。
PROJECT_ID
: プロジェクト ID。QUERY
: 継続的クエリの SQL ステートメント。SQL ステートメントには、サポートされているオペレーションのみを含める必要があります。SERVICE_ACCOUNT_EMAIL
: サービス アカウントのメールアドレス。サービス アカウントのメールアドレスは、 Google Cloud コンソールの [サービス アカウント] ページで確認できます。
- クエリを継続的に実行するには、
API
カスタムジョブ ID を作成する
すべてのクエリジョブには、ジョブの検索と管理に使用できるジョブ ID が割り当てられます。デフォルトでは、ジョブ ID はランダムに生成されます。ジョブ履歴またはジョブ エクスプローラを使用して継続的クエリのジョブ ID を検索しやすくするには、カスタムジョブ ID 接頭辞を割り当てます。
Google Cloud コンソールで、[BigQuery] ページに移動します。
クエリエディタで [展開] をクリックします。
[クエリモードを選択] で、[継続的クエリ] を選択します。
[確認] をクリックします。
クエリエディタで、[その他 > クエリ設定] の順にクリックします。
[カスタムジョブ ID 接頭辞] セクションに、カスタム名の接頭辞を入力します。
[保存] をクリックします。
例
次の SQL の例は、継続的クエリの一般的なユースケースを示しています。
データを Pub/Sub トピックにエクスポートする
次の例では、タクシー乗車情報のストリーミングを受信している BigQuery テーブルのデータをフィルタし、メッセージ属性を使用してリアルタイムで Pub/Sub トピックにデータをパブリッシュする継続的クエリを示します。
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude)) AS message, TO_JSON( STRUCT( CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' );
データを Bigtable テーブルにエクスポートする
次の例は、タクシーの乗車情報のストリーミングを受信している BigQuery テーブルからデータをフィルタし、Bigtable テーブルにデータをリアルタイムでエクスポートする継続的クエリを示しています。
EXPORT DATA OPTIONS ( format = 'CLOUD_BIGTABLE', truncate = TRUE, overwrite = TRUE, uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides') AS ( SELECT CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey, STRUCT( timestamp, latitude, longitude, meter_reading, ride_status, passenger_count) AS features FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' );
データを Spanner テーブルにエクスポートする
次の例は、タクシーの乗車情報のストリーミングを受信している BigQuery テーブルからデータをフィルタし、Spanner テーブルにデータをリアルタイムでエクスポートする継続的クエリを示しています。
EXPORT DATA OPTIONS ( format = 'CLOUD_SPANNER', uri = 'https://spanner.googleapis.com/projects/myproject/instances/myspannerinstance/databases/taxi-real-time-rides', spanner_options ="""{ "table": "rides", -- To ensure data is written to Spanner in the correct sequence -- during a continuous export, use the change_timestamp_column -- option. This should be mapped to a timestamp column from your -- BigQuery data. If your source data lacks a timestamp, the -- _CHANGE_TIMESTAMP pseudocolumn provided by the APPENDS function -- will be automatically mapped to the "change_timestamp" column. "change_timestamp_column": "change_timestamp" }""" ) AS ( SELECT ride_id, latitude, longitude, meter_reading, ride_status, passenger_count FROM APPENDS( TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' );
BigQuery テーブルにデータを書き込む
次の例は、タクシー乗車情報のストリーミングを受信している BigQuery テーブルからデータをフィルタして変換し、データを別の BigQuery テーブルにリアルタイムで書き込む継続的クエリを示しています。これにより、ダウンストリームの詳細な分析にデータを使用できるようになります。
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'dropoff';
Vertex AI モデルを使用してデータを処理する
次の例は、Vertex AI モデルを使用して、現在の緯度と経度に基づいてタクシー乗客向けの広告を生成し、結果を Pub/Sub トピックにリアルタイムでエクスポートする継続的クエリを示しています。
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude, prompt, ml_generate_text_llm_result)) AS message FROM ML.GENERATE_TEXT( MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`, ( SELECT timestamp, ride_id, latitude, longitude, CONCAT( 'Generate an ad based on the current latitude of ', latitude, ' and longitude of ', longitude) AS prompt FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you -- want to start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' ), STRUCT( 50 AS max_output_tokens, 1.0 AS temperature, 40 AS top_k, 1.0 AS top_p, TRUE AS flatten_json_output)) AS ml_output );
継続的クエリの SQL を変更する
継続的クエリジョブの実行中に、継続的クエリで使用される SQL を更新することはできません。継続的クエリジョブをキャンセルし、SQL を変更してから、元の継続的クエリジョブを停止したポイントから新しい継続的クエリジョブを開始する必要があります。
継続的クエリで使用される SQL を変更する手順は次のとおりです。
- 更新する継続的クエリジョブのジョブの詳細を表示し、ジョブ ID をメモします。
- 可能であれば、アップストリーム データの収集を一時停止します。これを行わないと、継続的クエリが再起動したときにデータが重複する可能性があります。
- 変更する継続的クエリをキャンセルします。
INFORMATION_SCHEMA
JOBS
ビューを使用して、元の継続的クエリジョブのend_time
値を取得します。SELECT end_time FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT WHERE EXTRACT(DATE FROM creation_time) = current_date() AND error_result.reason = 'stopped' AND job_id = 'JOB_ID';
次のように置き換えます。
PROJECT_ID
: プロジェクト ID。REGION
: プロジェクトで使用されるリージョン。JOB_ID
: 手順 1 で特定した継続的クエリジョブ ID。
手順 5 で取得した
end_time
値を開始点として、特定の時点から継続的クエリが開始するように、継続的クエリの SQL ステートメントを変更します。必要な変更を反映するように、継続的クエリの SQL ステートメントを変更します。
変更した継続的クエリを実行します。
継続的クエリをキャンセルする
他のジョブと同様に、継続的クエリジョブをキャンセルできます。ジョブがキャンセルされてからクエリの実行が停止するまでに、最大で 1 分かかることがあります。
クエリをキャンセルしてから再起動すると、再起動されたクエリは新しい独立したクエリのように動作します。再起動されたクエリは、前のジョブが停止した場所からデータ処理を開始するわけではないため、前のクエリの結果を参照できません。特定の時点から継続的クエリを開始するをご覧ください。
クエリをモニタリングしてエラーを処理する
継続的クエリは、データ不整合、スキーマ変更、一時的なサービス停止、メンテナンスなどの要因によって中断されることがあります。BigQuery は一部の一時的なエラーを処理しますが、ジョブの復元力を高めるためのベスト プラクティスは次のとおりです。