建立持續查詢

本文說明如何在 BigQuery 中執行連續查詢

BigQuery 持續查詢是會不斷執行的 SQL 陳述式,透過持續查詢,您可以在 BigQuery 中即時分析傳入資料,然後將結果匯出至 Bigtable、Pub/Sub 或 Spanner,或是將結果寫入 BigQuery 資料表。

選擇帳戶類型

您可以使用使用者帳戶建立及執行持續查詢作業,也可以使用使用者帳戶建立持續查詢作業,然後使用服務帳戶執行作業。 您必須使用服務帳戶,才能執行將結果匯出至 Pub/Sub 主題的持續查詢。

使用使用者帳戶時,連續查詢最多可執行兩天。使用服務帳戶時,持續查詢最多可執行 150 天。詳情請參閱「授權」。

所需權限

本節說明建立及執行連續查詢所需的權限。除了上述身分與存取權管理 (IAM) 角色,您也可以透過自訂角色取得必要權限。

使用使用者帳戶時的權限

本節說明使用使用者帳戶建立及執行持續查詢時,需要哪些角色和權限。

如要在 BigQuery 中建立工作,使用者帳戶必須具備 bigquery.jobs.create IAM 權限。下列每個 IAM 角色都會授予 bigquery.jobs.create 權限:

如要從 BigQuery 資料表匯出資料,使用者帳戶必須具備 bigquery.tables.export IAM 權限。下列每個 IAM 角色都會授予 bigquery.tables.export 權限:

如要更新 BigQuery 資料表中的資料,使用者帳戶必須具備 bigquery.tables.updateData IAM 權限。下列每個 IAM 角色都會授予 bigquery.tables.updateData 權限:

如果使用者帳戶必須啟用持續查詢用途所需的 API,則該帳戶必須具備服務使用管理員 (roles/serviceusage.serviceUsageAdmin) 角色。

使用服務帳戶時的權限

本節說明建立持續查詢的使用者帳戶,以及執行持續查詢的服務帳戶,分別需要哪些角色和權限。

使用者帳戶權限

如要在 BigQuery 中建立工作,使用者帳戶必須具備 bigquery.jobs.create IAM 權限。下列 IAM 角色都會授予 bigquery.jobs.create 權限:

如要提交使用服務帳戶執行的工作,使用者帳戶必須具備服務帳戶使用者 (roles/iam.serviceAccountUser) 角色。如果您使用同一個使用者帳戶建立服務帳戶,則該使用者帳戶必須具備服務帳戶管理員 (roles/iam.serviceAccountAdmin) 角色。如要瞭解如何限制使用者存取單一服務帳戶,而非專案中的所有服務帳戶,請參閱授予單一角色

如果使用者帳戶必須啟用持續查詢用途所需的 API,則該帳戶必須具備服務使用管理員 (roles/serviceusage.serviceUsageAdmin) 角色。

服務帳戶權限

如要從 BigQuery 資料表匯出資料,服務帳戶必須具備 bigquery.tables.export IAM 權限。下列每個 IAM 角色都會授予 bigquery.tables.export 權限:

如要更新 BigQuery 資料表中的資料,服務帳戶必須具備 bigquery.tables.updateData IAM 權限。下列每個 IAM 角色都會授予 bigquery.tables.updateData 權限:

事前準備

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Verify that billing is enabled for your Google Cloud project.

  3. Enable the BigQuery API.

    Enable the API

建立保留項目

建立 Enterprise 或 Enterprise Plus 版本預訂,然後建立預訂指派,並使用 CONTINUOUS 工作類型。這個預留項目可以自動調整資源配置。連續查詢的預留項目指派作業須遵守預留項目限制

匯出至 Pub/Sub

如要將資料匯出至 Pub/Sub,您必須具備額外 API、IAM 權限和 Google Cloud 資源。詳情請參閱「匯出至 Pub/Sub」。

在 Pub/Sub 訊息中將自訂屬性嵌入為中繼資料

您可以使用 Pub/Sub 屬性提供訊息的額外資訊,例如優先順序、來源、目的地或其他中繼資料。您也可以使用屬性篩選訂閱項目中的訊息

