Valori delle caratteristiche di importazione in batch

L'importazione in batch ti consente di importare in blocco i valori delle caratteristiche da un'origine dati valida. In una richiesta di importazione in batch, puoi importare i valori di un massimo di 100 caratteristiche per un tipo di entità. Tieni presente che è possibile eseguire un solo job di importazione in batch per ogni tipo di entità, per evitare collisioni.

In una richiesta di importazione in batch, specifica la posizione dei dati di origine e in che modo vengono mappati alle caratteristiche nell'archivio di caratteristiche. Poiché ogni richiesta di importazione in batch riguarda un singolo tipo di entità, anche i dati di origine devono riguardare un singolo tipo di entità.

Al termine dell'importazione, i valori delle funzionalità sono disponibili per le operazioni di lettura successive.

Prestazioni job di importazione

Vertex AI Feature Store (legacy) fornisce un'importazione elevata della velocità effettiva, ma la latenza minima può richiedere alcuni minuti. Ogni richiesta a Vertex AI Feature Store (legacy) avvia un job per completarlo. Il completamento di un job di importazione richiede qualche minuto, anche se stai importando un singolo record.

Se vuoi apportare modifiche alle prestazioni di un job, modifica le due variabili seguenti:

  • Il numero di nodi di pubblicazione online dell'archivio di caratteristiche.
  • Il numero di worker utilizzati per il job di importazione. I worker elaborano e scrivono dati nell'archivio di caratteristiche.

Il numero consigliato di worker è un worker ogni 10 nodi di gestione online nell'archivio di caratteristiche. Si può andare più in alto se il carico della pubblicazione online è basso. Puoi specificare un massimo di 100 worker. Per ulteriori indicazioni, consulta Monitorare e ottimizzare le risorse di conseguenza per ottimizzare l'importazione batch.

Se il provisioning del cluster di gestione online è insufficiente, il job di importazione potrebbe non riuscire. In caso di errore, riprova a eseguire la richiesta di importazione quando il carico della pubblicazione online è basso oppure aumenta il numero di nodi dell'archivio di caratteristiche e riprova a eseguire la richiesta.

Se l'archivio di caratteristiche non ha un archivio online (nessun nodo di pubblicazione online), il job di importazione scrive solo nell'archivio offline e le prestazioni del job dipendono esclusivamente dal numero di worker di importazione.

Coerenza dei dati

Se i dati di origine vengono modificati durante l'importazione, è possibile che si verifichino incoerenze. Assicurati che le modifiche ai dati di origine siano complete prima di avviare un job di importazione. Inoltre, i valori duplicati delle caratteristiche possono comportare la pubblicazione di valori diversi tra le richieste online e in batch. Assicurati di avere un valore di caratteristica per ogni coppia di ID entità e timestamp.

Se un'operazione di importazione non va a buon fine, l'archivio di caratteristiche potrebbe avere solo dati parziali, il che può comportare la restituzione di valori incoerenti tra le richieste di pubblicazione online e in batch. Per evitare questa incoerenza, riprova a eseguire la stessa richiesta di importazione e attendi che venga completata correttamente.

Valori null e array vuoti

Durante l'importazione, Vertex AI Feature Store (legacy) considera i valori scalari nulli o gli array vuoti come valori vuoti. Sono inclusi valori vuoti in una colonna CSV. Vertex AI Feature Store (legacy) non supporta valori null non scalabili, come un valore null in un array.

Durante la distribuzione online e recupero dati in batch, Vertex AI Feature Store (legacy) restituisce l'ultimo valore non nullo o non vuoto della caratteristica. Se non è disponibile un valore storico della caratteristica, Vertex AI Feature Store (legacy) restituisce null.

Valori NaN

Vertex AI Feature Store (legacy) supporta i valori NaN (Not a Number) in Double e DoubleArray. Durante l'importazione, puoi inserire NaN nel file CSV di input di pubblicazione per rappresentare un valore NaN. Durante la distribuzione online e recupero dati in batch, Vertex AI Feature Store (legacy) restituisce NaN per i valori NaN.

Importazione in batch

Importare valori in blocco in un archivio di caratteristiche per una o più caratteristiche di un singolo tipo di entità.

