Enriquece los datos de transmisión

Apache Beam simplifica el flujo de trabajo de enriquecimiento de datos, ya que proporciona una transformación de enriquecimiento lista para usar que puedes agregar a tu canalización. En esta página, se explica cómo usar la transformación de enriquecimiento de Apache Beam para enriquecer los datos de transmisión.

Cuando enriqueces los datos, aumentas los datos sin procesar de una fuente agregando datos relacionados de una segunda fuente. Los datos adicionales pueden provenir de una variedad de fuentes, como Bigtable o BigQuery. La transformación de enriquecimiento de Apache Beam usa una búsqueda de par clave-valor para conectar los datos adicionales con los datos sin procesar.

En los siguientes ejemplos, se proporcionan algunos casos en los que el enriquecimiento de datos es útil:

  • Cuando quieres crear una canalización de comercio electrónico que capture las actividades de los usuarios de un sitio web o una app y proporcione recomendaciones personalizadas. La transformación incorpora las actividades en tus datos de canalización para que puedas proporcionar las recomendaciones personalizadas.
  • Cuando tienes datos del usuario que quieres unir con datos geográficos para realizar estadísticas basadas en la ubicación geográfica.
  • Cuando quieres crear una canalización que recopile datos de dispositivos de la Internet de las cosas (IOT) que envíen eventos de telemetría.

Ventajas

La transformación de enriquecimiento tiene los siguientes beneficios:

  • Transforma los datos sin que tengas que escribir código complejo o administrar las bibliotecas subyacentes.
  • Proporciona controladores de código fuente integrados.
  • Usa la limitación del cliente para administrar el límite de frecuencia de las solicitudes. Las solicitudes se respaldan de manera exponencial con una estrategia de reintento predeterminada. Puedes configurar el límite de frecuencia para que se adapte a tu caso práctico.

Asistencia y limitaciones

La transformación de enriquecimiento tiene los siguientes requisitos:

  • Disponible para canalizaciones por lotes y de transmisión.
  • El controlador BigTableEnrichmentHandler está disponible en las versiones 2.54.0 y posteriores del SDK de Apache Beam para Python.
  • El controlador VertexAIFeatureStoreEnrichmentHandler está disponible en las versiones 2.55.0 y posteriores del SDK de Apache Beam para Python.
  • Cuando usas las versiones 2.55.0 y posteriores del SDK de Apache Beam para Python, también debes instalar el cliente de Python para Redis.
  • Los trabajos de Dataflow deben usar Runner v2.

Usa la transformación de enriquecimiento

Para usar la transformación de enriquecimiento, incluye el siguiente código en tu canalización:

import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler

bigtable_handler = BigTableEnrichmentHandler(...)

with beam.Pipeline() as p:
  output = (p
            ...
            | "Create" >> beam.Create(data)
            | "Enrich with Bigtable" >> Enrichment(bigtable_handler)
            ...
            )

Debido a que la transformación de enriquecimiento realiza una unión cruzada de forma predeterminada, diseña la unión personalizada para enriquecer los datos de entrada. Este diseño garantiza que la unión incluya solo los campos especificados.

En el siguiente ejemplo, left es el elemento de entrada de la transformación de enriquecimiento y right son los datos recuperados de un servicio externo para ese elemento de entrada.

def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
  enriched = {}
  enriched['FIELD_NAME'] = left['FIELD_NAME']
  ...
  return beam.Row(**enriched)

Parámetros

Para usar la transformación de enriquecimiento, se requiere el parámetro EnrichmentHandler.

También puedes usar un parámetro de configuración para especificar una función lambda para una función de unión, un tiempo de espera, un regulador o un repetidor (estrategia de reintento). Están disponibles los siguientes parámetros de configuración:

  • join_fn: Es una función lambda que toma diccionarios como entrada y muestra una fila enriquecida (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]). La fila enriquecida especifica cómo unir los datos recuperados de la API. La configuración predeterminada es una unión cruzada.
  • timeout: Es la cantidad de segundos que se espera hasta que la API complete la solicitud antes de que se agote el tiempo de espera. La configuración predeterminada es de 30 segundos.
  • throttler: Especifica el mecanismo de limitación. La única opción compatible es la limitación predeterminada del cliente.
  • repeater: Especifica la estrategia de reintento cuando se producen errores como TooManyRequests y TimeoutException. La configuración predeterminada es ExponentialBackOffRepeater.

¿Qué sigue?