在連續查詢結果中,如果資料欄名為 _ATTRIBUTES,系統會將其值複製到 Pub/Sub 訊息屬性。_ATTRIBUTES 內提供的欄位會做為屬性鍵。

_ATTRIBUTES 欄必須為 JSON 類型,格式為 ARRAY<STRUCT<STRING, STRING>>STRUCT<STRING>

如需範例,請參閱將資料匯出至 Pub/Sub 主題

匯出至 Bigtable

如要將資料匯出至 Bigtable,您必須具備額外的 API、IAM 權限和資源。 Google Cloud詳情請參閱匯出至 Bigtable

匯出至 Spanner

如要將資料匯出至 Spanner,您必須具備額外 API、IAM 權限和 Google Cloud 資源。詳情請參閱「匯出至 Spanner (反向 ETL)」。

將資料寫入 BigQuery 資料表

您可以使用 INSERT 陳述式將資料寫入 BigQuery 資料表。

使用 AI 函式

如要在持續查詢中使用支援的 AI 函式,您需要額外的 API、IAM 權限和 Google Cloud資源。如要瞭解詳情,請根據您的用途參閱下列主題:

在連續查詢中使用 AI 函式時,請考慮查詢輸出內容是否會超出函式的配額。如果超出配額,您可能必須另外處理未處理的記錄。

指定起點

您必須在連續查詢的 FROM 子句中使用 APPENDS 函式,指定要處理的最早資料。舉例來說,APPENDS(TABLE my_table, CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE) 會指示 BigQuery 處理在開始連續查詢前最多 10 分鐘內新增至資料表 my_table 的資料。加入 my_table 的資料會在匯入時處理。資料處理不會受到延遲影響。在持續查詢中使用 APPENDS 函式時,請勿提供 end_timestamp 引數。

以下範例說明如何使用 APPENDS 函式,在查詢接收計程車乘車資訊串流的 BigQuery 資料表時,從特定時間點開始持續查詢:

