Créer des requêtes continues

Ce document explique comment exécuter une requête continue dans BigQuery.

Les requêtes continues BigQuery sont des instructions SQL qui s'exécutent en continu. Les requêtes continues vous permettent d'analyser les données entrantes dans BigQuery en temps réel, puis d'exporter les résultats vers Bigtable ou Pub/Sub, ou de les écrire dans une table BigQuery.

Sélectionner un type de compte

Vous pouvez créer et exécuter un job de requête continue à l'aide d'un compte utilisateur, ou créer un job de requête continue à l'aide d'un compte utilisateur, puis l'exécuter à l'aide d'un compte de service. Vous devez utiliser un compte de service pour exécuter une requête continue qui exporte les résultats vers un sujet Pub/Sub.

Lorsque vous utilisez un compte utilisateur, une requête continue s'exécute pendant deux jours. Lorsque vous utilisez un compte de service, une requête continue s'exécute jusqu'à son annulation explicite. Pour en savoir plus, consultez la section Autorisation.

Autorisations requises

Cette section décrit les autorisations dont vous avez besoin pour créer et exécuter une requête continue. Au lieu des rôles IAM (Identity and Access Management) mentionnés, vous pouvez obtenir les autorisations requises via des rôles personnalisés.

Autorisations avec un compte utilisateur

Cette section fournit des informations sur les rôles et les autorisations nécessaires pour créer et exécuter une requête continue à l'aide d'un compte utilisateur.

Pour créer un job dans BigQuery, le compte utilisateur doit disposer de l'autorisation IAM bigquery.jobs.create. Chacun des rôles IAM suivants accorde l'autorisation bigquery.jobs.create :

Pour exporter des données à partir d'une table BigQuery, le compte utilisateur doit disposer de l'autorisation IAM bigquery.tables.export. Chacun des rôles IAM suivants accorde l'autorisation bigquery.tables.export :

Pour mettre à jour des données dans une table BigQuery, le compte utilisateur doit disposer de l'autorisation IAM bigquery.tables.updateData. Chacun des rôles IAM suivants accorde l'autorisation bigquery.tables.updateData :

Si le compte utilisateur doit activer les API nécessaires pour votre cas d'utilisation des requêtes continues, il doit disposer du rôle Administrateur Service Usage (roles/serviceusage.serviceUsageAdmin).

Autorisations avec un compte de service

Cette section fournit des informations sur les rôles et les autorisations nécessaires pour le compte utilisateur qui crée la requête continue, ainsi que pour le compte de service qui exécute la requête continue.

Autorisations du compte utilisateur

Pour créer un job dans BigQuery, le compte utilisateur doit disposer de l'autorisation IAM bigquery.jobs.create. Chacun des rôles IAM suivants accorde l'autorisation bigquery.jobs.create :

Pour envoyer un job qui s'exécute à l'aide d'un compte de service, le compte utilisateur doit disposer du rôle Utilisateur du compte de service (roles/iam.serviceAccountUser). Si vous utilisez le même compte utilisateur pour créer le compte de service, celui-ci doit disposer du rôle Administrateur de compte de service (roles/iam.serviceAccountAdmin). Pour obtenir des informations sur la manière de limiter l'accès d'un utilisateur à un seul compte de service plutôt qu'à tous les comptes de service d'un projet, consultez la section Attribuer un rôle unique.

Si le compte utilisateur doit activer les API nécessaires pour votre cas d'utilisation des requêtes continues, il doit disposer du rôle Administrateur Service Usage (roles/serviceusage.serviceUsageAdmin).

Autorisations de compte de service

Pour exporter des données à partir d'une table BigQuery, le compte de service doit disposer de l'autorisation IAM bigquery.tables.export. Chacun des rôles IAM suivants accorde l'autorisation bigquery.tables.export :

Pour mettre à jour des données dans une table BigQuery, le compte de service doit disposer de l'autorisation IAM bigquery.tables.updateData. Chacun des rôles IAM suivants accorde l'autorisation bigquery.tables.updateData :

Avant de commencer

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the BigQuery API.

    Enable the API

Créer une réservation

Créez une réservation pour l'édition Enterprise ou Enterprise Plus, puis créez une attribution de réservation avec un type de job CONTINUOUS.

Exporter vers Pub/Sub

Des API, des autorisations IAM et des ressources Google Cloud supplémentaires sont nécessaires pour exporter des données vers Pub/Sub. Pour en savoir plus, consultez la page Exporter vers Pub/Sub.

Intégrer des attributs personnalisés sous forme de métadonnées dans des messages Pub/Sub

