建立持續查詢

本文說明如何在 BigQuery 中執行連續查詢

BigQuery 持續查詢是會不斷執行的 SQL 陳述式,透過持續查詢功能,您可以即時分析 BigQuery 中的傳入資料,然後將結果匯出至 Bigtable 或 Pub/Sub,或是將結果寫入 BigQuery 表格。

選擇帳戶類型

您可以使用使用者帳戶建立及執行持續查詢作業,也可以使用使用者帳戶建立持續查詢作業,然後使用服務帳戶執行作業。 您必須使用服務帳戶,才能執行將結果匯出至 Pub/Sub 主題的持續查詢。

使用使用者帳戶時,連續查詢最多可執行兩天。使用服務帳戶時,持續查詢最多可執行 150 天。詳情請參閱「授權」。

所需權限

本節說明建立及執行連續查詢所需的權限。除了上述身分與存取權管理 (IAM) 角色,您也可以透過自訂角色取得必要權限。

使用使用者帳戶時的權限

本節說明使用使用者帳戶建立及執行持續查詢時,需要哪些角色和權限。

如要在 BigQuery 中建立工作,使用者帳戶必須具備 bigquery.jobs.create IAM 權限。下列每個 IAM 角色都會授予 bigquery.jobs.create 權限:

如要從 BigQuery 資料表匯出資料,使用者帳戶必須具備 bigquery.tables.export IAM 權限。下列每個 IAM 角色都會授予 bigquery.tables.export 權限:

如要更新 BigQuery 資料表中的資料,使用者帳戶必須具備 bigquery.tables.updateData IAM 權限。下列每個 IAM 角色都會授予 bigquery.tables.updateData 權限:

如果使用者帳戶必須啟用持續查詢用途所需的 API,則該帳戶必須具備服務使用管理員 (roles/serviceusage.serviceUsageAdmin) 角色。

使用服務帳戶時的權限

本節說明建立持續查詢的使用者帳戶,以及執行持續查詢的服務帳戶,分別需要哪些角色和權限。

使用者帳戶權限

如要在 BigQuery 中建立工作,使用者帳戶必須具備 bigquery.jobs.create IAM 權限。下列 IAM 角色都會授予 bigquery.jobs.create 權限:

如要提交使用服務帳戶執行的工作,使用者帳戶必須具備服務帳戶使用者 (roles/iam.serviceAccountUser) 角色。如果您使用同一個使用者帳戶建立服務帳戶,則該使用者帳戶必須具備服務帳戶管理員 (roles/iam.serviceAccountAdmin) 角色。如要瞭解如何限制使用者存取單一服務帳戶,而非專案中的所有服務帳戶,請參閱授予單一角色

如果使用者帳戶必須啟用持續查詢用途所需的 API,則該帳戶必須具備服務使用管理員 (roles/serviceusage.serviceUsageAdmin) 角色。

服務帳戶權限

如要從 BigQuery 資料表匯出資料,服務帳戶必須具備 bigquery.tables.export IAM 權限。下列每個 IAM 角色都會授予 bigquery.tables.export 權限:

如要更新 BigQuery 資料表中的資料,服務帳戶必須具備 bigquery.tables.updateData IAM 權限。下列每個 IAM 角色都會授予 bigquery.tables.updateData 權限:

事前準備

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the BigQuery API.

    Enable the API

建立保留項目

建立 Enterprise 或 Enterprise Plus 版本預訂,然後建立預訂指派,並使用 CONTINUOUS 工作類型。這個預留項目可以自動調整資源配置。連續查詢的預留項目指派作業須遵守預留項目限制

匯出至 Pub/Sub

如要將資料匯出至 Pub/Sub,您必須具備額外 API、IAM 權限和 Google Cloud 資源。詳情請參閱「匯出至 Pub/Sub」。

在 Pub/Sub 訊息中將自訂屬性嵌入為中繼資料

您可以使用 Pub/Sub 屬性提供訊息的額外資訊,例如優先順序、來源、目的地或其他中繼資料。您也可以使用屬性篩選訂閱項目中的訊息

在連續查詢結果中,如果資料欄名為 _ATTRIBUTES,系統會將其值複製到 Pub/Sub 訊息屬性。_ATTRIBUTES 內提供的欄位會做為屬性鍵。

_ATTRIBUTES 欄必須為 JSON 類型,格式為 ARRAY<STRUCT<STRING, STRING>>STRUCT<STRING>

如需範例,請參閱將資料匯出至 Pub/Sub 主題

匯出至 Bigtable

如要將資料匯出至 Bigtable,您必須具備額外的 API、IAM 權限和資源。 Google Cloud詳情請參閱匯出至 Bigtable