EXPORT DATA
  OPTIONS (format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS (
  SELECT
    TO_JSON_STRING(STRUCT(ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE
    ride_status = 'enroute');

指定早於目前時間的起點

如要處理目前時間點之前的資料,可以使用 APPENDS 函式,為查詢指定較早的起點。您指定的起點必須位於所選資料表的時間回溯期內。時間回溯期預設為過去七天。

如要納入時間旅行視窗外的資料,請使用標準查詢插入或匯出特定時間點的資料,然後從該時間點開始持續查詢。

範例

以下範例說明如何將 BigQuery 資料表 (接收特定時間點的計程車行程串流資訊) 中的舊資料載入資料表,然後從舊資料的截斷點開始執行連續查詢。

  1. 執行標準查詢,將資料回填至特定時間點:

    INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
    SELECT
      timestamp,
      meter_reading,
      ride_status,
      passenger_count,
      ST_Distance(
        ST_GeogPoint(pickup_longitude, pickup_latitude),
        ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
        SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
    FROM `myproject.real_time_taxi_streaming.taxirides`
      -- Include all data inserted into the table up to this point in time.
      -- This timestamp must be within the time travel window.
      FOR SYSTEM_TIME AS OF '2025-01-01 00:00:00 UTC'
    WHERE
      ride_status = 'dropoff';
  2. 從查詢停止的時間點執行持續查詢:

    INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
    SELECT
      timestamp,
      meter_reading,
      ride_status,
      passenger_count,
      ST_Distance(
        ST_GeogPoint(pickup_longitude, pickup_latitude),
        ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
        SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
    FROM
      APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
        -- Configure the APPENDS TVF start_timestamp to start processing
        -- data right where the batch query left off.
        -- This timestamp must be within the time travel window.
        TIMESTAMP '2025-01-01 00:00:00 UTC' + INTERVAL 1 MICROSECOND)
    WHERE
      ride_status = 'dropoff';

使用使用者帳戶執行連續查詢

本節說明如何使用使用者帳戶執行持續查詢。持續查詢執行後,您可以關閉 Google Cloud 控制台、終端機視窗或應用程式,不會中斷查詢執行作業。使用者帳戶執行的連續查詢最多可執行兩天,之後會自動停止。如要繼續處理新的傳入資料,請啟動新的連續查詢,並指定起點。如要自動執行這項程序,請參閱重新嘗試失敗的查詢

請按照下列步驟執行連續查詢:

主控台

  1. 前往 Google Cloud 控制台的「BigQuery」頁面。

    前往「BigQuery」

  2. 在查詢編輯器中,按一下 「更多」

    1. 在「選擇查詢模式」部分,選擇「持續查詢」
    2. 按一下「確認」。
    3. 選用:如要控管查詢的執行時間,請按一下「查詢設定」,然後以毫秒為單位設定「工作逾時」
  3. 在查詢編輯器中,輸入持續查詢的 SQL 陳述式。 SQL 陳述式只能包含支援的作業

  4. 按一下「執行」

bq

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

  2. 在 Cloud Shell 中,使用 bq query 指令搭配 --continuous 旗標,執行連續查詢:

    bq query --use_legacy_sql=false --continuous=true
    'QUERY'

    QUERY 換成持續查詢的 SQL 陳述式。SQL 陳述式只能包含支援的作業。您可以使用 --job_timeout_ms 標記控制查詢的執行時間。

  3. API

    呼叫 jobs.insert 方法,執行連續查詢。您必須在傳入的 Job 資源中,將 JobConfigurationQuerycontinuous 欄位設為 true。如有需要,您可以設定 jobTimeoutMs 欄位,控制查詢的執行時間長度。

    curl --request POST \
      "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \
      --header "Authorization: Bearer $(gcloud auth print-access-token)" \
      --header "Content-Type: application/json; charset=utf-8" \
      --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true}}}' \
      --compressed

    更改下列內容:

    • PROJECT_ID:您的專案 ID。
    • QUERY:連續查詢的 SQL 陳述式。SQL 陳述式只能包含支援的作業

使用服務帳戶執行連續查詢

本節說明如何使用服務帳戶執行持續查詢。持續查詢執行後,您可以關閉 Google Cloud 控制台、終端機視窗或應用程式,不會中斷查詢執行作業。使用服務帳戶執行的持續查詢最多可執行 150 天,之後會自動停止。如要繼續處理新的傳入資料,請啟動新的連續查詢,並指定起點。如要自動執行這項程序,請參閱重新嘗試失敗的查詢

如要使用服務帳戶執行持續查詢,請按照下列步驟操作:

主控台

  1. 建立服務帳戶
  2. 授予服務帳戶必要的權限
  3. 前往 Google Cloud 控制台的「BigQuery」頁面。

    前往「BigQuery」

  4. 在查詢編輯器中,按一下「更多」

  5. 在「選擇查詢模式」部分,選擇「持續查詢」

  6. 按一下「確認」。

  7. 在查詢編輯器中,依序點選「更多」「查詢設定」

  8. 在「Continuous query」(持續查詢) 區段中,使用「Service account」(服務帳戶) 方塊選取您建立的服務帳戶。

  9. 選用:如要控管查詢執行時間,請以毫秒為單位設定「工作逾時」

  10. 按一下 [儲存]

  11. 在查詢編輯器中,輸入持續查詢的 SQL 陳述式。 SQL 陳述式只能包含支援的作業

  12. 按一下「執行」

bq

  1. 建立服務帳戶
  2. 授予服務帳戶必要的權限
  3. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

  4. 在指令列中,使用 bq query 指令加上下列旗標,執行連續查詢:

    • --continuous 旗標設為 true,即可持續查詢。
    • 使用 --connection_property 旗標指定要使用的服務帳戶。
    • 選用:設定 --job_timeout_ms 旗標,限制查詢執行時間。
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'QUERY'

    更改下列內容:

    • PROJECT_ID:您的專案 ID。
    • SERVICE_ACCOUNT_EMAIL:服務帳戶電子郵件地址。您可以在 Google Cloud 控制台的「服務帳戶」頁面取得服務帳戶電子郵件地址。
    • QUERY:連續查詢的 SQL 陳述式。SQL 陳述式只能包含支援的作業
  5. API

    1. 建立服務帳戶
    2. 授予服務帳戶必要的權限
    3. 呼叫 jobs.insert 方法,執行連續查詢。在您傳入的 Job 資源中,設定 JobConfigurationQuery 資源的下列欄位:

      • continuous 欄位設為 true,即可讓查詢持續執行。
      • 使用 connectionProperties 欄位指定要使用的服務帳戶。

      您可以選擇在 JobConfiguration 資源中設定 jobTimeoutMs 欄位,控管查詢的執行時間。

      curl --request POST \
        "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \
        --header "Authorization: Bearer $(gcloud auth print-access-token)" \
        --header "Content-Type: application/json; charset=utf-8" \
        --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":[{"key":"service_account","value":"SERVICE_ACCOUNT_EMAIL"}]}}}' \
        --compressed

      更改下列內容:

      • PROJECT_ID:您的專案 ID。
      • QUERY:連續查詢的 SQL 陳述式。SQL 陳述式只能包含支援的作業
      • SERVICE_ACCOUNT_EMAIL:服務帳戶電子郵件地址。您可以在 Google Cloud 控制台的「服務帳戶」頁面取得服務帳戶電子郵件地址。

建立自訂工作 ID

每個查詢工作都會獲派工作 ID,可用於搜尋及管理工作。根據預設,工作 ID 是隨機產生。如要使用工作記錄工作探索工具,更輕鬆地搜尋連續查詢的工作 ID,您可以指派自訂工作 ID 前置字串:

  1. 前往 Google Cloud 控制台的「BigQuery」頁面。

    前往「BigQuery」

  2. 在查詢編輯器中,按一下「更多」

  3. 在「選擇查詢模式」部分,選擇「持續查詢」

  4. 按一下「確認」。

  5. 在查詢編輯器中,依序點選「更多」>「查詢設定」

  6. 在「Custom job ID prefix」(自訂工作 ID 前置字串) 區段中,輸入自訂名稱前置字串。

  7. 按一下 [儲存]

範例

下列 SQL 範例顯示持續查詢的常見用途。

將資料匯出至 Pub/Sub 主題

以下範例顯示連續查詢,可從接收串流計程車乘車資訊的 BigQuery 資料表篩選資料,並透過訊息屬性將資料即時發布至 Pub/Sub 主題:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message,
    TO_JSON(
      STRUCT(
        CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
);

將資料匯出至 Bigtable 資料表

以下範例顯示連續查詢,可從接收計程車乘車資訊串流的 BigQuery 資料表篩選資料,並即時將資料匯出至 Bigtable 資料表:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_BIGTABLE',
    truncate = TRUE,
    overwrite = TRUE,
    uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides')
AS (
  SELECT
    CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey,
    STRUCT(
      timestamp,
      latitude,
      longitude,
      meter_reading,
      ride_status,
      passenger_count) AS features
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
      -- Configure the APPENDS TVF start_timestamp to specify when you want to
      -- start processing data using your continuous query.
      -- This example starts processing at 10 minutes before the current time.
      CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
);

將資料匯出至 Spanner 資料表

以下範例顯示連續查詢,可從接收計程車乘車資訊串流的 BigQuery 表格中篩選資料,然後即時將資料匯出至 Spanner 表格:

EXPORT DATA
 OPTIONS (
   format = 'CLOUD_SPANNER',
   uri = 'https://spanner.googleapis.com/projects/myproject/instances/myspannerinstance/databases/taxi-real-time-rides',
   spanner_options ="""{
      "table": "rides",
      -- To ensure data is written to Spanner in the correct sequence
      -- during a continuous export, use the change_timestamp_column
      -- option. This should be mapped to a timestamp column from your
      -- BigQuery data. If your source data lacks a timestamp, the 
      -- _CHANGE_TIMESTAMP pseudocolumn provided by the APPENDS function 
      -- will be automatically mapped to the "change_timestamp" column.
      "change_timestamp_column": "change_timestamp"
   }"""
  )
  AS (
  SELECT
    ride_id,
    latitude,
    longitude,
    meter_reading,
    ride_status,
    passenger_count
  FROM APPENDS(
        TABLE `myproject.real_time_taxi_streaming.taxirides`,
        -- Configure the APPENDS TVF start_timestamp to specify when you want to
        -- start processing data using your continuous query.
        -- This example starts processing at 10 minutes before the current time.
        CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
  WHERE ride_status = 'enroute'
  );

將資料寫入 BigQuery 資料表

以下範例顯示持續查詢,可從接收計程車乘車資訊串流的 BigQuery 資料表篩選及轉換資料,然後即時將資料寫入另一個 BigQuery 資料表。這樣一來,資料就能用於後續的下游分析。

INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
SELECT
  timestamp,
  meter_reading,
  ride_status,
  passenger_count,
  ST_Distance(
    ST_GeogPoint(pickup_longitude, pickup_latitude),
    ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
    SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
FROM
  APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
    -- Configure the APPENDS TVF start_timestamp to specify when you want to
    -- start processing data using your continuous query.
    -- This example starts processing at 10 minutes before the current time.
    CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
WHERE
  ride_status = 'dropoff';

使用 Vertex AI 模型處理資料

以下範例顯示連續查詢,該查詢會使用 Vertex AI 模型,根據計程車乘客目前的緯度和經度產生廣告,然後即時將結果匯出至 Pub/Sub 主題:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude,
        prompt,
        ml_generate_text_llm_result)) AS message
  FROM
    ML.GENERATE_TEXT(
      MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`,
      (
        SELECT
          timestamp,
          ride_id,
          latitude,
          longitude,
          CONCAT(
            'Generate an ad based on the current latitude of ',
            latitude,
            ' and longitude of ',
            longitude) AS prompt
        FROM
          APPENDS(TABLE `myproject.real_time_taxi_streaming.taxirides`,
            -- Configure the APPENDS TVF start_timestamp to specify when you
            -- want to start processing data using your continuous query.
            -- This example starts processing at 10 minutes before the current time.
            CURRENT_TIMESTAMP() - INTERVAL 10 MINUTE)
        WHERE ride_status = 'enroute'
      ),
      STRUCT(
        50 AS max_output_tokens,
        1.0 AS temperature,
        40 AS top_k,
        1.0 AS top_p,
        TRUE AS flatten_json_output))
      AS ml_output
);

修改持續查詢的 SQL

持續查詢作業執行期間,您無法更新持續查詢中使用的 SQL。您必須取消持續查詢工作、修改 SQL,然後從停止原始持續查詢工作的位置,啟動新的持續查詢工作。

如要修改持續查詢中使用的 SQL,請按照下列步驟操作:

  1. 查看要更新的連續查詢工作詳細資料,並記下工作 ID。
  2. 盡可能暫停收集上游資料。如果無法執行這項操作,持續查詢重新啟動時,可能會出現部分重複資料。
  3. 取消要修改的持續查詢
  4. 使用 INFORMATION_SCHEMA JOBS 檢視畫面,取得原始持續查詢工作的 end_time 值:

    SELECT end_time
    FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
    WHERE
      EXTRACT(DATE FROM creation_time) = current_date()
    AND error_result.reason = 'stopped'
    AND job_id = 'JOB_ID';

    更改下列內容:

    • PROJECT_ID:您的專案 ID。
    • REGION:專案使用的區域。
    • JOB_ID:您在步驟 1 中識別的連續查詢作業 ID。
  5. 修改持續查詢 SQL 陳述式,從特定時間點開始持續查詢,並使用您在步驟 5 中擷取的 end_time 值做為起點。

  6. 修改持續查詢 SQL 陳述式,以反映所需變更。

  7. 執行修改後的持續查詢。

取消持續查詢

您可以像取消其他工作一樣取消持續查詢工作。取消工作後,查詢最多可能需要一分鐘才會停止執行。

如果取消查詢後重新啟動,重新啟動的查詢會視為新的獨立查詢。重新啟動的查詢不會從上一個作業停止處理資料的位置開始,也無法參照上一個查詢的結果。請參閱「從特定時間點開始執行連續查詢」。

監控查詢及處理錯誤

持續查詢可能會因資料不一致、結構定義變更、服務暫時中斷或維護等因素而中斷。雖然 BigQuery 會處理部分暫時性錯誤,但改善工作復原能力的最佳做法包括:

後續步驟