특성 값 일괄 가져오기

일괄 가져오기를 사용하면 적합한 데이터 소스에서 대량으로 특성값을 가져올 수 있습니다. 일괄 가져오기 요청으로 항목 유형에 대해 최대 100개까지 특성 값을 가져올 수 있습니다. 충돌을 방지하기 위해 항목 유형당 일괄 가져오기 작업을 하나만 실행할 수 있습니다.

일괄 가져오기 요청에서 소스 데이터의 위치와 피처스토어의 특성에 매핑되는 방법을 지정합니다. 각 일괄 가져오기 요청은 단일 항목 유형에 대한 것이므로 소스 데이터도 단일 항목 유형에 대한 것이어야 합니다.

가져오기가 성공적으로 완료되면 이후 읽기 작업에서 특성 값을 사용할 수 있습니다.

가져오기 작업 성능

Vertex AI Feature Store(기존)는 높은 처리량 가져오기를 제공하지만 최소 지연 시간은 몇 분 정도 걸릴 수 있습니다. Vertex AI Feature Store(기존)에 대한 각 요청은 과제물을 완료하기 위한 작업을 시작합니다. 단일 기록을 가져오더라도 가져오기 작업을 완료하는 데 몇 분 정도 걸립니다.

작업 성능을 조정하려면 다음 두 변수를 변경하세요.

  • 피처스토어 온라인 서빙 노드 수
  • 가져오기 작업에 사용된 작업자 수. 작업자는 데이터를 처리하고 피처스토어에 데이터를 씁니다.

피처스토어의 온라인 서빙 노드 10개마다 작업자 1개를 사용하는 것이 좋습니다. 온라인 서빙 부하가 낮으면 더 높일 수 있습니다. 최대 100명의 작업자를 지정할 수 있습니다. 자세한 내용은 일괄 가져오기 최적화를 위한 리소스 모니터링 및 조정을 참조하세요.

온라인 서빙 클러스터가 프로비저닝되지 않은 경우 가져오기 작업이 실패할 수 있습니다. 실패할 경우 온라인 서빙 부하가 낮을 때 가져오기 요청을 다시 시도하거나 피처스토어의 노드 수를 늘린 후 요청을 다시 시도하세요.

피처스토어에 온라인 스토리지(온라인 서빙 노드 0개)가 없으면 가져오기 작업은 오프라인 스토리지에만 기록되며 작업 성능은 가져오기 작업자 수에 따라 달라집니다.

데이터 일관성

가져오는 동안 소스 데이터가 수정되면 불일치가 발생할 수 있습니다. 가져오기 작업을 시작하기 전에 소스 데이터 수정이 완료되었는지 확인합니다. 또한 중복된 특성 값으로 인해 온라인 및 일괄 요청 간에 다른 값이 제공될 수 있습니다. 각 항목 ID와 타임스탬프 쌍에 특성 값이 하나 있는지 확인합니다.

가져오기 작업이 실패할 경우 피처스토어에 부분 데이터만 포함될 수 있으며 그로 인해 온라인 서빙 요청과 일괄 제공 요청에서 반환되는 값이 일치하지 않을 수도 있습니다. 이러한 불일치를 방지하려면 동일한 가져오기 요청을 재시도하고 요청이 성공적으로 완료될 때까지 기다립니다.

null 값 및 빈 배열

가져오기 중에 Vertex AI Feature Store(기존)는 null 스칼라 값 또는 빈 배열을 빈 값으로 간주합니다. 여기에는 CSV 열의 빈 값이 포함됩니다. Vertex AI Feature Store(기존)는 배열의 null 값과 같은 비스칼라 null 값을 지원하지 않습니다.

온라인 서빙 및 일괄 서빙 중에 Vertex AI Feature Store(기존)는 null이 아니거나 비어 있지 않은 최신 특성 값을 반환합니다. 이 특성의 이전 값을 사용할 수 없으면 Vertex AI Feature Store(기존)는 null을 반환합니다.

NaN 값

Vertex AI Feature Store(기존)는 DoubleDoubleArray의 NaN(숫자가 아님) 값을 지원합니다. 가져오기 중에 서빙 입력 CSV 파일에 NaN을 입력하여 NaN 값을 나타낼 수 있습니다. 온라인 서빙 및 일괄 서빙 중에 Vertex AI Feature Store(기존)는 NaN 값에 대해 NaN을 반환합니다.