UI web

  1. Nella sezione Vertex AI della console Google Cloud, vai alla pagina Funzionalità.

    Vai alla pagina Funzionalità

  2. Seleziona una regione dall'elenco a discesa Regione.
  3. Nella tabella delle caratteristiche, visualizza la colonna Tipo di entità e individua il tipo di entità che contiene le caratteristiche per cui vuoi importare i valori.
  4. Fai clic sul nome del tipo di entità.
  5. Dalla barra delle azioni, fai clic su Importa valori.
  6. Per Origine dati, seleziona una delle seguenti opzioni:
    • File CSV Cloud Storage: seleziona questa opzione per importare dati da più file CSV da Cloud Storage. Specifica il percorso e il nome del file CSV. Per specificare altri file, fai clic su Aggiungi un altro file.
    • File AVRO di Cloud Storage: seleziona questa opzione per importare i dati da un file AVRO da Cloud Storage. Specifica il percorso e il nome del file AVRO.
    • Tabella BigQuery: seleziona questa opzione per importare i dati da una tabella BigQuery o da una vista BigQuery. Sfoglia e seleziona una tabella o una visualizzazione da utilizzare, che ha il seguente formato: PROJECT_ID.DATASET_ID.TABLE_ID
  7. Fai clic su Continua.
  8. Per Mappa colonna a caratteristiche, specifica quali colonne nei dati di origine mappano a entità e caratteristiche nell'archivio di caratteristiche.
    1. Specifica il nome della colonna nei dati di origine che contiene gli ID entità.
    2. Per il timestamp, specifica una colonna timestamp nei dati di origine oppure specifica un singolo timestamp associato a tutti i valori delle funzionalità che importi.
    3. Nell'elenco delle funzionalità, inserisci il nome della colonna dei dati di origine per ogni funzionalità. Per impostazione predefinita, Vertex AI Feature Store (legacy) presuppone che il nome della caratteristica e il nome della colonna corrispondano.
  9. Fai clic su Importa.

REST

Per importare i valori delle caratteristiche per le caratteristiche esistenti, invia una richiesta POST utilizzando il metodo featurestores.entityTypes.importFeatureValues. Tieni presente che se i nomi delle colonne dei dati di origine e degli ID funzionalità di destinazione sono diversi, includi il parametro sourceField.

Prima di utilizzare i dati della richiesta, effettua le seguenti sostituzioni:

  • LOCATION_ID: regione in cui è stato creato l'archivio di caratteristiche. Ad esempio, us-central1.
  • PROJECT_ID: il tuo ID progetto.
  • FEATURESTORE_ID: ID dell'archivio di caratteristiche.
  • ENTITY_TYPE_ID: ID del tipo di entità.
  • ENTITY_SOURCE_COLUMN_ID: ID della colonna di origine che contiene gli ID entità.
  • FEATURE_TIME_ID: ID della colonna di origine contenente i timestamp delle caratteristiche per i valori delle caratteristiche.
  • FEATURE_ID: ID di una caratteristica esistente nell'archivio di caratteristiche per cui importare i valori.
  • FEATURE_SOURCE_COLUMN_ID: ID della colonna di origine che contiene i valori delle caratteristiche per le entità.
  • SOURCE_DATA_DETAILS: la posizione dei dati di origine, che indica anche il formato, ad esempio "bigquerySource": { "inputUri": "bq://test.dataset.sourcetable" } per una tabella BigQuery o una vista BigQuery.
  • WORKER_COUNT: il numero di worker da utilizzare per scrivere dati nell'archivio di caratteristiche.

Metodo HTTP e URL:

POST https://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues

Corpo JSON della richiesta:

{
  "entityIdField": "ENTITY_SOURCE_COLUMN_ID",
  "featureTimeField": "FEATURE_TIME_ID",
  SOURCE_DATA_DETAILS,
  "featureSpecs": [{
    "id": "FEATURE_ID",
    "sourceField": "FEATURE_SOURCE_COLUMN_ID"
  }],
  "workerCount": WORKER_COUNT
}

Per inviare la richiesta, scegli una delle seguenti opzioni:

curl

Salva il corpo della richiesta in un file denominato request.json ed esegui questo comando:

curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues"

PowerShell

Salva il corpo della richiesta in un file denominato request.json ed esegui questo comando:

$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }

Invoke-WebRequest `
-Method POST `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "https://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues" | Select-Object -Expand Content

Dovresti vedere un output simile al seguente. Puoi utilizzare OPERATION_ID nella risposta per ottenere lo stato dell'operazione.

{
  "name": "projects/PROJECT_NUMBER/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID/operations/OPERATION_ID",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.aiplatform.v1.ImportFeatureValuesOperationMetadata",
    "genericMetadata": {
      "createTime": "2021-03-02T00:04:13.039166Z",
      "updateTime": "2021-03-02T00:04:13.039166Z"
    }
  }
}

Python

Per scoprire come installare o aggiornare l'SDK Vertex AI per Python, vedi Installare l'SDK Vertex AI per Python. Per maggiori informazioni, consulta la documentazione di riferimento dell'API Python.

import datetime
from typing import List, Union

from google.cloud import aiplatform


def import_feature_values_sample(
    project: str,
    location: str,
    entity_type_id: str,
    featurestore_id: str,
    feature_ids: List[str],
    feature_time: Union[str, datetime.datetime],
    gcs_source_uris: Union[str, List[str]],
    gcs_source_type: str,
):

    aiplatform.init(project=project, location=location)

    my_entity_type = aiplatform.featurestore.EntityType(
        entity_type_name=entity_type_id, featurestore_id=featurestore_id
    )

    my_entity_type.ingest_from_gcs(
        feature_ids=feature_ids,
        feature_time=feature_time,
        gcs_source_uris=gcs_source_uris,
        gcs_source_type=gcs_source_type,
    )

Python

La libreria client per Vertex AI è inclusa quando installi l'SDK Vertex AI per Python. Per scoprire come installare l'SDK Vertex AI per Python, vedi Installare l'SDK Vertex AI per Python. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Vertex AI SDK per Python.

from google.cloud import aiplatform


def import_feature_values_sample(
    project: str,
    featurestore_id: str,
    entity_type_id: str,
    avro_gcs_uri: str,
    entity_id_field: str,
    feature_time_field: str,
    worker_count: int = 2,
    location: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    timeout: int = 300,
):
    # The AI Platform services require regional API endpoints, which need to be
    # in the same region or multi-region overlap with the Feature Store location.
    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for multiple requests.
    client = aiplatform.gapic.FeaturestoreServiceClient(client_options=client_options)
    entity_type = f"projects/{project}/locations/{location}/featurestores/{featurestore_id}/entityTypes/{entity_type_id}"
    avro_source = aiplatform.gapic.AvroSource(
        gcs_source=aiplatform.gapic.GcsSource(uris=[avro_gcs_uri])
    )
    feature_specs = [
        aiplatform.gapic.ImportFeatureValuesRequest.FeatureSpec(id="age"),
        aiplatform.gapic.ImportFeatureValuesRequest.FeatureSpec(id="gender"),
        aiplatform.gapic.ImportFeatureValuesRequest.FeatureSpec(id="liked_genres"),
    ]
    import_feature_values_request = aiplatform.gapic.ImportFeatureValuesRequest(
        entity_type=entity_type,
        avro_source=avro_source,
        feature_specs=feature_specs,
        entity_id_field=entity_id_field,
        feature_time_field=feature_time_field,
        worker_count=worker_count,
    )
    lro_response = client.import_feature_values(request=import_feature_values_request)
    print("Long running operation:", lro_response.operation.name)
    import_feature_values_response = lro_response.result(timeout=timeout)
    print("import_feature_values_response:", import_feature_values_response)

Java

Prima di provare questo esempio, segui le istruzioni per la configurazione di Java nella guida rapida di Vertex AI sull'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Java Vertex AI.

Per eseguire l'autenticazione su Vertex AI, configura Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.


