BigQuery에 데이터 연결 및 저장

Vertex AI Vision 앱에 BigQuery 커넥터를 추가하면 연결된 모든 앱 모델 출력이 대상 테이블에 처리됩니다.

앱에 BigQuery 커넥터를 추가할 때 자체 BigQuery 테이블을 만들고 해당 테이블을 지정하거나 Vertex AI Vision 앱 플랫폼에서 테이블을 자동으로 만들도록 할 수 있습니다.

자동 테이블 생성

Vertex AI Vision 앱 플랫폼에서 테이블을 자동으로 만들도록 허용하는 경우 BigQuery 커넥터 노드를 추가할 때 이 옵션을 지정할 수 있습니다.

자동 테이블 생성을 사용하려면 다음 데이터 세트 및 테이블 조건이 적용됩니다.

  • 데이터 세트: 자동으로 생성된 데이터 세트 이름은 visionai_dataset입니다.
  • 테이블: 자동으로 생성된 테이블 이름은 visionai_dataset.APPLICATION_ID입니다.
  • 오류 처리:

    • 동일한 데이터 세트에 동일한 이름의 테이블이 있는 경우 자동 생성되지 않습니다.

콘솔

  1. Vertex AI Vision 대시보드의 애플리케이션 탭을 엽니다.

    애플리케이션 탭으로 이동

  2. 목록에서 애플리케이션 이름 옆에 있는 앱 보기를 선택합니다.

  3. 애플리케이션 빌더 페이지의 커넥터 섹션에서 BigQuery를 선택합니다.

  4. BigQuery 경로 필드는 비워둡니다.

    UI에서 테이블 경로를 비워 두도록 지정

  5. 다른 설정을 변경합니다.

REST 및 명령줄

앱 플랫폼에서 테이블 스키마를 추론하도록 하려면 앱을 만들거나 업데이트할 때 BigQueryConfigcreateDefaultTableIfNotExists 필드를 사용하세요.

테이블을 수동으로 만들고 지정

출력 테이블을 수동으로 관리하려면 테이블에 필요한 스키마가 테이블 스키마의 하위 집합으로 포함되어야 합니다.

기존 테이블에 호환되지 않는 스키마가 있으면 배포가 거부됩니다.

기본 스키마 사용

모델 출력 테이블에 기본 스키마를 사용하는 경우 테이블에 다음과 같은 필수 열만 포함되어 있는지 확인합니다. BigQuery 테이블을 만들 때 다음 스키마 텍스트를 직접 복사할 수 있습니다. BigQuery 테이블 만들기에 관한 자세한 내용은 테이블 만들기 및 사용을 참고하세요. 테이블을 만들 때의 스키마 지정에 관한 자세한 내용은 스키마 지정을 참고하세요.

테이블을 만들 때 다음 텍스트를 사용하여 스키마를 설명합니다. JSON 열 유형("type": "JSON") 사용에 관한 자세한 내용은 표준 SQL에서 JSON 데이터 작업을 참고하세요. 주석 쿼리에는 JSON 열 유형을 사용하는 것이 좋습니다. "type" : "STRING"도 사용할 수 있습니다.

[
  {
    "name": "ingestion_time",
    "type": "TIMESTAMP",
    "mode": "REQUIRED"
  },
 {
   "name": "application",
   "type": "STRING",
   "mode": "REQUIRED"
 },
 {
   "name": "instance",
   "type": "STRING",
   "mode": "REQUIRED"
 },
 {
   "name": "node",
   "type": "STRING",
   "mode": "REQUIRED"
 },
 {
   "name": "annotation",
   "type": "JSON",
   "mode": "REQUIRED"
 }
]

Google Cloud 콘솔

  1. Google Cloud 콘솔에서 BigQuery 페이지로 이동합니다.

    BigQuery로 이동

  2. 프로젝트를 선택합니다.

  3. 옵션 더보기 를 선택합니다.

  4. 테이블 만들기를 클릭합니다.

  5. '스키마' 섹션에서 텍스트로 수정을 사용 설정합니다.

기본 스키마 이미지

gcloud

