Apache Beam은 파이프라인에 추가할 수 있는 턴키 보강 변환을 제공하여 데이터 보강 워크플로를 간소화합니다. 이 페이지에서는 Apache Beam 보강 변환을 사용하여 스트리밍 데이터를 보강하는 방법을 설명합니다.
데이터를 보강할 때는 두 번째 소스의 관련 데이터를 추가하여 한 소스의 원시 데이터를 보강할 수 있습니다. Bigtable 또는 BigQuery와 같은 다양한 소스에서 추가 데이터를 가져올 수 있습니다. Apache Beam 보강 변환은 키-값 조회를 사용하여 추가 데이터를 원시 데이터에 연결합니다.
다음 예시에서는 데이터 보강이 유용한 몇 가지 사례를 제공합니다.
- 웹사이트 또는 앱에서 사용자 활동을 캡처하고 맞춤형 추천을 제공하는 전자상거래 파이프라인을 만들려고 합니다. 이 변환은 맞춤설정된 추천을 제공할 수 있도록 활동을 파이프라인 데이터에 통합합니다.
- 지역 기반 분석을 위해 지리 데이터와 조인할 사용자 데이터가 있습니다.
- 원격 분석 이벤트를 전송하는 사물 인터넷(IOT) 기기에서 데이터를 수집하는 파이프라인을 만들려고 합니다.
이점
보강 변환에는 다음과 같은 이점이 있습니다.
- 복잡한 코드를 작성하거나 기본 라이브러리를 관리할 필요 없이 데이터를 변환합니다.
- 기본 제공 소스 핸들러를 제공합니다.
- 구성 세부정보를 전달하지 않고
BigTableEnrichmentHandler
핸들러를 사용하여 Bigtable 소스로 데이터를 보강합니다. - 구성 세부정보를 전달하지 않고
BigQueryEnrichmentHandler
핸들러를 사용하여 BigQuery 소스로 데이터를 보강합니다. - Vertex AI Feature Store 및 Bigtable 온라인 서빙과 함께
VertexAIFeatureStoreEnrichmentHandler
핸들러를 사용합니다.
- 구성 세부정보를 전달하지 않고
- 클라이언트 측 제한을 사용하여 요청 비율 제한을 관리합니다. 기본 재시도 전략을 통해 요청이 기하급수적으로 백오프됩니다. 사용 사례에 맞게 비율 제한을 구성할 수 있습니다.
지원 및 제한 사항
보강 변환에는 다음과 같은 요구사항이 있습니다.
- 일괄 및 스트리밍 파이프라인에 사용할 수 있습니다.
BigTableEnrichmentHandler
핸들러는 Apache Beam Python SDK 버전 2.54.0 이상에서 사용 가능합니다.BigQueryEnrichmentHandler
핸들러는 Apache Beam Python SDK 버전 2.57.0 이상에서 사용 가능합니다.VertexAIFeatureStoreEnrichmentHandler
핸들러는 Apache Beam Python SDK 버전 2.55.0 이상에서 사용 가능합니다.- Apache Beam Python SDK 버전 2.55.0 이상을 사용하는 경우 Redis용 Python 클라이언트도 설치해야 합니다.
- Dataflow 작업은 Runner v2를 사용해야 합니다.
보강 변환 사용
보강 변환을 사용하려면 파이프라인에 다음 코드를 포함합니다.
import apache_beam as beam
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler
bigtable_handler = BigTableEnrichmentHandler(...)
with beam.Pipeline() as p:
output = (p
...
| "Create" >> beam.Create(data)
| "Enrich with Bigtable" >> Enrichment(bigtable_handler)
...
)
보강 변환은 기본적으로 교차 조인을 실행하므로 입력 데이터를 보강하도록 맞춤 조인을 설계합니다. 이러한 설계에 따라 조인에는 지정된 필드만 포함됩니다.
다음 예시에서 left
는 보강 변환의 입력 요소이고 right
는 해당 입력 요소의 외부 서비스에서 가져온 데이터입니다.
def custom_join(left: Dict[str, Any], right: Dict[str, Any]):
enriched = {}
enriched['FIELD_NAME'] = left['FIELD_NAME']
...
return beam.Row(**enriched)
매개변수
보강 변환을 사용하려면 EnrichmentHandler
매개변수가 필요합니다.
또한 구성 매개변수를 사용하여 조인 함수, 제한 시간, 제한 또는 반복자(재시도 전략)의 lambda
함수를 지정할 수도 있습니다. 다음과 같은 구성 매개변수를 사용할 수 있습니다.
join_fn
: 사전을 입력으로 사용하고 보강된 행(Callable[[Dict[str, Any], Dict[str, Any]], beam.Row]
)을 반환하는lambda
함수입니다. 보강된 행은 API에서 가져온 데이터를 조인하는 방법을 지정합니다. 기본값은 교차 조인입니다.timeout
: 시간 초과가 발생하기 전에 API에서 요청을 완료할 때까지 기다리는 시간(초)입니다. 기본값은 30초입니다.throttler
: 제한 메커니즘을 지정합니다. 지원되는 유일한 옵션은 기본 클라이언트 측 적응형 제한입니다.repeater
:TooManyRequests
,TimeoutException
등의 오류가 발생할 때 사용할 재시도 전략을 지정합니다. 기본값은ExponentialBackOffRepeater
입니다.
다음 단계
- 더 많은 예시는 Apache Beam 변환 카탈로그의 보강 변환을 참조하세요.
- Apache Beam 및 Bigtable을 사용하여 데이터 보강
- Apache Beam 및 BigQuery를 사용하여 데이터 보강
- Apache Beam 및 Vertex AI Feature Store를 사용하여 데이터 보강