Creare query continue

Questo documento descrive come eseguire una query continua in BigQuery.

Le query continue di BigQuery sono istruzioni SQL eseguite continuamente. Le query continue ti consentono di analizzare i dati in entrata in BigQuery in tempo reale, quindi di esportare i risultati in Bigtable o Pub/Sub oppure di scriverli in una tabella BigQuery.

Scegli un tipo di account

Puoi creare ed eseguire un job di query continuo utilizzando un account utente oppure puoi creare un job di query continuo utilizzando un account utente ed eseguirlo mediante un account di servizio. Devi utilizzare un account di servizio per eseguire una query continua che esporterà i risultati in un argomento Pub/Sub.

Quando utilizzi un account utente, l'esecuzione di una query continua per due giorni. Quando utilizzi un account di servizio, viene eseguita una query continua fino a quando non viene annullata esplicitamente. Per ulteriori informazioni, consulta Autorizzazione.

Autorizzazioni obbligatorie

Questa sezione descrive le autorizzazioni necessarie per creare ed eseguire una query continua. Come alternativa ai ruoli Identity and Access Management (IAM) di cui abbiamo parlato, potresti ottenere le autorizzazioni richieste ruoli personalizzati.

Autorizzazioni per l'utilizzo di un account utente

Questa sezione fornisce informazioni sui ruoli e sulle autorizzazioni richiesti per creare ed eseguire una query continua utilizzando un account utente.

Per creare un job in BigQuery, l'account utente deve disporre dell'autorizzazione IAM bigquery.jobs.create. Ciascuno dei seguenti ruoli IAM concede l'autorizzazione bigquery.jobs.create:

Per esportare i dati da una tabella BigQuery, l'account utente deve avere l'autorizzazione IAM bigquery.tables.export . Ciascuno dei i seguenti ruoli IAM concedeno l'errore bigquery.tables.export autorizzazione:

Per aggiornare i dati in una tabella BigQuery, l'account utente deve disporre dell'autorizzazione IAM bigquery.tables.updateData. Ciascuno dei seguenti ruoli IAM concede l'autorizzazione bigquery.tables.updateData:

Se l'account utente deve abilitare le API richieste per caso d'uso di query continue, l'account utente deve avere Amministratore Service Usage (roles/serviceusage.serviceUsageAdmin) ruolo.

Autorizzazioni per l'utilizzo di un account di servizio

Questa sezione fornisce informazioni sui ruoli e sulle autorizzazioni richiesti dall'account utente che crea la query continua e dall'account di servizio che esegue la query continua.

Autorizzazioni dell'account utente

Per creare un job in BigQuery, l'account utente deve avere Autorizzazione IAM bigquery.jobs.create. Ciascuno dei seguenti ruoli IAM concede l'autorizzazione bigquery.jobs.create:

Per inviare un job che viene eseguito utilizzando un account di servizio, l'account utente deve disporre del ruolo Utente account di servizio (roles/iam.serviceAccountUser). Se utilizzi lo stesso account utente per creare l'account di servizio, allora l'account utente deve avere Amministratore account di servizio (roles/iam.serviceAccountAdmin) ruolo. Per informazioni su come limitare l'accesso di un utente a un singolo account di servizio, piuttosto che a tutti gli account di servizio all'interno di un progetto, consulta Concedere un singolo ruolo.

Se l'account utente deve attivare le API richieste per il tuo caso d'uso di query continue, deve disporre del ruolo Amministratore utilizzo servizio (roles/serviceusage.serviceUsageAdmin).

Autorizzazioni account di servizio

Per esportare i dati da una tabella BigQuery, l'account di servizio deve dispongono dell'autorizzazione IAM bigquery.tables.export. Ciascuno dei i seguenti ruoli IAM concedeno l'errore bigquery.tables.export autorizzazione:

Per aggiornare i dati in una tabella BigQuery, l'account di servizio deve disporre dell'autorizzazione IAM bigquery.tables.updateData. Ciascuno dei i seguenti ruoli IAM concedeno l'errore bigquery.tables.updateData autorizzazione:

Prima di iniziare

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the BigQuery API.

    Enable the API

Crea una prenotazione

Crea una prenotazione per la versione Enterprise o Enterprise Plus, poi crea un'assegnazione della prenotazione con un tipo di job CONTINUOUS.

Esporta in Pub/Sub

Per esportare i dati in Pub/Sub sono obbligatorie API, autorizzazioni IAM e risorse Google Cloud aggiuntive. Per ulteriori informazioni, vedi Esporta in Pub/Sub.

Incorporare attributi personalizzati come metadati nei messaggi Pub/Sub.

Puoi utilizzare gli attributi Pub/Sub per fornire ulteriori informazioni sul messaggio, come la sua priorità, origine, destinazione o metadati aggiuntivi. Puoi anche utilizzare gli attributi per filtrare i messaggi in base alla sottoscrizione.

All'interno del risultato di una query continua, se una colonna viene denominata _ATTRIBUTES, i relativi valori vengono copiati negli attributi dei messaggi Pub/Sub. I campi forniti all'interno di _ATTRIBUTES vengono utilizzati come chiavi degli attributi.

La colonna _ATTRIBUTES deve essere di tipo JSON, nel formato ARRAY<STRUCT<STRING, STRING>> o STRUCT<STRING>.

Per un esempio, vedi esportare i dati in un argomento Pub/Sub.

Esporta in Bigtable

API aggiuntive, autorizzazioni IAM e Google Cloud sono necessarie risorse per esportare i dati in Bigtable. Per ulteriori informazioni, consulta Esportare in Bigtable.

Scrivi dati in una tabella BigQuery

Puoi scrivere dati in una tabella BigQuery utilizzando un INSERT.

Utilizzare le funzioni di IA

API aggiuntive, autorizzazioni IAM e Google Cloud e risorse sono necessarie per utilizzare supportata di AI generativa in una query continua. Per ulteriori informazioni, consulta uno dei seguenti argomenti in base al tuo caso d'uso:

Quando utilizzi una funzione di IA in una query continua, valuta se l'output della query rimarrà entro la quota per la funzione. Se superi la quota, potresti dover gestire separatamente i record che non vengono elaborati.

Eseguire una query continua utilizzando un account utente

Questa sezione descrive come eseguire una query continua utilizzando un account utente. Quando è in esecuzione la query continua, puoi chiudere la console Google Cloud o un'applicazione senza interrompere l'esecuzione della query.

Per eseguire una query continua:

Console

  1. Nella console Google Cloud, vai alla pagina BigQuery.

    Vai a BigQuery

  2. Nell'editor delle query, fai clic su Altro.

  3. Nella sezione Scegli la modalità di query, scegli Query continua.

  4. Fai clic su Conferma.

  5. Nell'editor query, digita l'istruzione SQL per la query continua. L'istruzione SQL deve contenere solo operative supportate.

  6. Fai clic su Esegui.

bq

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. In Cloud Shell, esegui la query continua utilizzando il comando bq query con il flag --continuous:

    bq query --use_legacy_sql=false --continuous=true
    'QUERY'

    Sostituisci QUERY con l'istruzione SQL per la query continua. L'istruzione SQL deve contenere solo operazioni supportate.

API

Esegui la query continua chiamando il metodo Metodo jobs.insert. Devi impostare il campo continuous su true nella JobConfigurationQuery della Job risorsa che trasmetti.

curl --request POST \
  'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs
  --header 'Authorization: Bearer $(gcloud auth application-default print-access-token)' \
  --header 'Accept: application/json' \
  --header 'Content-Type: application/json' \
  --data '("configuration":("continuous":true,"useLegacySql":false,"query":"QUERY"))'
  --compressed

Sostituisci quanto segue:

  • PROJECT_ID: il tuo ID progetto.
  • QUERY: l'istruzione SQL per la query continua. L'istruzione SQL deve contenere solo operazioni supportate.

Esegui una query continua utilizzando un account di servizio

Questa sezione descrive come eseguire una query continua utilizzando un account di servizio. Quando è in esecuzione la query continua, puoi chiudere la console Google Cloud o un'applicazione senza interrompere l'esecuzione della query.

