建立持續查詢
本文說明如何在 BigQuery 中執行連續查詢。
BigQuery 持續查詢是會不斷執行的 SQL 陳述式,透過持續查詢功能,您可以即時分析 BigQuery 中的傳入資料,然後將結果匯出至 Bigtable 或 Pub/Sub,或是將結果寫入 BigQuery 表格。
選擇帳戶類型
您可以使用使用者帳戶建立及執行持續查詢作業,也可以使用使用者帳戶建立持續查詢作業,然後使用服務帳戶執行作業。 您必須使用服務帳戶,才能執行將結果匯出至 Pub/Sub 主題的持續查詢。
使用使用者帳戶時,連續查詢最多可執行兩天。使用服務帳戶時,持續查詢最多可執行 150 天。詳情請參閱「授權」。
所需權限
本節說明建立及執行連續查詢所需的權限。除了上述身分與存取權管理 (IAM) 角色,您也可以透過自訂角色取得必要權限。
使用使用者帳戶時的權限
本節說明使用使用者帳戶建立及執行持續查詢時,需要哪些角色和權限。
如要在 BigQuery 中建立工作,使用者帳戶必須具備 bigquery.jobs.create
IAM 權限。下列每個 IAM 角色都會授予 bigquery.jobs.create
權限:
- BigQuery 使用者 (
roles/bigquery.user
) - BigQuery 作業使用者 (
roles/bigquery.jobUser
) - BigQuery 管理員 (
roles/bigquery.admin
)
如要從 BigQuery 資料表匯出資料,使用者帳戶必須具備 bigquery.tables.export
IAM 權限。下列每個 IAM 角色都會授予 bigquery.tables.export
權限:
- BigQuery 資料檢視器 (
roles/bigquery.dataViewer
) - BigQuery 資料編輯器 (
roles/bigquery.dataEditor
) - BigQuery 資料擁有者 (
roles/bigquery.dataOwner
) - BigQuery 管理員 (
roles/bigquery.admin
)
如要更新 BigQuery 資料表中的資料,使用者帳戶必須具備 bigquery.tables.updateData
IAM 權限。下列每個 IAM 角色都會授予 bigquery.tables.updateData
權限:
- BigQuery 資料編輯器 (
roles/bigquery.dataEditor
) - BigQuery 資料擁有者 (
roles/bigquery.dataOwner
) - BigQuery 管理員 (
roles/bigquery.admin
)
如果使用者帳戶必須啟用持續查詢用途所需的 API,則該帳戶必須具備服務使用管理員 (roles/serviceusage.serviceUsageAdmin
) 角色。
使用服務帳戶時的權限
本節說明建立持續查詢的使用者帳戶,以及執行持續查詢的服務帳戶,分別需要哪些角色和權限。
使用者帳戶權限
如要在 BigQuery 中建立工作,使用者帳戶必須具備 bigquery.jobs.create
IAM 權限。下列 IAM 角色都會授予 bigquery.jobs.create
權限:
- BigQuery 使用者 (
roles/bigquery.user
) - BigQuery 作業使用者 (
roles/bigquery.jobUser
) - BigQuery 管理員 (
roles/bigquery.admin
)
如要提交使用服務帳戶執行的工作,使用者帳戶必須具備服務帳戶使用者 (roles/iam.serviceAccountUser
) 角色。如果您使用同一個使用者帳戶建立服務帳戶,則該使用者帳戶必須具備服務帳戶管理員 (roles/iam.serviceAccountAdmin
) 角色。如要瞭解如何限制使用者存取單一服務帳戶,而非專案中的所有服務帳戶,請參閱授予單一角色。
如果使用者帳戶必須啟用持續查詢用途所需的 API,則該帳戶必須具備服務使用管理員 (roles/serviceusage.serviceUsageAdmin
) 角色。
服務帳戶權限
如要從 BigQuery 資料表匯出資料,服務帳戶必須具備 bigquery.tables.export
IAM 權限。下列每個 IAM 角色都會授予 bigquery.tables.export
權限:
- BigQuery 資料檢視器 (
roles/bigquery.dataViewer
) - BigQuery 資料編輯器 (
roles/bigquery.dataEditor
) - BigQuery 資料擁有者 (
roles/bigquery.dataOwner
) - BigQuery 管理員 (
roles/bigquery.admin
)
bigquery.tables.updateData
IAM 權限。下列每個 IAM 角色都會授予 bigquery.tables.updateData
權限:
- BigQuery 資料編輯器 (
roles/bigquery.dataEditor
) - BigQuery 資料擁有者 (
roles/bigquery.dataOwner
) - BigQuery 管理員 (
roles/bigquery.admin
)
事前準備
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the BigQuery API.
建立保留項目
建立 Enterprise 或 Enterprise Plus 版本預訂,然後建立預訂指派,並使用 CONTINUOUS
工作類型。這個預留項目可以自動調整資源配置。連續查詢的預留項目指派作業須遵守預留項目限制。
匯出至 Pub/Sub
如要將資料匯出至 Pub/Sub,您必須具備額外 API、IAM 權限和 Google Cloud 資源。詳情請參閱「匯出至 Pub/Sub」。
在 Pub/Sub 訊息中將自訂屬性嵌入為中繼資料
您可以使用 Pub/Sub 屬性提供訊息的額外資訊,例如優先順序、來源、目的地或其他中繼資料。您也可以使用屬性篩選訂閱項目中的訊息。
在連續查詢結果中,如果資料欄名為 _ATTRIBUTES
,系統會將其值複製到 Pub/Sub 訊息屬性。_ATTRIBUTES
內提供的欄位會做為屬性鍵。
_ATTRIBUTES
欄必須為 JSON
類型,格式為 ARRAY<STRUCT<STRING, STRING>>
或 STRUCT<STRING>
。
如需範例,請參閱將資料匯出至 Pub/Sub 主題。
匯出至 Bigtable
如要將資料匯出至 Bigtable,您必須具備額外的 API、IAM 權限和資源。 Google Cloud詳情請參閱匯出至 Bigtable。
將資料寫入 BigQuery 資料表
您可以使用 INSERT
陳述式將資料寫入 BigQuery 資料表。
使用 AI 函式
如要在持續查詢中使用支援的 AI 函式,您需要額外的 API、IAM 權限和 Google Cloud資源。如要瞭解詳情,請根據您的用途參閱下列主題:
- 使用
ML.GENERATE_TEXT
函式生成文字 - 使用
ML.GENERATE_EMBEDDING
函式生成文字嵌入 - 使用
ML.UNDERSTAND_TEXT
函式解讀文字 - 使用
ML.TRANSLATE
函式翻譯文字
在連續查詢中使用 AI 函式時,請考慮查詢輸出內容是否會超出函式的配額。如果超出配額,您可能必須另外處理未處理的記錄。
指定起點
您必須在連續查詢的 FROM
子句中使用 APPENDS
函式,指定要處理的最早資料。舉例來說,APPENDS(TABLE my_table, CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
會指示 BigQuery 處理在開始連續查詢前最多 10 分鐘內新增至資料表 my_table
的資料。加入 my_table
的資料會在匯入時處理。資料處理不會受到延遲影響。在持續查詢中使用 APPENDS
函式時,請勿提供 end_timestamp
引數。
以下範例說明如何使用 APPENDS
函式,在查詢接收計程車乘車資訊串流的 BigQuery 資料表時,從特定時間點開始持續查詢:
EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING(STRUCT(ride_id, timestamp, latitude, longitude)) AS message FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute');
指定早於目前時間的起點
如要處理目前時間點之前的資料,可以使用 APPENDS
函式,為查詢指定較早的起點。您指定的起點必須位於所選資料表的時間回溯期內。時間回溯期預設為過去七天。
如要納入時間旅行視窗外的資料,請使用標準查詢插入或匯出特定時間點的資料,然後從該時間點開始持續查詢。
範例
以下範例說明如何將 BigQuery 資料表 (接收特定時間點的計程車行程串流資訊) 中的舊資料載入資料表,然後從舊資料的截斷點開始執行連續查詢。
執行標準查詢,將資料回填至特定時間點:
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM `myproject.real_time_taxi_streaming.taxirides` -- Include all data inserted into the table up to this point in time. -- This timestamp must be within the time travel window. FOR SYSTEM_TIME AS OF '2025-01-01 00:00:00 UTC' WHERE ride_status = 'dropoff';
從查詢停止的時間點執行持續查詢:
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to start processing -- data right where the batch query left off. -- This timestamp must be within the time travel window. TIMESTAMP '2025-01-01 00:00:00 UTC' + INTERVAL 1 MICROSECOND) WHERE ride_status = 'dropoff';
使用使用者帳戶執行連續查詢
本節說明如何使用使用者帳戶執行持續查詢。持續查詢執行後,您可以關閉 Google Cloud 控制台、終端機視窗或應用程式,不會中斷查詢執行作業。使用者帳戶執行的連續查詢最多可執行兩天,之後會自動停止。如要繼續處理新的傳入資料,請啟動新的連續查詢,並指定起點。如要自動執行這項程序,請參閱重新嘗試失敗的查詢。
請按照下列步驟執行連續查詢:
主控台
前往 Google Cloud 控制台的「BigQuery」頁面。
在查詢編輯器中,按一下
「更多」。- 在「選擇查詢模式」部分,選擇「持續查詢」。
- 按一下「確認」。
- 選用:如要控管查詢的執行時間,請按一下「查詢設定」,然後以毫秒為單位設定「工作逾時」。
在查詢編輯器中,輸入持續查詢的 SQL 陳述式。 SQL 陳述式只能包含支援的作業。
按一下「執行」。
bq
-
In the Google Cloud console, activate Cloud Shell.
在 Cloud Shell 中,使用
bq query
指令搭配--continuous
旗標,執行連續查詢:bq query --use_legacy_sql=false --continuous=true 'QUERY'
將
QUERY
換成持續查詢的 SQL 陳述式。SQL 陳述式只能包含支援的作業。您可以使用--job_timeout_ms
標記控制查詢的執行時間。PROJECT_ID
:您的專案 ID。QUERY
:連續查詢的 SQL 陳述式。SQL 陳述式只能包含支援的作業。
API
呼叫 jobs.insert
方法,執行連續查詢。您必須在傳入的 Job
資源中,將 JobConfigurationQuery
的 continuous
欄位設為 true
。如有需要,您可以設定 jobTimeoutMs
欄位,控制查詢的執行時間長度。
curl --request POST \ "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \ --header "Authorization: Bearer $(gcloud auth print-access-token)" \ --header "Content-Type: application/json; charset=utf-8" \ --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true}}}' \ --compressed
更改下列內容:
使用服務帳戶執行連續查詢
本節說明如何使用服務帳戶執行持續查詢。持續查詢執行後,您可以關閉 Google Cloud 控制台、終端機視窗或應用程式,不會中斷查詢執行作業。使用服務帳戶執行的持續查詢最多可執行 150 天,之後會自動停止。如要繼續處理新的傳入資料,請啟動新的連續查詢,並指定起點。如要自動執行這項程序,請參閱重新嘗試失敗的查詢。
如要使用服務帳戶執行持續查詢,請按照下列步驟操作:
主控台
bq
- 建立服務帳戶。
- 授予服務帳戶必要的權限。
-
In the Google Cloud console, activate Cloud Shell.
在指令列中,使用
bq query
指令加上下列旗標,執行連續查詢:- 將
--continuous
旗標設為true
,即可持續查詢。 - 使用
--connection_property
旗標指定要使用的服務帳戶。 - 選用:設定
--job_timeout_ms
旗標,限制查詢執行時間。
bq query --project_id=PROJECT_ID --use_legacy_sql=false \ --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \ 'QUERY'
更改下列內容:
- 將
- 建立服務帳戶。
- 授予服務帳戶必要的權限。
呼叫
jobs.insert
方法,執行連續查詢。在您傳入的Job
資源中,設定JobConfigurationQuery
資源的下列欄位:- 將
continuous
欄位設為true
,即可讓查詢持續執行。 - 使用
connectionProperties
欄位指定要使用的服務帳戶。
您可以選擇在
JobConfiguration
資源中設定jobTimeoutMs
欄位,控管查詢的執行時間。curl --request POST \ "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \ --header "Authorization: Bearer $(gcloud auth print-access-token)" \ --header "Content-Type: application/json; charset=utf-8" \ --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":[{"key":"service_account","value":"SERVICE_ACCOUNT_EMAIL"}]}}}' \ --compressed
更改下列內容:
- 將
API
建立自訂工作 ID
每個查詢工作都會獲派工作 ID,可用於搜尋及管理工作。根據預設,工作 ID 是隨機產生。如要使用工作記錄或工作探索工具,更輕鬆地搜尋連續查詢的工作 ID,您可以指派自訂工作 ID 前置字串:
前往 Google Cloud 控制台的「BigQuery」頁面。
在查詢編輯器中,按一下「更多」。
在「選擇查詢模式」部分,選擇「持續查詢」。
按一下「確認」。
在查詢編輯器中,依序點選「更多」>「查詢設定」。
在「Custom job ID prefix」(自訂工作 ID 前置字串) 區段中,輸入自訂名稱前置字串。
按一下 [儲存]。
範例
下列 SQL 範例顯示持續查詢的常見用途。
將資料匯出至 Pub/Sub 主題
以下範例顯示連續查詢,可從接收串流計程車乘車資訊的 BigQuery 資料表篩選資料,並透過訊息屬性將資料即時發布至 Pub/Sub 主題:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude)) AS message, TO_JSON( STRUCT( CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' );
將資料匯出至 Bigtable 資料表
以下範例顯示連續查詢,可從接收計程車乘車資訊串流的 BigQuery 資料表篩選資料,並即時將資料匯出至 Bigtable 資料表:
EXPORT DATA OPTIONS ( format = 'CLOUD_BIGTABLE', truncate = TRUE, overwrite = TRUE, uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides') AS ( SELECT CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey, STRUCT( timestamp, latitude, longitude, meter_reading, ride_status, passenger_count) AS features FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' );
將資料寫入 BigQuery 資料表
以下範例顯示持續查詢,可從接收計程車乘車資訊串流的 BigQuery 資料表篩選及轉換資料,然後即時將資料寫入另一個 BigQuery 資料表。這樣一來,資料就能用於後續的下游分析。
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you want to -- start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'dropoff';
使用 Vertex AI 模型處理資料
以下範例顯示連續查詢,該查詢會使用 Vertex AI 模型,根據計程車乘客目前的緯度和經度產生廣告,然後即時將結果匯出至 Pub/Sub 主題:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude, prompt, ml_generate_text_llm_result)) AS message FROM ML.GENERATE_TEXT( MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`, ( SELECT timestamp, ride_id, latitude, longitude, CONCAT( 'Generate an ad based on the current latitude of ', latitude, ' and longitude of ', longitude) AS prompt FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`, -- Configure the APPENDS TVF start_timestamp to specify when you -- want to start processing data using your continuous query. -- This example starts processing at 10 minutes before the current time. CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) WHERE ride_status = 'enroute' ), STRUCT( 50 AS max_output_tokens, 1.0 AS temperature, 40 AS top_k, 1.0 AS top_p, TRUE AS flatten_json_output)) AS ml_output );
修改持續查詢的 SQL
持續查詢作業執行期間,您無法更新持續查詢中使用的 SQL。您必須取消持續查詢工作、修改 SQL,然後從停止原始持續查詢工作的位置,啟動新的持續查詢工作。
如要修改持續查詢中使用的 SQL,請按照下列步驟操作:
- 查看要更新的連續查詢工作詳細資料,並記下工作 ID。
- 盡可能暫停收集上游資料。如果無法執行這項操作,持續查詢重新啟動時,可能會出現部分重複資料。
- 取消要修改的持續查詢。
使用
INFORMATION_SCHEMA
JOBS
檢視畫面,取得原始持續查詢工作的end_time
值:SELECT end_time FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT WHERE EXTRACT(DATE FROM creation_time) = current_date() AND error_result.reason = 'stopped' AND job_id = 'JOB_ID';
更改下列內容:
PROJECT_ID
:您的專案 ID。REGION
:專案使用的區域。JOB_ID
:您在步驟 1 中識別的連續查詢作業 ID。
修改持續查詢 SQL 陳述式,從特定時間點開始持續查詢,並使用您在步驟 5 中擷取的
end_time
值做為起點。修改持續查詢 SQL 陳述式,以反映所需變更。
執行修改後的持續查詢。
取消持續查詢
您可以像取消其他工作一樣取消持續查詢工作。取消工作後,查詢最多可能需要一分鐘才會停止執行。
如果取消查詢後重新啟動,重新啟動的查詢會視為新的獨立查詢。重新啟動的查詢不會從上一個作業停止處理資料的位置開始,也無法參照上一個查詢的結果。請參閱「從特定時間點開始執行連續查詢」。
監控查詢及處理錯誤
持續查詢可能會因資料不一致、結構定義變更、服務暫時中斷或維護等因素而中斷。雖然 BigQuery 會處理部分暫時性錯誤,但改善工作復原能力的最佳做法包括: