Este documento mostra como criar um pipeline de extração, transformação e carregamento (ETL) do AlloyDB para PostgreSQL com o Dataflow. Google Cloud O Dataflow é um serviço totalmente gerido Google Cloud para desenvolver e executar pipelines de processamento de dados.
Pode usar as instruções no documento, que se baseiam no Vector Embedding Ingestion with Apache Beam and AlloyDB do Colab, que usa o Python para criar o pipeline de carregamento basic_ingestion_pipeline.py
. Alguns dos exemplos de utilização em que pode aplicar as informações neste documento são a pesquisa semântica ou a geração aumentada de recuperação (RAG).
Estas instruções descrevem os seguintes componentes do pipeline do Dataflow:
- Configurar uma ligação do AlloyDB e do Dataflow
- Gerar incorporações no AlloyDB for PostgreSQL com o controlador
VertexAITextEmbeddings
do Apache Beam e o modelo de incorporação de texto da Vertex AI - Criar um pipeline de streaming no Dataflow
Antes de começar
Antes de criar o pipeline do Dataflow com o Colab, conclua estes pré-requisitos:
- Configure o seu ambiente para criar um pipeline do Dataflow.
Ative o AlloyDB para PostgreSQL e outras APIs necessárias:
gcloud services enable alloydb.googleapis.com cloudresourcemanager.googleapis.com \ servicenetworking.googleapis.com
Crie um cluster e uma instância principal do AlloyDB para PostgreSQL.
Instale a extensão de vetores do AlloyDB para PostgreSQL na sua base de dados.
Conceda a função AlloyDB Admin (roles/alloydb.admin) à conta de utilizador do Dataflow.
Configure a instância do AlloyDB for PostgreSQL e os componentes do pipeline
Primeiro, configure o pipeline para se ligar a uma instância do AlloyDB para PostgreSQL. Esta configuração inclui a definição do Google Cloud ID do projeto, do URI da instância do AlloyDB for PostgreSQL, do utilizador e da palavra-passe para estabelecer ligação através do conetor de linguagem do AlloyDB. Para mais informações sobre a configuração da associação, consulte o artigo Configuração da base de dados.
Os módulos Apache Beam específicos da geração aumentada de obtenção (RAG) fornecem classes para as seguintes tarefas:
- Carregar dados do AlloyDB for PostgreSQL
- Gerar incorporações
- Escrever estas incorporações de vetores novamente no AlloyDB for PostgreSQL
Importe as classes necessárias para o código do pipeline antes de criar a lógica do pipeline. Para mais informações sobre os componentes do pipeline, consulte o artigo Importar componentes do pipeline.
Crie dados de amostra
O Colab Vector Embedding Ingestion with Apache Beam and AlloyDB fornece dados de products_data
amostra para executar o pipeline. O pipeline usa estes dados de exemplo como entrada, juntamente com o modelo de incorporação, para gerar incorporações.
Para mais informações, consulte o artigo Crie dados de amostra.
Crie uma tabela para armazenar incorporações
O pipeline armazena as incorporações geradas na tabela default_dataflow_product_embeddings
. Para mais informações sobre como criar o esquema da tabela, consulte o artigo Crie uma tabela com o esquema predefinido.
Opcional: prepare os dados para o carregamento de incorporações
Com base no seu conjunto de dados, pode dividir os dados em metadados e texto que o modelo de incorporação tem de converter em incorporações. As classes MLTransform()
e VectorDatabaseWriteTransform()
processam os dados de entrada num tamanho suportado pelo modelo de incorporação. Inclua os metadados e formate os dados de entrada de acordo com as especificações do modelo de incorporação que está a usar.
Para mais informações sobre a preparação de dados, consulte o artigo Mapeie os dados dos produtos para blocos.
Configure o controlador de incorporação para gerar incorporações
A classe VertexAITextEmbeddings()
define o modelo de incorporação de texto que cria incorporações de vetores. Este modelo de incorporação converte os dados divididos em blocos em incorporações.
Para mais informações, consulte o artigo Configure o controlador de incorporação.
Também pode usar um modelo pré-preparado criado com a framework SentenceTransformers da Huggingface para gerar incorporações de vetores. Para mais informações, consulte o artigo Gere incorporações com o HuggingFace.
Crie um pipeline de carregamento
O pipeline basic_ingestion_pipeline.py
, fornecido no Colab Vector Embedding Ingestion with Apache Beam and AlloyDB, incorpora as configurações das secções anteriores, incluindo a configuração do AlloyDB para PostgreSQL, o carregamento de dados para o AlloyDB para PostgreSQL, a divisão opcional de dados e a configuração do controlador de incorporação.
O pipeline de carregamento faz o seguinte:
- Cria tabelas de dados de produtos
- Converte dados em blocos
- Gera incorporações
- Escreve as incorporações convertidas na tabela
products_data
no AlloyDB para PostgreSQL
Pode executar este pipeline com um executor local direto ou um executor baseado na nuvem, como o Dataflow.
Para mais informações sobre como criar o pipeline de carregamento, consulte o artigo Guarde o nosso pipeline num ficheiro Python.
Execute o pipeline do Dataflow
Pode executar um pipeline do Dataflow a partir da linha de comandos. Transmita credenciais, como o ID do projeto, os detalhes da ligação do AlloyDB para PostgreSQL, a localização do contentor do Cloud Storage, os detalhes do ambiente de execução, as informações da rede e o nome do pipeline de carregamento (basic_ingestion_pipeline.py
).
No Colab Vector Embedding Ingestion with Apache Beam and AlloyDB, a instância do AlloyDB para PostgreSQL e as tarefas do Dataflow são executadas na mesma rede VPC e sub-rede.
Para mais informações sobre a execução de um pipeline no Dataflow, consulte o artigo Execute um pipeline no Dataflow.
Na Google Cloud consola, no painel de controlo do Dataflow, pode ver gráficos de execução, registos e métricas enquanto o pipeline é executado.
Opcional: execute o pipeline do Dataflow de streaming
Para dados que se espera que mudem com frequência, como pesquisas de semelhanças ou motores de recomendações, considere criar um pipeline de streaming com o Dataflow e o Pub/Sub.
Em vez de processar um lote de dados, este pipeline lê continuamente as mensagens recebidas de um tópico do Pub/Sub, converte as mensagens em blocos, gera incorporações através de um modelo especificado (como o Hugging Face ou o Vertex AI) e atualiza a tabela do AlloyDB for PostgreSQL.
Para mais informações, consulte o artigo Streaming de atualizações de incorporações a partir do Pub/Sub.
Valide as incorporações de vetores no AlloyDB para PostgreSQL
Após a execução do pipeline, verifique se o pipeline escreveu as incorporações na sua base de dados do AlloyDB para PostgreSQL.
Para mais informações, consulte o artigo Valide as incorporações escritas.
O que se segue?
- Saiba como realizar a importação de incorporações de vetores com o Apache Beam, o Dataflow e o AlloyDB para PostgreSQL.