Connect and store data to BigQuery

When you add a BigQuery connector to your Vertex AI Vision app all the connected app model outputs will be ingested to the target table.

You can either create your own BigQuery table and specify that table when you add a BigQuery connector to the app, or let the Vertex AI Vision app platform automatically create the table.

Automatic table creation

If you let Vertex AI Vision app platform automatically create the table, you can specify this option when you add the BigQuery connector node.

The following dataset and table conditions apply if you want to use automatic table creation:

  • Dataset: The automatically created dataset name is visionai_dataset.
  • Table: The automatically created table name is visionai_dataset.APPLICATION_ID.
  • Error handling:

    • If the table with the same name under the same dataset exists, no automatic creation happens.

Console

  1. Open the Applications tab of the Vertex AI Vision dashboard.

    Go to the Applications tab

  2. Select View app next to the name of your application from the list.

  3. On the application builder page select BigQuery from the Connectors section.

  4. Leave the BigQuery path field empty.

    Specify table path left blank in UI

  5. Change any other settings.

REST & CMD LINE

To let the app platform infer a table schema, use the createDefaultTableIfNotExists field of the BigQueryConfig when you create or update an app.

Manually create and specify a table

If you want to manage your output table manually, the table must have the required schema as a subset of the table schema.

If the existing table has incompatible schemas, the deployment is rejected.

Use the default schema

If you use the default schema for model output tables, make sure your table only contains the following required columns in the table. You can directly copy the following schema text when you create the BigQuery table. For more detailed information about creating a BigQuery table, see Create and use tables. For more information about schema specification when you create a table, see Specifying a schema.

Use the following text to describe the schema when you create a table. For information on using the JSON column type ("type": "JSON"), see Working with JSON data in Standard SQL. The JSON column type is recommended for annotation query. You can also use "type" : "STRING".

[
  {
    "name": "ingestion_time",
    "type": "TIMESTAMP",
    "mode": "REQUIRED"
  },
 {
   "name": "application",
   "type": "STRING",
   "mode": "REQUIRED"
 },
 {
   "name": "instance",
   "type": "STRING",
   "mode": "REQUIRED"
 },
 {
   "name": "node",
   "type": "STRING",
   "mode": "REQUIRED"
 },
 {
   "name": "annotation",
   "type": "JSON",
   "mode": "REQUIRED"
 }
]

Google Cloud console

  1. In the Google Cloud console, go to the BigQuery page.

    Go to BigQuery

  2. Select your project.

  3. Select more options .

  4. Click Create table.

  5. In the "Schema" section, enable Edit as text.

default schema image

gcloud

The following example first creates the request JSON file, then uses the gcloud alpha bq tables create command.

  1. First create the request JSON file:

    echo "{
    \"schema\": [
        {
          \"name\": \"ingestion_time\",
          \"type\": \"TIMESTAMP\",
          \"mode\": \"REQUIRED\"
        },
        {
          \"name\": \"application\",
          \"type\": \"STRING\",
          \"mode\": \"REQUIRED\"
        },
        {
          \"name\": \"instance\",
          \"type\": \"STRING\",
          \"mode\": \"REQUIRED\"
        },
        {
          \"name\": \"node\",
          \"type\": \"STRING\",
          \"mode\": \"REQUIRED\"
        },
        {
          \"name\": \"annotation\",
          \"type\": \"JSON\",
          \"mode\": \"REQUIRED\"
        }
    ]
    }
    " >> bigquery_schema.json
  2. Send the gcloud command. Make the following replacements:

    • TABLE_NAME: The ID of the table or fully qualified identifier for the table.

    • DATASET: The id of the BigQuery dataset.

    gcloud alpha bq tables create TABLE_NAME \
    --dataset=DATASET \
    --schema-file=./bigquery_schema.json
    

Sample BigQuery rows generated by a Vertex AI Vision app:

ingestion_time application instance node annotation
2022-05-11 23:3211.911378 UTC my_application 5 just-one-node {"bytesFields": ["Ig1qdXN0LW9uZS1ub2RIGgE1Eg5teV9hcHBsaWNhdGlvbgjS+YnOzdj3Ag=="],"displayNames":["hello","world"],"ids":["12345","34567"]}
2022-05-11 23:3211.911338 UTC my_application 1 just-one-node {"bytesFields": ["Ig1qdXN0LW9uZS1ub2RIGgExEg5teV9hcHBsaWNhdGlvbgiq+YnOzdj3Ag=="],"displayNames":["hello","world"],"ids":["12345","34567"]}
2022-05-11 23:3211.911313 UTC my_application 4 just-one-node {"bytesFields": ["Ig1qdXN0LW9uZS1ub2RIGgE0Eg5teV9hcHBsaWNhdGlvbgiR+YnOzdj3Ag=="],"displayNames":["hello","world"],"ids":["12345","34567"]}
2022-05-11 23:3212.235327 UTC my_application 4 just-one-node {"bytesFields": ["Ig1qdXN0LW9uZS1ub2RIGgE0Eg5teV9hcHBsaWNhdGlvbgi/3J3Ozdj3Ag=="],"displayNames":["hello","world"],"ids":["12345","34567"]}

