스트리밍 데이터 보강

Apache Beam은 파이프라인에 추가할 수 있는 턴키 보강 변환을 제공하여 데이터 보강 워크플로를 단순화합니다. 이 페이지에서는 Apache Beam 보강 변환을 사용하여 스트리밍 데이터를 보강하는 방법을 설명합니다.

데이터를 보강할 때는 두 번째 소스의 관련 데이터를 추가하여 한 소스의 원시 데이터를 보강합니다. 추가 데이터는 Bigtable 또는 BigQuery와 같은 다양한 소스에서 가져올 수 있습니다. Apache Beam 보강 변환은 키-값 조회를 사용하여 추가 데이터를 원시 데이터에 연결합니다.

다음 예시에서는 데이터 보강이 유용한 몇 가지 사례를 제공합니다.

  • 웹사이트 또는 앱에서 사용자 활동을 캡처하고 맞춤설정된 추천을 제공하는 전자상거래 파이프라인을 만들려고 합니다. 이 변환은 맞춤설정된 추천을 제공할 수 있도록 활동을 파이프라인 데이터에 통합합니다.
  • 지역 기반 분석을 수행하기 위해 지리 데이터와 조인하려는 사용자 데이터가 있습니다.
  • 원격 분석 이벤트를 전송하는 사물 인터넷(IOT) 기기에서 데이터를 수집하는 파이프라인을 만들려고 합니다.

혜택

보강 변환은 다음과 같은 이점이 있습니다.

  • 복잡한 코드를 작성하거나 기본 라이브러리를 관리할 필요 없이 데이터를 변환합니다.
  • 기본 제공 소스 핸들러를 제공합니다.
  • 클라이언트 측 제한을 사용하여 요청 비율 제한을 관리합니다. 기본 재시도 전략을 통해 요청이 기하급수적으로 백오프됩니다. 사용 사례에 맞게 비율 제한을 구성할 수 있습니다.

지원 및 제한 사항

보강 변환에는 다음과 같은 요구사항이 있습니다.

  • 일괄 및 스트리밍 파이프라인에 사용할 수 있습니다.
  • BigTableEnrichmentHandler 핸들러는 Apache Beam Python SDK 버전 2.54.0 이상에서 사용 가능합니다.
  • VertexAIFeatureStoreEnrichmentHandler 핸들러는 Apache Beam Python SDK 버전 2.55.0 이상에서 사용 가능합니다.
  • Apache Beam Python SDK 버전 2.55.0 이상을 사용하는 경우 Redis용 Python 클라이언트도 설치해야 합니다.
  • Dataflow 작업은 Runner v2를 사용해야 합니다.

보강 변환 사용

보강 변환을 사용하려면 파이프라인에 다음 코드를 포함합니다.

import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler

bigtable_handler = BigTableEnrichmentHandler(...)

with beam.Pipeline() as p:
  output = (p
            ...
            | "Create" >> beam.Create(data)
            | "Enrich with Bigtable" >> Enrichment(bigtable_handler)
            ...
            )

보강 변환은 기본적으로 교차 조인을 수행하므로 커스텀 조인을 설계하여 입력 데이터를 보강합니다. 이러한 설계에 따라 조인에는 지정된 필드만 포함됩니다.

다음 예시에서 left는 보강 변환의 입력 요소이고 right는 해당 입력 요소의 외부 서비스에서 가져온 데이터입니다.

def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
  enriched = {}
  enriched['FIELD_NAME'] = left['FIELD_NAME']
  ...
  return beam.Row(**enriched)

매개변수

보강 변환을 사용하려면 EnrichmentHandler 매개변수가 필요합니다.

또한 구성 매개변수를 사용하여 조인 함수, 제한 시간, 제한 또는 반복자(재시도 전략)의 lambda 함수를 지정할 수도 있습니다. 다음과 같은 구성 매개변수를 사용할 수 있습니다.

  • join_fn: 사전을 입력으로 사용하고 보강된 행(Callable[[Dict[str, Any], Dict[str, Any]], beam.Row])을 반환하는 lambda 함수입니다. 보강된 행은 API에서 가져온 데이터를 조인하는 방법을 지정합니다. 기본값은 교차 조인입니다.
  • timeout: 시간 초과가 발생하기 전에 API에서 요청을 완료할 때까지 기다리는 시간(초)입니다. 기본값은 30초입니다.
  • throttler: 제한 메커니즘을 지정합니다. 지원되는 유일한 옵션은 기본 클라이언트 측 적응형 제한입니다.
  • repeater: TooManyRequestsTimeoutException과 같은 오류가 발생할 경우 재시도 전략을 지정합니다. 기본값은 ExponentialBackOffRepeater입니다.

다음 단계