Conectar e armazenar dados no BigQuery

Quando você adiciona um conector do BigQuery ao seu app da Vertex AI Vision, todas as saídas do modelo do app conectado são processadas na tabela de destino.

Você pode criar sua própria tabela do BigQuery e especificar essa tabela ao adicionar um conector do BigQuery ao app ou permitir que a plataforma de apps da Vertex AI Vision crie a tabela automaticamente.

Criação automática de tabelas

Se você permitir que a plataforma de apps da Vertex AI Vision crie a tabela automaticamente, poderá especificar essa opção ao adicionar o nó do conector do BigQuery.

As condições de conjunto de dados e tabela a seguir se aplicam se você quiser usar a criação automática de tabelas:

  • Conjunto de dados: o nome do conjunto de dados criado automaticamente é visionai_dataset.
  • Tabela: o nome da tabela criada automaticamente é visionai_dataset.APPLICATION_ID.
  • Tratamento de erros:

    • Se a tabela com o mesmo nome no mesmo conjunto de dados existir, nenhuma criação automática vai acontecer.

Console

  1. Abra a guia Aplicativos do painel da Vertex AI Vision.

    Acesse a guia "Aplicativos"

  2. Selecione Ver app ao lado do nome do aplicativo na lista.

  3. Na página do criador de aplicativos, selecione BigQuery na seção Conectores.

  4. Deixe o campo Caminho do BigQuery em branco.

    Especificar o caminho da tabela em branco na interface

  5. Mude as outras configurações.

REST e LINHA DE CMD

Para permitir que a plataforma de app infira um esquema de tabela, use o campo createDefaultTableIfNotExists do BigQueryConfig ao criar ou atualizar um app.

Criar e especificar manualmente uma tabela

Se você quiser gerenciar a tabela de saída manualmente, ela precisa ter o esquema necessário como um subconjunto do esquema da tabela.

Se a tabela atual tiver esquemas incompatíveis, a implantação será rejeitada.

Usar o esquema padrão

Se você usar o esquema padrão para tabelas de saída de modelos, verifique se a tabela contém apenas as colunas obrigatórias abaixo. Você pode copiar diretamente o texto do esquema a seguir ao criar a tabela do BigQuery. Para mais detalhes sobre como criar uma tabela do BigQuery, consulte Criar e usar tabelas. Para mais informações sobre a especificação de esquema ao criar uma tabela, consulte Como especificar um esquema.

Use o texto a seguir para descrever o esquema ao criar uma tabela. Para informações sobre como usar o tipo de coluna JSON ("type": "JSON"), consulte Como trabalhar com dados JSON no SQL padrão. O tipo de coluna JSON é recomendado para consultas de anotação. Também é possível usar "type" : "STRING".

[
  {
    "name": "ingestion_time",
    "type": "TIMESTAMP",
    "mode": "REQUIRED"
  },
 {
   "name": "application",
   "type": "STRING",
   "mode": "REQUIRED"
 },
 {
   "name": "instance",
   "type": "STRING",
   "mode": "REQUIRED"
 },
 {
   "name": "node",
   "type": "STRING",
   "mode": "REQUIRED"
 },
 {
   "name": "annotation",
   "type": "JSON",
   "mode": "REQUIRED"
 }
]

Console do Google Cloud

  1. No Console do Google Cloud, acesse a página BigQuery.

    Acessar o BigQuery

  2. Selecione o projeto.

  3. Selecione Mais opções .

  4. Clique em Criar tabela.

  5. Na seção "Esquema", ative Editar como texto.

imagem do esquema padrão

gcloud

