Créer des requêtes continues
Ce document explique comment exécuter une requête continue dans BigQuery.
Les requêtes continues BigQuery sont des instructions SQL qui s'exécutent de façon continue. Les requêtes continues vous permettent d'analyser les données entrantes dans BigQuery en temps réel, puis d'exporter les résultats vers Bigtable ou Pub/Sub, ou d'écrire les résultats dans une table BigQuery.
Sélectionner un type de compte
Vous pouvez créer et exécuter un job de requête continue à l'aide d'un compte utilisateur, ou créer un job de requête continue à l'aide d'un compte utilisateur, puis l'exécuter à l'aide d'un compte de service. Vous devez utiliser un compte de service pour exécuter une requête continue qui exporte les résultats vers un sujet Pub/Sub.
Lorsque vous utilisez un compte utilisateur, une requête continue s'exécute pendant deux jours. Lorsque vous utilisez un compte de service, une requête continue s'exécute jusqu'à son annulation explicite. Pour en savoir plus, consultez la section Autorisation.
Autorisations requises
Cette section décrit les autorisations dont vous avez besoin pour créer et exécuter une requête continue. Au lieu des rôles IAM (Identity and Access Management) mentionnés, vous pouvez obtenir les autorisations requises via des rôles personnalisés.
Autorisations avec un compte utilisateur
Cette section fournit des informations sur les rôles et les autorisations nécessaires pour créer et exécuter une requête continue à l'aide d'un compte utilisateur.
Pour créer un job dans BigQuery, le compte utilisateur doit disposer de l'autorisation IAM bigquery.jobs.create
. Chacun des rôles IAM suivants accorde l'autorisation bigquery.jobs.create
:
- Utilisateur BigQuery (
roles/bigquery.user
) - Utilisateur de job BigQuery (
roles/bigquery.jobUser
) - Administrateur BigQuery (
roles/bigquery.admin
)
Pour exporter des données à partir d'une table BigQuery, le compte utilisateur doit disposer de l'autorisation IAM bigquery.tables.export
. Chacun des rôles IAM suivants accorde l'autorisation bigquery.tables.export
:
- Lecteur de données BigQuery (
roles/bigquery.dataViewer
) - Éditeur de données BigQuery (
roles/bigquery.dataEditor
) - Propriétaire de données BigQuery (
roles/bigquery.dataOwner
) - Administrateur BigQuery (
roles/bigquery.admin
)
Pour mettre à jour les données d'une table BigQuery, le compte utilisateur doit disposer de l'autorisation IAM bigquery.tables.updateData
. Chacun des rôles IAM suivants accorde l'autorisation bigquery.tables.updateData
:
- Éditeur de données BigQuery (
roles/bigquery.dataEditor
) - Propriétaire de données BigQuery (
roles/bigquery.dataOwner
) - Administrateur BigQuery (
roles/bigquery.admin
)
Si le compte utilisateur doit activer les API nécessaires pour votre cas d'utilisation des requêtes continues, il doit disposer du rôle Administrateur Service Usage (roles/serviceusage.serviceUsageAdmin
).
Autorisations avec un compte de service
Cette section fournit des informations sur les rôles et les autorisations nécessaires pour le compte utilisateur qui crée la requête continue, ainsi que pour le compte de service qui exécute la requête continue.
Autorisations du compte utilisateur
Pour créer un job dans BigQuery, le compte utilisateur doit disposer de l'autorisation IAM bigquery.jobs.create
. Chacun des rôles IAM suivants accorde l'autorisation bigquery.jobs.create
:
- Utilisateur BigQuery (
roles/bigquery.user
) - Utilisateur de job BigQuery (
roles/bigquery.jobUser
) - Administrateur BigQuery (
roles/bigquery.admin
)
Pour envoyer un job qui s'exécute à l'aide d'un compte de service, le compte utilisateur doit disposer du rôle Utilisateur du compte de service (roles/iam.serviceAccountUser
). Si vous utilisez le même compte utilisateur pour créer le compte de service, celui-ci doit disposer du rôle Administrateur de compte de service (roles/iam.serviceAccountAdmin
). Pour obtenir des informations sur la manière de limiter l'accès d'un utilisateur à un seul compte de service plutôt qu'à tous les comptes de service d'un projet, consultez la section Attribuer un rôle unique.
Si le compte utilisateur doit activer les API nécessaires pour votre cas d'utilisation des requêtes continues, il doit disposer du rôle Administrateur Service Usage (roles/serviceusage.serviceUsageAdmin
).
Autorisations de compte de service
Pour exporter des données à partir d'une table BigQuery, le compte de service doit disposer de l'autorisation IAM bigquery.tables.export
. Chacun des rôles IAM suivants accorde l'autorisation bigquery.tables.export
:
- Lecteur de données BigQuery (
roles/bigquery.dataViewer
) - Éditeur de données BigQuery (
roles/bigquery.dataEditor
) - Propriétaire de données BigQuery (
roles/bigquery.dataOwner
) - Administrateur BigQuery (
roles/bigquery.admin
)
bigquery.tables.updateData
. Chacun des rôles IAM suivants accorde l'autorisation bigquery.tables.updateData
:
- Éditeur de données BigQuery (
roles/bigquery.dataEditor
) - Propriétaire de données BigQuery (
roles/bigquery.dataOwner
) - Administrateur BigQuery (
roles/bigquery.admin
)
Avant de commencer
-
In the Google Cloud console, on the project selector page, select or create a Google Cloud project.
-
Make sure that billing is enabled for your Google Cloud project.
-
Enable the BigQuery API.
Créer une réservation
Créez une réservation pour l'édition Enterprise ou Enterprise Plus, puis créez une attribution de réservation avec un type de tâche CONTINUOUS
.
Lorsque vous créez une attribution de réservation pour une requête continue, la réservation associée est limitée à 500 emplacements ou moins, et ne peut pas être configurée pour utiliser l'autoscaling.
Exporter vers Pub/Sub
Des API, des autorisations IAM et des ressources Google Cloud supplémentaires sont nécessaires pour exporter des données vers Pub/Sub. Pour en savoir plus, consultez la section Exporter vers Pub/Sub.
Intégrer des attributs personnalisés sous forme de métadonnées dans des messages Pub/Sub
Vous pouvez utiliser des attributs Pub/Sub pour fournir des informations supplémentaires sur le message, telles que sa priorité, son origine, sa destination ou des métadonnées supplémentaires. Vous pouvez également utiliser des attributs pour filtrer les messages de l'abonnement.
Dans un résultat de requête continue, si une colonne est nommée _ATTRIBUTES
, ses valeurs sont copiées dans les attributs de message Pub/Sub.
Les champs fournis dans _ATTRIBUTES
sont utilisés comme clés d'attribut.
La colonne _ATTRIBUTES
doit être de type JSON
, au format ARRAY<STRUCT<STRING, STRING>>
ou STRUCT<STRING>
.
Pour obtenir un exemple, consultez la section Exporter des données vers un sujet Pub/Sub.
Exporter vers Bigtable
Des API, des autorisations IAM et des ressources Google Cloud supplémentaires sont nécessaires pour exporter des données vers Bigtable. Pour en savoir plus, consultez la section Exporter vers Bigtable.
Écrire des données dans une table BigQuery
Vous pouvez écrire des données dans une table BigQuery à l'aide d'une instruction INSERT
.
Utiliser les fonctions d'IA
Des API, des autorisations IAM et des ressources Google Cloud supplémentaires sont nécessaires pour utiliser une fonction d'IA compatible dans une requête continue. Pour en savoir plus, consultez l'un des sujets suivants, en fonction de votre cas d'utilisation :
- Générer du texte à l'aide de la fonction
ML.GENERATE_TEXT
- Générer des embeddings textuels à l'aide de la fonction
ML.GENERATE_EMBEDDING
- Comprenez du texte avec la fonction
ML.UNDERSTAND_TEXT
- Traduire du texte avec la fonction
ML.TRANSLATE
Lorsque vous utilisez une fonction d'IA dans une requête continue, vérifiez si la sortie de la requête reste dans le quota de la fonction. Si vous dépassez le quota, vous devrez peut-être gérer séparément les enregistrements qui ne sont pas traités.
Exécuter une requête continue à l'aide d'un compte utilisateur
Cette section explique comment exécuter une requête continue à l'aide d'un compte utilisateur. Une fois la requête continue exécutée, vous pouvez fermer la console Google Cloud, la fenêtre de terminal ou l'application sans interrompre l'exécution de la requête.
Pour exécuter une requête continue, procédez comme suit :
Console
Dans la console Google Cloud, accédez à la page BigQuery.
Dans l'éditeur de requête, cliquez sur Plus.
Dans la section Choisir le mode de requête, sélectionnez Requête continue.
Cliquez sur Confirmer.
Dans l'éditeur de requête, saisissez l'instruction SQL de la requête continue. L'instruction SQL ne doit contenir que des opérations compatibles.
Cliquez sur Exécuter.
bq
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Dans Cloud Shell, exécutez la requête continue à l'aide de la commande
bq query
avec l'option--continuous
:bq query --use_legacy_sql=false --continuous=true 'QUERY'
Remplacez
QUERY
par l'instruction SQL de la requête continue. L'instruction SQL ne doit contenir que des opérations compatibles.
API
Exécutez la requête continue en appelant la méthode jobs.insert
.
Vous devez définir le champ continuous
sur true
dans le champ JobConfigurationQuery
de la ressource Job
que vous transmettez.
curl --request POST \ 'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs --header 'Authorization: Bearer $(gcloud auth application-default print-access-token)' \ --header 'Accept: application/json' \ --header 'Content-Type: application/json' \ --data '("configuration":("continuous":true,"useLegacySql":false,"query":"QUERY"))' --compressed
Remplacez les éléments suivants :
PROJECT_ID
: ID de votre projet.QUERY
: instruction SQL de la requête continue. L'instruction SQL ne doit contenir que des opérations compatibles.
Exécuter une requête continue à l'aide d'un compte de service
Cette section explique comment exécuter une requête continue à l'aide d'un compte de service. Une fois la requête continue exécutée, vous pouvez fermer la console Google Cloud, la fenêtre de terminal ou l'application sans interrompre l'exécution de la requête.
Pour utiliser un compte de service afin d'exécuter une requête continue, procédez comme suit :
Console
- Créez un compte de service.
- Accordez les autorisations requises au compte de service :
Dans la console Google Cloud, accédez à la page BigQuery.
Dans l'éditeur de requête, cliquez sur Plus.
Dans la section Choisir le mode de requête, sélectionnez Requête continue.
Cliquez sur Confirmer.
Dans l'éditeur de requête, cliquez sur Plus > Paramètres de requête.
Dans la section Requête continue, utilisez la zone de texte Compte de service pour sélectionner le compte de service que vous avez créé.
Cliquez sur Enregistrer.
Dans l'éditeur de requête, saisissez l'instruction SQL de la requête continue. L'instruction SQL ne doit contenir que des opérations compatibles.
Cliquez sur Exécuter.
bq
- Créez un compte de service.
- Accordez les autorisations requises au compte de service :
-
In the Google Cloud console, activate Cloud Shell.
At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.
Sur la ligne de commande, exécutez la requête continue à l'aide de la commande
bq query
avec les options suivantes :- Définissez l'indicateur
--continuous
surtrue
pour rendre la requête continue. - Utilisez l'option
--connection_property
pour spécifier un compte de service à utiliser.
bq query --project_id=PROJECT_ID --use_legacy_sql=false \ --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \ 'QUERY'
Remplacez les éléments suivants :
PROJECT_ID
: ID de votre projet.SERVICE_ACCOUNT_EMAIL
: adresse e-mail du compte de service. Vous pouvez obtenir l'adresse e-mail du compte de service sur la page Comptes de service de la console Google Cloud.QUERY
: instruction SQL de la requête continue. L'instruction SQL ne doit contenir que des opérations compatibles.
- Définissez l'indicateur
API
- Créez un compte de service.
- Accordez les autorisations requises au compte de service :
Exécutez la requête continue en appelant la méthode
jobs.insert
. Définissez les champs suivants dans la ressourceJobConfigurationQuery
de la ressourceJob
que vous transmettez :- Définissez le champ
continuous
surtrue
pour rendre la requête continue. - Utilisez le champ
connection_property
pour spécifier un compte de service à utiliser.
curl --request POST \ 'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs --header 'Authorization: Bearer $(gcloud auth print-access-token) \ --header 'Accept: application/json' \ --header 'Content-Type: application/json' \ --data '("configuration":("query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \ --compressed
Remplacez les éléments suivants :
PROJECT_ID
: ID de votre projet.QUERY
: instruction SQL de la requête continue. L'instruction SQL ne doit contenir que des opérations compatibles.SERVICE_ACCOUNT_EMAIL
: adresse e-mail du compte de service. Vous pouvez obtenir l'adresse e-mail du compte de service sur la page Comptes de service de la console Google Cloud.
- Définissez le champ
Exemples
Les exemples SQL suivants illustrent des cas d'utilisation courants des requêtes continues.
Exporter des données vers un sujet Pub/Sub
L'exemple suivant montre une requête continue qui filtre les données d'une table BigQuery qui reçoit des flux d'informations sur les trajets en taxi, et publie les données sur un sujet Pub/Sub en temps réel avec des attributs de messages :
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude)) AS message, TO_JSON( STRUCT( CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES FROM `myproject.real_time_taxi_streaming.taxi_rides` WHERE ride_status = 'enroute' );
Exporter des données vers une table Bigtable
L'exemple suivant montre une requête continue qui filtre les données d'une table BigQuery qui reçoit des flux d'informations sur les trajets en taxi, et les exporte dans une table Bigtable en temps réel :
EXPORT DATA OPTIONS ( format = 'CLOUD_BIGTABLE', truncate = TRUE, overwrite = TRUE, uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides') AS ( SELECT CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey, STRUCT( timestamp, latitude, longitude, meter_reading, ride_status, passenger_count) AS features FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'enroute' );
Écrire des données dans une table BigQuery
L'exemple suivant montre une requête continue qui filtre et transforme les données d'une table BigQuery qui reçoit des flux d'informations sur les trajets en taxi, puis écrit les données dans une autre table BigQuery en temps réel. Les données sont ainsi disponibles pour une analyse ultérieure en aval.
INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides` SELECT timestamp, meter_reading, ride_status, passenger_count, ST_Distance( ST_GeogPoint(pickup_longitude, pickup_latitude), ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance, SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'dropoff';
Traiter des données à l'aide d'un modèle Vertex AI
L'exemple suivant montre une requête continue qui utilise un modèle Vertex AI pour générer une annonce pour les passagers de taxi en fonction de leur latitude et de leur longitude actuelles, puis qui exporte les résultats dans un sujet Pub/Sub en temps réel :
EXPORT DATA OPTIONS ( format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING( STRUCT( ride_id, timestamp, latitude, longitude, prompt, ml_generate_text_llm_result)) AS message FROM ML.GENERATE_TEXT( MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`, ( SELECT timestamp, ride_id, latitude, longitude, CONCAT( 'Generate an ad based on the current latitude of ', latitude, ' and longitude of ', longitude) AS prompt FROM `myproject.real_time_taxi_streaming.taxirides` WHERE ride_status = 'enroute' ), STRUCT( 50 AS max_output_tokens, 1.0 AS temperature, 40 AS top_k, 1.0 AS top_p, TRUE AS flatten_json_output)) AS ml_output );
Démarrer une requête continue à partir d'un moment précis
Lorsque vous lancez une requête continue, elle traite toutes les lignes de la table à partir de laquelle vous effectuez la sélection, puis traite les nouvelles lignes à mesure qu'elles arrivent. Si vous souhaitez ignorer le traitement d'une partie ou de l'ensemble des données existantes, vous pouvez utiliser la fonction d'historique des modifications APPENDS
pour commencer le traitement à partir d'un moment précis.
L'exemple suivant montre comment lancer une requête continue à partir d'un moment précis à l'aide de la fonction APPENDS
:
EXPORT DATA OPTIONS (format = 'CLOUD_PUBSUB', uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS ( SELECT TO_JSON_STRING(STRUCT(ride_id, timestamp, latitude, longitude)) AS message FROM APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, '2024-06-12 01:23:03.652423 UTC', NULL) WHERE ride_status = 'enroute');
Modifier le code SQL d'une requête continue
Vous ne pouvez pas mettre à jour le code SQL utilisé dans une requête continue pendant l'exécution du job de requête continue. Vous devez annuler le job de requête continue, modifier le code SQL, puis démarrer un nouveau job de requête continue à partir du moment où vous avez arrêté le job de requête continue d'origine.
Pour modifier le code SQL utilisé dans une requête continue, procédez comme suit :
- Affichez les détails du job pour le job de requête continue que vous souhaitez mettre à jour et notez l'ID du job.
- Si possible, suspendez la collecte des données en amont. Si vous ne pouvez pas le faire, vous risquez de dupliquer des données lorsque la requête continue est redémarrée.
- Annulez la requête continue que vous souhaitez modifier.
Récupérez la valeur
end_time
du job de requête continue d'origine à l'aide de la vueJOBS
INFORMATION_SCHEMA
:SELECT end_time FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT WHERE EXTRACT(DATE FROM creation_time) = current_date() AND error_result.reason = 'stopped' AND job_id = 'JOB_ID';
Remplacez les éléments suivants :
PROJECT_ID
: ID de votre projet.REGION
: région utilisée par votre projet.JOB_ID
: ID du job de requête continue que vous avez identifié à l'étape 1.
Modifiez l'instruction SQL de requête continue pour démarrer la requête continue à partir d'un moment précis, en utilisant la valeur
end_time
récupérée à l'étape 5 comme point de départ.Modifiez l'instruction SQL de la requête continue pour refléter les modifications nécessaires.
Exécutez la requête continue modifiée.
Annuler une requête continue
Vous pouvez annuler un job de requête continue comme n'importe quel autre job. L'arrêt de l'exécution du job peut prendre jusqu'à une minute après l'annulation du job.