Per utilizzare un account di servizio per eseguire una query continua:

Console

  1. Crea un account di servizio.
  2. Concedi la licenza richiesta autorizzazioni all'account di servizio.
  3. Nella console Google Cloud, vai alla pagina BigQuery.

    Vai a BigQuery

  4. Nell'editor delle query, fai clic su Altro.

  5. Nella sezione Scegli la modalità di query, scegli Query continua.

  6. Fai clic su Conferma.

  7. Nell'editor delle query, fai clic su Altro > Impostazioni query.

  8. Nella sezione Query continua, utilizza la casella Account di servizio. per selezionare l'account di servizio che hai creato.

  9. Fai clic su Salva.

  10. Nell'editor query, digita l'istruzione SQL per la query continua. L'istruzione SQL deve contenere solo operazioni supportate.

  11. Fai clic su Esegui.

bq

  1. Crea un account di servizio.
  2. Concedi la licenza richiesta autorizzazioni all'account di servizio.
  3. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  4. Nella riga di comando, esegui la query continua utilizzando Comando bq query con i seguenti flag:

    • Imposta il flag --continuous su true per rendere la query continua.
    • Utilizza il flag --connection_property per specificare un account di servizio da utilizzare.
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'QUERY'

    Sostituisci quanto segue:

    • PROJECT_ID: il tuo ID progetto.
    • SERVICE_ACCOUNT_EMAIL: il servizio l'indirizzo email dell'account. Puoi recuperare l'indirizzo email dell'account di servizio dalla pagina Account di servizio della console Google Cloud.
    • QUERY: l'istruzione SQL per una query continua. L'istruzione SQL deve contenere solo operazioni supportate.

