建立持續查詢
本文說明如何在 BigQuery 中執行連續查詢。
BigQuery 持續查詢是會不斷執行的 SQL 陳述式,透過持續查詢,您可以在 BigQuery 中即時分析傳入資料,然後將結果匯出至 Bigtable、Pub/Sub 或 Spanner,或是將結果寫入 BigQuery 資料表。
選擇帳戶類型
您可以使用使用者帳戶建立及執行持續查詢作業,也可以使用使用者帳戶建立持續查詢作業,然後使用服務帳戶執行作業。 您必須使用服務帳戶,才能執行將結果匯出至 Pub/Sub 主題的持續查詢。
使用使用者帳戶時,連續查詢最多可執行兩天。使用服務帳戶時,持續查詢最多可執行 150 天。詳情請參閱「授權」。
所需權限
本節說明建立及執行連續查詢所需的權限。除了上述身分與存取權管理 (IAM) 角色,您也可以透過自訂角色取得必要權限。
使用使用者帳戶時的權限
本節說明使用使用者帳戶建立及執行持續查詢時,需要哪些角色和權限。
如要在 BigQuery 中建立工作,使用者帳戶必須具備 bigquery.jobs.create
IAM 權限。下列每個 IAM 角色都會授予 bigquery.jobs.create
權限:
- BigQuery 使用者 (
roles/bigquery.user
) - BigQuery 作業使用者 (
roles/bigquery.jobUser
) - BigQuery 管理員 (
roles/bigquery.admin
)
如要從 BigQuery 資料表匯出資料,使用者帳戶必須具備 bigquery.tables.export
IAM 權限。下列每個 IAM 角色都會授予 bigquery.tables.export
權限:
- BigQuery 資料檢視器 (
roles/bigquery.dataViewer
) - BigQuery 資料編輯器 (
roles/bigquery.dataEditor
) - BigQuery 資料擁有者 (
roles/bigquery.dataOwner
) - BigQuery 管理員 (
roles/bigquery.admin
)
如要更新 BigQuery 資料表中的資料,使用者帳戶必須具備 bigquery.tables.updateData
IAM 權限。下列每個 IAM 角色都會授予 bigquery.tables.updateData
權限:
- BigQuery 資料編輯器 (
roles/bigquery.dataEditor
) - BigQuery 資料擁有者 (
roles/bigquery.dataOwner
) - BigQuery 管理員 (
roles/bigquery.admin
)
如果使用者帳戶必須啟用持續查詢用途所需的 API,則該帳戶必須具備服務使用管理員 (roles/serviceusage.serviceUsageAdmin
) 角色。
使用服務帳戶時的權限
本節說明建立持續查詢的使用者帳戶,以及執行持續查詢的服務帳戶,分別需要哪些角色和權限。
使用者帳戶權限
如要在 BigQuery 中建立工作,使用者帳戶必須具備 bigquery.jobs.create
IAM 權限。下列 IAM 角色都會授予 bigquery.jobs.create
權限:
- BigQuery 使用者 (
roles/bigquery.user
) - BigQuery 作業使用者 (
roles/bigquery.jobUser
) - BigQuery 管理員 (
roles/bigquery.admin
)
如要提交使用服務帳戶執行的工作,使用者帳戶必須具備服務帳戶使用者 (roles/iam.serviceAccountUser
) 角色。如果您使用同一個使用者帳戶建立服務帳戶,則該使用者帳戶必須具備服務帳戶管理員 (roles/iam.serviceAccountAdmin
) 角色。如要瞭解如何限制使用者存取單一服務帳戶,而非專案中的所有服務帳戶,請參閱授予單一角色。
如果使用者帳戶必須啟用持續查詢用途所需的 API,則該帳戶必須具備服務使用管理員 (roles/serviceusage.serviceUsageAdmin
) 角色。
服務帳戶權限
如要從 BigQuery 資料表匯出資料,服務帳戶必須具備 bigquery.tables.export
IAM 權限。下列每個 IAM 角色都會授予 bigquery.tables.export
權限:
- BigQuery 資料檢視器 (
roles/bigquery.dataViewer
) - BigQuery 資料編輯器 (
roles/bigquery.dataEditor
) - BigQuery 資料擁有者 (
roles/bigquery.dataOwner
) - BigQuery 管理員 (
roles/bigquery.admin
)
bigquery.tables.updateData
IAM 權限。下列每個 IAM 角色都會授予 bigquery.tables.updateData
權限:
- BigQuery 資料編輯器 (
roles/bigquery.dataEditor
) - BigQuery 資料擁有者 (
roles/bigquery.dataOwner
) - BigQuery 管理員 (
roles/bigquery.admin
)
事前準備
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Verify that billing is enabled for your Google Cloud project.
-
Enable the BigQuery API.
建立保留項目
建立 Enterprise 或 Enterprise Plus 版本預訂,然後建立預訂指派,並使用 CONTINUOUS
工作類型。這個預留項目可以自動調整資源配置。連續查詢的預留項目指派作業須遵守預留項目限制。
匯出至 Pub/Sub
如要將資料匯出至 Pub/Sub,您必須具備額外 API、IAM 權限和 Google Cloud 資源。詳情請參閱「匯出至 Pub/Sub」。
在 Pub/Sub 訊息中將自訂屬性嵌入為中繼資料
您可以使用 Pub/Sub 屬性提供訊息的額外資訊,例如優先順序、來源、目的地或其他中繼資料。您也可以使用屬性篩選訂閱項目中的訊息。
在連續查詢結果中,如果資料欄名為 _ATTRIBUTES
,系統會將其值複製到 Pub/Sub 訊息屬性。_ATTRIBUTES
內提供的欄位會做為屬性鍵。
_ATTRIBUTES
欄必須為 JSON
類型,格式為 ARRAY<STRUCT<STRING, STRING>>
或 STRUCT<STRING>
。
如需範例,請參閱將資料匯出至 Pub/Sub 主題。
匯出至 Bigtable
如要將資料匯出至 Bigtable,您必須具備額外的 API、IAM 權限和資源。 Google Cloud詳情請參閱匯出至 Bigtable。
匯出至 Spanner
如要將資料匯出至 Spanner,您必須具備額外 API、IAM 權限和 Google Cloud 資源。詳情請參閱「匯出至 Spanner (反向 ETL)」。
將資料寫入 BigQuery 資料表
您可以使用 INSERT
陳述式將資料寫入 BigQuery 資料表。
使用 AI 函式
如要在持續查詢中使用支援的 AI 函式,您需要額外的 API、IAM 權限和 Google Cloud資源。如要瞭解詳情,請根據您的用途參閱下列主題:
- 使用
ML.GENERATE_TEXT
函式生成文字 - 使用
ML.GENERATE_EMBEDDING
函式生成文字嵌入 - 使用
ML.UNDERSTAND_TEXT
函式解讀文字 - 使用
ML.TRANSLATE
函式翻譯文字
在連續查詢中使用 AI 函式時,請考慮查詢輸出內容是否會超出函式的配額。如果超出配額,您可能必須另外處理未處理的記錄。
指定起點
您必須在連續查詢的 FROM
子句中使用 APPENDS
函式,指定要處理的最早資料。舉例來說,APPENDS(TABLE my_table, CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
會指示 BigQuery 處理在開始連續查詢前最多 10 分鐘內新增至資料表 my_table
的資料。加入 my_table
的資料會在匯入時處理。資料處理不會受到延遲影響。在持續查詢中使用 APPENDS
函式時,請勿提供 end_timestamp
引數。
以下範例說明如何使用 APPENDS
函式,在查詢接收計程車乘車資訊串流的 BigQuery 資料表時,從特定時間點開始持續查詢:
EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING(STRUCT(ride_id, timestamp, latitude, longitude)) AS message FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute');
指定早於目前時間的起點
如要處理目前時間點之前的資料,可以使用 APPENDS
函式,為查詢指定較早的起點。您指定的起點必須位於所選資料表的時間回溯期內。時間回溯期預設為過去七天。
如要納入時間旅行視窗外的資料,請使用標準查詢插入或匯出特定時間點的資料,然後從該時間點開始持續查詢。
範例
以下範例說明如何將 BigQuery 資料表 (接收特定時間點的計程車行程串流資訊) 中的舊資料載入資料表,然後從舊資料的截斷點開始執行連續查詢。
執行標準查詢,將資料回填至特定時間點:
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM `myproject.real_time_taxi_streaming.taxirides` -- Include all data inserted into the table up to this point in time. -- This timestamp must be within the time travel window. FOR SYSTEM_TIME AS OF '2025-01-01 00:00:00 UTC' WHERE ride_status = 'dropoff';
從查詢停止的時間點執行持續查詢:
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to start processing -- data right where the batch query left off. -- This timestamp must be within the time travel window. TIMESTAMP '2025-01-01 00:00:00 UTC' + INTERVAL 1 MICROSECOND) WHERE ride_status = 'dropoff';
使用使用者帳戶執行連續查詢
本節說明如何使用使用者帳戶執行持續查詢。持續查詢執行後,您可以關閉 Google Cloud 控制台、終端機視窗或應用程式,不會中斷查詢執行作業。使用者帳戶執行的連續查詢最多可執行兩天,之後會自動停止。如要繼續處理新的傳入資料,請啟動新的連續查詢,並指定起點。如要自動執行這項程序,請參閱重新嘗試失敗的查詢。
請按照下列步驟執行連續查詢:
主控台
前往 Google Cloud 控制台的「BigQuery」頁面。
在查詢編輯器中,按一下
「更多」。- 在「選擇查詢模式」部分,選擇「持續查詢」。
- 按一下「確認」。
- 選用:如要控管查詢的執行時間,請按一下「查詢設定」,然後以毫秒為單位設定「工作逾時」。
在查詢編輯器中,輸入持續查詢的 SQL 陳述式。 SQL 陳述式只能包含支援的作業。
按一下「執行」。
bq
-
In the Google Cloud console, activate Cloud Shell.
在 Cloud Shell 中,使用
bq query
指令搭配--continuous
旗標,執行連續查詢:bq query --use_legacy_sql=false --continuous=true 'QUERY'
將
QUERY
換成持續查詢的 SQL 陳述式。SQL 陳述式只能包含支援的作業。您可以使用--job_timeout_ms
標記控制查詢的執行時間。PROJECT_ID
:您的專案 ID。QUERY
:連續查詢的 SQL 陳述式。SQL 陳述式只能包含支援的作業。
API
呼叫 jobs.insert
方法,執行連續查詢。您必須在傳入的 Job
資源中,將 JobConfigurationQuery
的 continuous
欄位設為 true
。如有需要,您可以設定 jobTimeoutMs
欄位,控制查詢的執行時間長度。
curl --request POST \ "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \ --header "Authorization: Bearer $(gcloud auth print-access-token)" \ --header "Content-Type: application/json; charset=utf-8" \ --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true}}}' \ --compressed
更改下列內容:
使用服務帳戶執行連續查詢
本節說明如何使用服務帳戶執行持續查詢。持續查詢執行後,您可以關閉 Google Cloud 控制台、終端機視窗或應用程式,不會中斷查詢執行作業。使用服務帳戶執行的持續查詢最多可執行 150 天,之後會自動停止。如要繼續處理新的傳入資料,請啟動新的連續查詢,並指定起點。如要自動執行這項程序,請參閱重新嘗試失敗的查詢。
如要使用服務帳戶執行持續查詢,請按照下列步驟操作:
主控台
bq
- 建立服務帳戶。
- 授予服務帳戶必要的權限。
-
In the Google Cloud console, activate Cloud Shell.
在指令列中,使用
bq query
指令加上下列旗標,執行連續查詢:- 將
--continuous
旗標設為true
,即可持續查詢。 - 使用
--connection_property
旗標指定要使用的服務帳戶。 - 選用:設定
--job_timeout_ms
旗標,限制查詢執行時間。
bq query --project_id=PROJECT_ID --use_legacy_sql=false \ --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \ 'QUERY'
更改下列內容:
- 將
- 建立服務帳戶。
- 授予服務帳戶必要的權限。
呼叫
jobs.insert
方法,執行連續查詢。在您傳入的Job
資源中,設定JobConfigurationQuery
資源的下列欄位:- 將
continuous
欄位設為true
,即可讓查詢持續執行。 - 使用
connectionProperties
欄位指定要使用的服務帳戶。
您可以選擇在
JobConfiguration
資源中設定jobTimeoutMs
欄位,控管查詢的執行時間。curl --request POST \ "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \ --header "Authorization: Bearer $(gcloud auth print-access-token)" \ --header "Content-Type: application/json; charset=utf-8" \ --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":[{"key":"service_account","value":"SERVICE_ACCOUNT_EMAIL"}]}}}' \ --compressed
更改下列內容:
- 將
API
建立自訂工作 ID
每個查詢工作都會獲派工作 ID,可用於搜尋及管理工作。根據預設,工作 ID 是隨機產生。如要使用工作記錄或工作探索工具,更輕鬆地搜尋連續查詢的工作 ID,您可以指派自訂工作 ID 前置字串:
前往 Google Cloud 控制台的「BigQuery」頁面。
在查詢編輯器中,按一下「更多」。
在「選擇查詢模式」部分,選擇「持續查詢」。
按一下「確認」。
在查詢編輯器中,依序點選「更多」>「查詢設定」。
在「Custom job ID prefix」(自訂工作 ID 前置字串) 區段中,輸入自訂名稱前置字串。
按一下 [儲存]。
範例
下列 SQL 範例顯示持續查詢的常見用途。
將資料匯出至 Pub/Sub 主題
以下範例顯示連續查詢,可從接收串流計程車乘車資訊的 BigQuery 資料表篩選資料,並透過訊息屬性將資料即時發布至 Pub/Sub 主題:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude)) AS message, TO_JSON( STRUCT( CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' );
將資料匯出至 Bigtable 資料表
以下範例顯示連續查詢,可從接收計程車乘車資訊串流的 BigQuery 資料表篩選資料,並即時將資料匯出至 Bigtable 資料表:
EXPORT DATA OPTIONS ( format = 'CLOUD_BIGTABLE', truncate = TRUE, overwrite = TRUE, uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides') AS ( SELECT CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey, STRUCT( timestamp, latitude, longitude, meter_reading, ride_status, passenger_count) AS features FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' );
將資料匯出至 Spanner 資料表
以下範例顯示連續查詢,可從接收計程車乘車資訊串流的 BigQuery 表格中篩選資料,然後即時將資料匯出至 Spanner 表格:
EXPORT DATA OPTIONS ( format = 'CLOUD_SPANNER', uri = 'https://spanner.googleapis.com/projects/myproject/instances/myspannerinstance/databases/taxi-real-time-rides', spanner_options ="""{ "table": "rides", -- To ensure data is written to Spanner in the correct sequence -- during a continuous export, use the change_timestamp_column -- option. This should be mapped to a timestamp column from your -- BigQuery data. If your source data lacks a timestamp, the -- _CHANGE_TIMESTAMP pseudocolumn provided by the APPENDS function -- will be automatically mapped to the "change_timestamp" column. "change_timestamp_column": "change_timestamp" }""" ) AS ( SELECT ride_id, latitude, longitude, meter_reading, ride_status, passenger_count FROM APPENDS( TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' );
將資料寫入 BigQuery 資料表
以下範例顯示持續查詢,可從接收計程車乘車資訊串流的 BigQuery 資料表篩選及轉換資料,然後即時將資料寫入另一個 BigQuery 資料表。這樣一來,資料就能用於後續的下游分析。
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'dropoff';
使用 Vertex AI 模型處理資料
以下範例顯示連續查詢,該查詢會使用 Vertex AI 模型,根據計程車乘客目前的緯度和經度產生廣告,然後即時將結果匯出至 Pub/Sub 主題:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude, prompt, ml_generate_text_llm_result)) AS message FROM ML.GENERATE_TEXT( MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`, ( SELECT timestamp, ride_id, latitude, longitude, CONCAT( 'Generate an ad based on the current latitude of ', latitude, ' and longitude of ', longitude) AS prompt FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you -- want to start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' ), STRUCT( 50 AS max_output_tokens, 1.0 AS temperature, 40 AS top_k, 1.0 AS top_p, TRUE AS flatten_json_output)) AS ml_output );
修改持續查詢的 SQL
持續查詢作業執行期間,您無法更新持續查詢中使用的 SQL。您必須取消持續查詢工作、修改 SQL,然後從停止原始持續查詢工作的位置,啟動新的持續查詢工作。
如要修改持續查詢中使用的 SQL,請按照下列步驟操作:
- 查看要更新的連續查詢工作詳細資料,並記下工作 ID。
- 盡可能暫停收集上游資料。如果無法執行這項操作,持續查詢重新啟動時,可能會出現部分重複資料。
- 取消要修改的持續查詢。
使用
INFORMATION_SCHEMA
JOBS
檢視畫面,取得原始持續查詢工作的end_time
值:SELECT end_time FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT WHERE EXTRACT(DATE FROM creation_time) = current_date() AND error_result.reason = 'stopped' AND job_id = 'JOB_ID';
更改下列內容:
PROJECT_ID
:您的專案 ID。REGION
:專案使用的區域。JOB_ID
:您在步驟 1 中識別的連續查詢作業 ID。
修改持續查詢 SQL 陳述式,從特定時間點開始持續查詢,並使用您在步驟 5 中擷取的
end_time
值做為起點。修改持續查詢 SQL 陳述式,以反映所需變更。
執行修改後的持續查詢。
取消持續查詢
您可以像取消其他工作一樣取消持續查詢工作。取消工作後,查詢最多可能需要一分鐘才會停止執行。
如果取消查詢後重新啟動,重新啟動的查詢會視為新的獨立查詢。重新啟動的查詢不會從上一個作業停止處理資料的位置開始,也無法參照上一個查詢的結果。請參閱「從特定時間點開始執行連續查詢」。
監控查詢及處理錯誤
持續查詢可能會因資料不一致、結構定義變更、服務暫時中斷或維護等因素而中斷。雖然 BigQuery 會處理部分暫時性錯誤,但改善工作復原能力的最佳做法包括: