Fluxos de alterações do Bigtable para o modelo de pesquisa de vetor

Este modelo cria um pipeline de streaming para transmitir registros de alteração de dados do Bigtable e gravá-los na Vertex AI Vector Search usando o Dataflow Runner V2.

Requisitos de pipeline

  • A instância de origem do Bigtable precisa existir.
  • A tabela de origem do Bigtable precisa existir e a tabela precisa ter os fluxos de alterações ativados.
  • O perfil do aplicativo Bigtable precisa existir.
  • O caminho do índice da pesquisa de vetores precisa existir.

Parâmetros do modelo

Parâmetro Descrição
embeddingColumn Nome da coluna totalmente qualificada em que os embeddings são armazenados. No formato cf:col.
embeddingByteSize O tamanho do byte de cada entrada na matriz de embeddings. Use 4 para ponto flutuante e 8 para duplo. O valor padrão é 4.
vectorSearchIndex O índice de pesquisa de vetor em que as alterações serão transmitidas, no formato "projects/{projectID}/locations/{region}/indexes/{indexID}". (sem espaços à esquerda ou à direita). Exemplo: projects/123/locations/us-east1/indexes/456.
bigtableChangeStreamAppProfile O perfil do aplicativo usado para distinguir cargas de trabalho no Bigtable.
bigtableReadInstanceId O ID da instância do Bigtable que contém a tabela.
bigtableReadTableId A tabela do Bigtable em que a leitura será feita.
bigtableMetadataTableTableId Opcional: ID da tabela de metadados que é criada. Se não for definido, o Bigtable vai gerar um ID.
crowdingTagColumn Opcional: o nome da coluna totalmente qualificado em que a tag de distanciamento está armazenada, no formato cf:col.
allowRestrictsMappings Opcional: os nomes de coluna totalmente qualificados, separados por vírgulas das colunas a serem usadas como allow restritos, com os respectivos aliases. Cada nome de coluna precisa estar no formato cf:col->alias.
denyRestrictsMappings Opcional: os nomes de coluna totalmente qualificados, separados por vírgulas das colunas a serem usadas como deny restritos, com os respectivos aliases. Cada nome de coluna precisa estar no formato cf:col->alias.
intNumericRestrictsMappings Opcional: os nomes de coluna totalmente qualificados, separados por vírgulas das colunas a serem usadas como numeric_restricts inteiros, com os respectivos aliases. Cada nome de coluna precisa estar no formato cf:col->alias.
floatNumericRestrictsMappings Opcional: os nomes de coluna totalmente qualificados e separados por vírgulas das colunas a serem usadas como pontos flutuantes (4 bytes) numeric_restricts, com os respectivos aliases. Cada nome de coluna precisa estar no formato cf:col->alias
doubleNumericRestrictsMappings Opcional: os nomes de colunas totalmente qualificados e separados por vírgulas das colunas a serem usadas como numeric_restricts duplos (8 bytes) com os respectivos aliases. Cada nome de coluna precisa estar no formato cf:col->alias
upsertMaxBatchSize Opcional: o número máximo de inserções a serem armazenadas em buffer antes de atualizar o lote no índice da Pesquisa de vetor. Os lotes são enviados quando há upsertBatchSize registros prontos. Exemplo: 10.
upsertMaxBufferDuration Opcional: o atraso máximo antes do envio de um lote de inserções para a Pesquisa de vetor. Os lotes são enviados quando há upsertBatchSize registros prontos. Formatos permitidos são: Ns para segundos (exemplo: 5s), Nm para minutos (exemplo: 12m) e Nh para horas (exemplo: 2h). Padrão: 10s.
deleteMaxBatchSize Opcional: o número máximo de exclusões no buffer antes de excluir o lote do índice da Pesquisa de vetor. Os lotes são enviados quando há deleteBatchSize registros prontos. Por exemplo, 10.
deleteMaxBufferDuration Opcional: o atraso máximo antes do envio de um lote de exclusões à Pesquisa de vetor. Os lotes são enviados quando há deleteBatchSize registros prontos. Formatos permitidos são: Ns para segundos (exemplo: 5s), Nm para minutos (exemplo: 12m) e Nh para horas (exemplo: 2h). Padrão: 10s.
dlqDirectory Opcional: o caminho do arquivo para armazenar todos os registros não processados com o motivo da falha de processamento. O padrão é um diretório no local temporário do job do Dataflow. O valor padrão é apropriado para a maioria dos cenários.
bigtableChangeStreamMetadataInstanceId Opcional: a instância do Bigtable a ser usada para a tabela de metadados do conector de fluxos de alteração. O padrão é vazio.
bigtableChangeStreamMetadataTableTableId Opcional: o ID da tabela de metadados do conector de fluxos de alteração do Bigtable a ser usado. Se não for informado, uma tabela de metadados do conector de streams de alteração do Bigtable será criada automaticamente durante o fluxo do pipeline. O padrão é vazio.
bigtableChangeStreamCharset Opcional: a alteração do Bigtable faz o stream do nome do conjunto de caracteres ao ler valores e qualificadores de coluna. O padrão é UTF-8.
bigtableChangeStreamStartTimestamp Opcional: a DateTime inicial, inclusive, a ser usada em fluxos de alterações de leitura (https://tools.ietf.org/html/rfc3339). Por exemplo, 2022-05-05T07:59:59Z. O padrão é o carimbo de data/hora quando o pipeline é iniciado.
bigtableChangeStreamIgnoreColumnFamilies Opcional: uma lista separada por vírgulas de mudanças nos nomes dos grupos de colunas que não serão capturadas. O padrão é vazio.
bigtableChangeStreamIgnoreColumns Opcional: uma lista separada por vírgulas de alterações de nomes de colunas que não serão capturadas. O padrão é vazio.
bigtableChangeStreamName Opcional: um nome exclusivo para o pipeline do cliente. Esse parâmetro permite que você retome o processamento do ponto em que um pipeline que estava em execução parou. O padrão é um nome gerado automaticamente. Consulte os registros do job do Dataflow para ver o valor usado.
bigtableChangeStreamResume

Opcional: quando definido como verdadeiro, um novo pipeline retoma o processamento a partir do ponto em que um pipeline em execução anteriormente com o mesmo valor de nome é interrompido. Se um pipeline com esse nome nunca foi executado no passado, o novo pipeline não será iniciado. Use o parâmetro bigtableChangeStreamName para especificar a linha do pipeline.

Quando ela é definida como falsa, um novo pipeline é iniciado. Se um pipeline com o mesmo nome de bigtableChangeStreamName já foi executado para determinada origem, o novo pipeline não é iniciado.

O padrão é "false".

bigtableReadProjectId Opcional: projeto em que os dados do Bigtable serão lidos. O padrão para esse parâmetro é o projeto em que o pipeline do Dataflow está sendo executado.

Executar o modelo

Console

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Bigtable Change Streams to Vector Search template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

CLI da gcloud

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       embeddingColumn=EMBEDDING_COLUMN,\
       embeddingByteSize=EMBEDDING_BYTE_SIZE,\
       vectorSearchIndex=VECTOR_SEARCH_INDEX,\
       bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\
       bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\
       bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • REGION_NAME: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • EMBEDDING_COLUMN: a coluna de embedding
  • EMBEDDING_BYTE_SIZE: o tamanho do byte da matriz de embeddings. Pode ser 4 ou 8.
  • VECTOR_SEARCH_INDEX: o caminho do índice de pesquisa de vetores
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: o ID do perfil do aplicativo Bigtable.
  • BIGTABLE_READ_INSTANCE_ID: o ID da instância do Bigtable de origem
  • BIGTABLE_READ_TABLE_ID: o ID da tabela de origem do Bigtable

API

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "embeddingColumn": "EMBEDDING_COLUMN",
       "embeddingByteSize": "EMBEDDING_BYTE_SIZE",
       "vectorSearchIndex": "VECTOR_SEARCH_INDEX",
       "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE",
       "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID",
       "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search",
     "environment": { "maxWorkers": "10" }
  }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • LOCATION: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • EMBEDDING_COLUMN: a coluna de embedding
  • EMBEDDING_BYTE_SIZE: o tamanho do byte da matriz de embeddings. Pode ser 4 ou 8.
  • VECTOR_SEARCH_INDEX: o caminho do índice de pesquisa de vetores
  • BIGTABLE_CHANGE_STREAM_APP_PROFILE: o ID do perfil do aplicativo Bigtable.
  • BIGTABLE_READ_INSTANCE_ID: o ID da instância do Bigtable de origem
  • BIGTABLE_READ_TABLE_ID: o ID da tabela de origem do Bigtable