import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.aiplatform.v1.AvroSource;
import com.google.cloud.aiplatform.v1.EntityTypeName;
import com.google.cloud.aiplatform.v1.FeaturestoreServiceClient;
import com.google.cloud.aiplatform.v1.FeaturestoreServiceSettings;
import com.google.cloud.aiplatform.v1.GcsSource;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesOperationMetadata;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesRequest;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesRequest.FeatureSpec;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ImportFeatureValuesSample {

  public static void main(String[] args)
      throws IOException, InterruptedException, ExecutionException, TimeoutException {
    // TODO(developer): Replace these variables before running the sample.
    String project = "YOUR_PROJECT_ID";
    String featurestoreId = "YOUR_FEATURESTORE_ID";
    String entityTypeId = "YOUR_ENTITY_TYPE_ID";
    String entityIdField = "YOUR_ENTITY_FIELD_ID";
    String featureTimeField = "YOUR_FEATURE_TIME_FIELD";
    String gcsSourceUri = "YOUR_GCS_SOURCE_URI";
    int workerCount = 2;
    String location = "us-central1";
    String endpoint = "us-central1-aiplatform.googleapis.com:443";
    int timeout = 300;
    importFeatureValuesSample(
        project,
        featurestoreId,
        entityTypeId,
        gcsSourceUri,
        entityIdField,
        featureTimeField,
        workerCount,
        location,
        endpoint,
        timeout);
  }

  static void importFeatureValuesSample(
      String project,
      String featurestoreId,
      String entityTypeId,
      String gcsSourceUri,
      String entityIdField,
      String featureTimeField,
      int workerCount,
      String location,
      String endpoint,
      int timeout)
      throws IOException, InterruptedException, ExecutionException, TimeoutException {
    FeaturestoreServiceSettings featurestoreServiceSettings =
        FeaturestoreServiceSettings.newBuilder().setEndpoint(endpoint).build();

    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (FeaturestoreServiceClient featurestoreServiceClient =
        FeaturestoreServiceClient.create(featurestoreServiceSettings)) {
      List<FeatureSpec> featureSpecs = new ArrayList<>();

      featureSpecs.add(FeatureSpec.newBuilder().setId("title").build());
      featureSpecs.add(FeatureSpec.newBuilder().setId("genres").build());
      featureSpecs.add(FeatureSpec.newBuilder().setId("average_rating").build());
      ImportFeatureValuesRequest importFeatureValuesRequest =
          ImportFeatureValuesRequest.newBuilder()
              .setEntityType(
                  EntityTypeName.of(project, location, featurestoreId, entityTypeId).toString())
              .setEntityIdField(entityIdField)
              .setFeatureTimeField(featureTimeField)
              .addAllFeatureSpecs(featureSpecs)
              .setWorkerCount(workerCount)
              .setAvroSource(
                  AvroSource.newBuilder()
                      .setGcsSource(GcsSource.newBuilder().addUris(gcsSourceUri)))
              .build();
      OperationFuture<ImportFeatureValuesResponse, ImportFeatureValuesOperationMetadata>
          importFeatureValuesFuture =
              featurestoreServiceClient.importFeatureValuesAsync(importFeatureValuesRequest);
      System.out.format(
          "Operation name: %s%n", importFeatureValuesFuture.getInitialFuture().get().getName());
      System.out.println("Waiting for operation to finish...");
      ImportFeatureValuesResponse importFeatureValuesResponse =
          importFeatureValuesFuture.get(timeout, TimeUnit.SECONDS);
      System.out.println("Import Feature Values Response");
      System.out.println(importFeatureValuesResponse);
      featurestoreServiceClient.close();
    }
  }
}

Node.js

Prima di provare questo esempio, segui le istruzioni per la configurazione di Node.js nella guida rapida di Vertex AI sull'utilizzo delle librerie client. Per ulteriori informazioni, consulta la documentazione di riferimento dell'API Node.js Vertex AI.

Per eseguire l'autenticazione su Vertex AI, configura Credenziali predefinite dell'applicazione. Per maggiori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

/**
 * TODO(developer): Uncomment these variables before running the sample.\
 * (Not necessary if passing values as arguments)
 */