Vous pouvez utiliser des attributs Pub/Sub pour fournir des informations supplémentaires sur le message, telles que sa priorité, son origine, sa destination ou des métadonnées supplémentaires. Vous pouvez également utiliser des attributs pour filtrer les messages de l'abonnement.

Dans un résultat de requête continue, si une colonne est nommée _ATTRIBUTES, ses valeurs sont copiées dans les attributs de message Pub/Sub. Les champs fournis dans _ATTRIBUTES sont utilisés comme clés d'attribut.

La colonne _ATTRIBUTES doit être de type JSON et au format ARRAY<STRUCT<STRING, STRING>> ou STRUCT<STRING>.

Pour obtenir un exemple, consultez la section Exporter des données vers un sujet Pub/Sub.

Exporter vers Bigtable

Des API, des autorisations IAM et des ressources Google Cloud supplémentaires sont nécessaires pour exporter des données vers Bigtable. Pour en savoir plus, consultez la page Exporter vers Bigtable.

Écrire des données dans une table BigQuery

Vous pouvez écrire des données dans une table BigQuery à l'aide d'une instruction INSERT.

Utiliser les fonctions d'IA

Des API, des autorisations IAM et des ressources Google Cloud supplémentaires sont nécessaires pour utiliser une fonction d'IA compatible dans une requête continue. Pour en savoir plus, consultez l'un des sujets suivants, en fonction de votre cas d'utilisation :

Lorsque vous utilisez une fonction d'IA dans une requête continue, déterminez si le résultat de la requête restera dans le quota de la fonction. Si vous dépassez le quota, vous devrez peut-être gérer séparément les enregistrements qui ne sont pas traités.

Exécuter une requête continue à l'aide d'un compte utilisateur

Cette section explique comment exécuter une requête continue à l'aide d'un compte utilisateur. Une fois la requête continue exécutée, vous pouvez fermer la console Google Cloud, la fenêtre de terminal ou l'application sans interrompre l'exécution de la requête.

Pour exécuter une requête continue, procédez comme suit :

Console

  1. Dans la console Google Cloud, accédez à la page BigQuery.

    Accéder à BigQuery

  2. Dans l'éditeur de requête, cliquez sur Plus.

  3. Dans la section Choisir le mode de requête, sélectionnez Requête continue.

  4. Cliquez sur Confirmer.

  5. Dans l'éditeur de requête, saisissez l'instruction SQL correspondant à la requête continue. L'instruction SQL ne doit contenir que des opérations compatibles.

  6. Cliquez sur Exécuter.

bq

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  2. Dans Cloud Shell, exécutez la requête continue à l'aide de la commande bq query avec l'option --continuous :

    bq query --use_legacy_sql=false --continuous=true
    'QUERY'

    Remplacez QUERY par l'instruction SQL de la requête continue. L'instruction SQL ne doit contenir que des opérations compatibles.

API

Exécutez la requête continue en appelant la méthode jobs.insert. Vous devez définir le champ continuous sur true dans le champ JobConfigurationQuery de la ressource Job que vous transmettez.

curl --request POST \
  'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs
  --header 'Authorization: Bearer $(gcloud auth application-default print-access-token)' \
  --header 'Accept: application/json' \
  --header 'Content-Type: application/json' \
  --data '("configuration":("continuous":true,"useLegacySql":false,"query":"QUERY"))'
  --compressed

Remplacez les éléments suivants :

  • PROJECT_ID : ID de votre projet.
  • QUERY : instruction SQL de la requête continue. L'instruction SQL ne doit contenir que des opérations compatibles.

Exécuter une requête continue à l'aide d'un compte de service

Cette section explique comment exécuter une requête continue à l'aide d'un compte de service. Une fois la requête continue exécutée, vous pouvez fermer la console Google Cloud, la fenêtre de terminal ou l'application sans interrompre l'exécution de la requête.

Pour utiliser un compte de service afin d'exécuter une requête continue, procédez comme suit :

Console

  1. Créez un compte de service.
  2. Accordez les autorisations requises au compte de service :
  3. Dans la console Google Cloud, accédez à la page BigQuery.

    Accéder à BigQuery

  4. Dans l'éditeur de requête, cliquez sur Plus.

  5. Dans la section Choisir le mode de requête, sélectionnez Requête continue.

  6. Cliquez sur Confirmer.

  7. Dans l'éditeur de requête, cliquez sur Plus > Paramètres de requête.

  8. Dans la section Requête continue, utilisez la zone Compte de service pour sélectionner le compte de service que vous avez créé.

  9. Cliquez sur Enregistrer.

  10. Dans l'éditeur de requête, saisissez l'instruction SQL correspondant à la requête continue. L'instruction SQL ne doit contenir que des opérations compatibles.

  11. Cliquez sur Exécuter.