API

  1. Crea un account di servizio.
  2. Concedi le autorizzazioni richieste all'account di servizio.
  3. Esegui la query continua chiamando il metodo Metodo jobs.insert. Imposta i seguenti campi nella risorsa JobConfigurationQuery della risorsa Job che passi:

    • Imposta il campo continuous su true per rendere la query continua.
    • Utilizza il campo connection_property per specificare un account di servizio da utilizzare.
    curl --request POST \
      'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs
      --header 'Authorization: Bearer $(gcloud auth print-access-token) \
      --header 'Accept: application/json' \
      --header 'Content-Type: application/json' \
      --data '("configuration":("query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \
      --compressed

    Sostituisci quanto segue:

    • PROJECT_ID: il tuo ID progetto.
    • QUERY: l'istruzione SQL per una query continua. L'istruzione SQL deve contenere solo operative supportate.
    • SERVICE_ACCOUNT_EMAIL: l'indirizzo email dell'account di servizio. Puoi ricevere l'email dell'account di servizio sul Pagina Account di servizio della console Google Cloud.

Esempi

I seguenti esempi di SQL mostrano casi d'uso comuni per le query continue.

Esporta i dati in un argomento Pub/Sub

L'esempio seguente mostra una query continua che filtra i dati da una la tabella BigQuery che riceve informazioni sulle corse dei taxi in streaming, e pubblica i dati in un argomento Pub/Sub in tempo reale attributi dei messaggi:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message,
    TO_JSON(
      STRUCT(
        CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES
  FROM `myproject.real_time_taxi_streaming.taxi_rides`
  WHERE ride_status = 'enroute'
);

Esportare i dati in una tabella Bigtable

L'esempio seguente mostra una query continua che filtra i dati da una la tabella BigQuery che riceve informazioni sulle corse dei taxi in streaming, ed esporta i dati in una tabella Bigtable in tempo reale:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_BIGTABLE',
    truncate = TRUE,
    overwrite = TRUE,
    uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides')
AS (
  SELECT
    CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey,
    STRUCT(
      timestamp,
      latitude,
      longitude,
      meter_reading,
      ride_status,
      passenger_count) AS features
  FROM `myproject.real_time_taxi_streaming.taxirides`
  WHERE ride_status = 'enroute'
);

Scrivere dati in una tabella BigQuery

L'esempio seguente mostra una query continua che filtra e trasforma i dati da una tabella BigQuery che riceve informazioni su corse in taxi in streaming, quindi scrive i dati in un'altra tabella BigQuery in tempo reale. Ciò rende i dati disponibili per ulteriori analisi a valle.

INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
SELECT
  timestamp,
  meter_reading,
  ride_status,
  passenger_count,
  ST_Distance(
    ST_GeogPoint(pickup_longitude, pickup_latitude),
    ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
    SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
FROM `myproject.real_time_taxi_streaming.taxirides`
WHERE
  ride_status = 'dropoff';

Elaborare i dati utilizzando un modello Vertex AI

L'esempio seguente mostra una query continua che utilizza un Modello Vertex AI per generare una pubblicità per i passeggeri di taxi in base alla latitudine e longitudine correnti, quindi esporta i risultati in un argomento Pub/Sub in tempo reale:

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude,
        prompt,
        ml_generate_text_llm_result)) AS message
  FROM
    ML.GENERATE_TEXT(
      MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`,
      (
        SELECT
          timestamp,
          ride_id,
          latitude,
          longitude,
          CONCAT(
            'Generate an ad based on the current latitude of ',
            latitude,
            ' and longitude of ',
            longitude) AS prompt
        FROM `myproject.real_time_taxi_streaming.taxirides`
        WHERE ride_status = 'enroute'
      ),
      STRUCT(
        50 AS max_output_tokens,
        1.0 AS temperature,
        40 AS top_k,
        1.0 AS top_p,
        TRUE AS flatten_json_output))
      AS ml_output
);

Avviare una query continua da un determinato punto in tempo

Quando avvii una query continua, vengono elaborate tutte le righe della tabella da cui stai selezionando, quindi le nuove righe man mano che arrivano. Se vuoi saltare l'elaborazione di alcuni o di tutti i dati esistenti, puoi utilizzare la funzione APPENDS della cronologia delle modifiche per avviare l'elaborazione da un determinato punto in tempo.

L'esempio seguente mostra come avviare una query continua da una particolare un momento specifico utilizzando la funzione APPENDS:

EXPORT DATA
  OPTIONS (format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS (
  SELECT
    TO_JSON_STRING(STRUCT(ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, '2024-06-12 01:23:03.652423 UTC', NULL)
  WHERE
    ride_status = 'enroute');

Modifica l'SQL di una query continua

Non puoi aggiornare il codice SQL utilizzato in una query continua è in esecuzione. Devi annullare il job di query continuo, modificare l'SQL, e poi avviare un nuovo job di query continuo dal punto in cui l'hai arrestato. del job di query continua originale.

Per modificare il codice SQL utilizzato in una query continua:

  1. Visualizza i dettagli del job per il job di query continua che vuoi aggiornare e prendi nota dell'ID job.
  2. Se possibile, metti in pausa la raccolta dei dati upstream. Se non riesci a farlo, potresti riscontrare una duplicazione dei dati al riavvio della query continua.
  3. Annulla la query continua che desideri modificare.
  4. Ottieni il valore end_time per il job di query continua originale utilizzando la INFORMATION_SCHEMA JOBS visualizzazione:

    SELECT end_time
    FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
    WHERE
      EXTRACT(DATE FROM creation_time) = current_date()
    AND error_result.reason = 'stopped'
    AND job_id = 'JOB_ID';

    Sostituisci quanto segue:

    • PROJECT_ID: il tuo ID progetto.
    • REGION: la regione utilizzata dal progetto.
    • JOB_ID: l'ID del job di query continua che identificati nel Passaggio 1.
  5. Modifica l'istruzione SQL di query continua in avviare la query continua da un determinato momento, utilizzando il valore end_time recuperato al passaggio 5 come punto di partenza punto di accesso.

  6. Modifica l'istruzione SQL della query continua in base alle modifiche necessarie.

  7. Esegui la query continua modificata.

Annullare una query continua

Puoi annullare una query continua come qualsiasi altro lavoro. Potrebbe volerci fino a un minuto prima che la query verrà interrotto dopo l'annullamento del job.

Passaggi successivi