O exemplo a seguir primeiro cria o arquivo JSON da solicitação e, em seguida, usa o comando gcloud alpha bq tables create.

  1. Primeiro, crie o arquivo JSON da solicitação:

    echo "{
    \"schema\": [
        {
          \"name\": \"ingestion_time\",
          \"type\": \"TIMESTAMP\",
          \"mode\": \"REQUIRED\"
        },
        {
          \"name\": \"application\",
          \"type\": \"STRING\",
          \"mode\": \"REQUIRED\"
        },
        {
          \"name\": \"instance\",
          \"type\": \"STRING\",
          \"mode\": \"REQUIRED\"
        },
        {
          \"name\": \"node\",
          \"type\": \"STRING\",
          \"mode\": \"REQUIRED\"
        },
        {
          \"name\": \"annotation\",
          \"type\": \"JSON\",
          \"mode\": \"REQUIRED\"
        }
    ]
    }
    " >> bigquery_schema.json
  2. Envie o comando gcloud. Faça as seguintes substituições:

    • TABLE_NAME: o ID da tabela ou o identificador totalmente qualificado da tabela.

    • DATASET: o ID do conjunto de dados do BigQuery.

    gcloud alpha bq tables create TABLE_NAME \
    --dataset=DATASET \
    --schema-file=./bigquery_schema.json
    

Exemplos de linhas do BigQuery geradas por um app da Vertex AI Vision:

ingestion_time aplicativo instância anotação
2022-05-11 23:3211.911378 UTC my_application 5 just-one-node {"bytesFields": ["Ig1qdXN0LW9uZS1ub2RIGgE1Eg5teV9hcHBsaWNhdGlvbgjS+YnOzdj3Ag=="],"displayNames":["hello","world"],"ids":["12345","34567"]}
2022-05-11 23:3211.911338 UTC my_application 1 just-one-node {"bytesFields": ["Ig1qdXN0LW9uZS1ub2RIGgExEg5teV9hcHBsaWNhdGlvbgiq+YnOzdj3Ag=="],"displayNames":["hello","world"],"ids":["12345","34567"]}
2022-05-11 23:3211.911313 UTC my_application 4 just-one-node {"bytesFields": ["Ig1qdXN0LW9uZS1ub2RIGgE0Eg5teV9hcHBsaWNhdGlvbgiR+YnOzdj3Ag=="],"displayNames":["hello","world"],"ids":["12345","34567"]}
11/05/2022 23:32:12.235327 UTC my_application 4 just-one-node {"bytesFields": ["Ig1qdXN0LW9uZS1ub2RIGgE0Eg5teV9hcHBsaWNhdGlvbgi/3J3Ozdj3Ag=="],"displayNames":["hello","world"],"ids":["12345","34567"]}

Usar um esquema personalizado

Se o esquema padrão não funcionar para seu caso de uso, use as funções do Cloud Run para gerar linhas do BigQuery com um esquema definido pelo usuário. Se você usar um esquema personalizado, não haverá pré-requisito para o esquema de tabela do BigQuery.

Gráfico do app com o nó do BigQuery selecionado

Gráfico do app conectado ao BigQuery

O conector do BigQuery pode ser conectado a qualquer modelo que gere anotações em vídeo ou baseadas em proto:

  • Para a entrada de vídeo, o conector do BigQuery somente extrai os dados de metadados armazenados no cabeçalho do fluxo e os ingere no BigQuery como outras saídas de anotação de modelo. O vídeo em si não é armazenado.
  • Se o stream não tiver nenhum metadado, nada será armazenado no BigQuery.

Consultar os dados da tabela

Com o esquema de tabela padrão do BigQuery, é possível realizar análises poderosas depois que a tabela é preenchida com dados.

Amostras de consultas

Use as consultas de exemplo a seguir no BigQuery para receber insights dos modelos de visão da Vertex AI.

Por exemplo, é possível usar o BigQuery para desenhar uma curva baseada no tempo para o número máximo de pessoas detectadas por minuto usando dados do modelo de detecção de pessoas / veículos com a seguinte consulta:

WITH
 nested3 AS(
 WITH
   nested2 AS (
   WITH
     nested AS (
     SELECT
       t.ingestion_time AS ingestion_time,
       JSON_QUERY_ARRAY(t.annotation.stats["fullFrameCount"]) AS counts
     FROM
       `PROJECT_ID.DATASET_NAME.TABLE_NAME` AS t)
   SELECT
     ingestion_time,
     e
   FROM
     nested,
     UNNEST(nested.counts) AS e)
 SELECT
   STRING(TIMESTAMP_TRUNC(nested2.ingestion_time, MINUTE, "America/Los_Angeles"),"America/Los_Angeles") AS time,
   IFNULL(INT64(nested2.e["count"]), 0) AS person_count
 FROM
   nested2
 WHERE
   JSON_VALUE(nested2.e["entity"]["labelString"])="Person")
SELECT
 time,
 MAX(person_count)
FROM
 nested3
GROUP BY
 time

Da mesma forma, é possível usar o BigQuery e o recurso de contagem de linha de travessia do modelo de análise de ocupação para criar uma consulta que conte o número total de veículos que passam pela linha de travessia por minuto:

WITH
 nested4 AS (
 WITH
   nested3 AS (
   WITH
     nested2 AS (
     WITH
       nested AS (
       SELECT
         t.ingestion_time AS ingestion_time,
         JSON_QUERY_ARRAY(t.annotation.stats["crossingLineCounts"]) AS lines
       FROM
         `PROJECT_ID.DATASET_NAME.TABLE_NAME` AS t)
     SELECT
       nested.ingestion_time,
       JSON_QUERY_ARRAY(line["positiveDirectionCounts"]) AS entities
     FROM
       nested,
       UNNEST(nested.lines) AS line
     WHERE
       JSON_VALUE(line.annotation.id) = "LINE_ANNOTATION_ID")
   SELECT
     ingestion_time,
     entity
   FROM
     nested2,
     UNNEST(nested2.entities) AS entity )
 SELECT
   STRING(TIMESTAMP_TRUNC(nested3.ingestion_time, MINUTE, "America/Los_Angeles"),"America/Los_Angeles") AS time,
   IFNULL(INT64(nested3.entity["count"]), 0) AS vehicle_count
 FROM
   nested3
 WHERE
   JSON_VALUE(nested3.entity["entity"]["labelString"])="Vehicle" )
SELECT
 time,
 SUM(vehicle_count)
FROM
 nested4
GROUP BY
 time

Execute a consulta

Depois de formatar a consulta do SQL padrão do Google, use o console para executá-la:

Console

  1. No Console do Google Cloud, abra a página do BigQuery.

    Ir para o BigQuery

  2. Selecione Expandir ao lado do nome do conjunto de dados e selecione o nome da tabela.

  3. Na visualização de detalhes da tabela, clique em Escrever nova consulta.

    Escrever nova consulta

  4. Insira uma consulta SQL padrão do Google na área de texto do Editor de consultas. Para conferir exemplos de consultas, consulte exemplos de consultas.

  5. Opcional: para mudar o local de processamento de dados, clique em Mais e, em seguida, em Configurações de consulta. Em Local de processamento, clique em Seleção automática e escolha o local dos dados. Por fim, clique em Salvar para atualizar as configurações da consulta.

  6. Clique em Executar.

Isso cria um job de consulta que grava a saída em uma tabela temporária.

Integração do Cloud Run functions

É possível usar funções do Cloud Run para acionar o processamento de dados com a ingestão personalizada do BigQuery. Para usar as funções do Cloud Run na ingestão personalizada do BigQuery, faça o seguinte:

  • Ao usar o console do Google Cloud, selecione a função do Cloud correspondente no menu suspenso de cada modelo conectado.

    selecionar a imagem da função do Cloud

  • Ao usar a API Vertex AI Vision, adicione um par de chave-valor ao campo cloud_function_mapping de BigQueryConfig no nó do BigQuery. A chave é o nome do nó do BigQuery, e o valor é o acionador HTTP da função de destino.

Para usar as funções do Cloud Run com a transferência personalizada do BigQuery, a função precisa atender aos seguintes requisitos:

  • A instância do Cloud Run functions precisa ser criada antes de você criar o nó do BigQuery.
  • A API Vertex AI Vision espera receber uma anotação AppendRowsRequest retornada pelas funções do Cloud Run.
  • Defina o campo proto_rows.writer_schema para todas as respostas CloudFunction. O write_stream pode ser ignorado.

