Connecter et stocker des données dans BigQuery

Lorsque vous ajoutez un connecteur BigQuery à votre application Vertex AI Vision, toutes les sorties du modèle d'application connecté sont ingérées dans la table cible.

Vous pouvez créer votre propre table BigQuery et la spécifier lorsque vous ajoutez un connecteur BigQuery à l'application, ou laisser la plate-forme d'application Vertex AI Vision créer automatiquement la table.

Création automatique de tableaux

Si vous laissez la plate-forme d'application Vertex AI Vision créer automatiquement la table, vous pouvez spécifier cette option lorsque vous ajoutez le nœud de connecteur BigQuery.

Les conditions d'ensemble de données et de table suivantes s'appliquent si vous souhaitez utiliser la création automatique de tables:

  • Ensemble de données: le nom de l'ensemble de données créé automatiquement est visionai_dataset.
  • Table: le nom de la table créée automatiquement est visionai_dataset.APPLICATION_ID.
  • Gestion des erreurs:

    • Si la table du même nom existe dans le même ensemble de données, aucune création automatique n'est effectuée.

Console

  1. Ouvrez l'onglet Applications du tableau de bord Vertex AI Vision.

    Accéder à l'onglet "Applications"

  2. Sélectionnez Afficher l'application à côté du nom de votre application dans la liste.

  3. Sur la page de l'outil de création d'applications, sélectionnez BigQuery dans la section Connecteurs.

  4. Laissez le champ Chemin d'accès BigQuery vide.

    Le chemin d'accès à la table est laissé vide dans l'UI

  5. Modifiez les autres paramètres.

API REST et ligne de commande

Pour permettre à la plate-forme d'application d'inférer un schéma de table, utilisez le champ createDefaultTableIfNotExists de BigQueryConfig lorsque vous créez ou modifiez une application.

Créer et spécifier manuellement une table

Si vous souhaitez gérer manuellement votre table de sortie, elle doit comporter le schéma requis en tant que sous-ensemble du schéma de la table.

Si la table existante comporte des schémas incompatibles, le déploiement est refusé.

Utiliser le schéma par défaut

Si vous utilisez le schéma par défaut pour les tables de sortie du modèle, assurez-vous que votre table ne contient que les colonnes requises suivantes. Vous pouvez copier directement le texte de schéma suivant lorsque vous créez la table BigQuery. Pour en savoir plus sur la création d'une table BigQuery, consultez la page Créer et utiliser des tables. Pour en savoir plus sur la spécification de schéma lorsque vous créez une table, consultez la section Spécifier un schéma.

Utilisez le texte suivant pour décrire le schéma lorsque vous créez une table. Pour en savoir plus sur l'utilisation du type de colonne JSON ("type": "JSON"), consultez Utiliser des données JSON en langage SQL standard. Le type de colonne JSON est recommandé pour les requêtes d'annotation. Vous pouvez également utiliser "type" : "STRING".

[
  {
    "name": "ingestion_time",
    "type": "TIMESTAMP",
    "mode": "REQUIRED"
  },
 {
   "name": "application",
   "type": "STRING",
   "mode": "REQUIRED"
 },
 {
   "name": "instance",
   "type": "STRING",
   "mode": "REQUIRED"
 },
 {
   "name": "node",
   "type": "STRING",
   "mode": "REQUIRED"
 },
 {
   "name": "annotation",
   "type": "JSON",
   "mode": "REQUIRED"
 }
]

Console Google Cloud

  1. Dans la console Google Cloud, accédez à la page BigQuery.

    Accéder à BigQuery

  2. Sélectionnez votre projet.

  3. Sélectionnez Plus d'options .

  4. Cliquez sur Créer une table.

  5. Dans la section "Schéma", activez Modifier sous forme de texte.

image du schéma par défaut

gcloud