// const project = 'YOUR_PROJECT_ID';
// const featurestoreId = 'YOUR_FEATURESTORE_ID';
// const entityTypeId = 'YOUR_ENTITY_TYPE_ID';
// const avroGcsUri = 'AVRO_FILE_IN_THE_GCS_URI';
// const entityIdField = 'ENTITY_ID_FIELD_IN_AVRO';
// const featureTimeField = 'TIMESTAMP_FIELD_IN_AVRO';
// const workerCount = <NO_OF_WORKERS_FOR_INGESTION_JOB>;
// const location = 'YOUR_PROJECT_LOCATION';
// const apiEndpoint = 'YOUR_API_ENDPOINT';
// const timeout = <TIMEOUT_IN_MILLI_SECONDS>;

// Imports the Google Cloud Featurestore Service Client library
const {FeaturestoreServiceClient} = require('@google-cloud/aiplatform').v1;

// Specifies the location of the api endpoint
const clientOptions = {
  apiEndpoint: apiEndpoint,
};

// Instantiates a client
const featurestoreServiceClient = new FeaturestoreServiceClient(
  clientOptions
);

async function importFeatureValues() {
  // Configure the entityType resource
  const entityType = `projects/${project}/locations/${location}/featurestores/${featurestoreId}/entityTypes/${entityTypeId}`;

  const avroSource = {
    gcsSource: {
      uris: [avroGcsUri],
    },
  };

  const featureSpecs = [{id: 'age'}, {id: 'gender'}, {id: 'liked_genres'}];

  const request = {
    entityType: entityType,
    avroSource: avroSource,
    entityIdField: entityIdField,
    featureSpecs: featureSpecs,
    featureTimeField: featureTimeField,
    workerCount: Number(workerCount),
  };

  // Import Feature Values Request
  const [operation] = await featurestoreServiceClient.importFeatureValues(
    request,
    {timeout: Number(timeout)}
  );
  const [response] = await operation.promise();

  console.log('Import feature values response');
  console.log('Raw response:');
  console.log(JSON.stringify(response, null, 2));
}
importFeatureValues();

Visualizza i job di importazione

Utilizza la console Google Cloud per visualizzare i job di importazione batch in un progetto Google Cloud.

UI web

  1. Nella sezione Vertex AI della console Google Cloud, vai alla pagina Funzionalità.

    Vai alla pagina Funzionalità

  2. Seleziona una regione dall'elenco a discesa Regione.
  3. Dalla barra delle azioni, fai clic su Visualizza job di importazione per elencare i job di importazione per tutti gli archivi di caratteristiche.
  4. Fai clic sull'ID di un job di importazione per visualizzarne i dettagli, ad esempio l'origine dati, il numero di entità di importazione e il numero di valori delle caratteristiche importati.

Sovrascrivi i dati esistenti in un archivio di caratteristiche

Puoi reimportare i valori per sovrascrivere i valori esistenti delle caratteristiche se hanno entrambi gli stessi timestamp. Non è necessario eliminare prima i valori delle caratteristiche esistenti. Ad esempio, puoi fare affidamento su un'origine dati sottostante modificata di recente. Per mantenere l'archivio di caratteristiche coerente con i dati sottostanti, importa di nuovo i valori delle caratteristiche. Se i timestamp non corrispondono, i valori importati vengono considerati univoci e i valori precedenti continuano a esistere (non vengono sostituiti).

Per garantire la coerenza tra le richieste online e di recupero dati in batch, attendi il completamento del job di importazione prima di effettuare qualsiasi richiesta di gestione.

Esegui backfill dei dati storici

Se esegui il backfill dei dati, da cui importi valori delle caratteristiche precedenti, disabilita la pubblicazione online per il job di importazione. La pubblicazione online consente di pubblicare solo gli ultimi valori delle funzionalità, che non sono inclusi nel backfill. La disattivazione della pubblicazione online è utile perché si elimina il carico sui nodi di pubblicazione online e si aumenta la velocità effettiva per il job di importazione, il che può ridurne il tempo di completamento.

Puoi disabilitare la pubblicazione online per i job di importazione quando utilizzi l'API o le librerie client. Per maggiori informazioni, consulta il campo disableOnlineServing per il metodo importFeatureValue.

Passaggi successivi