일괄 가져오기

단일 항목 유형의 하나 이상의 특성에 대해 피처스토어로 값을 일괄적으로 가져옵니다.

웹 UI

  1. Google Cloud 콘솔의 Vertex AI 섹션에서 특성 페이지로 이동합니다.

    특성 페이지로 이동

  2. 리전 드롭다운 목록에서 리전을 선택합니다.
  3. 특성 테이블에서 항목 유형 열을 확인하고 값을 가져오려는 특성이 포함된 항목 유형을 찾습니다.
  4. 항목 유형 이름을 클릭합니다.
  5. 작업 모음에서 값 수집을 클릭합니다.
  6. 데이터 소스에 대해 다음 중 하나를 선택합니다.
    • Cloud Storage CSV 파일: Cloud Storage의 여러 CSV 파일에서 데이터를 가져오려면 이 옵션을 선택합니다. CSV 파일의 경로와 이름을 지정합니다. 추가 파일을 지정하려면 다른 파일 추가를 클릭합니다.
    • Cloud Storage AVRO 파일: Cloud Storage의 AVRO 파일에서 데이터를 가져오려면 이 옵션을 선택합니다. AVRO 파일의 경로와 이름을 지정합니다.
    • BigQuery 테이블: BigQuery 테이블 또는 BigQuery 뷰에서 데이터를 가져오려면 이 옵션을 선택합니다. 사용할 PROJECT_ID.DATASET_ID.TABLE_ID 형식의 테이블 또는 뷰를 찾아 선택합니다.
  7. 계속을 클릭합니다.
  8. 열을 특성에 매핑에서 피처스토어의 항목 및 특성에 매핑할 소스 데이터 열을 지정합니다.
    1. 항목 ID가 포함된 열 이름을 소스 데이터에 지정합니다.
    2. 타임스탬프의 경우 소스 데이터에 타임스탬프 열을 지정하거나 가져오는 모든 특성 값과 연관된 단일 타임스탬프를 지정합니다.
    3. 특성 목록에서 각 특성과 일치하는 소스 데이터 열 이름을 입력합니다. 기본적으로 Vertex AI Feature Store(기존)는 특성 이름과 열 이름이 일치한다고 가정합니다.
  9. 수집을 클릭합니다.

REST

기존 기능의 특성 값을 가져오려면 featurestores.entityTypes.importFeatureValues 메서드를 사용하여 POST 요청을 전송합니다. 소스 데이터 열과 대상 특성 ID의 이름이 다른 경우 sourceField 매개변수를 포함하세요.

요청 데이터를 사용하기 전에 다음을 바꿉니다.

  • LOCATION_ID: 피처스토어가 생성되는 리전. 예를 들면 us-central1입니다.
  • PROJECT_ID: 프로젝트 ID
  • FEATURESTORE_ID: 피처스토어의 ID
  • ENTITY_TYPE_ID: 항목 유형의 ID입니다.
  • ENTITY_SOURCE_COLUMN_ID: 항목 ID가 포함된 소스 열의 ID
  • FEATURE_TIME_ID: 특성 값의 특성 타임스탬프가 포함된 소스 열의 ID
  • FEATURE_ID: 값을 가져올 피처스토어의 기존 특성 ID
  • FEATURE_SOURCE_COLUMN_ID: 항목의 특성 값이 포함된 소스 열의 ID
  • SOURCE_DATA_DETAILS: 소스 데이터 위치. BigQuery 테이블 또는 BigQuery 뷰의 경우 "bigquerySource": { "inputUri": "bq://test.dataset.sourcetable" } 같은 형식도 나타냅니다.
  • WORKER_COUNT: 피처스토어에 데이터를 쓰는 데 사용할 작업자의 수

HTTP 메서드 및 URL:

POST https://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues

JSON 요청 본문:

{
  "entityIdField": "ENTITY_SOURCE_COLUMN_ID",
  "featureTimeField": "FEATURE_TIME_ID",
  SOURCE_DATA_DETAILS,
  "featureSpecs": [{
    "id": "FEATURE_ID",
    "sourceField": "FEATURE_SOURCE_COLUMN_ID"
  }],
  "workerCount": WORKER_COUNT
}