다음 예에서는 먼저 요청 JSON 파일을 만든 다음 gcloud alpha bq tables create 명령어를 사용합니다.

  1. 먼저 요청 JSON 파일을 만듭니다.

    echo "{
    \"schema\": [
        {
          \"name\": \"ingestion_time\",
          \"type\": \"TIMESTAMP\",
          \"mode\": \"REQUIRED\"
        },
        {
          \"name\": \"application\",
          \"type\": \"STRING\",
          \"mode\": \"REQUIRED\"
        },
        {
          \"name\": \"instance\",
          \"type\": \"STRING\",
          \"mode\": \"REQUIRED\"
        },
        {
          \"name\": \"node\",
          \"type\": \"STRING\",
          \"mode\": \"REQUIRED\"
        },
        {
          \"name\": \"annotation\",
          \"type\": \"JSON\",
          \"mode\": \"REQUIRED\"
        }
    ]
    }
    " >> bigquery_schema.json
  2. gcloud 명령어를 전송합니다. 다음을 바꿉니다.

    • TABLE_NAME: 테이블의 ID 또는 테이블의 정규화된 식별자입니다.

    • DATASET: BigQuery 데이터 세트의 ID입니다.

    gcloud alpha bq tables create TABLE_NAME \
    --dataset=DATASET \
    --schema-file=./bigquery_schema.json
    

Vertex AI Vision 앱에서 생성된 샘플 BigQuery 행:

ingestion_time 애플리케이션 인스턴스 노드 주석
2022-05-11 23:3211.911378 UTC my_application 5 just-one-node {"bytesFields": ["Ig1qdXN0LW9uZS1ub2RIGgE1Eg5teV9hcHBsaWNhdGlvbgjS+YnOzdj3Ag=="],"displayNames":["hello","world"],"ids":["12345","34567"]}
2022-05-11 23:3211.911338 UTC my_application 1 just-one-node {"bytesFields": ["Ig1qdXN0LW9uZS1ub2RIGgExEg5teV9hcHBsaWNhdGlvbgiq+YnOzdj3Ag=="],"displayNames":["hello","world"],"ids":["12345","34567"]}
2022-05-11 23:3211.911313 UTC my_application 4 just-one-node {"bytesFields": ["Ig1qdXN0LW9uZS1ub2RIGgE0Eg5teV9hcHBsaWNhdGlvbgiR+YnOzdj3Ag=="],"displayNames":["hello","world"],"ids":["12345","34567"]}
2022-05-11 23:3212.235327 UTC my_application 4 just-one-node {"bytesFields": ["Ig1qdXN0LW9uZS1ub2RIGgE0Eg5teV9hcHBsaWNhdGlvbgi/3J3Ozdj3Ag=="],"displayNames":["hello","world"],"ids":["12345","34567"]}

맞춤설정된 스키마 사용

사용 사례에 기본 스키마가 적합하지 않은 경우 Cloud Run 함수를 사용하여 사용자 정의 스키마로 BigQuery 행을 생성할 수 있습니다. 맞춤 스키마를 사용하는 경우 BigQuery 테이블 스키마에 대한 기본 요건이 없습니다.

BigQuery 노드가 선택된 앱 그래프

BigQuery에 연결된 앱 그래프

BigQuery 커넥터는 동영상 또는 프로토 기반 주석을 출력하는 모든 모델에 연결할 수 있습니다.

  • 동영상 입력의 경우 BigQuery 커넥터는 스트림 헤더에 저장된 메타데이터 데이터만 추출하고 이 데이터를 다른 모델 주석 출력으로 BigQuery에 처리합니다. 동영상 자체는 저장되지 않습니다.
  • 스트림에 메타데이터가 없으면 BigQuery에 아무것도 저장되지 않습니다.

테이블 데이터 쿼리

기본 BigQuery 테이블 스키마를 사용하면 테이블에 데이터가 채워진 후 강력한 분석을 실행할 수 있습니다.

샘플 쿼리

BigQuery에서 다음 샘플 쿼리를 사용하여 Vertex AI Vision 모델에서 유용한 정보를 얻을 수 있습니다.

예를 들어 다음 쿼리를 사용하여 BigQuery에서 사람 / 차량 감지기 모델의 데이터를 사용하여 분당 감지된 최대 인원 수에 관한 시간 기반 곡선을 그릴 수 있습니다.