Exemplo de integração do Cloud Run functions

O exemplo a seguir mostra como analisar a saída do nó de contagem de ocupação (OccupancyCountPredictionResult) e extrair um esquema de tabela ingestion_time, person_count e vehicle_count.

O resultado do exemplo a seguir é uma tabela do BigQuery com o esquema:

[
  {
    "name": "ingestion_time",
    "type": "TIMESTAMP",
    "mode": "REQUIRED"
  },
  {
    "name": "person_count",
    "type": "INTEGER",
    "mode": "NULLABLE"
  },
      {
    "name": "vehicle_count",
    "type": "INTEGER",
    "mode": "NULLABLE"
  },
]

Use o seguinte código para criar esta tabela:

  1. Defina um proto (por exemplo, test_table_schema.proto) para os campos de tabela que você quer gravar:

    syntax = "proto3";
    
    package visionai.testing;
    
    message TestTableSchema {
      int64 ingestion_time = 1;
      int32 person_count = 2;
      int32 vehicle_count = 3;
    }
    
  2. Compilar o arquivo proto para gerar o arquivo Python do buffer de protocolo:

    protoc -I=./ --python_out=./ ./test_table_schema.proto
    
  3. Importe o arquivo Python gerado e escreva a função do Cloud.

    Python

    import base64
    import sys
    
    from flask import jsonify
    import functions_framework
    from google.protobuf import descriptor_pb2
    from google.protobuf.json_format import MessageToDict
    import test_table_schema_pb2
    
    def table_schema():
      schema = descriptor_pb2.DescriptorProto()
      test_table_schema_pb2.DESCRIPTOR.message_types_by_name[
          'TestTableSchema'].CopyToProto(schema)
      return schema
    
    def bigquery_append_row_request(row):
      append_row_request = {}
      append_row_request['protoRows'] = {
          'writerSchema': {
              'protoDescriptor': MessageToDict(table_schema())
          },
          'rows': {
              'serializedRows':
                  base64.b64encode(row.SerializeToString()).decode('utf-8')
          }
      }
      return append_row_request
    
    @functions_framework.http
    def hello_http(request):
      request_json = request.get_json(silent=False)
      annotations = []
      payloads = []
      if request_json and 'annotations' in request_json:
        for annotation_with_timestamp in request_json['annotations']:
          row = test_table_schema_pb2.TestTableSchema()
          row.person_count = 0
          row.vehicle_count = 0
          if 'ingestionTimeMicros' in annotation_with_timestamp:
            row.ingestion_time = int(
                annotation_with_timestamp['ingestionTimeMicros'])
          if 'annotation' in annotation_with_timestamp:
            annotation = annotation_with_timestamp['annotation']
            if 'stats' in annotation:
              stats = annotation['stats']
              for count in stats['fullFrameCount']:
                if count['entity']['labelString'] == 'Person':
                  if 'count' in count:
                    row.person_count = count['count']
                elif count['entity']['labelString'] == 'Vehicle':
                  if 'count' in count:
                    row.vehicle_count = count['count']
          payloads.append(bigquery_append_row_request(row))
      for payload in payloads:
        annotations.append({'annotation': payload})
      return jsonify(annotations=annotations)
  4. Para incluir suas dependências nas funções do Cloud Run, também é necessário fazer upload do arquivo test_table_schema_pb2.py gerado e especificar requirements.txt de forma semelhante a esta:

    functions-framework==3.*
    click==7.1.2
    cloudevents==1.2.0
    deprecation==2.1.0
    Flask==1.1.2
    gunicorn==20.0.4
    itsdangerous==1.1.0
    Jinja2==2.11.2
    MarkupSafe==1.1.1
    pathtools==0.1.2
    watchdog==1.0.2
    Werkzeug==1.0.1
    protobuf==3.12.2
    
  5. Implante a função do Cloud e defina o acionador HTTP correspondente no BigQueryConfig.