요청을 보내려면 다음 옵션 중 하나를 선택합니다.

curl

요청 본문을 request.json 파일에 저장하고 다음 명령어를 실행합니다.

curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues"

PowerShell

요청 본문을 request.json 파일에 저장하고 다음 명령어를 실행합니다.

$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }

Invoke-WebRequest `
-Method POST `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "https://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues" | Select-Object -Expand Content

다음과 비슷한 출력이 표시됩니다. 응답의 OPERATION_ID를 사용하여 작업 상태를 가져올 수 있습니다.

{
  "name": "projects/PROJECT_NUMBER/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID/operations/OPERATION_ID",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.aiplatform.v1.ImportFeatureValuesOperationMetadata",
    "genericMetadata": {
      "createTime": "2021-03-02T00:04:13.039166Z",
      "updateTime": "2021-03-02T00:04:13.039166Z"
    }
  }
}

Python

Python을 설치하거나 업데이트하는 방법은 Python용 Vertex AI SDK 설치를 참조하세요. 자세한 내용은 Python API 참고 문서를 참조하세요.

import datetime
from typing import List, Union

from google.cloud import aiplatform

def import_feature_values_sample(
    project: str,
    location: str,
    entity_type_id: str,
    featurestore_id: str,
    feature_ids: List[str],
    feature_time: Union[str, datetime.datetime],
    gcs_source_uris: Union[str, List[str]],
    gcs_source_type: str,
):

    aiplatform.init(project=project, location=location)

    my_entity_type = aiplatform.featurestore.EntityType(
        entity_type_name=entity_type_id, featurestore_id=featurestore_id
    )

    my_entity_type.ingest_from_gcs(
        feature_ids=feature_ids,
        feature_time=feature_time,
        gcs_source_uris=gcs_source_uris,
        gcs_source_type=gcs_source_type,
    )

Python

Vertex AI 클라이언트 라이브러리는 Python용 Vertex AI SDK를 설치할 때 포함됩니다. Python용 Vertex AI SDK를 설치하는 방법은 Python용 Vertex AI SDK 설치를 참조하세요. 자세한 내용은 Python용 Vertex AI SDK API 참고 문서를 확인하세요.

from google.cloud import aiplatform

def import_feature_values_sample(
    project: str,
    featurestore_id: str,
    entity_type_id: str,
    avro_gcs_uri: str,
    entity_id_field: str,
    feature_time_field: str,
    worker_count: int = 2,
    location: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    timeout: int = 300,
):
    # The AI Platform services require regional API endpoints, which need to be
    # in the same region or multi-region overlap with the Feature Store location.
    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for multiple requests.
    client = aiplatform.gapic.FeaturestoreServiceClient(client_options=client_options)
    entity_type = f"projects/{project}/locations/{location}/featurestores/{featurestore_id}/entityTypes/{entity_type_id}"
    avro_source = aiplatform.gapic.AvroSource(
        gcs_source=aiplatform.gapic.GcsSource(uris=[avro_gcs_uri])
    )
    feature_specs = [
        aiplatform.gapic.ImportFeatureValuesRequest.FeatureSpec(id="age"),
        aiplatform.gapic.ImportFeatureValuesRequest.FeatureSpec(id="gender"),
        aiplatform.gapic.ImportFeatureValuesRequest.FeatureSpec(id="liked_genres"),
    ]
    import_feature_values_request = aiplatform.gapic.ImportFeatureValuesRequest(
        entity_type=entity_type,
        avro_source=avro_source,
        feature_specs=feature_specs,
        entity_id_field=entity_id_field,
        feature_time_field=feature_time_field,
        worker_count=worker_count,
    )
    lro_response = client.import_feature_values(request=import_feature_values_request)
    print("Long running operation:", lro_response.operation.name)
    import_feature_values_response = lro_response.result(timeout=timeout)
    print("import_feature_values_response:", import_feature_values_response)

Java

이 샘플을 사용해 보기 전에 Vertex AI 빠른 시작: 클라이언트 라이브러리 사용Java 설정 안내를 따르세요. 자세한 내용은 Vertex AI Java API 참고 문서를 참조하세요.

