O Apache Beam simplifica o fluxo de trabalho de enriquecimento de dados, fornecendo uma transformação de enriquecimento pronta a usar que pode adicionar ao seu pipeline. Esta página explica como usar a transformação de enriquecimento do Apache Beam para enriquecer os seus dados de streaming.
Quando enriquece os dados, aumenta os dados não processados de uma origem adicionando dados relacionados de uma segunda origem. Os dados adicionais podem provir de várias origens, como o Bigtable ou o BigQuery. A transformação de enriquecimento do Apache Beam usa uma pesquisa de chave/valor para associar os dados adicionais aos dados não processados.
Os exemplos seguintes apresentam alguns casos em que o enriquecimento de dados é útil:
- Quer criar um pipeline de comércio eletrónico que capte as atividades dos utilizadores de um Website ou uma app e forneça recomendações personalizadas. A transformação incorpora as atividades nos dados do pipeline para que possa fornecer as recomendações personalizadas.
- Tem dados do utilizador que quer associar a dados geográficos para fazer análise com base na geografia.
- Quer criar um pipeline que recolha dados de dispositivos da Internet das Coisas (IOT) que enviam eventos de telemetria.
Vantagens
A transformação de enriquecimento tem as seguintes vantagens:
- Transforma os seus dados sem que tenha de escrever código complexo nem gerir bibliotecas subjacentes.
- Fornece controladores de origem integrados.
- Use o controlador
BigTableEnrichmentHandler
para enriquecer os seus dados através de uma origem do Bigtable sem transmitir detalhes de configuração. - Use o controlador
BigQueryEnrichmentHandler
para enriquecer os seus dados através de uma origem do BigQuery sem transmitir detalhes de configuração. - Use o controlador
VertexAIFeatureStoreEnrichmentHandler
com Vertex AI Feature Store e serviço online do Bigtable.
- Use o controlador
- Usa a limitação do lado do cliente para gerir a limitação de velocidade dos pedidos. As solicitações são retiradas exponencialmente com uma estratégia de nova tentativa predefinida. Pode configurar a limitação de taxa para se adequar ao seu exemplo de utilização.
Apoio técnico e limitações
A transformação de enriquecimento tem os seguintes requisitos:
- Disponível para pipelines de streaming e em lote.
- O controlador
BigTableEnrichmentHandler
está disponível nas versões 2.54.0 e posteriores do SDK Python do Apache Beam. - O controlador
BigQueryEnrichmentHandler
está disponível nas versões 2.57.0 e posteriores do SDK Python do Apache Beam. - O controlador
VertexAIFeatureStoreEnrichmentHandler
está disponível nas versões 2.55.0 e posteriores do SDK Python do Apache Beam. - Quando usar as versões 2.55.0 e posteriores do SDK Python do Apache Beam, também tem de instalar o cliente Python para Redis.
- As tarefas do Dataflow têm de usar o Runner v2.
Use a transformação de enriquecimento
Para usar a transformação de enriquecimento, inclua o seguinte código no seu pipeline:
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler
bigtable_handler = BigTableEnrichmentHandler(...)
with beam.Pipeline() as p:
output = (p
...
| "Create" >> beam.Create(data)
| "Enrich with Bigtable" >> Enrichment(bigtable_handler)
...
)
Uma vez que a transformação de enriquecimento executa uma união cruzada por predefinição, crie a união personalizada para enriquecer os dados de entrada. Este design garante que a junção inclui apenas os campos especificados.
No exemplo seguinte, left
é o elemento de entrada da transformação de enriquecimento e right
são os dados obtidos de um serviço externo para esse elemento de entrada.
def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
enriched = {}
enriched['FIELD_NAME'] = left['FIELD_NAME']
...
return beam.Row(**enriched)
Parâmetros
Para usar a transformação de enriquecimento, o parâmetro EnrichmentHandler
é obrigatório.
Também pode usar um parâmetro de configuração para especificar uma função lambda
para uma função de junção, um limite de tempo, um limitador ou um repetidor (estratégia de repetição). Estão disponíveis os seguintes parâmetros de configuração:
join_fn
: uma funçãolambda
que recebe dicionários como entrada e devolve uma linha enriquecida (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]
). A linha enriquecida especifica como juntar os dados obtidos da API. A predefinição é uma união cruzada.timeout
: o número de segundos a aguardar que o pedido seja concluído pela API antes de atingir o limite de tempo. A predefinição são 30 segundos.throttler
: especifica o mecanismo de limitação. A única opção suportada é a limitação adaptativa do lado do cliente predefinida.repeater
: especifica a estratégia de repetição quando ocorrem erros comoTooManyRequests
eTimeoutException
. A predefinição éExponentialBackOffRepeater
.
O que se segue?
- Para mais exemplos, consulte a Transformação de enriquecimento no catálogo de transformações do Apache Beam.
- Use o Apache Beam e o Bigtable para enriquecer os dados.
- Use o Apache Beam e o BigQuery para enriquecer dados.
- Use o Apache Beam e o Vertex AI Feature Store para enriquecer os dados.