WITH
 nested3 AS(
 WITH
   nested2 AS (
   WITH
     nested AS (
     SELECT
       t.ingestion_time AS ingestion_time,
       JSON_QUERY_ARRAY(t.annotation.stats["fullFrameCount"]) AS counts
     FROM
       `PROJECT_ID.DATASET_NAME.TABLE_NAME` AS t)
   SELECT
     ingestion_time,
     e
   FROM
     nested,
     UNNEST(nested.counts) AS e)
 SELECT
   STRING(TIMESTAMP_TRUNC(nested2.ingestion_time, MINUTE, "America/Los_Angeles"),"America/Los_Angeles") AS time,
   IFNULL(INT64(nested2.e["count"]), 0) AS person_count
 FROM
   nested2
 WHERE
   JSON_VALUE(nested2.e["entity"]["labelString"])="Person")
SELECT
 time,
 MAX(person_count)
FROM
 nested3
GROUP BY
 time

마찬가지로 BigQuery와 점유 분석 모델의 교차선 개수 기능을 사용하여 분당 교차선을 통과하는 총 차량 수를 집계하는 쿼리를 만들 수 있습니다.

WITH
 nested4 AS (
 WITH
   nested3 AS (
   WITH
     nested2 AS (
     WITH
       nested AS (
       SELECT
         t.ingestion_time AS ingestion_time,
         JSON_QUERY_ARRAY(t.annotation.stats["crossingLineCounts"]) AS lines
       FROM
         `PROJECT_ID.DATASET_NAME.TABLE_NAME` AS t)
     SELECT
       nested.ingestion_time,
       JSON_QUERY_ARRAY(line["positiveDirectionCounts"]) AS entities
     FROM
       nested,
       UNNEST(nested.lines) AS line
     WHERE
       JSON_VALUE(line.annotation.id) = "LINE_ANNOTATION_ID")
   SELECT
     ingestion_time,
     entity
   FROM
     nested2,
     UNNEST(nested2.entities) AS entity )
 SELECT
   STRING(TIMESTAMP_TRUNC(nested3.ingestion_time, MINUTE, "America/Los_Angeles"),"America/Los_Angeles") AS time,
   IFNULL(INT64(nested3.entity["count"]), 0) AS vehicle_count
 FROM
   nested3
 WHERE
   JSON_VALUE(nested3.entity["entity"]["labelString"])="Vehicle" )
SELECT
 time,
 SUM(vehicle_count)
FROM
 nested4
GROUP BY
 time

쿼리 실행

Google 표준 SQL 쿼리의 형식을 지정한 후 콘솔을 사용하여 쿼리를 실행할 수 있습니다.

콘솔

  1. Google Cloud 콘솔에서 BigQuery 페이지를 엽니다.

    BigQuery로 이동

  2. 데이터 세트 이름 옆에 있는 펼치기를 선택하고 테이블 이름을 선택합니다.

  3. 테이블 세부정보 보기에서 새 쿼리 작성을 클릭합니다.

    새 쿼리 작성

  4. 쿼리 편집기 텍스트 영역에 Google 표준 SQL 쿼리를 입력합니다. 쿼리 예시는 샘플 쿼리를 참고하세요.

  5. 선택사항: 데이터 처리 위치를 변경하려면 더보기, 쿼리 설정을 차례로 클릭합니다. 처리 위치에서 자동 선택을 클릭하고 데이터의 위치를 선택합니다. 마지막으로 저장을 클릭하여 쿼리 설정을 업데이트합니다.

  6. 실행을 클릭합니다.

그러면 임시 테이블에 출력을 쓰는 쿼리 작업이 생성됩니다.

Cloud Run 함수 통합

맞춤설정된 BigQuery 처리로 추가 데이터 처리를 트리거하는 Cloud Run 함수를 사용할 수 있습니다. 맞춤설정된 BigQuery 처리에 Cloud Run 함수를 사용하려면 다음 단계를 따르세요.

  • Google Cloud 콘솔을 사용하는 경우 연결된 각 모델의 드롭다운 메뉴에서 해당하는 Cloud 함수를 선택합니다.

    Cloud 함수 이미지 선택

  • Vertex AI Vision API를 사용할 때는 BigQuery 노드의 BigQueryConfig cloud_function_mapping 필드에 키-값 쌍을 하나 추가합니다. 키는 BigQuery 노드 이름이고 값은 타겟 함수의 HTTP 트리거입니다.

