Este modelo cria um pipeline de streaming para transmitir registros de alteração de dados do Bigtable e gravá-los na Vertex AI Vector Search usando o Dataflow Runner V2.
Requisitos de pipeline
- A instância de origem do Bigtable precisa existir.
- A tabela de origem do Bigtable precisa existir e a tabela precisa ter os fluxos de alterações ativados.
- O perfil do aplicativo Bigtable precisa existir.
- O caminho do índice da pesquisa de vetores precisa existir.
Parâmetros do modelo
Parâmetro | Descrição |
---|---|
embeddingColumn |
Nome da coluna totalmente qualificada em que os embeddings são armazenados. No formato cf:col. |
embeddingByteSize |
O tamanho do byte de cada entrada na matriz de embeddings. Use 4 para ponto flutuante e 8 para duplo. O valor padrão é 4 . |
vectorSearchIndex |
O índice de pesquisa de vetor em que as alterações serão transmitidas, no formato
"projects/{projectID}/locations/{region}/indexes/{indexID}". (sem espaços à esquerda ou à direita). Exemplo:
projects/123/locations/us-east1/indexes/456 . |
bigtableChangeStreamAppProfile |
O perfil do aplicativo usado para distinguir cargas de trabalho no Bigtable. |
bigtableReadInstanceId |
O ID da instância do Bigtable que contém a tabela. |
bigtableReadTableId |
A tabela do Bigtable em que a leitura será feita. |
bigtableMetadataTableTableId |
Opcional: ID da tabela de metadados que é criada. Se não for definido, o Bigtable vai gerar um ID. |
crowdingTagColumn |
Opcional: o nome da coluna totalmente qualificado em que a tag de distanciamento está armazenada, no formato cf:col . |
allowRestrictsMappings |
Opcional: os nomes de coluna totalmente qualificados, separados por vírgulas das colunas a serem usadas como
allow restritos, com os respectivos aliases. Cada nome de coluna precisa estar no formato cf:col->alias . |
denyRestrictsMappings |
Opcional: os nomes de coluna totalmente qualificados, separados por vírgulas das colunas a serem usadas como
deny restritos, com os respectivos aliases. Cada nome de coluna precisa estar no formato cf:col->alias . |
intNumericRestrictsMappings |
Opcional: os nomes de coluna totalmente qualificados, separados por vírgulas das colunas a serem usadas como
numeric_restricts inteiros, com os respectivos aliases. Cada nome de coluna precisa estar no formato cf:col->alias . |
floatNumericRestrictsMappings |
Opcional: os nomes de coluna totalmente qualificados e separados por vírgulas das colunas a serem usadas como pontos flutuantes (4 bytes)
numeric_restricts , com os respectivos aliases. Cada nome de coluna precisa estar no formato cf:col->alias |
doubleNumericRestrictsMappings |
Opcional: os nomes de colunas totalmente qualificados e separados por vírgulas das colunas a serem usadas como numeric_restricts duplos (8 bytes)
com os respectivos aliases. Cada nome de coluna precisa estar no formato cf:col->alias |
upsertMaxBatchSize |
Opcional: o número máximo de inserções a serem armazenadas em buffer antes de atualizar o lote no índice da Pesquisa de vetor. Os lotes são enviados quando
há upsertBatchSize registros prontos.
Exemplo: 10 . |
upsertMaxBufferDuration |
Opcional: o atraso máximo antes do envio de um lote de inserções para a Pesquisa de vetor. Os lotes são enviados quando há
upsertBatchSize registros prontos. Formatos permitidos
são: Ns para segundos (exemplo: 5s), Nm para
minutos (exemplo: 12m) e Nh para horas (exemplo:
2h). Padrão: 10s . |
deleteMaxBatchSize |
Opcional: o número máximo de exclusões no buffer antes de
excluir o lote do índice da Pesquisa de vetor.
Os lotes são enviados quando há
deleteBatchSize registros prontos.
Por exemplo, 10 . |
deleteMaxBufferDuration |
Opcional: o atraso máximo antes do envio de um lote de exclusões
à Pesquisa de vetor. Os lotes são enviados quando
há deleteBatchSize registros prontos. Formatos permitidos
são: Ns para segundos (exemplo: 5s), Nm para
minutos (exemplo: 12m) e Nh para horas (exemplo:
2h). Padrão: 10s . |
dlqDirectory |
Opcional: o caminho do arquivo para armazenar todos os registros não processados com o motivo da falha de processamento. O padrão é um diretório no local temporário do job do Dataflow. O valor padrão é apropriado para a maioria dos cenários. |
bigtableChangeStreamMetadataInstanceId |
Opcional: a instância do Bigtable a ser usada para a tabela de metadados do conector de fluxos de alteração. O padrão é vazio. |
bigtableChangeStreamMetadataTableTableId |
Opcional: o ID da tabela de metadados do conector de fluxos de alteração do Bigtable a ser usado. Se não for informado, uma tabela de metadados do conector de streams de alteração do Bigtable será criada automaticamente durante o fluxo do pipeline. O padrão é vazio. |
bigtableChangeStreamCharset |
Opcional: a alteração do Bigtable faz o stream do nome do conjunto de caracteres ao ler valores e qualificadores de coluna. O padrão é UTF-8. |
bigtableChangeStreamStartTimestamp |
Opcional: a DateTime inicial, inclusive, a ser usada em fluxos de alterações de leitura (https://tools.ietf.org/html/rfc3339). Por exemplo, 2022-05-05T07:59:59Z. O padrão é o carimbo de data/hora quando o pipeline é iniciado. |
bigtableChangeStreamIgnoreColumnFamilies |
Opcional: uma lista separada por vírgulas de mudanças nos nomes dos grupos de colunas que não serão capturadas. O padrão é vazio. |
bigtableChangeStreamIgnoreColumns |
Opcional: uma lista separada por vírgulas de alterações de nomes de colunas que não serão capturadas. O padrão é vazio. |
bigtableChangeStreamName |
Opcional: um nome exclusivo para o pipeline do cliente. Esse parâmetro permite que você retome o processamento do ponto em que um pipeline que estava em execução parou. O padrão é um nome gerado automaticamente. Consulte os registros do job do Dataflow para ver o valor usado. |
bigtableChangeStreamResume |
Opcional: quando definido como verdadeiro, um novo pipeline retoma o processamento a partir do ponto em que um pipeline em execução anteriormente com o mesmo valor de nome é interrompido. Se um pipeline com esse
nome nunca foi executado no passado, o novo pipeline não será iniciado.
Use o parâmetro Quando ela é definida como falsa, um novo pipeline é iniciado. Se um pipeline
com o mesmo nome de O padrão é "false". |
bigtableReadProjectId |
Opcional: projeto em que os dados do Bigtable serão lidos. O padrão para esse parâmetro é o projeto em que o pipeline do Dataflow está sendo executado. |
Executar o modelo
Console
- Acesse a página Criar job usando um modelo do Dataflow. Acesse Criar job usando um modelo
- No campo Nome do job, insira um nome exclusivo.
- Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é
us-central1
.Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.
- No menu suspenso Modelo do Dataflow, selecione the Bigtable Change Streams to Vector Search template.
- Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
- Cliquem em Executar job.
CLI da gcloud
No shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \ --project=PROJECT_ID \ --region=REGION_NAME \ --parameters \ embeddingColumn=EMBEDDING_COLUMN,\ embeddingByteSize=EMBEDDING_BYTE_SIZE,\ vectorSearchIndex=VECTOR_SEARCH_INDEX,\ bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\ bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\
Substitua:
JOB_NAME
: um nome de job de sua escolhaVERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
REGION_NAME
: a região em que você quer implantar o job do Dataflow, por exemplo,us-central1
EMBEDDING_COLUMN
: a coluna de embeddingEMBEDDING_BYTE_SIZE
: o tamanho do byte da matriz de embeddings. Pode ser 4 ou 8.VECTOR_SEARCH_INDEX
: o caminho do índice de pesquisa de vetoresBIGTABLE_CHANGE_STREAM_APP_PROFILE
: o ID do perfil do aplicativo Bigtable.BIGTABLE_READ_INSTANCE_ID
: o ID da instância do Bigtable de origemBIGTABLE_READ_TABLE_ID
: o ID da tabela de origem do Bigtable
API
Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a
API e os respectivos escopos de autorização, consulte
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launchParameter": { "jobName": "JOB_NAME", "parameters": { "embeddingColumn": "EMBEDDING_COLUMN", "embeddingByteSize": "EMBEDDING_BYTE_SIZE", "vectorSearchIndex": "VECTOR_SEARCH_INDEX", "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE", "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID", }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search", "environment": { "maxWorkers": "10" } } }
Substitua:
PROJECT_ID
: o ID do projeto do Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaVERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
LOCATION
: a região em que você quer implantar o job do Dataflow, por exemplo,us-central1
EMBEDDING_COLUMN
: a coluna de embeddingEMBEDDING_BYTE_SIZE
: o tamanho do byte da matriz de embeddings. Pode ser 4 ou 8.VECTOR_SEARCH_INDEX
: o caminho do índice de pesquisa de vetoresBIGTABLE_CHANGE_STREAM_APP_PROFILE
: o ID do perfil do aplicativo Bigtable.BIGTABLE_READ_INSTANCE_ID
: o ID da instância do Bigtable de origemBIGTABLE_READ_TABLE_ID
: o ID da tabela de origem do Bigtable