bq

  1. Créez un compte de service.
  2. Accordez les autorisations requises au compte de service :
  3. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

    At the bottom of the Google Cloud console, a Cloud Shell session starts and displays a command-line prompt. Cloud Shell is a shell environment with the Google Cloud CLI already installed and with values already set for your current project. It can take a few seconds for the session to initialize.

  4. Sur la ligne de commande, exécutez la requête continue à l'aide de la commande bq query avec les options suivantes :

    • Définissez l'option --continuous sur true pour que la requête soit continue.
    • Spécifiez l'option --connection_property pour spécifier un compte de service à utiliser.
    bq query --project_id=PROJECT_ID --use_legacy_sql=false \
    --continuous=true --connection_property=service_account=SERVICE_ACCOUNT_EMAIL \
    'QUERY'

    Remplacez les éléments suivants :

    • PROJECT_ID : ID de votre projet.
    • SERVICE_ACCOUNT_EMAIL : adresse e-mail du compte de service. Vous pouvez obtenir l'adresse e-mail du compte de service sur la page Comptes de service de la console Google Cloud.
    • QUERY : instruction SQL de la requête continue. L'instruction SQL ne doit contenir que des opérations compatibles.

API

  1. Créez un compte de service.
  2. Accordez les autorisations requises au compte de service :
  3. Exécutez la requête continue en appelant la méthode jobs.insert. Définissez les champs suivants dans la ressource JobConfigurationQuery de la ressource Job que vous transmettez :

    • Définissez le champ continuous sur true pour rendre la requête continue.
    • Spécifiez un compte de service à utiliser à l'aide du champ connection_property.
    curl --request POST \
      'https://bigquery.googleapis.com/bigquery/v2/projects/PROJECT_ID/jobs
      --header 'Authorization: Bearer $(gcloud auth print-access-token) \
      --header 'Accept: application/json' \
      --header 'Content-Type: application/json' \
      --data '("configuration":("query":"QUERY","useLegacySql":false,"continuous":true,"connectionProperties":["key": "service_account","value":"SERVICE_ACCOUNT_EMAIL"]))' \
      --compressed

    Remplacez les éléments suivants :

    • PROJECT_ID : ID de votre projet.
    • QUERY : instruction SQL de la requête continue. L'instruction SQL ne doit contenir que des opérations compatibles.
    • SERVICE_ACCOUNT_EMAIL : adresse e-mail du compte de service. Vous pouvez obtenir l'adresse e-mail du compte de service sur la page Comptes de service de la console Google Cloud.

Exemples

Les exemples SQL suivants illustrent des cas d'utilisation courants des requêtes continues.

Exporter des données vers un sujet Pub/Sub

L'exemple suivant montre une requête continue qui filtre les données d'une table BigQuery qui reçoit des flux d'informations sur les trajets en taxi, et publie les données sur un sujet Pub/Sub en temps réel avec des attributs de messages :

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude)) AS message,
    TO_JSON(
      STRUCT(
        CAST(passenger_count AS STRING) AS passenger_count)) AS _ATTRIBUTES
  FROM `myproject.real_time_taxi_streaming.taxi_rides`
  WHERE ride_status = 'enroute'
);

Exporter des données vers une table Bigtable

L'exemple suivant montre une requête continue qui filtre les données d'une table BigQuery qui reçoit des flux d'informations sur les trajets en taxi, et les exporte dans une table Bigtable en temps réel :

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_BIGTABLE',
    truncate = TRUE,
    overwrite = TRUE,
    uri = 'https://bigtable.googleapis.com/projects/myproject/instances/mybigtableinstance/tables/taxi-real-time-rides')
AS (
  SELECT
    CAST(CONCAT(ride_id, timestamp, latitude, longitude) AS STRING) AS rowkey,
    STRUCT(
      timestamp,
      latitude,
      longitude,
      meter_reading,
      ride_status,
      passenger_count) AS features
  FROM `myproject.real_time_taxi_streaming.taxirides`
  WHERE ride_status = 'enroute'
);

Écrire des données dans une table BigQuery

L'exemple suivant montre une requête continue qui filtre et transforme les données d'une table BigQuery qui reçoit des flux d'informations sur les trajets en taxi, puis écrit les données dans une autre table BigQuery en temps réel. Les données sont ainsi disponibles pour une analyse plus approfondie en aval.

INSERT INTO `myproject.real_time_taxi_streaming.transformed_taxirides`
SELECT
  timestamp,
  meter_reading,
  ride_status,
  passenger_count,
  ST_Distance(
    ST_GeogPoint(pickup_longitude, pickup_latitude),
    ST_GeogPoint(dropoff_longitude, dropoff_latitude)) AS euclidean_trip_distance,
    SAFE_DIVIDE(meter_reading, passenger_count) AS cost_per_passenger