Vertex AI에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.


import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.aiplatform.v1.AvroSource;
import com.google.cloud.aiplatform.v1.EntityTypeName;
import com.google.cloud.aiplatform.v1.FeaturestoreServiceClient;
import com.google.cloud.aiplatform.v1.FeaturestoreServiceSettings;
import com.google.cloud.aiplatform.v1.GcsSource;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesOperationMetadata;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesRequest;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesRequest.FeatureSpec;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ImportFeatureValuesSample {

  public static void main(String[] args)
      throws IOException, InterruptedException, ExecutionException, TimeoutException {
    // TODO(developer): Replace these variables before running the sample.
    String project = "YOUR_PROJECT_ID";
    String featurestoreId = "YOUR_FEATURESTORE_ID";
    String entityTypeId = "YOUR_ENTITY_TYPE_ID";
    String entityIdField = "YOUR_ENTITY_FIELD_ID";
    String featureTimeField = "YOUR_FEATURE_TIME_FIELD";
    String gcsSourceUri = "YOUR_GCS_SOURCE_URI";
    int workerCount = 2;
    String location = "us-central1";
    String endpoint = "us-central1-aiplatform.googleapis.com:443";
    int timeout = 300;
    importFeatureValuesSample(
        project,
        featurestoreId,
        entityTypeId,
        gcsSourceUri,
        entityIdField,
        featureTimeField,
        workerCount,
        location,
        endpoint,
        timeout);
  }

  static void importFeatureValuesSample(
      String project,
      String featurestoreId,
      String entityTypeId,
      String gcsSourceUri,
      String entityIdField,
      String featureTimeField,
      int workerCount,
      String location,
      String endpoint,
      int timeout)
      throws IOException, InterruptedException, ExecutionException, TimeoutException {
    FeaturestoreServiceSettings featurestoreServiceSettings =
        FeaturestoreServiceSettings.newBuilder().setEndpoint(endpoint).build();

    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (FeaturestoreServiceClient featurestoreServiceClient =
        FeaturestoreServiceClient.create(featurestoreServiceSettings)) {
      List<FeatureSpec> featureSpecs = new ArrayList<>();

      featureSpecs.add(FeatureSpec.newBuilder().setId("title").build());
      featureSpecs.add(FeatureSpec.newBuilder().setId("genres").build());
      featureSpecs.add(FeatureSpec.newBuilder().setId("average_rating").build());
      ImportFeatureValuesRequest importFeatureValuesRequest =
          ImportFeatureValuesRequest.newBuilder()
              .setEntityType(
                  EntityTypeName.of(project, location, featurestoreId, entityTypeId).toString())
              .setEntityIdField(entityIdField)
              .setFeatureTimeField(featureTimeField)
              .addAllFeatureSpecs(featureSpecs)
              .setWorkerCount(workerCount)
              .setAvroSource(
                  AvroSource.newBuilder()
                      .setGcsSource(GcsSource.newBuilder().addUris(gcsSourceUri)))
              .build();
      OperationFuture<ImportFeatureValuesResponse, ImportFeatureValuesOperationMetadata>
          importFeatureValuesFuture =
              featurestoreServiceClient.importFeatureValuesAsync(importFeatureValuesRequest);
      System.out.format(
          "Operation name: %s%n", importFeatureValuesFuture.getInitialFuture().get().getName());
      System.out.println("Waiting for operation to finish...");
      ImportFeatureValuesResponse importFeatureValuesResponse =
          importFeatureValuesFuture.get(timeout, TimeUnit.SECONDS);
      System.out.println("Import Feature Values Response");
      System.out.println(importFeatureValuesResponse);
      featurestoreServiceClient.close();
    }
  }
}

Node.js

이 샘플을 사용해 보기 전에 Vertex AI 빠른 시작: 클라이언트 라이브러리 사용Node.js 설정 안내를 따르세요. 자세한 내용은 Vertex AI Node.js API 참고 문서를 참조하세요.

Vertex AI에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

/**
 * TODO(developer): Uncomment these variables before running the sample.\
 * (Not necessary if passing values as arguments)
 */

