Apache Beam은 파이프라인에 추가할 수 있는 턴키 보강 변환을 제공하여 데이터 보강 워크플로를 단순화합니다. 이 페이지에서는 Apache Beam 보강 변환을 사용하여 스트리밍 데이터를 보강하는 방법을 설명합니다.
데이터를 보강할 때는 두 번째 소스의 관련 데이터를 추가하여 한 소스의 원시 데이터를 보강합니다. 추가 데이터는 Bigtable 또는 BigQuery와 같은 다양한 소스에서 가져올 수 있습니다. Apache Beam 보강 변환은 키-값 조회를 사용하여 추가 데이터를 원시 데이터에 연결합니다.
다음 예시에서는 데이터 보강이 유용한 몇 가지 사례를 제공합니다.
- 웹사이트 또는 앱에서 사용자 활동을 캡처하고 맞춤설정된 추천을 제공하는 전자상거래 파이프라인을 만들려고 합니다. 이 변환은 맞춤설정된 추천을 제공할 수 있도록 활동을 파이프라인 데이터에 통합합니다.
- 지역 기반 분석을 수행하기 위해 지리 데이터와 조인하려는 사용자 데이터가 있습니다.
- 원격 분석 이벤트를 전송하는 사물 인터넷(IOT) 기기에서 데이터를 수집하는 파이프라인을 만들려고 합니다.
혜택
보강 변환은 다음과 같은 이점이 있습니다.
- 복잡한 코드를 작성하거나 기본 라이브러리를 관리할 필요 없이 데이터를 변환합니다.
- 기본 제공 소스 핸들러를 제공합니다.
- 구성 세부정보를 전달하지 않고
BigTableEnrichmentHandler
핸들러를 사용하여 Bigtable 소스로 데이터를 보강합니다. - Vertex AI Feature Store 및 Bigtable 온라인 서빙과 함께
VertexAIFeatureStoreEnrichmentHandler
핸들러를 사용합니다.
- 구성 세부정보를 전달하지 않고
- 클라이언트 측 제한을 사용하여 요청 비율 제한을 관리합니다. 기본 재시도 전략을 통해 요청이 기하급수적으로 백오프됩니다. 사용 사례에 맞게 비율 제한을 구성할 수 있습니다.
지원 및 제한 사항
보강 변환에는 다음과 같은 요구사항이 있습니다.
- 일괄 및 스트리밍 파이프라인에 사용할 수 있습니다.
BigTableEnrichmentHandler
핸들러는 Apache Beam Python SDK 버전 2.54.0 이상에서 사용 가능합니다.VertexAIFeatureStoreEnrichmentHandler
핸들러는 Apache Beam Python SDK 버전 2.55.0 이상에서 사용 가능합니다.- Apache Beam Python SDK 버전 2.55.0 이상을 사용하는 경우 Redis용 Python 클라이언트도 설치해야 합니다.
- Dataflow 작업은 Runner v2를 사용해야 합니다.
보강 변환 사용
보강 변환을 사용하려면 파이프라인에 다음 코드를 포함합니다.
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler
bigtable_handler = BigTableEnrichmentHandler(...)
with beam.Pipeline() as p:
output = (p
...
| "Create" >> beam.Create(data)
| "Enrich with Bigtable" >> Enrichment(bigtable_handler)
...
)
보강 변환은 기본적으로 교차 조인을 수행하므로 커스텀 조인을 설계하여 입력 데이터를 보강합니다. 이러한 설계에 따라 조인에는 지정된 필드만 포함됩니다.
다음 예시에서 left
는 보강 변환의 입력 요소이고 right
는 해당 입력 요소의 외부 서비스에서 가져온 데이터입니다.
def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
enriched = {}
enriched['FIELD_NAME'] = left['FIELD_NAME']
...
return beam.Row(**enriched)
매개변수
보강 변환을 사용하려면 EnrichmentHandler
매개변수가 필요합니다.
또한 구성 매개변수를 사용하여 조인 함수, 제한 시간, 제한 또는 반복자(재시도 전략)의 lambda
함수를 지정할 수도 있습니다. 다음과 같은 구성 매개변수를 사용할 수 있습니다.
join_fn
: 사전을 입력으로 사용하고 보강된 행(Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]
)을 반환하는lambda
함수입니다. 보강된 행은 API에서 가져온 데이터를 조인하는 방법을 지정합니다. 기본값은 교차 조인입니다.timeout
: 시간 초과가 발생하기 전에 API에서 요청을 완료할 때까지 기다리는 시간(초)입니다. 기본값은 30초입니다.throttler
: 제한 메커니즘을 지정합니다. 지원되는 유일한 옵션은 기본 클라이언트 측 적응형 제한입니다.repeater
:TooManyRequests
및TimeoutException
과 같은 오류가 발생할 경우 재시도 전략을 지정합니다. 기본값은ExponentialBackOffRepeater
입니다.
다음 단계
- 더 많은 예시는 Apache Beam 변환 카탈로그의 보강 변환을 참조하세요.
- Apache Beam 및 Bigtable을 사용하여 데이터 보강
- Apache Beam 및 Vertex AI Feature Store를 사용하여 데이터 보강