Crie um pipeline de incorporação de vetores em tempo real para o AlloyDB com o Dataflow

Este documento mostra como criar um pipeline de extração, transformação e carregamento (ETL) do AlloyDB para PostgreSQL com o Dataflow. Google Cloud O Dataflow é um serviço totalmente gerido Google Cloud para desenvolver e executar pipelines de processamento de dados.

Pode usar as instruções no documento, que se baseiam no Vector Embedding Ingestion with Apache Beam and AlloyDB do Colab, que usa o Python para criar o pipeline de carregamento basic_ingestion_pipeline.py. Alguns dos exemplos de utilização em que pode aplicar as informações neste documento são a pesquisa semântica ou a geração aumentada de recuperação (RAG).

Estas instruções descrevem os seguintes componentes do pipeline do Dataflow:

  • Configurar uma ligação do AlloyDB e do Dataflow
  • Gerar incorporações no AlloyDB for PostgreSQL com o controlador VertexAITextEmbeddings do Apache Beam e o modelo de incorporação de texto da Vertex AI
  • Criar um pipeline de streaming no Dataflow

Antes de começar

Antes de criar o pipeline do Dataflow com o Colab, conclua estes pré-requisitos:

Configure a instância do AlloyDB for PostgreSQL e os componentes do pipeline

Primeiro, configure o pipeline para se ligar a uma instância do AlloyDB para PostgreSQL. Esta configuração inclui a definição do Google Cloud ID do projeto, do URI da instância do AlloyDB for PostgreSQL, do utilizador e da palavra-passe para estabelecer ligação através do conetor de linguagem do AlloyDB. Para mais informações sobre a configuração da associação, consulte o artigo Configuração da base de dados.

Os módulos Apache Beam específicos da geração aumentada de obtenção (RAG) fornecem classes para as seguintes tarefas:

  • Carregar dados do AlloyDB for PostgreSQL
  • Gerar incorporações
  • Escrever estas incorporações de vetores novamente no AlloyDB for PostgreSQL

Importe as classes necessárias para o código do pipeline antes de criar a lógica do pipeline. Para mais informações sobre os componentes do pipeline, consulte o artigo Importar componentes do pipeline.

Crie dados de amostra

O Colab Vector Embedding Ingestion with Apache Beam and AlloyDB fornece dados de products_data amostra para executar o pipeline. O pipeline usa estes dados de exemplo como entrada, juntamente com o modelo de incorporação, para gerar incorporações.

Para mais informações, consulte o artigo Crie dados de amostra.

Crie uma tabela para armazenar incorporações

O pipeline armazena as incorporações geradas na tabela default_dataflow_product_embeddings. Para mais informações sobre como criar o esquema da tabela, consulte o artigo Crie uma tabela com o esquema predefinido.

Opcional: prepare os dados para o carregamento de incorporações

Com base no seu conjunto de dados, pode dividir os dados em metadados e texto que o modelo de incorporação tem de converter em incorporações. As classes MLTransform() e VectorDatabaseWriteTransform() processam os dados de entrada num tamanho suportado pelo modelo de incorporação. Inclua os metadados e formate os dados de entrada de acordo com as especificações do modelo de incorporação que está a usar.

Para mais informações sobre a preparação de dados, consulte o artigo Mapeie os dados dos produtos para blocos.

Configure o controlador de incorporação para gerar incorporações

A classe VertexAITextEmbeddings() define o modelo de incorporação de texto que cria incorporações de vetores. Este modelo de incorporação converte os dados divididos em blocos em incorporações.

Para mais informações, consulte o artigo Configure o controlador de incorporação.

Também pode usar um modelo pré-preparado criado com a framework SentenceTransformers da Huggingface para gerar incorporações de vetores. Para mais informações, consulte o artigo Gere incorporações com o HuggingFace.

Crie um pipeline de carregamento

O pipeline basic_ingestion_pipeline.py, fornecido no Colab Vector Embedding Ingestion with Apache Beam and AlloyDB, incorpora as configurações das secções anteriores, incluindo a configuração do AlloyDB para PostgreSQL, o carregamento de dados para o AlloyDB para PostgreSQL, a divisão opcional de dados e a configuração do controlador de incorporação.

O pipeline de carregamento faz o seguinte:

  • Cria tabelas de dados de produtos
  • Converte dados em blocos
  • Gera incorporações
  • Escreve as incorporações convertidas na tabela products_data no AlloyDB para PostgreSQL

Pode executar este pipeline com um executor local direto ou um executor baseado na nuvem, como o Dataflow.

Para mais informações sobre como criar o pipeline de carregamento, consulte o artigo Guarde o nosso pipeline num ficheiro Python.

Execute o pipeline do Dataflow

Pode executar um pipeline do Dataflow a partir da linha de comandos. Transmita credenciais, como o ID do projeto, os detalhes da ligação do AlloyDB para PostgreSQL, a localização do contentor do Cloud Storage, os detalhes do ambiente de execução, as informações da rede e o nome do pipeline de carregamento (basic_ingestion_pipeline.py).

No Colab Vector Embedding Ingestion with Apache Beam and AlloyDB, a instância do AlloyDB para PostgreSQL e as tarefas do Dataflow são executadas na mesma rede VPC e sub-rede.

Para mais informações sobre a execução de um pipeline no Dataflow, consulte o artigo Execute um pipeline no Dataflow.

Na Google Cloud consola, no painel de controlo do Dataflow, pode ver gráficos de execução, registos e métricas enquanto o pipeline é executado.

Opcional: execute o pipeline do Dataflow de streaming

Para dados que se espera que mudem com frequência, como pesquisas de semelhanças ou motores de recomendações, considere criar um pipeline de streaming com o Dataflow e o Pub/Sub.

Em vez de processar um lote de dados, este pipeline lê continuamente as mensagens recebidas de um tópico do Pub/Sub, converte as mensagens em blocos, gera incorporações através de um modelo especificado (como o Hugging Face ou o Vertex AI) e atualiza a tabela do AlloyDB for PostgreSQL.

Para mais informações, consulte o artigo Streaming de atualizações de incorporações a partir do Pub/Sub.

Valide as incorporações de vetores no AlloyDB para PostgreSQL

Após a execução do pipeline, verifique se o pipeline escreveu as incorporações na sua base de dados do AlloyDB para PostgreSQL.

Para mais informações, consulte o artigo Valide as incorporações escritas.

O que se segue?