Importar em lote valores de atributos

A importação em lote permite importar valores de atributos em massa de uma fonte de dados válida. Em uma solicitação de importação em lote, é possível importar valores de até 100 atributos para um tipo de entidade. Para evitar colisões, só é possível executar um job de importação em lote por tipo de entidade.

Em uma solicitação de importação em lote, especifique o local dos dados de origem e como ele é mapeado para os atributos no featurestore. Como cada solicitação de importação em lote é destina a apenas um tipo de entidade, os dados de origem também precisam ser.

Depois que a importação for concluída, os valores de recurso estarão disponíveis para operações de leitura subsequentes.

Desempenho do job de importação

O Vertex AI Feature Store (legado) fornece importação com alta capacidade de processamento, mas a latência mínima pode levar alguns minutos. Cada solicitação para a Vertex AI Feature Store inicia um job para concluir o trabalho. Um job de importação leva alguns minutos para ser concluído, mesmo que você esteja importando um único registro.

Para fazer ajustes no desempenho de um job, altere as duas variáveis a seguir:

  • O número de nós de veiculação on-line do featurestore.
  • O número de workers usados para o job de importação. Os workers processam e gravam dados no featurestore.

O número recomendado de workers é um worker a cada 10 nós de exibição on-line no featurestore. É possível aumentar esse número se a carga de exibição on-line estiver baixa. É possível especificar no máximo 100 workers. Para mais orientações, consulte Monitorar e ajustar os recursos adequadamente para otimizar a importação em lote.

Se o cluster de veiculação on-line estiver subprovisionado, o job de importação poderá falhar. Em caso de falha, tente fazer a solicitação de importação novamente quando a carga de veiculação on-line estiver baixa ou aumente a contagem de nós do featurestore e tente outra vez.

Se o featurestore não tiver um armazenamento on-line (nenhum nó de veiculação on-line), o job de importação só vai fazer gravações no armazenamento off-line, e o desempenho do job vai depender apenas do número de workers de importação.

Consistência de dados

Inconsistências podem ser introduzidas se os dados de origem forem modificados durante a importação. Confirme se todas as modificações nos dados de origem foram concluídas antes de iniciar um job de importação. Além disso, valores de atributos duplicados podem resultar em valores diferentes exibidos entre solicitações on-line e em lote. Confirme se você tem um valor de atributo para cada par de ID de entidade e carimbo de data/hora.

Se uma operação de importação falhar, o featurestore pode ter dados parciais, o que também pode fazer com que valores inconsistentes sejam retornados entre as solicitações de exibição on-line e em lote. Para evitar essa inconsistência, repita novamente a mesma solicitação de importação e aguarde até que a solicitação seja concluída com êxito.

Valores nulos e matrizes vazias

Durante a importação, o Vertex AI Feature Store (legado) considera valores escalares nulos ou matrizes vazias como valores vazios. Eles incluem valores vazios em uma coluna CSV. O Feature Store da Vertex AI (legado) não oferece suporte a valores nulos não escalares, como um valor null em uma matriz.

Durante a disponibilização on-line e em lote, a Feature Store da Vertex AI retorna o valor mais recente do atributo não nulo ou não vazio. Se um valor histórico do atributo não estiver disponível, o Feature Store da Vertex AI vai retornar null.

Valores NaN

O Feature Store da Vertex AI é compatível com valores NaN (não é um número) em Double e DoubleArray. Durante a importação, é possível inserir NaN no arquivo CSV de entrada de disponibilização para representar um valor NaN. Durante a disponibilizaçao on-line e em lote, a Feature Store da Vertex AI retorna NaN para valores NaN.

Importação em lote

Importe valores em massa para um featurestore para um ou mais atributos de um único tipo de entidade.