Use a customized schema

If the default schema doesn't work for your use case, you can use Cloud Functions to generate BigQuery rows with a user-defined schema. If you use a custom schema there is no prerequisite for the BigQuery table schema.

App graph with BigQuery node selected

app graph connected to bigquery

The BigQuery connector can be connected to any model that outputs video or proto-based annotation:

  • For video input, the BigQuery connector only extracts the metadata data stored in the stream header and ingests this data to BigQuery as other model annotation outputs. The video itself is not stored.
  • If your stream contains no metadata, nothing will be stored to BigQuery.

Query table data

With the default BigQuery table schema, you can perform powerful analysis after the table is populated with data.

Sample queries

You can use the following sample queries in BigQuery to gain insight from Vertex AI Vision models.

For example, you can use BigQuery to draw a time-based curve for maximum number of detected people per minute using data from the Person / vehicle detector model with the following query:

WITH
 nested3 AS(
 WITH
   nested2 AS (
   WITH
     nested AS (
     SELECT
       t.ingestion_time AS ingestion_time,
       JSON_QUERY_ARRAY(t.annotation.stats["fullFrameCount"]) AS counts
     FROM
       `PROJECT_ID.DATASET_NAME.TABLE_NAME` AS t)
   SELECT
     ingestion_time,
     e
   FROM
     nested,
     UNNEST(nested.counts) AS e)
 SELECT
   STRING(TIMESTAMP_TRUNC(nested2.ingestion_time, MINUTE, "America/Los_Angeles"),"America/Los_Angeles") AS time,
   IFNULL(INT64(nested2.e["count"]), 0) AS person_count
 FROM
   nested2
 WHERE
   JSON_VALUE(nested2.e["entity"]["labelString"])="Person")
SELECT
 time,
 MAX(person_count)
FROM
 nested3
GROUP BY
 time

Similarly, you can use BigQuery and the crossing line count feature of the Occupancy analytics model to create a query that counts the total number of vehicles that pass the crossing line per minute:

WITH
 nested4 AS (
 WITH
   nested3 AS (
   WITH
     nested2 AS (
     WITH
       nested AS (
       SELECT
         t.ingestion_time AS ingestion_time,
         JSON_QUERY_ARRAY(t.annotation.stats["crossingLineCounts"]) AS lines
       FROM
         `PROJECT_ID.DATASET_NAME.TABLE_NAME` AS t)
     SELECT
       nested.ingestion_time,
       JSON_QUERY_ARRAY(line["positiveDirectionCounts"]) AS entities
     FROM
       nested,
       UNNEST(nested.lines) AS line
     WHERE
       JSON_VALUE(line.annotation.id) = "LINE_ANNOTATION_ID")
   SELECT
     ingestion_time,
     entity
   FROM
     nested2,
     UNNEST(nested2.entities) AS entity )
 SELECT
   STRING(TIMESTAMP_TRUNC(nested3.ingestion_time, MINUTE, "America/Los_Angeles"),"America/Los_Angeles") AS time,
   IFNULL(INT64(nested3.entity["count"]), 0) AS vehicle_count
 FROM
   nested3
 WHERE
   JSON_VALUE(nested3.entity["entity"]["labelString"])="Vehicle" )
SELECT
 time,
 SUM(vehicle_count)
FROM
 nested4
GROUP BY
 time

Run your query

After you format your Google Standard SQL query, you can use the console to run your query:

Console

  1. In the Google Cloud console, open the BigQuery page.

    Go to BigQuery

  2. Select Expand next to you dataset name, and select your table name.

  3. In the table detail view, click Compose new query.

    Compose new query

  4. Enter a Google Standard SQL query in the Query editor text area. For example queries, see sample queries.

  5. Optional: To change the data processing location, click More, then Query settings. Under Processing location, click Auto-select and choose your data's location. Finally, click Save to update the query settings.

  6. Click Run.

