Apache Beam simplifica el flujo de trabajo de enriquecimiento de datos, ya que proporciona una transformación de enriquecimiento lista para usar que puedes agregar a tu canalización. En esta página, se explica cómo usar la transformación de enriquecimiento de Apache Beam para enriquecer los datos de transmisión.
Cuando enriqueces los datos, aumentas los datos sin procesar de una fuente agregando datos relacionados de una segunda fuente. Los datos adicionales pueden provenir de una variedad de fuentes, como Bigtable o BigQuery. La transformación de enriquecimiento de Apache Beam usa una búsqueda de par clave-valor para conectar los datos adicionales con los datos sin procesar.
En los siguientes ejemplos, se proporcionan algunos casos en los que el enriquecimiento de datos es útil:
- Cuando quieres crear una canalización de comercio electrónico que capture las actividades de los usuarios de un sitio web o una app y proporcione recomendaciones personalizadas. La transformación incorpora las actividades en tus datos de canalización para que puedas proporcionar las recomendaciones personalizadas.
- Cuando tienes datos del usuario que quieres unir con datos geográficos para realizar estadísticas basadas en la ubicación geográfica.
- Cuando quieres crear una canalización que recopile datos de dispositivos de la Internet de las cosas (IOT) que envíen eventos de telemetría.
Ventajas
La transformación de enriquecimiento tiene los siguientes beneficios:
- Transforma los datos sin que tengas que escribir código complejo o administrar las bibliotecas subyacentes.
- Proporciona controladores de código fuente integrados.
- Usa el controlador
BigTableEnrichmentHandler
para enriquecer tus datos mediante una fuente de Bigtable sin pasar los detalles de configuración. - Usa el controlador
VertexAIFeatureStoreEnrichmentHandler
con Vertex AI Feature Store y la entrega en línea de Bigtable.
- Usa el controlador
- Usa la limitación del cliente para administrar el límite de frecuencia de las solicitudes. Las solicitudes se respaldan de manera exponencial con una estrategia de reintento predeterminada. Puedes configurar el límite de frecuencia para que se adapte a tu caso práctico.
Asistencia y limitaciones
La transformación de enriquecimiento tiene los siguientes requisitos:
- Disponible para canalizaciones por lotes y de transmisión.
- El controlador
BigTableEnrichmentHandler
está disponible en las versiones 2.54.0 y posteriores del SDK de Apache Beam para Python. - El controlador
VertexAIFeatureStoreEnrichmentHandler
está disponible en las versiones 2.55.0 y posteriores del SDK de Apache Beam para Python. - Cuando usas las versiones 2.55.0 y posteriores del SDK de Apache Beam para Python, también debes instalar el cliente de Python para Redis.
- Los trabajos de Dataflow deben usar Runner v2.
Usa la transformación de enriquecimiento
Para usar la transformación de enriquecimiento, incluye el siguiente código en tu canalización:
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler
bigtable_handler = BigTableEnrichmentHandler(...)
with beam.Pipeline() as p:
output = (p
...
| "Create" >> beam.Create(data)
| "Enrich with Bigtable" >> Enrichment(bigtable_handler)
...
)
Debido a que la transformación de enriquecimiento realiza una unión cruzada de forma predeterminada, diseña la unión personalizada para enriquecer los datos de entrada. Este diseño garantiza que la unión incluya solo los campos especificados.
En el siguiente ejemplo, left
es el elemento de entrada de la transformación de enriquecimiento y right
son los datos recuperados de un servicio externo para ese elemento de entrada.
def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
enriched = {}
enriched['FIELD_NAME'] = left['FIELD_NAME']
...
return beam.Row(**enriched)
Parámetros
Para usar la transformación de enriquecimiento, se requiere el parámetro EnrichmentHandler
.
También puedes usar un parámetro de configuración para especificar una función lambda
para una función de unión, un tiempo de espera, un regulador o un repetidor (estrategia de reintento). Están disponibles los siguientes parámetros de configuración:
join_fn
: Es una funciónlambda
que toma diccionarios como entrada y muestra una fila enriquecida (Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]
). La fila enriquecida especifica cómo unir los datos recuperados de la API. La configuración predeterminada es una unión cruzada.timeout
: Es la cantidad de segundos que se espera hasta que la API complete la solicitud antes de que se agote el tiempo de espera. La configuración predeterminada es de 30 segundos.throttler
: Especifica el mecanismo de limitación. La única opción compatible es la limitación predeterminada del cliente.repeater
: Especifica la estrategia de reintento cuando se producen errores comoTooManyRequests
yTimeoutException
. La configuración predeterminada esExponentialBackOffRepeater
.
¿Qué sigue?
- Para obtener más ejemplos, consulta Transformación de enriquecimiento en el catálogo de transformaciones de Apache Beam.
- Usa Apache Beam y Bigtable para enriquecer los datos.
- Usa Apache Beam y Vertex AI Feature Store para enriquecer los datos.