맞춤설정된 BigQuery 처리와 함께 Cloud Run 함수를 사용하려면 함수가 다음 요구사항을 충족해야 합니다.

  • BigQuery 노드를 만들기 전에 Cloud Run 함수 인스턴스를 만들어야 합니다.
  • Vertex AI Vision API는 Cloud Run 함수에서 반환된 AppendRowsRequest 주석을 수신할 것으로 예상합니다.
  • 모든 CloudFunction 응답에 proto_rows.writer_schema 필드를 설정해야 합니다. write_stream는 무시해도 됩니다.

Cloud Run 함수 통합 예

다음 예에서는 숙박 인원 노드 출력(OccupancyCountPredictionResult)을 파싱하고 여기에서 ingestion_time, person_count, vehicle_count 테이블 스키마를 추출하는 방법을 보여줍니다.

다음 샘플의 결과는 다음과 같은 스키마가 있는 BigQuery 테이블입니다.

[
  {
    "name": "ingestion_time",
    "type": "TIMESTAMP",
    "mode": "REQUIRED"
  },
  {
    "name": "person_count",
    "type": "INTEGER",
    "mode": "NULLABLE"
  },
      {
    "name": "vehicle_count",
    "type": "INTEGER",
    "mode": "NULLABLE"
  },
]

다음 코드를 사용하여 이 테이블을 만듭니다.

  1. 쓰려는 테이블 필드의 proto (예: test_table_schema.proto)를 정의합니다.

    syntax = "proto3";
    
    package visionai.testing;
    
    message TestTableSchema {
      int64 ingestion_time = 1;
      int32 person_count = 2;
      int32 vehicle_count = 3;
    }
    
  2. proto 파일을 컴파일하여 프로토콜 버퍼 Python 파일을 생성합니다.

    protoc -I=./ --python_out=./ ./test_table_schema.proto
    
  3. 생성된 Python 파일을 가져오고 Cloud 함수를 작성합니다.

    Python

    import base64
    import sys
    
    from flask import jsonify
    import functions_framework
    from google.protobuf import descriptor_pb2
    from google.protobuf.json_format import MessageToDict
    import test_table_schema_pb2
    
    def table_schema():
      schema = descriptor_pb2.DescriptorProto()
      test_table_schema_pb2.DESCRIPTOR.message_types_by_name[
          'TestTableSchema'].CopyToProto(schema)
      return schema
    
    def bigquery_append_row_request(row):
      append_row_request = {}
      append_row_request['protoRows'] = {
          'writerSchema': {
              'protoDescriptor': MessageToDict(table_schema())
          },
          'rows': {
              'serializedRows':
                  base64.b64encode(row.SerializeToString()).decode('utf-8')
          }
      }
      return append_row_request
    
    @functions_framework.http
    def hello_http(request):
      request_json = request.get_json(silent=False)
      annotations = []
      payloads = []
      if request_json and 'annotations' in request_json:
        for annotation_with_timestamp in request_json['annotations']:
          row = test_table_schema_pb2.TestTableSchema()
          row.person_count = 0
          row.vehicle_count = 0
          if 'ingestionTimeMicros' in annotation_with_timestamp:
            row.ingestion_time = int(
                annotation_with_timestamp['ingestionTimeMicros'])
          if 'annotation' in annotation_with_timestamp:
            annotation = annotation_with_timestamp['annotation']
            if 'stats' in annotation:
              stats = annotation['stats']
              for count in stats['fullFrameCount']:
                if count['entity']['labelString'] == 'Person':
                  if 'count' in count:
                    row.person_count = count['count']
                elif count['entity']['labelString'] == 'Vehicle':
                  if 'count' in count:
                    row.vehicle_count = count['count']
          payloads.append(bigquery_append_row_request(row))
      for payload in payloads:
        annotations.append({'annotation': payload})
      return jsonify(annotations=annotations)
  4. Cloud Run 함수에 종속 항목을 포함하려면 생성된 test_table_schema_pb2.py 파일을 업로드하고 다음과 같이 requirements.txt를 지정해야 합니다.

    functions-framework==3.*
    click==7.1.2
    cloudevents==1.2.0
    deprecation==2.1.0
    Flask==1.1.2
    gunicorn==20.0.4
    itsdangerous==1.1.0
    Jinja2==2.11.2
    MarkupSafe==1.1.1
    pathtools==0.1.2
    watchdog==1.0.2
    Werkzeug==1.0.1
    protobuf==3.12.2
    
  5. Cloud 함수를 배포하고 BigQueryConfig에서 상응하는 HTTP 트리거를 설정합니다.