Creare query continue
Questo documento descrive come eseguire una query continua in BigQuery.
Le query continue di BigQuery sono istruzioni SQL eseguite continuamente. Le query continue ti consentono di analizzare i dati in entrata in BigQuery in tempo reale, quindi di esportare i risultati in Bigtable o Pub/Sub oppure di scrivere i risultati in una tabella BigQuery.
Scegli un tipo di account
Puoi creare ed eseguire un job di query continua utilizzando un account utente oppure puoi creare un job di query continua utilizzando un account utente e poi eseguirlo utilizzando un account di servizio. Devi utilizzare un account di servizio per eseguire una query continua che esporterà i risultati in un argomento Pub/Sub.
Quando utilizzi un account utente, viene eseguita una query continua per due giorni. Quando utilizzi un account di servizio, viene eseguita una query continua fino a quando non viene annullata esplicitamente. Per ulteriori informazioni, consulta Autorizzazione.
Autorizzazioni obbligatorie
Questa sezione descrive le autorizzazioni necessarie per creare ed eseguire una query continua. In alternativa ai ruoli IAM (Identity and Access Management) elencati, puoi ottenere le autorizzazioni richieste tramite ruoli personalizzati.
Autorizzazioni per l'utilizzo di un account utente
Questa sezione fornisce informazioni sui ruoli e sulle autorizzazioni necessarie per creare ed eseguire una query continua utilizzando un account utente.
Per creare un job in BigQuery, l'account utente deve disporre dell'autorizzazione IAM bigquery.jobs.create
. Ciascuno dei seguenti ruoli IAM concede l'autorizzazione bigquery.jobs.create
:
- Utente BigQuery (
roles/bigquery.user
) - Utente job BigQuery (
roles/bigquery.jobUser
) - Amministratore BigQuery (
roles/bigquery.admin
)
Per esportare i dati da una tabella BigQuery, l'account utente deve disporre dell'bigquery.tables.export
autorizzazione IAM . Ciascuno dei seguenti ruoli IAM concede l'autorizzazione bigquery.tables.export
:
- Visualizzatore dei dati BigQuery (
roles/bigquery.dataViewer
) - BigQuery Data Editor (
roles/bigquery.dataEditor
) - Proprietario dei dati BigQuery (
roles/bigquery.dataOwner
) - Amministratore BigQuery (
roles/bigquery.admin
)
Per aggiornare i dati in una tabella BigQuery, l'account utente deve disporre dell'autorizzazione IAM bigquery.tables.updateData
. Ciascuno dei seguenti ruoli IAM concede l'autorizzazione bigquery.tables.updateData
:
- BigQuery Data Editor (
roles/bigquery.dataEditor
) - Proprietario dei dati BigQuery (
roles/bigquery.dataOwner
) - Amministratore BigQuery (
roles/bigquery.admin
)
Se l'account utente deve attivare le API richieste per il tuo caso d'uso di query continue, deve disporre del ruolo Amministratore utilizzo servizio (roles/serviceusage.serviceUsageAdmin
).
Autorizzazioni per l'utilizzo di un account di servizio
Questa sezione fornisce informazioni sui ruoli e sulle autorizzazioni richiesti dall'account utente che crea la query continua e dall'account di servizio che esegue la query continua.
Autorizzazioni dell'account utente
Per creare un job in BigQuery, l'account utente deve disporre dell'autorizzazione IAM bigquery.jobs.create
. Ciascuno dei seguenti ruoli IAM concede l'autorizzazione bigquery.jobs.create
:
- Utente BigQuery (
roles/bigquery.user
) - Utente job BigQuery (
roles/bigquery.jobUser
) - Amministratore BigQuery (
roles/bigquery.admin
)
Per inviare un job che viene eseguito utilizzando un account di servizio, l'account utente deve disporre del ruolo Utente account di servizio (roles/iam.serviceAccountUser
). Se utilizzi lo stesso account utente per creare l'account di servizio, questo deve disporre del ruolo Amministratore account di servizio (roles/iam.serviceAccountAdmin
). Per informazioni su come limitare l'accesso di un utente a un singolo account di servizio,
piuttosto che a tutti gli account di servizio all'interno di un progetto, consulta
Concedere un singolo ruolo.
Se l'account utente deve attivare le API richieste per il tuo caso d'uso di query continue, deve disporre del ruolo Amministratore utilizzo servizio (roles/serviceusage.serviceUsageAdmin
).
Autorizzazioni service account
Per esportare i dati da una tabella BigQuery, l'account di servizio deve avere l'autorizzazione IAM bigquery.tables.export
. Ciascuno dei seguenti ruoli IAM concede l'autorizzazione bigquery.tables.export
:
- Visualizzatore dei dati BigQuery (
roles/bigquery.dataViewer
) - BigQuery Data Editor (
roles/bigquery.dataEditor
) - Proprietario dei dati BigQuery (
roles/bigquery.dataOwner
) - Amministratore BigQuery (
roles/bigquery.admin
)
bigquery.tables.updateData
. Ciascuno dei seguenti ruoli IAM concede l'autorizzazione bigquery.tables.updateData
:
- BigQuery Data Editor (
roles/bigquery.dataEditor
) - Proprietario dei dati BigQuery (
roles/bigquery.dataOwner
) - Amministratore BigQuery (
roles/bigquery.admin
)
Prima di iniziare
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the BigQuery API.
Crea una prenotazione
Crea una prenotazione della versione Enterprise o Enterprise Plus,
poi
crea un'assegnazione della prenotazione
con un tipo di job CONTINUOUS
.
Quando crei un'assegnazione di prenotazione per una query continua, la prenotazione associata è limitata a un massimo di 500 slot e non può essere configurata per l'utilizzo della scalabilità automatica.
Esporta in Pub/Sub
Per esportare i dati in Pub/Sub sono obbligatorie API, autorizzazioni IAM e risorse Google Cloud . Per saperne di più, consulta Esportare in Pub/Sub.
Incorporare attributi personalizzati come metadati nei messaggi Pub/Sub
Puoi utilizzare gli attributi Pub/Sub per fornire informazioni aggiuntive sul messaggio, come priorità, origine, destinazione o metadati aggiuntivi. Puoi anche utilizzare gli attributi per filtrare i messaggi nell'abbonamento.
All'interno di un risultato di query continua, se una colonna è denominata _ATTRIBUTES
,
i relativi valori vengono copiati negli attributi del messaggio Pub/Sub.
I campi forniti in _ATTRIBUTES
vengono utilizzati come chiavi degli attributi.
La colonna _ATTRIBUTES
deve essere di tipo JSON
, nel formato
ARRAY<STRUCT<STRING, STRING>>
o STRUCT<STRING>
.
Per un esempio, consulta esportare i dati in un argomento Pub/Sub.
Esporta in Bigtable
Per esportare i dati in Bigtable sono necessarie API, autorizzazioni IAM e risorse Google Cloud. Per ulteriori informazioni, consulta Esportare in Bigtable.
Scrivere dati in una tabella BigQuery
Puoi scrivere dati in una tabella BigQuery utilizzando un
istruzione INSERT
.
Utilizzare le funzioni di IA
Sono necessarie API, autorizzazioni IAM e risorse aggiuntive di Google Cloudper utilizzare una funzione di AI supportata in una query continua. Per ulteriori informazioni, consulta uno dei seguenti argomenti in base al tuo caso d'uso:
- Genera testo utilizzando la funzione
ML.GENERATE_TEXT
- Genera rappresentazioni distribuite di testo utilizzando la funzione
ML.GENERATE_EMBEDDING
- Comprendere il testo con la funzione
ML.UNDERSTAND_TEXT
- Tradurre il testo con la funzione
ML.TRANSLATE
Quando utilizzi una funzione di AI in una query continua, valuta se l'output della query rimarrà entro la quota per la funzione. Se superi la quota, potresti dover gestire separatamente i record che non vengono elaborati.
Eseguire una query continua utilizzando un account utente
Questa sezione descrive come eseguire una query continua utilizzando un account utente. Una volta avviata la query continua, puoi chiudere la console Google Cloud , la finestra del terminale o l'applicazione senza interrompere l'esecuzione della query.
Per eseguire una query continua:
Console
Nella console Google Cloud , vai alla pagina BigQuery.
Nell'editor di query, fai clic su Altro.
Nella sezione Scegli la modalità di query, scegli Query continua.
Fai clic su Conferma.
Nell'editor di query, digita l'istruzione SQL per la query continua. L'istruzione SQL deve contenere solo operazioni supportate.
Fai clic su Esegui.
bq
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
In Cloud Shell, esegui la query continua utilizzando il comando
bq query
con il flag--continuous
:bq query --use_legacy_sql=false --continuous=true 'QUERY'
Sostituisci
QUERY
con l'istruzione SQL per la query continua. L'istruzione SQL deve contenere solo operazioni supportate.
API
Esegui la query continua chiamando il
metodo jobs.insert
.
Devi impostare il campo continuous
su true
in JobConfigurationQuery
della risorsa Job
che passi.
curl --request POST \ "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \ --header "Authorization: Bearer $(gcloud auth print-access-token)" \ --header "Content-Type: application/json; charset=utf-8" \ --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true}}}' \ --compressed
Sostituisci quanto segue:
PROJECT_ID
: il tuo ID progetto.QUERY
: l'istruzione SQL per la query continua. L'istruzione SQL deve contenere solo operazioni supportate.
Esegui una query continua utilizzando un account di servizio
Questa sezione descrive come eseguire una query continua utilizzando un account di servizio. Una volta avviata la query continua, puoi chiudere la console Google Cloud , la finestra del terminale o l'applicazione senza interrompere l'esecuzione della query.
Per utilizzare un account di servizio per eseguire una query continua:
Console
- Crea un account di servizio.
- Concedi le autorizzazioni richieste all'account di servizio.
Nella console Google Cloud , vai alla pagina BigQuery.
Nell'editor di query, fai clic su Altro.
Nella sezione Scegli la modalità di query, scegli Query continua.
Fai clic su Conferma.
Nell'editor delle query, fai clic su Altro > Impostazioni query.
Nella sezione Query continua, utilizza la casella Account di servizio per selezionare l'account di servizio che hai creato.
Fai clic su Salva.
Nell'editor di query, digita l'istruzione SQL per la query continua. L'istruzione SQL deve contenere solo operazioni supportate.
Fai clic su Esegui.
bq
- Crea un account di servizio.
- Concedi le autorizzazioni richieste all'account di servizio.
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Nella riga di comando, esegui la query continua utilizzando il comando
bq query
con i seguenti flag:- Imposta il flag
--continuous
sutrue
per rendere la query continua. - Utilizza il flag
--connection_property
per specificare un account di servizio da utilizzare.
bq query --project_id=PROJECT_ID --use_legacy_sql=false \ --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \ 'QUERY'
Sostituisci quanto segue:
PROJECT_ID
: il tuo ID progetto.SERVICE_ACCOUNT_EMAIL
: l'indirizzo email dell'account di servizio. Puoi recuperare l'indirizzo email dell'account di servizio dalla pagina Account di servizio della console Google Cloud .QUERY
: l'istruzione SQL per la query continua. L'istruzione SQL deve contenere solo operazioni supportate.
- Imposta il flag
API
- Crea un account di servizio.
- Concedi le autorizzazioni richieste all'account di servizio.
Esegui la query continua chiamando il metodo
jobs.insert
. Imposta i seguenti campi nella risorsaJobConfigurationQuery
della risorsaJob
che passi:- Imposta il campo
continuous
sutrue
per rendere la query continua. - Utilizza il campo
connectionProperties
per specificare un account di servizio da utilizzare.
curl --request POST \ "https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs" \ --header "Authorization: Bearer $(gcloud auth print-access-token)" \ --header "Content-Type: application/json; charset=utf-8" \ --data '{"configuration":{"query":{"query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":[{"key":"service_account","value":"SERVICE_ACCOUNT_EMAIL"}]}}}' \ --compressed
Sostituisci quanto segue:
PROJECT_ID
: il tuo ID progetto.QUERY
: l'istruzione SQL per la query continua. L'istruzione SQL deve contenere solo operazioni supportate.SERVICE_ACCOUNT_EMAIL
: l'indirizzo email dell'account di servizio. Puoi recuperare l'indirizzo email dell'account di servizio nella pagina Account di servizio della console Google Cloud .
- Imposta il campo
Esempi
I seguenti esempi SQL mostrano casi d'uso comuni per le query continue.
Esportare i dati in un argomento Pub/Sub
L'esempio seguente mostra una query continua che filtra i dati di una tabella BigQuery che riceve informazioni in streaming sulle corse in taxi e li pubblica in tempo reale in un argomento Pub/Sub con attributi dei messaggi:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude)) AS message, TO_JSON( STRUCT( CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES FROM `myproject.real_time_taxi_streaming.taxi_rides` WHERE ride_status = 'enroute' );
Esportare i dati in una tabella Bigtable
L'esempio seguente mostra una query continua che filtra i dati di una tabella BigQuery che riceve informazioni in streaming sulle corse in taxi e li esporta in tempo reale nella tabella Bigtable:
EXPORT DATA OPTIONS ( format = 'CLOUD_BIGTABLE', truncate = TRUE, overwrite = TRUE, uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides') AS ( SELECT CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey, STRUCT( timestamp, latitude, longitude, meter_reading, ride_status, passenger_count) AS features FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'enroute' );
Scrivere dati in una tabella BigQuery
L'esempio seguente mostra una query continua che filtra e trasforma i dati da una tabella BigQuery che riceve informazioni in streaming sulle corse in taxi, quindi scrive i dati in un'altra tabella BigQuery in tempo reale. In questo modo, i dati sono disponibili per ulteriori analisi a valle.
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'dropoff';
Elaborare i dati utilizzando un modello Vertex AI
L'esempio seguente mostra una query continua che utilizza un modello Vertex AI per generare un annuncio per i passeggeri di taxi in base alla loro latitudine e longitudine attuali, quindi esporta i risultati in un argomento Pub/Sub in tempo reale:
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude, prompt, ml_generate_text_llm_result)) AS message FROM ML.GENERATE_TEXT( MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`, ( SELECT timestamp, ride_id, latitude, longitude, CONCAT( 'Generate an ad based on the current latitude of ', latitude, ' and longitude of ', longitude) AS prompt FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'enroute' ), STRUCT( 50 AS max_output_tokens, 1.0 AS temperature, 40 AS top_k, 1.0 AS top_p, TRUE AS flatten_json_output)) AS ml_output );
Avviare una query continua da un determinato punto nel tempo
Quando avvii una query continua, vengono elaborate tutte le righe della tabella da cui stai selezionando, quindi le nuove righe man mano che arrivano. Se
vuoi saltare l'elaborazione di alcuni o di tutti i dati esistenti, puoi utilizzare la
funzione di cronologia delle modifiche APPENDS
per avviare l'elaborazione da un determinato punto in tempo.
L'esempio seguente mostra come avviare una query continua da un determinato
momento nel tempo utilizzando la funzione APPENDS
:
EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING(STRUCT(ride_id, timestamp, latitude, longitude)) AS message FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, '2024-06-12 01:23:03.652423 UTC', NULL) WHERE ride_status = 'enroute');
Modificare il codice SQL di una query continua
Non puoi aggiornare il codice SQL utilizzato in una query continua mentre il job di query continua è in esecuzione. Devi annullare il job di query continua, modificare il codice SQL e avviare un nuovo job di query continua dal punto in cui hai interrotto il job di query continua originale.
Per modificare il codice SQL utilizzato in una query continua:
- Visualizza i dettagli del job per il job di query continua che vuoi aggiornare e prendi nota dell'ID job.
- Se possibile, metti in pausa la raccolta dei dati a monte. Se non riesci a farlo, potresti riscontrare una duplicazione dei dati al riavvio della query continua.
- Annulla la query continua che vuoi modificare.
Ottieni il valore
end_time
per il job di query continua originale utilizzando laINFORMATION_SCHEMA
JOBS
visualizzazione:SELECT end_time FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT WHERE EXTRACT(DATE FROM creation_time) = current_date() AND error_result.reason = 'stopped' AND job_id = 'JOB_ID';
Sostituisci quanto segue:
PROJECT_ID
: il tuo ID progetto.REGION
: la regione utilizzata dal progetto.JOB_ID
: l'ID job di query continua che hai identificato nel passaggio 1.
Modifica l'istruzione SQL della query continua per avviarla da un determinato punto nel tempo, utilizzando il valore
end_time
recuperato nel passaggio 5 come punto di partenza.Modifica l'istruzione SQL della query continua in base alle modifiche necessarie.
Esegui la query continua modificata.
Annullare una query continua
Puoi annullare un job di query continua come qualsiasi altro job. Potrebbero essere necessari fino a un minuto prima che la query smetta di essere eseguita dopo l'annullamento del job.