// const project = 'YOUR_PROJECT_ID';
// const featurestoreId = 'YOUR_FEATURESTORE_ID';
// const entityTypeId = 'YOUR_ENTITY_TYPE_ID';
// const avroGcsUri = 'AVRO_FILE_IN_THE_GCS_URI';
// const entityIdField = 'ENTITY_ID_FIELD_IN_AVRO';
// const featureTimeField = 'TIMESTAMP_FIELD_IN_AVRO';
// const workerCount = <NO_OF_WORKERS_FOR_INGESTION_JOB>;
// const location = 'YOUR_PROJECT_LOCATION';
// const apiEndpoint = 'YOUR_API_ENDPOINT';
// const timeout = <TIMEOUT_IN_MILLI_SECONDS>;

// Imports the Google Cloud Featurestore Service Client library
const {FeaturestoreServiceClient} = require('@google-cloud/aiplatform').v1;

// Specifies the location of the api endpoint
const clientOptions = {
  apiEndpoint: apiEndpoint,
};

// Instantiates a client
const featurestoreServiceClient = new FeaturestoreServiceClient(
  clientOptions
);

async function importFeatureValues() {
  // Configure the entityType resource
  const entityType = `projects/${project}/locations/${location}/featurestores/${featurestoreId}/entityTypes/${entityTypeId}`;

  const avroSource = {
    gcsSource: {
      uris: [avroGcsUri],
    },
  };

  const featureSpecs = [{id: 'age'}, {id: 'gender'}, {id: 'liked_genres'}];

  const request = {
    entityType: entityType,
    avroSource: avroSource,
    entityIdField: entityIdField,
    featureSpecs: featureSpecs,
    featureTimeField: featureTimeField,
    workerCount: Number(workerCount),
  };

  // Import Feature Values Request
  const [operation] = await featurestoreServiceClient.importFeatureValues(
    request,
    {timeout: Number(timeout)}
  );
  const [response] = await operation.promise();

  console.log('Import feature values response');
  console.log('Raw response:');
  console.log(JSON.stringify(response, null, 2));
}
importFeatureValues();

가져오기 작업 보기

Google Cloud 콘솔을 사용하여 Google Cloud 프로젝트의 일괄 가져오기 작업을 봅니다.

웹 UI

  1. Google Cloud 콘솔의 Vertex AI 섹션에서 특성 페이지로 이동합니다.

    특성 페이지로 이동

  2. 리전 드롭다운 목록에서 리전을 선택합니다.
  3. 작업 모음에서 수집 작업 보기를 클릭하여 모든 피처스토어의 가져오기 작업을 나열합니다.
  4. 가져오기 작업의 ID를 클릭하여 데이터 소스, 가져오기 항목 수, 가져온 특성 값 등의 세부정보를 확인합니다.

피처스토어의 기존 데이터 덮어쓰기

타임스탬프가 둘 다 같으면 값을 다시 가져와서 기존 특성 값을 덮어쓸 수 있습니다. 기존 특성 값을 먼저 삭제할 필요가 없습니다. 예를 들어 최근에 변경된 기본 소스 데이터를 사용해야 할 수 있습니다. 피처스토어를 기본 데이터와 동일하게 유지하려면 특성 값을 다시 가져오세요. 타임스탬프가 일치하지 않으면 가져온 값이 고유한 것으로 간주되고 이전 값이 지속됩니다(덮어쓰지 않음).

온라인 및 일괄 서빙 요청 간의 일관성을 보장하려면 제공 요청을 수행하기 전에 먼저 가져오기 작업이 완료될 때까지 기다리세요.

이전 데이터 백필

이전 특성 값을 가져오는 데이터 백필을 수행하는 경우 가져오기 작업에 대해 온라인 서빙을 사용 중지합니다. 온라인 서빙은 백필에 포함되지 않는 최근 특성 값만 처리합니다. 온라인 서빙 노드에 대한 로드를 없애고 가져오기 작업의 처리량을 늘려서 완료 시간을 줄여줄 수 있기 때문에 온라인 서빙를 사용 중지하는 것이 좋습니다.

API 또는 클라이언트 라이브러리를 사용할 때 가져오기 작업에 대해 온라인 서빙을 사용 중지할 수 있습니다. 자세한 내용은 importFeatureValue 메서드disableOnlineServing 필드를 참조하세요.

다음 단계