IU da Web

  1. Na seção "Vertex AI" do Console do Google Cloud, acesse a página Recursos.

    Acessar a página Recursos

  2. Selecione uma região na lista suspensa Região.
  3. Na tabela de atributos, visualize a coluna Tipo de entidade e encontre o tipo de entidade que contém os atributos para os quais você quer importar valores.
  4. Clique no nome do tipo de entidade.
  5. Na barra de ações, clique em Processar valores.
  6. Em Origem de dados, selecione uma das seguintes opções:
    • Arquivo CSV do Cloud Storage: selecione esta opção para importar dados de vários arquivos CSV do Cloud Storage. Especifique o caminho e o nome do arquivo CSV. Para especificar arquivos adicionais, clique em Adicionar outro arquivo.
    • Arquivo AVRO do Cloud Storage: selecione esta opção para ingerir dados de um arquivo AVRO do Cloud Storage. Especifique o caminho e o nome do arquivo AVRO.
    • Tabela do BigQuery: selecione esta opção para ingerir dados de uma tabela ou visualização do BigQuery. Procure e selecione uma tabela ou visualização para usar, que está no seguinte formato: PROJECT_ID.DATASET_ID.TABLE_ID
  7. Clique em Continuar.
  8. Em Mapear coluna para recursos, especifique quais colunas nos dados de origem são mapeadas para entidades e recursos na featurestore.
    1. Especifique o nome da coluna nos dados de origem que contém os IDs de entidade.
    2. Para o carimbo de data/hora, especifique uma coluna nos dados de origem ou um único carimbo de data/hora associado a todos os valores de atributos importados.
    3. Na lista de elementos, insira o nome da coluna de dados de origem que é mapeado para cada elemento. Por padrão, o Feature Store da Vertex AI pressupõe que o nome do atributo e o nome da coluna correspondem.
  9. Clique em Ingerir.

REST

Para importar valores de recursos para os recursos atuais, envie uma solicitação POST usando o método featurestores.entityTypes.importFeatureValues. Se os nomes das colunas de dados de origem e os IDs de recurso de destino forem diferentes, inclua o parâmetro sourceField.

Antes de usar os dados da solicitação abaixo, faça as substituições a seguir:

  • LOCATION_ID: região onde a featurestore é criada. Por exemplo, us-central1.
  • PROJECT_ID: o ID do projeto.
  • FEATURESTORE_ID: ID do featurestore.
  • ENTITY_TYPE_ID: código do tipo de entidade.
  • ENTITY_SOURCE_COLUMN_ID: ID da coluna de origem que contém IDs de entidade.
  • FEATURE_TIME_ID: ID da coluna de origem que contém os carimbos de data/hora do recurso para os valores do recurso.
  • FEATURE_ID: código de um recurso existente na featurestore para o qual os valores serão importados.
  • FEATURE_SOURCE_COLUMN_ID: ID da coluna de origem que contém valores de recursos para as entidades.
  • SOURCE_DATA_DETAILS: o local dos dados de origem, que também indica o formato, como "bigquerySource": { "inputUri": "bq://test.dataset.sourcetable" } para uma tabela ou visualização do BigQuery.
  • WORKER_COUNT: o número de workers a serem usados para gravar dados no featurestore.

Método HTTP e URL:

POST https://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues

Corpo JSON da solicitação:

{
  "entityIdField": "ENTITY_SOURCE_COLUMN_ID",
  "featureTimeField": "FEATURE_TIME_ID",
  SOURCE_DATA_DETAILS,
  "featureSpecs": [{
    "id": "FEATURE_ID",
    "sourceField": "FEATURE_SOURCE_COLUMN_ID"
  }],
  "workerCount": WORKER_COUNT
}

Para enviar a solicitação, escolha uma destas opções:

curl

Salve o corpo da solicitação em um arquivo com o nome request.json e execute o comando a seguir:

curl -X POST \
-H "Authorization: Bearer $(gcloud auth print-access-token)" \
-H "Content-Type: application/json; charset=utf-8" \
-d @request.json \
"https://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues"

PowerShell

Salve o corpo da solicitação em um arquivo com o nome request.json e execute o comando a seguir:

$cred = gcloud auth print-access-token
$headers = @{ "Authorization" = "Bearer $cred" }

