Aprimorar dados de streaming

O Apache Beam simplifica o fluxo de trabalho de enriquecimento de dados fornecendo uma transformação de aprimoramento pronta para uso que pode ser adicionada ao pipeline. Nesta página, explicamos como usar a transformação do Apache Beam para aprimorar dados de streaming.

Ao aprimorar dados, você aumenta a quantidade de dados brutos de uma fonte, adicionando dados relacionados de uma segunda fonte. Os dados adicionais podem vir de várias fontes, como o Bigtable ou o BigQuery. A transformação de aprimoramento do Apache Beam usa uma pesquisa de chave-valor para conectar os dados adicionais aos dados brutos.

Os exemplos a seguir mostram alguns casos em que o aprimoramento de dados é útil:

  • Você quer criar um pipeline de e-commerce que capture as atividades do usuário em um site ou app e forneça recomendações personalizadas. A transformação incorpora as atividades nos dados do pipeline para que você possa fornecer as recomendações personalizadas.
  • Você tem dados do usuários que gostaria de combinar com dados geográficos para fazer análises com base na região geográfica.
  • Você quer criar um pipeline que reúna dados de dispositivos da Internet das Coisas (IOT, na sigla em inglês) que enviam eventos de telemetria.

Benefícios

A transformação de aprimoramento oferece os seguintes benefícios:

  • Transforma seus dados sem exigir que você escreva códigos complexos ou gerencie bibliotecas subjacentes.
  • Fornece gerenciadores de origem integrados.
  • Usa a limitação do lado do cliente para gerenciar a limitação de taxa das solicitações. As solicitações são desativadas exponencialmente com uma estratégia de repetição padrão. É possível configurar a limitação de taxa de acordo com seu caso de uso.

Suporte e limitações

A transformação de aprimoramento tem os seguintes requisitos:

  • Disponível para pipelines de lote e streaming.
  • O gerenciador BigTableEnrichmentHandler está disponível nas versões 2.54.0 e posteriores do SDK do Apache Beam para Python.
  • O gerenciador VertexAIFeatureStoreEnrichmentHandler está disponível nas versões 2.55.0 e posteriores do SDK do Apache Beam para Python.
  • Ao usar a versão 2.55.0 e posteriores do SDK do Apache Beam para Python, também é necessário instalar o cliente Python para Redis.
  • Os jobs do Dataflow precisam usar o Runner v2.

Usar a transformação de aprimoramento

Para usar a transformação de aprimoramento, inclua o seguinte código no pipeline:

import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler

bigtable_handler = BigTableEnrichmentHandler(...)

with beam.Pipeline() as p:
  output = (p
            ...
            | "Create" >> beam.Create(data)
            | "Enrich with Bigtable" >> Enrichment(bigtable_handler)
            ...
            )

Como a transformação de aprimoramento executa uma correlação por padrão, projete a junção personalizada para aprimorar os dados de entrada. Com isso, a junção incluirá apenas os campos especificados.

No exemplo abaixo, left é o elemento de entrada da transformação de aprimoramento, e right corresponde aos dados buscados em um serviço externo para esse elemento de entrada.

def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
  enriched = {}
  enriched['FIELD_NAME'] = left['FIELD_NAME']
  ...
  return beam.Row(**enriched)

Parâmetros

Para usar a transformação de aprimoramento, o parâmetro EnrichmentHandler é obrigatório.

Também é possível usar um parâmetro de configuração para especificar uma função lambda para uma função de junção, um tempo limite, um limitador ou um repetidor (estratégia de nova tentativa). Os seguintes parâmetros de configuração estão disponíveis:

  • join_fn: uma função lambda que usa dicionários como entrada e retorna uma linha aprimorada (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]). A linha aprimorada especifica como mesclar os dados encontrados pela API. O padrão é uma correlação.
  • timeout: quantos segundos se passaram até a solicitação ser concluída pela API antes de expirar. O padrão é de 30 segundos.
  • throttler: especifica o mecanismo de limitação. A única opção aceita é a limitação adaptável padrão do lado do cliente.
  • repeater: especifica a estratégia de repetição quando erros como TooManyRequests e TimeoutException ocorrem. O padrão é ExponentialBackOffRepeater.

A seguir