This creates a query job that writes the output to a temporary table.

Cloud Functions integration

You can use Cloud Functions triggering additional data processing with your customized BigQuery ingestion. To use Cloud Functions for your customized BigQuery ingestion, do the following:

  • When using the Google Cloud console, select the corresponding cloud function from the dropdown menu of each connected model.

    select cloud function image

  • When using the Vertex AI Vision API, add one key-value pair to the cloud_function_mapping field of BigQueryConfig in the BigQuery node. The key is the BigQuery node name and value is the http trigger of the target function.

To use Cloud Functions with your customized BigQuery ingestion, the function must meet the following requirements:

  • The Cloud Functions instance has to be created before you create the BigQuery node.
  • Vertex AI Vision API expects to receive an AppendRowsRequest annotation returned from Cloud Functions.
  • You must set the proto_rows.writer_schema field for all CloudFunction responses ; write_stream can be ignored.

Cloud Functions integration example

The following example shows how to parse occupancy count node output (OccupancyCountPredictionResult), and extract from it an ingestion_time, person_count, and vehicle_count table schema.

The result of the following sample is a BigQuery table with the schema:

[
  {
    "name": "ingestion_time",
    "type": "TIMESTAMP",
    "mode": "REQUIRED"
  },
  {
    "name": "person_count",
    "type": "INTEGER",
    "mode": "NULLABLE"
  },
      {
    "name": "vehicle_count",
    "type": "INTEGER",
    "mode": "NULLABLE"
  },
]

Use the following code to create this table:

  1. Define a proto (for example, test_table_schema.proto) for table fields you want to write:

    syntax = "proto3";
    
    package visionai.testing;
    
    message TestTableSchema {
      int64 ingestion_time = 1;
      int32 person_count = 2;
      int32 vehicle_count = 3;
    }
    
  2. Compile the proto file to generate the protocol buffer Python file:

    protoc -I=./ --python_out=./ ./test_table_schema.proto
    
  3. Import the generated Python file and write the cloud function.

    Python

    import base64
    import sys
    
    from flask import jsonify
    import functions_framework
    from google.protobuf import descriptor_pb2
    from google.protobuf.json_format import MessageToDict
    import test_table_schema_pb2
    
    def table_schema():
      schema = descriptor_pb2.DescriptorProto()
      test_table_schema_pb2.DESCRIPTOR.message_types_by_name[
          'TestTableSchema'].CopyToProto(schema)
      return schema
    
    def bigquery_append_row_request(row):
      append_row_request = {}
      append_row_request['protoRows'] = {
          'writerSchema': {
              'protoDescriptor': MessageToDict(table_schema())
          },
          'rows': {
              'serializedRows':
                  base64.b64encode(row.SerializeToString()).decode('utf-8')
          }
      }
      return append_row_request
    
    @functions_framework.http
    def hello_http(request):
      request_json = request.get_json(silent=False)
      annotations = []
      payloads = []
      if request_json and 'annotations' in request_json:
        for annotation_with_timestamp in request_json['annotations']:
          row = test_table_schema_pb2.TestTableSchema()
          row.person_count = 0
          row.vehicle_count = 0
          if 'ingestionTimeMicros' in annotation_with_timestamp:
            row.ingestion_time = int(
                annotation_with_timestamp['ingestionTimeMicros'])
          if 'annotation' in annotation_with_timestamp:
            annotation = annotation_with_timestamp['annotation']
            if 'stats' in annotation:
              stats = annotation['stats']
              for count in stats['fullFrameCount']:
                if count['entity']['labelString'] == 'Person':
                  if 'count' in count:
                    row.person_count = count['count']
                elif count['entity']['labelString'] == 'Vehicle':
                  if 'count' in count:
                    row.vehicle_count = count['count']
          payloads.append(bigquery_append_row_request(row))
      for payload in payloads:
        annotations.append({'annotation': payload})
      return jsonify(annotations=annotations)
  4. To include your dependencies in Cloud Functions, you must also upload the generated test_table_schema_pb2.py file and specify requirements.txt similar to the following:

    functions-framework==3.*
    click==7.1.2
    cloudevents==1.2.0
    deprecation==2.1.0
    Flask==1.1.2
    gunicorn==20.0.4
    itsdangerous==1.1.0
    Jinja2==2.11.2
    MarkupSafe==1.1.1
    pathtools==0.1.2
    watchdog==1.0.2
    Werkzeug==1.0.1
    protobuf==3.12.2
    
  5. Deploy the cloud function and set the corresponding http trigger in the BigQueryConfig.