L'exemple suivant crée d'abord le fichier JSON de la requête, puis utilise la commande gcloud alpha bq tables create.

  1. Commencez par créer le fichier JSON de la requête:

    echo "{
    \"schema\": [
        {
          \"name\": \"ingestion_time\",
          \"type\": \"TIMESTAMP\",
          \"mode\": \"REQUIRED\"
        },
        {
          \"name\": \"application\",
          \"type\": \"STRING\",
          \"mode\": \"REQUIRED\"
        },
        {
          \"name\": \"instance\",
          \"type\": \"STRING\",
          \"mode\": \"REQUIRED\"
        },
        {
          \"name\": \"node\",
          \"type\": \"STRING\",
          \"mode\": \"REQUIRED\"
        },
        {
          \"name\": \"annotation\",
          \"type\": \"JSON\",
          \"mode\": \"REQUIRED\"
        }
    ]
    }
    " >> bigquery_schema.json
  2. Envoyez la commande gcloud. Effectuez les remplacements suivants :

    • TABLE_NAME: ID de la table ou identifiant complet de la table.

    • DATASET: ID de l'ensemble de données BigQuery.

    gcloud alpha bq tables create TABLE_NAME \
    --dataset=DATASET \
    --schema-file=./bigquery_schema.json
    

Exemples de lignes BigQuery générées par une application Vertex AI Vision:

ingestion_time application instance nœud annotation
2022-05-11 23:3211.911378 UTC my_application 5 just-one-node {"bytesFields": ["Ig1qdXN0LW9uZS1ub2RIGgE1Eg5teV9hcHBsaWNhdGlvbgjS+YnOzdj3Ag=="],"displayNames":["hello","world"],"ids":["12345","34567"]}
2022-05-11 23:3211.911338 UTC my_application 1 just-one-node {"bytesFields": ["Ig1qdXN0LW9uZS1ub2RIGgExEg5teV9hcHBsaWNhdGlvbgiq+YnOzdj3Ag=="],"displayNames":["hello","world"],"ids":["12345","34567"]}
2022-05-11 23:3211.911313 UTC my_application 4 just-one-node {"bytesFields": ["Ig1qdXN0LW9uZS1ub2RIGgE0Eg5teV9hcHBsaWNhdGlvbgiR+YnOzdj3Ag=="],"displayNames":["hello","world"],"ids":["12345","34567"]}
2022-05-11 23:3212.235327 UTC my_application 4 just-one-node {"bytesFields": ["Ig1qdXN0LW9uZS1ub2RIGgE0Eg5teV9hcHBsaWNhdGlvbgi/3J3Ozdj3Ag=="],"displayNames":["hello","world"],"ids":["12345","34567"]}

Utiliser un schéma personnalisé

Si le schéma par défaut ne convient pas à votre cas d'utilisation, vous pouvez utiliser des fonctions Cloud Run pour générer des lignes BigQuery avec un schéma défini par l'utilisateur. Si vous utilisez un schéma personnalisé, aucun prérequis n'est requis pour le schéma de la table BigQuery.

Graphique de l'application avec le nœud BigQuery sélectionné

Graphique de l'application connecté à BigQuery

Le connecteur BigQuery peut être connecté à n'importe quel modèle qui produit des annotations vidéo ou basées sur des prototypes:

  • Pour l'entrée vidéo, le connecteur BigQuery n'extrait que les données de métadonnées stockées dans l'en-tête du flux et les ingère dans BigQuery comme les autres sorties d'annotation du modèle. La vidéo elle-même n'est pas stockée.
  • Si votre flux ne contient aucune métadonnées, rien ne sera stocké dans BigQuery.

Interroger les données de la table

Avec le schéma de table BigQuery par défaut, vous pouvez effectuer des analyses puissantes une fois la table renseignée avec des données.

Exemples de requêtes

Vous pouvez utiliser les exemples de requêtes suivants dans BigQuery pour obtenir des insights à partir des modèles Vertex AI Vision.

Par exemple, vous pouvez utiliser BigQuery pour tracer une courbe basée sur le temps pour le nombre maximal de personnes détectées par minute à l'aide des données du modèle de détecteur de personnes / véhicules avec la requête suivante:

WITH
 nested3 AS(
 WITH
   nested2 AS (
   WITH
     nested AS (
     SELECT
       t.ingestion_time AS ingestion_time,
       JSON_QUERY_ARRAY(t.annotation.stats["fullFrameCount"]) AS counts
     FROM
       `PROJECT_ID.DATASET_NAME.TABLE_NAME` AS t)
   SELECT
     ingestion_time,
     e
   FROM
     nested,
     UNNEST(nested.counts) AS e)
 SELECT
   STRING(TIMESTAMP_TRUNC(nested2.ingestion_time, MINUTE, "America/Los_Angeles"),"America/Los_Angeles") AS time,
   IFNULL(INT64(nested2.e["count"]), 0) AS person_count
 FROM
   nested2
 WHERE
   JSON_VALUE(nested2.e["entity"]["labelString"])="Person")
