Este modelo cria um pipeline de streaming para transmitir registros de alteração de dados do Bigtable e gravá-los na Vertex AI Vector Search usando o Dataflow Runner V2.
Requisitos de pipeline
- A instância de origem do Bigtable precisa existir.
- A tabela de origem do Bigtable precisa existir e a tabela precisa ter os fluxos de alterações ativados.
- O perfil do aplicativo Bigtable precisa existir.
- O caminho do índice da pesquisa de vetores precisa existir.
Parâmetros do modelo
Parâmetros obrigatórios
- embeddingColumn: nome da coluna totalmente qualificada em que os embeddings são armazenados. No formato cf:col.
- embeddingByteSize: o tamanho do byte de cada entrada na matriz de embeddings. Use 4 para ponto flutuante e 8 para duplo. O padrão é 4.
- vectorSearchIndex: o índice de pesquisa de vetor em que as mudanças serão transmitidas, no formato "projects/{projectID}/locations/{region}/indexes/{indexID}" (sem espaços à esquerda ou à direita). Por exemplo,
projects/123/locations/us-east1/indexes/456
. - bigtableChangeStreamAppProfile: o ID do perfil do aplicativo do Bigtable. O perfil do aplicativo precisa usar roteamento de cluster único e permitir transações de linha única.
- bigtableReadInstanceId: o ID da instância de origem do Bigtable.
- bigtableReadTableId: o ID da tabela de origem do Bigtable.
Parâmetros opcionais
- bigtableMetadataTableTableId: ID da tabela usado para criar a tabela de metadados.
- crowdingTagColumn: o nome da coluna totalmente qualificado em que a tag de distanciamento está armazenada. No formato cf:col.
- allowRestrictsMappings: os nomes de coluna totalmente qualificados, separados por vírgulas, das colunas que devem ser usadas como restrições
allow
, com o alias. No formato cf:col->alias. - denyRestrictsMappings: os nomes de coluna totalmente qualificados, separados por vírgulas das colunas a serem usadas como restrições
deny
, com os respectivos aliases. No formato cf:col->alias. - intNumericRestrictsMappings: os nomes de coluna totalmente qualificados e separados por vírgulas das colunas a serem usadas como
numeric_restricts
inteiros, com os respectivos aliases. No formato cf:col->alias. - floatNumericRestrictsMappings: os nomes de coluna totalmente qualificados e separados por vírgulas das colunas que devem ser usadas como
numeric_restricts
de ponto flutuante (4 bytes), com os respectivos aliases. No formato cf:col->alias. - doubleNumericRestrictsMappings: os nomes de coluna totalmente qualificados e separados por vírgulas das colunas a serem usadas como
numeric_restricts
duplos (8 bytes), com os respectivos aliases. No formato cf:col->alias. - upsertMaxBatchSize: o número máximo de inserções a serem armazenadas em buffer antes de atualizar o lote no índice da Pesquisa de vetor. Os lotes serão enviados quando houver registros upsertBatchSize prontos ou quando qualquer registro estiver aguardando o tempo upsertBatchDelay. Por exemplo,
10
. O padrão é 10. - upsertMaxBufferDuration: o atraso máximo antes do envio de um lote de inserções para a Pesquisa de vetor.Os lotes serão enviados quando houver registros upsertBatchSize prontos ou quando qualquer registro estiver aguardando o tempo upsertBatchDelay. Os formatos permitidos são: Ns (para segundos, exemplo: "5s"), Nm (para minutos, exemplo: "12m") e Nh (para horas, exemplo: "2h"). Por exemplo,
10s
. O padrão é 10 segundos. - deleteMaxBatchSize: o número máximo de exclusões no buffer antes de excluir o lote do índice da Pesquisa de vetor. Os lotes serão enviados quando houver registros deleteBatchSize prontos ou quando qualquer registro tiver aguardado o tempo deleteBatchDelay. Por exemplo,
10
. O padrão é 10. - deleteMaxBufferDuration: o atraso máximo antes do envio de um lote de exclusões à Pesquisa de vetor.Os lotes serão enviados quando houver registros deleteBatchSize prontos ou quando qualquer registro tiver aguardado o tempo deleteBatchDelay. Os formatos permitidos são: Ns (para segundos, exemplo: "5s"), Nm (para minutos, exemplo: "12m") e Nh (para horas, exemplo: "2h"). Por exemplo,
10s
. O padrão é 10 segundos. - dlqDirectory: o caminho para armazenar todos os registros não processados com o motivo da falha de processamento. O padrão é um diretório no local temporário do job do Dataflow. O valor padrão é suficiente na maioria das condições.
- bigtableChangeStreamMetadataInstanceId: a alteração do Bigtable faz streaming do ID da instância de metadados. O padrão é vazio.
- bigtableChangeStreamMetadataTableTableId: o ID da tabela de metadados do conector de fluxo de alterações do Bigtable. Se não for fornecida, uma tabela de metadados do conector de streams de alterações do Bigtable será criada automaticamente durante a execução do pipeline. O padrão é vazio.
- bigtableChangeStreamCharset: o nome do conjunto de caracteres fluxo de alterações do Bigtable. O padrão é UTF-8.
- bigtableChangeStreamStartTimestamp: o carimbo de data/hora inicial (https://tools.ietf.org/html/rfc3339, link em inglês), inclusive, para uso na leitura de fluxo de alterações. Por exemplo,
2022-05-05T07:59:59Z
. O padrão é o carimbo de data/hora do horário de início do pipeline. - bigtableChangeStreamIgnoreColumnFamilies: uma lista separada por vírgulas com o nome do grupo de colunas é ignorado. O padrão é vazio.
- bigtableChangeStreamIgnoreColumns: uma lista separada por vírgulas do nome da coluna é ignorada. Exemplo: "cf1:col1,cf2:col2". O padrão é vazio.
- bigtableChangeStreamName: um nome exclusivo para o pipeline do cliente. Permite retomar o processamento a partir do ponto em que um pipeline anteriormente em execução foi interrompido. O padrão é um nome gerado automaticamente. Consulte os registros do job do Dataflow para ver o valor usado.
- bigtableChangeStreamResume: quando definido como
true
, um novo pipeline retoma o processamento a partir do ponto em que um pipeline anteriormente em execução com o mesmo valor debigtableChangeStreamName
foi interrompido. Se o pipeline com o valorbigtableChangeStreamName
fornecido nunca tiver sido executado, um novo pipeline não será iniciado. Quando definido comofalse
, um novo pipeline é iniciado. Se um pipeline com o mesmo valorbigtableChangeStreamName
já tiver sido executado para a origem especificada, um novo pipeline não será iniciado. O padrão éfalse
. - bigtableReadChangeStreamTimeoutMs: o tempo limite para solicitações ReadChangeStream do Bigtable em milissegundos.
- bigtableReadProjectId: o ID do projeto do Bigtable. O padrão é o projeto do job do Dataflow.
Executar o modelo
Console
- Acesse a página Criar job usando um modelo do Dataflow. Acesse Criar job usando um modelo
- No campo Nome do job, insira um nome exclusivo.
- Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é
us-central1
.Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.
- No menu suspenso Modelo do Dataflow, selecione the Bigtable Change Streams to Vector Search template.
- Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
- Cliquem em Executar job.
CLI da gcloud
No shell ou no terminal, execute o modelo:
gcloud dataflow flex-template run JOB_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search \ --project=PROJECT_ID \ --region=REGION_NAME \ --parameters \ embeddingColumn=EMBEDDING_COLUMN,\ embeddingByteSize=EMBEDDING_BYTE_SIZE,\ vectorSearchIndex=VECTOR_SEARCH_INDEX,\ bigtableChangeStreamAppProfile=BIGTABLE_CHANGE_STREAM_APP_PROFILE,\ bigtableReadInstanceId=BIGTABLE_READ_INSTANCE_ID,\ bigtableReadTableId=BIGTABLE_READ_TABLE_ID,\
Substitua:
JOB_NAME
: um nome de job de sua escolhaVERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
REGION_NAME
: a região em que você quer implantar o job do Dataflow, por exemplo,us-central1
EMBEDDING_COLUMN
: a coluna de embeddingEMBEDDING_BYTE_SIZE
: o tamanho do byte da matriz de embeddings. Pode ser 4 ou 8.VECTOR_SEARCH_INDEX
: o caminho do índice de pesquisa de vetoresBIGTABLE_CHANGE_STREAM_APP_PROFILE
: o ID do perfil do aplicativo Bigtable.BIGTABLE_READ_INSTANCE_ID
: o ID da instância do Bigtable de origemBIGTABLE_READ_TABLE_ID
: o ID da tabela de origem do Bigtable
API
Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a
API e os respectivos escopos de autorização, consulte
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launchParameter": { "jobName": "JOB_NAME", "parameters": { "embeddingColumn": "EMBEDDING_COLUMN", "embeddingByteSize": "EMBEDDING_BYTE_SIZE", "vectorSearchIndex": "VECTOR_SEARCH_INDEX", "bigtableChangeStreamAppProfile": "BIGTABLE_CHANGE_STREAM_APP_PROFILE", "bigtableReadInstanceId": "BIGTABLE_READ_INSTANCE_ID", "bigtableReadTableId": "BIGTABLE_READ_TABLE_ID", }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Bigtable_Change_Streams_to_Vector_Search", "environment": { "maxWorkers": "10" } } }
Substitua:
PROJECT_ID
: o ID do projeto Google Cloud em que você quer executar o job do DataflowJOB_NAME
: um nome de job de sua escolhaVERSION
: a versão do modelo que você quer usarUse estes valores:
latest
para usar a versão mais recente do modelo, disponível na pasta mãe não datada no bucket: gs://dataflow-templates-REGION_NAME/latest/- o nome da versão, como
2023-09-12-00_RC00
, para usar uma versão específica do modelo, que pode ser encontrada aninhada na respectiva pasta mãe datada no bucket: gs://dataflow-templates-REGION_NAME/
LOCATION
: a região em que você quer implantar o job do Dataflow, por exemplo,us-central1
EMBEDDING_COLUMN
: a coluna de embeddingEMBEDDING_BYTE_SIZE
: o tamanho do byte da matriz de embeddings. Pode ser 4 ou 8.VECTOR_SEARCH_INDEX
: o caminho do índice de pesquisa de vetoresBIGTABLE_CHANGE_STREAM_APP_PROFILE
: o ID do perfil do aplicativo Bigtable.BIGTABLE_READ_INSTANCE_ID
: o ID da instância do Bigtable de origemBIGTABLE_READ_TABLE_ID
: o ID da tabela de origem do Bigtable