Invoke-WebRequest `
-Method POST `
-Headers $headers `
-ContentType: "application/json; charset=utf-8" `
-InFile request.json `
-Uri "https://LOCATION_ID-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID:importFeatureValues" | Select-Object -Expand Content

Será exibido um código semelhante a este. Use OPERATION_ID na resposta para ver o status da operação.

{
  "name": "projects/PROJECT_NUMBER/locations/LOCATION_ID/featurestores/FEATURESTORE_ID/entityTypes/ENTITY_TYPE_ID/operations/OPERATION_ID",
  "metadata": {
    "@type": "type.googleapis.com/google.cloud.aiplatform.v1.ImportFeatureValuesOperationMetadata",
    "genericMetadata": {
      "createTime": "2021-03-02T00:04:13.039166Z",
      "updateTime": "2021-03-02T00:04:13.039166Z"
    }
  }
}

Python

Para saber como instalar o SDK da Vertex AI para Python, consulte Instalar o SDK da Vertex AI para Python. Para mais informações, consulte a documentação de referência da API Python.

import datetime
from typing import List, Union

from google.cloud import aiplatform

def import_feature_values_sample(
    project: str,
    location: str,
    entity_type_id: str,
    featurestore_id: str,
    feature_ids: List[str],
    feature_time: Union[str, datetime.datetime],
    gcs_source_uris: Union[str, List[str]],
    gcs_source_type: str,
):

    aiplatform.init(project=project, location=location)

    my_entity_type = aiplatform.featurestore.EntityType(
        entity_type_name=entity_type_id, featurestore_id=featurestore_id
    )

    my_entity_type.ingest_from_gcs(
        feature_ids=feature_ids,
        feature_time=feature_time,
        gcs_source_uris=gcs_source_uris,
        gcs_source_type=gcs_source_type,
    )

Python

A biblioteca de cliente da Vertex AI está incluída quando você instala o SDK da Vertex AI para Python. Para saber como instalar o SDK da Vertex AI para Python, consulte Instalar o SDK da Vertex AI para Python. Saiba mais na documentação de referência da API SDK da Vertex AI para Python.

from google.cloud import aiplatform

def import_feature_values_sample(
    project: str,
    featurestore_id: str,
    entity_type_id: str,
    avro_gcs_uri: str,
    entity_id_field: str,
    feature_time_field: str,
    worker_count: int = 2,
    location: str = "us-central1",
    api_endpoint: str = "us-central1-aiplatform.googleapis.com",
    timeout: int = 300,
):
    # The AI Platform services require regional API endpoints, which need to be
    # in the same region or multi-region overlap with the Feature Store location.
    client_options = {"api_endpoint": api_endpoint}
    # Initialize client that will be used to create and send requests.
    # This client only needs to be created once, and can be reused for multiple requests.
    client = aiplatform.gapic.FeaturestoreServiceClient(client_options=client_options)
    entity_type = f"projects/{project}/locations/{location}/featurestores/{featurestore_id}/entityTypes/{entity_type_id}"
    avro_source = aiplatform.gapic.AvroSource(
        gcs_source=aiplatform.gapic.GcsSource(uris=[avro_gcs_uri])
    )
    feature_specs = [
        aiplatform.gapic.ImportFeatureValuesRequest.FeatureSpec(id="age"),
        aiplatform.gapic.ImportFeatureValuesRequest.FeatureSpec(id="gender"),
        aiplatform.gapic.ImportFeatureValuesRequest.FeatureSpec(id="liked_genres"),
    ]
    import_feature_values_request = aiplatform.gapic.ImportFeatureValuesRequest(
        entity_type=entity_type,
        avro_source=avro_source,
        feature_specs=feature_specs,
        entity_id_field=entity_id_field,
        feature_time_field=feature_time_field,
        worker_count=worker_count,
    )
    lro_response = client.import_feature_values(request=import_feature_values_request)
    print("Long running operation:", lro_response.operation.name)
    import_feature_values_response = lro_response.result(timeout=timeout)
    print("import_feature_values_response:", import_feature_values_response)

Java

Antes de testar essa amostra, siga as instruções de configuração para Java Guia de início rápido da Vertex AI: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Vertex AI para Java.

Para autenticar na Vertex AI, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.


import com.google.api.gax.longrunning.OperationFuture;
import com.google.cloud.aiplatform.v1.AvroSource;
import com.google.cloud.aiplatform.v1.EntityTypeName;
import com.google.cloud.aiplatform.v1.FeaturestoreServiceClient;
import com.google.cloud.aiplatform.v1.FeaturestoreServiceSettings;
import com.google.cloud.aiplatform.v1.GcsSource;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesOperationMetadata;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesRequest;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesRequest.FeatureSpec;
import com.google.cloud.aiplatform.v1.ImportFeatureValuesResponse;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class ImportFeatureValuesSample {

  public static void main(String[] args)
      throws IOException, InterruptedException, ExecutionException, TimeoutException {
    // TODO(developer): Replace these variables before running the sample.
    String project = "YOUR_PROJECT_ID";
    String featurestoreId = "YOUR_FEATURESTORE_ID";
    String entityTypeId = "YOUR_ENTITY_TYPE_ID";
    String entityIdField = "YOUR_ENTITY_FIELD_ID";
    String featureTimeField = "YOUR_FEATURE_TIME_FIELD";
    String gcsSourceUri = "YOUR_GCS_SOURCE_URI";
    int workerCount = 2;
    String location = "us-central1";
    String endpoint = "us-central1-aiplatform.googleapis.com:443";
    int timeout = 300;
    importFeatureValuesSample(
        project,
        featurestoreId,
        entityTypeId,
        gcsSourceUri,
        entityIdField,
        featureTimeField,
        workerCount,
        location,
        endpoint,
        timeout);
  }

  static void importFeatureValuesSample(
      String project,
      String featurestoreId,
      String entityTypeId,
      String gcsSourceUri,
      String entityIdField,
      String featureTimeField,
      int workerCount,
      String location,
      String endpoint,
      int timeout)
      throws IOException, InterruptedException, ExecutionException, TimeoutException {
    FeaturestoreServiceSettings featurestoreServiceSettings =
        FeaturestoreServiceSettings.newBuilder().setEndpoint(endpoint).build();

    // Initialize client that will be used to send requests. This client only needs to be created
    // once, and can be reused for multiple requests. After completing all of your requests, call
    // the "close" method on the client to safely clean up any remaining background resources.
    try (FeaturestoreServiceClient featurestoreServiceClient =
        FeaturestoreServiceClient.create(featurestoreServiceSettings)) {
      List<FeatureSpec> featureSpecs = new ArrayList<>();

      featureSpecs.add(FeatureSpec.newBuilder().setId("title").build());
      featureSpecs.add(FeatureSpec.newBuilder().setId("genres").build());
      featureSpecs.add(FeatureSpec.newBuilder().setId("average_rating").build());
      ImportFeatureValuesRequest importFeatureValuesRequest =
          ImportFeatureValuesRequest.newBuilder()
              .setEntityType(
                  EntityTypeName.of(project, location, featurestoreId, entityTypeId).toString())
              .setEntityIdField(entityIdField)
              .setFeatureTimeField(featureTimeField)
              .addAllFeatureSpecs(featureSpecs)
              .setWorkerCount(workerCount)
              .setAvroSource(
                  AvroSource.newBuilder()
                      .setGcsSource(GcsSource.newBuilder().addUris(gcsSourceUri)))
              .build();
      OperationFuture<ImportFeatureValuesResponse, ImportFeatureValuesOperationMetadata>
          importFeatureValuesFuture =
              featurestoreServiceClient.importFeatureValuesAsync(importFeatureValuesRequest);
      System.out.format(
          "Operation name: %s%n", importFeatureValuesFuture.getInitialFuture().get().getName());
      System.out.println("Waiting for operation to finish...");
      ImportFeatureValuesResponse importFeatureValuesResponse =
          importFeatureValuesFuture.get(timeout, TimeUnit.SECONDS);
      System.out.println("Import Feature Values Response");
      System.out.println(importFeatureValuesResponse);
      featurestoreServiceClient.close();
    }
  }
}

Node.js

Antes de testar essa amostra, siga as instruções de configuração para Node.js Guia de início rápido da Vertex AI: como usar bibliotecas de cliente. Para mais informações, consulte a documentação de referência da API Vertex AI para Node.js.

Para autenticar na Vertex AI, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

/**
 * TODO(developer): Uncomment these variables before running the sample.\
 * (Not necessary if passing values as arguments)
 */

// const project = 'YOUR_PROJECT_ID';
// const featurestoreId = 'YOUR_FEATURESTORE_ID';
// const entityTypeId = 'YOUR_ENTITY_TYPE_ID';
// const avroGcsUri = 'AVRO_FILE_IN_THE_GCS_URI';
// const entityIdField = 'ENTITY_ID_FIELD_IN_AVRO';
// const featureTimeField = 'TIMESTAMP_FIELD_IN_AVRO';
// const workerCount = <NO_OF_WORKERS_FOR_INGESTION_JOB>;
// const location = 'YOUR_PROJECT_LOCATION';
// const apiEndpoint = 'YOUR_API_ENDPOINT';
// const timeout = <TIMEOUT_IN_MILLI_SECONDS>;

// Imports the Google Cloud Featurestore Service Client library
const {FeaturestoreServiceClient} = require('@google-cloud/aiplatform').v1;

// Specifies the location of the api endpoint
const clientOptions = {
  apiEndpoint: apiEndpoint,
};

// Instantiates a client
const featurestoreServiceClient = new FeaturestoreServiceClient(
  clientOptions
);

async function importFeatureValues() {
  // Configure the entityType resource
  const entityType = `projects/${project}/locations/${location}/featurestores/${featurestoreId}/entityTypes/${entityTypeId}`;

  const avroSource = {
    gcsSource: {
      uris: [avroGcsUri],
    },
  };

  const featureSpecs = [{id: 'age'}, {id: 'gender'}, {id: 'liked_genres'}];

  const request = {
    entityType: entityType,
    avroSource: avroSource,
    entityIdField: entityIdField,
    featureSpecs: featureSpecs,
    featureTimeField: featureTimeField,
    workerCount: Number(workerCount),
  };

  // Import Feature Values Request
  const [operation] = await featurestoreServiceClient.importFeatureValues(
    request,
    {timeout: Number(timeout)}
  );
  const [response] = await operation.promise();

  console.log('Import feature values response');
  console.log('Raw response:');
  console.log(JSON.stringify(response, null, 2));
}
importFeatureValues();

Conferir jobs de importação

Use o console do Google Cloud para visualizar os jobs de importação em lote em um projeto do Google Cloud.

IU da Web

  1. Na seção "Vertex AI" do Console do Google Cloud, acesse a página Recursos.

    Acessar a página Recursos

  2. Selecione uma região na lista suspensa Região.
  3. Na barra de ações, clique em Exibir jobs de ingestão para listar os jobs de importação de todos os featurestores.
  4. Clique no ID de um job de importação para exibir os detalhes, como a origem de dados, o número de entidades de importação e o número de valores de atributos importados.

Substituir dados existentes em uma featurestore

É possível reimportar valores para substituir os valores de atributos atuais, caso ambos tenham os mesmos carimbos de data/hora. Não é necessário excluir os valores de recursos existentes primeiro. Por exemplo, você pode confiar em dados de origem subjacentes que foram alterados recentemente. Para manter o featurestore consistente com esses dados subjacentes, importe os valores do recurso novamente. Se você tiver carimbos de data/hora incompatíveis, os valores importados serão considerados exclusivos, e os valores antigos continuarão a existir (não serão substituídos).

Para garantir a consistência entre as solicitações de disponibilização on-line e em lote, aguarde até que o job de importação seja concluído antes de fazer qualquer solicitação de disponibilização.

Preenchimento de dados históricos

Se você estiver preenchendo dados onde estiver importando valores de recurso anteriores, desative a veiculação on-line do job de importação. A veiculação on-line serve apenas para exibir os valores de recurso mais recentes, sem o preenchimento. Desativar a veiculação on-line é útil porque você elimina qualquer carga nos nós de veiculação on-line e aumenta a capacidade de processamento do job de importação, o que pode diminuir o tempo de conclusão.

É possível desativar a veiculação on-line para jobs de importação usando a API ou as bibliotecas de cliente. Para mais informações, consulte o campo disableOnlineServing para o método importFeatureValue.

A seguir