SELECT
 time,
 MAX(person_count)
FROM
 nested3
GROUP BY
 time

De même, vous pouvez utiliser BigQuery et la fonctionnalité de comptage de la ligne de croisement du modèle d'analyse de l'occupation pour créer une requête qui compte le nombre total de véhicules qui passent la ligne de croisement par minute:

WITH
 nested4 AS (
 WITH
   nested3 AS (
   WITH
     nested2 AS (
     WITH
       nested AS (
       SELECT
         t.ingestion_time AS ingestion_time,
         JSON_QUERY_ARRAY(t.annotation.stats["crossingLineCounts"]) AS lines
       FROM
         `PROJECT_ID.DATASET_NAME.TABLE_NAME` AS t)
     SELECT
       nested.ingestion_time,
       JSON_QUERY_ARRAY(line["positiveDirectionCounts"]) AS entities
     FROM
       nested,
       UNNEST(nested.lines) AS line
     WHERE
       JSON_VALUE(line.annotation.id) = "LINE_ANNOTATION_ID")
   SELECT
     ingestion_time,
     entity
   FROM
     nested2,
     UNNEST(nested2.entities) AS entity )
 SELECT
   STRING(TIMESTAMP_TRUNC(nested3.ingestion_time, MINUTE, "America/Los_Angeles"),"America/Los_Angeles") AS time,
   IFNULL(INT64(nested3.entity["count"]), 0) AS vehicle_count
 FROM
   nested3
 WHERE
   JSON_VALUE(nested3.entity["entity"]["labelString"])="Vehicle" )
SELECT
 time,
 SUM(vehicle_count)
FROM
 nested4
GROUP BY
 time

Exécuter votre requête

Après avoir mis en forme votre requête SQL standard Google, vous pouvez utiliser la console pour l'exécuter:

Console

  1. Dans la console Google Cloud, ouvrez la page "BigQuery".

    Accéder à BigQuery

  2. Sélectionnez Développer à côté du nom de votre ensemble de données, puis sélectionnez le nom de votre table.

  3. Dans la vue d'informations de la table, cliquez sur Saisir une nouvelle requête.

    Saisir une nouvelle requête

  4. Saisissez une requête SQL standard de Google dans la zone de texte de l'éditeur de requête. Pour obtenir des exemples de requêtes, consultez la section Exemples de requêtes.

  5. Facultatif: Pour modifier l'emplacement de traitement des données, cliquez sur Plus, puis sur Paramètres de requête. Dans le champ Zone de traitement, cliquez sur Sélection automatique et choisissez l'emplacement de vos données. Cliquez ensuite sur Enregistrer pour mettre à jour les paramètres de la requête.

  6. Cliquez sur Run (Exécuter).

Cette action crée une tâche de requête qui écrit les résultats dans une table temporaire.

Intégration des fonctions Cloud Run

Vous pouvez utiliser des fonctions Cloud Run déclenchant un traitement de données supplémentaire avec votre ingestion BigQuery personnalisée. Pour utiliser des fonctions Cloud Run pour votre ingestion BigQuery personnalisée, procédez comme suit:

  • Lorsque vous utilisez la console Google Cloud, sélectionnez la fonction cloud correspondante dans le menu déroulant de chaque modèle connecté.

    image de sélection de la fonction cloud

  • Lorsque vous utilisez l'API Vertex AI Vision, ajoutez une paire clé-valeur au champ cloud_function_mapping de BigQueryConfig dans le nœud BigQuery. La clé correspond au nom du nœud BigQuery et la valeur au déclencheur HTTP de la fonction cible.