FROM `myproject.real_time_taxi_streaming.taxirides`
WHERE
  ride_status = 'dropoff';

Traiter des données à l'aide d'un modèle Vertex AI

L'exemple suivant montre une requête continue qui utilise un modèle Vertex AI pour générer une annonce pour les passagers de taxi en fonction de leur latitude et de leur longitude actuelles, puis qui exporte les résultats dans un sujet Pub/Sub en temps réel :

EXPORT DATA
  OPTIONS (
    format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides')
AS (
  SELECT
    TO_JSON_STRING(
      STRUCT(
        ride_id,
        timestamp,
        latitude,
        longitude,
        prompt,
        ml_generate_text_llm_result)) AS message
  FROM
    ML.GENERATE_TEXT(
      MODEL `myproject.real_time_taxi_streaming.taxi_ml_generate_model`,
      (
        SELECT
          timestamp,
          ride_id,
          latitude,
          longitude,
          CONCAT(
            'Generate an ad based on the current latitude of ',
            latitude,
            ' and longitude of ',
            longitude) AS prompt
        FROM `myproject.real_time_taxi_streaming.taxirides`
        WHERE ride_status = 'enroute'
      ),
      STRUCT(
        50 AS max_output_tokens,
        1.0 AS temperature,
        40 AS top_k,
        1.0 AS top_p,
        TRUE AS flatten_json_output))
      AS ml_output
);

Lancer une requête continue à partir d'un moment précis

Lorsque vous démarrez une requête continue, celle-ci traite toutes les lignes de la table dans laquelle vous effectuez votre sélection, puis traite les nouvelles lignes à mesure qu'elles arrivent. Si vous souhaitez ignorer le traitement d'une partie ou de l'ensemble des données existantes, vous pouvez utiliser la fonction d'historique des modifications APPENDS pour commencer le traitement à partir d'un moment précis.

L'exemple suivant montre comment lancer une requête continue à partir d'un moment précis à l'aide de la fonction APPENDS :

EXPORT DATA
  OPTIONS (format = 'CLOUD_PUBSUB',
    uri = 'https://pubsub.googleapis.com/projects/myproject/topics/taxi-real-time-rides') AS (
  SELECT
    TO_JSON_STRING(STRUCT(ride_id,
        timestamp,
        latitude,
        longitude)) AS message
  FROM
    APPENDS(TABLE `myproject.real_time_taxi_streaming.taxi_rides`, '2024-06-12 01:23:03.652423 UTC', NULL)
  WHERE
    ride_status = 'enroute');

Modifier le code SQL d'une requête continue

Vous ne pouvez pas mettre à jour le code SQL utilisé dans une requête continue pendant l'exécution du job de requête continue. Vous devez annuler le job de requête continue, modifier le code SQL, puis démarrer un nouveau job de requête continue à partir du moment où vous avez arrêté le job de requête continue d'origine.

Pour modifier le code SQL utilisé dans une requête continue, procédez comme suit :

  1. Affichez les détails du job pour le job de requête continue que vous souhaitez mettre à jour et notez l'ID du job.
  2. Si possible, mettez en veille la collecte des données en amont. Si vous ne pouvez pas effectuer cette opération, vous risquez d'obtenir une duplication des données lors du redémarrage de la requête continue.
  3. Annulez la requête continue que vous souhaitez modifier.
  4. Récupérez la valeur end_time du job de requête continue d'origine à l'aide de la vue JOBS INFORMATION_SCHEMA :

    SELECT end_time
    FROM `PROJECT_ID.region-REGION`.INFORMATION_SCHEMA.JOBS_BY_PROJECT
    WHERE
      EXTRACT(DATE FROM creation_time) = current_date()
    AND error_result.reason = 'stopped'
    AND job_id = 'JOB_ID';

    Remplacez les éléments suivants :

    • PROJECT_ID : ID de votre projet.
    • REGION : région utilisée par votre projet.
    • JOB_ID : ID du job de requête continue que vous avez identifié à l'étape 1.
  5. Modifiez l'instruction SQL de requête continue pour démarrer la requête continue à partir d'un moment précis, en utilisant la valeur end_time récupérée à l'étape 5 comme point de départ.

  6. Modifiez l'instruction SQL de requête continue pour refléter les modifications nécessaires.

  7. Exécutez la requête continue modifiée.

Annuler une requête continue

Vous pouvez annuler un job de requête continue comme n'importe quel autre job. L'arrêt de l'exécution du job peut prendre jusqu'à une minute après l'annulation du job.

Étape suivante