將資料寫入 BigQuery 資料表

您可以使用 INSERT 陳述式將資料寫入 BigQuery 資料表。

使用 AI 函式

如要在持續查詢中使用支援的 AI 函式,您需要額外的 API、IAM 權限和 Google Cloud資源。如要瞭解詳情,請根據您的用途參閱下列主題:

在連續查詢中使用 AI 函式時,請考慮查詢輸出內容是否會超出函式的配額。如果超出配額,您可能必須另外處理未處理的記錄。

指定起點

您必須在連續查詢的 FROM 子句中使用 APPENDS 函式,指定要處理的最早資料。舉例來說,APPENDS(TABLE my_table, CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) 會指示 BigQuery 處理在開始連續查詢前最多 10 分鐘內新增至資料表 my_table 的資料。加入 my_table 的資料會在匯入時處理。資料處理不會受到延遲影響。在持續查詢中使用 APPENDS 函式時,請勿提供 end_timestamp 引數。

以下範例說明如何使用 APPENDS 函式,在查詢接收計程車乘車資訊串流的 BigQuery 資料表時,從特定時間點開始持續查詢:

EXPORT DATA
  OPTIONS (format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS (
  SELECT
    TO_JSON_STRING(STRUCT(ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE
    ride_status = 'enroute');

指定早於目前時間的起點

如要處理目前時間點之前的資料,可以使用 APPENDS 函式,為查詢指定較早的起點。您指定的起點必須位於所選資料表的時間回溯期內。時間回溯期預設為過去七天。

如要納入時間旅行視窗外的資料,請使用標準查詢插入或匯出特定時間點的資料,然後從該時間點開始持續查詢。

範例

以下範例說明如何將 BigQuery 資料表 (接收特定時間點的計程車行程串流資訊) 中的舊資料載入資料表,然後從舊資料的截斷點開始執行連續查詢。

  1. 執行標準查詢,將資料回填至特定時間點:

    INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
    SELECT
      timestamp,
      meter_reading,
      ride_status,
      passenger_count,
      ST_Distance(
        ST_GeogPoint(pickup_longitude, pickup_latitude),
        ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
        SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
    FROM `myproject.real_time_taxi_streaming.taxirides`
      -- Include all data inserted into the table up to this point in time.
      -- This timestamp must be within the time travel window.
      FOR SYSTEM_TIME AS OF '2025-01-01 00:00:00 UTC'
    WHERE
      ride_status = 'dropoff';
  2. 從查詢停止的時間點執行持續查詢:

    INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
    SELECT
      timestamp,
      meter_reading,
      ride_status,
      passenger_count,
      ST_Distance(
        ST_GeogPoint(pickup_longitude, pickup_latitude),
        ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
        SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
    FROM
      APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
        -- Configure the APPENDS TVF start_timestamp to start processing
        -- data right where the batch query left off.
        -- This timestamp must be within the time travel window.
        TIMESTAMP '2025-01-01 00:00:00 UTC' + INTERVAL 1 MICROSECOND)
    WHERE
      ride_status = 'dropoff';

使用使用者帳戶執行連續查詢

本節說明如何使用使用者帳戶執行持續查詢。持續查詢執行後,您可以關閉 Google Cloud 控制台、終端機視窗或應用程式,不會中斷查詢執行作業。使用者帳戶執行的連續查詢最多可執行兩天,之後會自動停止。如要繼續處理新的傳入資料,請啟動新的連續查詢,並指定起點。如要自動執行這項程序,請參閱重新嘗試失敗的查詢

請按照下列步驟執行連續查詢:

主控台

  1. 前往 Google Cloud 控制台的「BigQuery」頁面。

    前往 BigQuery

  2. 在查詢編輯器中,按一下 「更多」

    1. 在「選擇查詢模式」部分,選擇「持續查詢」
    2. 按一下「確認」。
    3. 選用:如要控管查詢的執行時間,請按一下「查詢設定」,然後以毫秒為單位設定「工作逾時」
  3. 在查詢編輯器中,輸入持續查詢的 SQL 陳述式。 SQL 陳述式只能包含支援的作業

  4. 按一下「執行」

bq

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

  2. 在 Cloud Shell 中,使用 bq query 指令搭配 --continuous 旗標,執行連續查詢:

    bq query --use_legacy_sql=false --continuous=true
    'QUERY'

    QUERY 換成持續查詢的 SQL 陳述式。SQL 陳述式只能包含支援的作業。您可以使用 --job_timeout_ms 標記控制查詢的執行時間。

  3. API

    呼叫 jobs.insert 方法,執行連續查詢。您必須在傳入的 Job 資源中,將 JobConfigurationQuerycontinuous 欄位設為 true。如有需要,您可以設定 jobTimeoutMs 欄位,控制查詢的執行時間長度。

    curl --request POST \
      "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \
      --header "Authorization: Bearer $(gcloud auth print-access-token)" \
      --header "Content-Type: application/json; charset=utf-8" \
      --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true}}}' \
      --compressed

    更改下列內容:

    • PROJECT_ID:您的專案 ID。
    • QUERY:連續查詢的 SQL 陳述式。SQL 陳述式只能包含支援的作業

使用服務帳戶執行連續查詢

本節說明如何使用服務帳戶執行持續查詢。持續查詢執行後,您可以關閉 Google Cloud 控制台、終端機視窗或應用程式,不會中斷查詢執行作業。使用服務帳戶執行的持續查詢最多可執行 150 天,之後會自動停止。如要繼續處理新的傳入資料,請啟動新的連續查詢,並指定起點。如要自動執行這項程序,請參閱重新嘗試失敗的查詢

如要使用服務帳戶執行持續查詢,請按照下列步驟操作:

主控台

  1. 建立服務帳戶
  2. 授予服務帳戶必要的權限
  3. 前往 Google Cloud 控制台的「BigQuery」頁面。

    前往 BigQuery

  4. 在查詢編輯器中,按一下「更多」

  5. 在「選擇查詢模式」部分,選擇「持續查詢」

  6. 按一下「確認」。

  7. 在查詢編輯器中,依序點選「更多」「查詢設定」

  8. 在「Continuous query」(持續查詢) 區段中,使用「Service account」(服務帳戶) 方塊選取您建立的服務帳戶。

  9. 選用:如要控管查詢執行時間,請以毫秒為單位設定「工作逾時」

  10. 按一下 [儲存]

  11. 在查詢編輯器中,輸入持續查詢的 SQL 陳述式。 SQL 陳述式只能包含支援的作業

  12. 按一下「執行」

bq

  1. 建立服務帳戶
  2. 授予服務帳戶必要的權限
  3. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

  4. 在指令列中,使用 bq query 指令加上下列旗標,執行連續查詢:

    • --continuous 旗標設為 true,即可持續查詢。
    • 使用 --connection_property 旗標指定要使用的服務帳戶。
    • 選用:設定 --job_timeout_ms 旗標,限制查詢執行時間。
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'QUERY'

    更改下列內容:

    • PROJECT_ID:您的專案 ID。
    • SERVICE_ACCOUNT_EMAIL:服務帳戶電子郵件地址。您可以在 Google Cloud 控制台的「服務帳戶」頁面取得服務帳戶電子郵件地址。
    • QUERY:連續查詢的 SQL 陳述式。SQL 陳述式只能包含支援的作業
  5. API

    1. 建立服務帳戶
    2. 授予服務帳戶必要的權限
    3. 呼叫 jobs.insert 方法,執行連續查詢。在您傳入的 Job 資源中,設定 JobConfigurationQuery 資源的下列欄位:

      • continuous 欄位設為 true,即可讓查詢持續執行。
      • 使用 connectionProperties 欄位指定要使用的服務帳戶。

      您可以選擇在 JobConfiguration 資源中設定 jobTimeoutMs 欄位,控管查詢的執行時間。

      curl --request POST \
        "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \
        --header "Authorization: Bearer $(gcloud auth print-access-token)" \
        --header "Content-Type: application/json; charset=utf-8" \
        --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":[{"key":"service_account","value":"SERVICE_ACCOUNT_EMAIL"}]}}}' \
        --compressed

      更改下列內容:

      • PROJECT_ID:您的專案 ID。
      • QUERY:連續查詢的 SQL 陳述式。SQL 陳述式只能包含支援的作業
      • SERVICE_ACCOUNT_EMAIL:服務帳戶電子郵件地址。您可以在 Google Cloud 控制台的「服務帳戶」頁面取得服務帳戶電子郵件地址。

建立自訂工作 ID

每個查詢工作都會獲派工作 ID,可用於搜尋及管理工作。根據預設,工作 ID 是隨機產生。如要使用工作記錄工作探索工具,更輕鬆地搜尋連續查詢的工作 ID,您可以指派自訂工作 ID 前置字串:

  1. 前往 Google Cloud 控制台的「BigQuery」頁面。

    前往 BigQuery

  2. 在查詢編輯器中,按一下「更多」

  3. 在「選擇查詢模式」部分,選擇「持續查詢」

  4. 按一下「確認」。

  5. 在查詢編輯器中,依序點選「更多」>「查詢設定」

  6. 在「Custom job ID prefix」(自訂工作 ID 前置字串) 區段中,輸入自訂名稱前置字串。

  7. 按一下 [儲存]

範例

下列 SQL 範例顯示持續查詢的常見用途。

將資料匯出至 Pub/Sub 主題

以下範例顯示連續查詢,可從接收串流計程車乘車資訊的 BigQuery 資料表篩選資料,並透過訊息屬性將資料即時發布至 Pub/Sub 主題:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message,
    TO_JSON(
      STRUCT(
        CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
);

將資料匯出至 Bigtable 資料表

以下範例顯示連續查詢,可從接收計程車乘車資訊串流的 BigQuery 資料表篩選資料,並即時將資料匯出至 Bigtable 資料表:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_BIGTABLE',
    truncate = TRUE,
    overwrite = TRUE,
    uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides')
AS (
  SELECT
    CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey,
    STRUCT(
      timestamp,
      latitude,
      longitude,
      meter_reading,
      ride_status,
      passenger_count) AS features
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
);

將資料寫入 BigQuery 資料表

以下範例顯示持續查詢,可從接收計程車乘車資訊串流的 BigQuery 資料表篩選及轉換資料,然後即時將資料寫入另一個 BigQuery 資料表。這樣一來,資料就能用於後續的下游分析。

INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
SELECT
  timestamp,
  meter_reading,
  ride_status,
  passenger_count,
  ST_Distance(
    ST_GeogPoint(pickup_longitude, pickup_latitude),
    ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
    SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
FROM
  APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
    -- Configure the APPENDS TVF start_timestamp to specify when you want to
    -- start processing data using your continuous query.
    -- This example starts processing at 10 minutes before the current time.
    CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
WHERE
  ride_status = 'dropoff';

使用 Vertex AI 模型處理資料

以下範例顯示連續查詢,該查詢會使用 Vertex AI 模型,根據計程車乘客目前的緯度和經度產生廣告,然後即時將結果匯出至 Pub/Sub 主題:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude,
        prompt,
        ml_generate_text_llm_result)) AS message
  FROM
    ML.GENERATE_TEXT(
      MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`,
      (
        SELECT
          timestamp,
          ride_id,
          latitude,
          longitude,
          CONCAT(
            'Generate an ad based on the current latitude of ',
            latitude,
            ' and longitude of ',
            longitude) AS prompt
        FROM
          APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
            -- Configure the APPENDS TVF start_timestamp to specify when you
            -- want to start processing data using your continuous query.
            -- This example starts processing at 10 minutes before the current time.
            CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
        WHERE ride_status = 'enroute'
      ),
      STRUCT(
        50 AS max_output_tokens,
        1.0 AS temperature,
        40 AS top_k,
        1.0 AS top_p,
        TRUE AS flatten_json_output))
      AS ml_output
);

修改持續查詢的 SQL

持續查詢作業執行期間,您無法更新持續查詢中使用的 SQL。您必須取消持續查詢工作、修改 SQL,然後從停止原始持續查詢工作的位置,啟動新的持續查詢工作。

如要修改持續查詢中使用的 SQL,請按照下列步驟操作:

  1. 查看要更新的連續查詢工作詳細資料,並記下工作 ID。
  2. 盡可能暫停收集上游資料。如果無法執行這項操作,持續查詢重新啟動時,可能會出現部分重複資料。
  3. 取消要修改的持續查詢
  4. 使用 INFORMATION_SCHEMA JOBS 檢視畫面,取得原始持續查詢工作的 end_time 值:

    SELECT end_time
    FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
    WHERE
      EXTRACT(DATE FROM creation_time) = current_date()
    AND error_result.reason = 'stopped'
    AND job_id = 'JOB_ID';

    更改下列內容:

    • PROJECT_ID:您的專案 ID。
    • REGION:專案使用的區域。
    • JOB_ID:您在步驟 1 中識別的連續查詢作業 ID。
  5. 修改持續查詢 SQL 陳述式,從特定時間點開始持續查詢,並使用您在步驟 5 中擷取的 end_time 值做為起點。

  6. 修改持續查詢 SQL 陳述式,以反映所需變更。

  7. 執行修改後的持續查詢。

取消持續查詢

您可以像取消其他工作一樣取消持續查詢工作。取消工作後,查詢最多可能需要一分鐘才會停止執行。

如果取消查詢後重新啟動,重新啟動的查詢會視為新的獨立查詢。重新啟動的查詢不會從上一個作業停止處理資料的位置開始,也無法參照上一個查詢的結果。請參閱「從特定時間點開始執行連續查詢」。

監控查詢及處理錯誤

持續查詢可能會因資料不一致、結構定義變更、服務暫時中斷或維護等因素而中斷。雖然 BigQuery 會處理部分暫時性錯誤,但改善工作復原能力的最佳做法包括:

後續步驟