Pour utiliser des fonctions Cloud Run avec votre ingestion BigQuery personnalisée, la fonction doit remplir les conditions suivantes:

  • L'instance de fonctions Cloud Run doit être créée avant de créer le nœud BigQuery.
  • L'API Vertex AI Vision s'attend à recevoir une annotation AppendRowsRequest renvoyée par les fonctions Cloud Run.
  • Vous devez définir le champ proto_rows.writer_schema pour toutes les réponses CloudFunction. write_stream peut être ignoré.

Exemple d'intégration de fonctions Cloud Run

L'exemple suivant montre comment analyser la sortie du nœud de comptage de l'occupation (OccupancyCountPredictionResult) et en extraire un schéma de table ingestion_time, person_count et vehicle_count.

Le résultat de l'exemple suivant est une table BigQuery avec le schéma suivant:

[
  {
    "name": "ingestion_time",
    "type": "TIMESTAMP",
    "mode": "REQUIRED"
  },
  {
    "name": "person_count",
    "type": "INTEGER",
    "mode": "NULLABLE"
  },
      {
    "name": "vehicle_count",
    "type": "INTEGER",
    "mode": "NULLABLE"
  },
]

Utilisez le code suivant pour créer cette table:

  1. Définissez un proto (par exemple, test_table_schema.proto) pour les champs de table que vous souhaitez écrire:

    syntax = "proto3";
    
    package visionai.testing;
    
    message TestTableSchema {
      int64 ingestion_time = 1;
      int32 person_count = 2;
      int32 vehicle_count = 3;
    }
    
  2. Compilez le fichier proto pour générer le fichier Python du tampon de protocole:

    protoc -I=./ --python_out=./ ./test_table_schema.proto
    
  3. Importez le fichier Python généré et écrivez la fonction cloud.

    Python

    import base64
    import sys
    
    from flask import jsonify
    import functions_framework
    from google.protobuf import descriptor_pb2
    from google.protobuf.json_format import MessageToDict
    import test_table_schema_pb2
    
    def table_schema():
      schema = descriptor_pb2.DescriptorProto()
      test_table_schema_pb2.DESCRIPTOR.message_types_by_name[
          'TestTableSchema'].CopyToProto(schema)
      return schema
    
    def bigquery_append_row_request(row):
      append_row_request = {}
      append_row_request['protoRows'] = {
          'writerSchema': {
              'protoDescriptor': MessageToDict(table_schema())
          },
          'rows': {
              'serializedRows':
                  base64.b64encode(row.SerializeToString()).decode('utf-8')
          }
      }
      return append_row_request
    
    @functions_framework.http
    def hello_http(request):
      request_json = request.get_json(silent=False)
      annotations = []
      payloads = []
      if request_json and 'annotations' in request_json:
        for annotation_with_timestamp in request_json['annotations']:
          row = test_table_schema_pb2.TestTableSchema()
          row.person_count = 0
          row.vehicle_count = 0
          if 'ingestionTimeMicros' in annotation_with_timestamp:
            row.ingestion_time = int(
                annotation_with_timestamp['ingestionTimeMicros'])
          if 'annotation' in annotation_with_timestamp:
            annotation = annotation_with_timestamp['annotation']
            if 'stats' in annotation:
              stats = annotation['stats']
              for count in stats['fullFrameCount']:
                if count['entity']['labelString'] == 'Person':
                  if 'count' in count:
                    row.person_count = count['count']
                elif count['entity']['labelString'] == 'Vehicle':
                  if 'count' in count:
                    row.vehicle_count = count['count']
          payloads.append(bigquery_append_row_request(row))
      for payload in payloads:
        annotations.append({'annotation': payload})
      return jsonify(annotations=annotations)
  4. Pour inclure vos dépendances dans les fonctions Cloud Run, vous devez également importer le fichier test_table_schema_pb2.py généré et spécifier un requirements.txt semblable à celui-ci:

    functions-framework==3.*
    click==7.1.2
    cloudevents==1.2.0
    deprecation==2.1.0
    Flask==1.1.2
    gunicorn==20.0.4
    itsdangerous==1.1.0
    Jinja2==2.11.2
    MarkupSafe==1.1.1
    pathtools==0.1.2
    watchdog==1.0.2
    Werkzeug==1.0.1
    protobuf==3.12.2
    
  5. Déployez la fonction Cloud et définissez le déclencheur HTTP correspondant dans BigQueryConfig.