Enriqueça os dados de streaming

O Apache Beam simplifica o fluxo de trabalho de enriquecimento de dados, fornecendo uma transformação de enriquecimento pronta a usar que pode adicionar ao seu pipeline. Esta página explica como usar a transformação de enriquecimento do Apache Beam para enriquecer os seus dados de streaming.

Quando enriquece os dados, aumenta os dados não processados de uma origem adicionando dados relacionados de uma segunda origem. Os dados adicionais podem provir de várias origens, como o Bigtable ou o BigQuery. A transformação de enriquecimento do Apache Beam usa uma pesquisa de chave/valor para associar os dados adicionais aos dados não processados.

Os exemplos seguintes apresentam alguns casos em que o enriquecimento de dados é útil:

  • Quer criar um pipeline de comércio eletrónico que capte as atividades dos utilizadores de um Website ou uma app e forneça recomendações personalizadas. A transformação incorpora as atividades nos dados do pipeline para que possa fornecer as recomendações personalizadas.
  • Tem dados do utilizador que quer associar a dados geográficos para fazer análise com base na geografia.
  • Quer criar um pipeline que recolha dados de dispositivos da Internet das Coisas (IOT) que enviam eventos de telemetria.

Vantagens

A transformação de enriquecimento tem as seguintes vantagens:

  • Transforma os seus dados sem que tenha de escrever código complexo nem gerir bibliotecas subjacentes.
  • Fornece controladores de origem integrados.
  • Usa a limitação do lado do cliente para gerir a limitação de velocidade dos pedidos. As solicitações são retiradas exponencialmente com uma estratégia de nova tentativa predefinida. Pode configurar a limitação de taxa para se adequar ao seu exemplo de utilização.

Apoio técnico e limitações

A transformação de enriquecimento tem os seguintes requisitos:

  • Disponível para pipelines de streaming e em lote.
  • O controlador BigTableEnrichmentHandler está disponível nas versões 2.54.0 e posteriores do SDK Python do Apache Beam.
  • O controlador BigQueryEnrichmentHandler está disponível nas versões 2.57.0 e posteriores do SDK Python do Apache Beam.
  • O controlador VertexAIFeatureStoreEnrichmentHandler está disponível nas versões 2.55.0 e posteriores do SDK Python do Apache Beam.
  • Quando usar as versões 2.55.0 e posteriores do SDK Python do Apache Beam, também tem de instalar o cliente Python para Redis.
  • As tarefas do Dataflow têm de usar o Runner v2.

Use a transformação de enriquecimento

Para usar a transformação de enriquecimento, inclua o seguinte código no seu pipeline:

import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler

bigtable_handler = BigTableEnrichmentHandler(...)

with beam.Pipeline() as p:
  output = (p
            ...
            | "Create" >> beam.Create(data)
            | "Enrich with Bigtable" >> Enrichment(bigtable_handler)
            ...
            )

Uma vez que a transformação de enriquecimento executa uma união cruzada por predefinição, crie a união personalizada para enriquecer os dados de entrada. Este design garante que a junção inclui apenas os campos especificados.

No exemplo seguinte, left é o elemento de entrada da transformação de enriquecimento e right são os dados obtidos de um serviço externo para esse elemento de entrada.

def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
  enriched = {}
  enriched['FIELD_NAME'] = left['FIELD_NAME']
  ...
  return beam.Row(**enriched)

Parâmetros

Para usar a transformação de enriquecimento, o parâmetro EnrichmentHandler é obrigatório.

Também pode usar um parâmetro de configuração para especificar uma função lambda para uma função de junção, um limite de tempo, um limitador ou um repetidor (estratégia de repetição). Estão disponíveis os seguintes parâmetros de configuração:

  • join_fn: uma função lambda que recebe dicionários como entrada e devolve uma linha enriquecida (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]). A linha enriquecida especifica como juntar os dados obtidos da API. A predefinição é uma união cruzada.
  • timeout: o número de segundos a aguardar que o pedido seja concluído pela API antes de atingir o limite de tempo. A predefinição são 30 segundos.
  • throttler: especifica o mecanismo de limitação. A única opção suportada é a limitação adaptativa do lado do cliente predefinida.
  • repeater: especifica a estratégia de repetição quando ocorrem erros como TooManyRequests e TimeoutException. A predefinição é ExponentialBackOffRepeater.

O que se segue?