Java 태스크 패턴

전자 상거래 샘플 애플리케이션은 Dataflow를 사용해서 스트리밍 데이터 분석 및 실시간 AI를 구현하기 위한 권장사항을 보여줍니다. 이 예시에서는 Java 프로그래밍 태스크를 가장 잘 수행하는 방법을 보여주는 태스크 패턴이 포함되어 있습니다. 이러한 태스크는 일반적으로 전자상거래 애플리케이션을 만들기 위해 필요합니다.

애플리케이션에는 다음과 같은 Java 태스크 패턴이 포함됩니다.

Apache Beam 스키마를 사용하여 구조화된 데이터 작업

Apache Beam 스키마를 사용하여 구조화된 데이터를 더 쉽게 처리할 수 있습니다.

객체를 으로 전환하면 매우 깨끗한 자바 코드가 생성되어 방향성 비순환 그래프(DAG) 빌드를 더 쉽게 수행할 수 있습니다. 또한 메서드를 호출하는 대신 만들려는 애널리틱스 문 내에서 객체 속성을 필드로 참조할 수 있습니다.

예시

CountViewsPerProduct.java

JsonToRow를 사용하여 JSON 데이터 변환

Dataflow에서 JSON 문자열을 처리하는 것은 일반적인 요구사항입니다. 예를 들어 JSON 문자열은 웹 애플리케이션에서 캡처한 클릭스트림 정보를 스트리밍할 때 처리됩니다. JSON 문자열을 처리하려면 파이프라인을 처리하는 동안 이러한 문자열을 Row 또는 POJO(Plain Old Java Object)로 변환해야 합니다.

Apache Beam에서 기본 제공되는 변환 JsonToRow를 사용하여 JSON 문자열을 Row로 변환할 수 있습니다. 그러나 큐에서 실패한 메시지를 처리하려는 경우 별도로 빌드해야 합니다. 추가 분석을 위해 처리할 수 없는 데이터를 큐에 추가를 참조하세요.

AutoValue를 사용하여 JSON 문자열을 POJO로 변환해야 하는 경우 @DefaultSchema(AutoValueSchema.class) 주석을 사용하여 해당 유형의 스키마를 등록한 후 Convert 유틸리티 클래스를 사용합니다. 결과 코드는 다음과 비슷합니다.

PCollection<String> json = ...

PCollection<MyUserType>  = json
  .apply("Parse JSON to Beam Rows", JsonToRow.withSchema(expectedSchema))
  .apply("Convert to a user type with a compatible schema registered", Convert.to(MyUserType.class))

스키마를 추론할 수 있는 다양한 자바 유형을 비롯한 자세한 내용은 스키마 만들기를 참조하세요.

데이터에 JsonToRow를 사용할 수 없는 경우에는 Gson이 대안입니다. Gson은 데이터의 기본 처리 측면에서 상당히 완화되어 있으므로 데이터 변환 프로세스에 대한 유효성 검사가 더 필요할 수 있습니다.

Examples

AutoValue 코드 생성기를 사용하여 POJO를 생성합니다.

Apache Beam 스키마는 구조화된 데이터를 사용할 수 있기 때문에 보통은 파이프라인의 객체를 나타내는 가장 좋은 방법입니다. 그렇지만 키-값 객체를 다루거나 객체 상태를 처리하는 경우와 같이 POJO(Plain Old Java Object)가 필요한 경우도 있습니다. POJO를 직접 빌드하려면 equals()hashcode() 메서드의 재정의를 코딩해야 하므로 시간이 많이 걸리고 오류가 발생하기 쉽습니다. 잘못된 재정의로 인해 애플리케이션 동작 또는 데이터 손실이 일관되지 않을 수 있습니다.

POJO를 생성하려면 AutoValue 클래스 빌더를 사용합니다. 이 옵션을 사용하면 필수 재정의가 사용되고 잠재적 오류를 방지할 수 있습니다. AutoValue는 Apache Beam 코드베이스에서 많이 사용되므로 Java를 사용하여 Dataflow에서 Apache Beam 파이프라인을 개발할 때는 이 클래스 빌더에 익숙한 것이 좋습니다.

또한 @DefaultSchema(AutoValueSchema.class) 주석을 추가하는 경우 Apache Beam 스키마와 함께 AutoValue를 사용할 수 있습니다. 자세한 내용은 스키마 만들기를 참조하세요.

AutoValue에 대한 자세한 내용은 AutoValue?을 사용해야 하는 이유AutoValue 문서를 참조하세요.

예시

Clickstream.java

추가 분석을 위해 처리할 수 없는 데이터를 큐에 추가

프로덕션 시스템에서는 문제가 있는 데이터를 처리하는 것이 중요합니다. 가능하면 데이터를 인스트림으로 검증하고 수정합니다. 수정할 수 없으면 나중에 분석할 수 있도록 처리되지 않은 메시지 큐(데드 레터 큐)에 값을 기록합니다. JSON 문자열을 Row로 변환하는 경우와 같이 일반적으로 한 형식에서 다른 형식으로 데이터를 변환할 때 문제가 발생합니다.

이 문제를 해결하려면 멀티 출력 변환을 사용하여 처리되지 않은 데이터가 포함된 요소를 다른 PCollection으로 셔틀한 후 추가 분석을 수행합니다. 이러한 처리는 파이프라인의 여러 위치에서 사용할 수 있는 일반적인 작업입니다. 여러 위치에서 사용할 수 있도록 변환을 충분히 일반화하도록 시도합니다. 먼저 원래 데이터를 포함하여 공통 속성을 래핑하도록 오류 객체를 만듭니다. 다음으로 대상에 대한 옵션이 여러 개 있는 싱크 변환을 만듭니다.

Examples

데이터 검증 변환 직렬 적용

외부 시스템에서 수집된 데이터를 정리해야 할 경우가 많습니다. 가능한 경우 문제가 있는 데이터를 인스트림에 수정할 수 있도록 파이프라인을 구성합니다. 필요한 경우 추가 분석을 위한 큐에 데이터를 전송합니다.

수정이 필요한 여러 문제로부터 단일 메시지가 발생할 수 있으므로 필요한 방향성 순환 그래프(DAG)를 계획합니다. 요소에 여러 결함의 데이터가 포함된 경우 요소가 적절한 변환을 거치는지 확인해야 합니다.

예를 들어 어느 하나도 null이 아니어야 하는 다음 값이 있는 요소를 상상해 보세요.

{"itemA": null,"itemB": null}

두 가지 잠재적인 문제를 모두 해결하는 변환을 통해 요소가 전달되는지 확인합니다.

badElements.apply(fixItemA).apply(fixItemB)

파이프라인에 더 많은 직렬 단계가 포함될 수 있지만 퓨전을 사용하면 이로 인해 발생하는 처리 오버헤드를 최소화할 수 있습니다.

예시

ValidateAndCorrectCSEvt.java

DoFn.StartBundle을 사용하여 외부 서비스에 대한 마이크로 배치 호출

외부 API를 파이프라인의 일부로 호출해야 할 수 있습니다. 파이프라인이 여러 컴퓨팅 리소스 간에 작업을 분산하므로 시스템을 통과하는 각 요소 흐름에 단일 호출을 수행하더라도 외부 서비스 엔드포인트에 부담을 줄 수 있습니다. 이 문제는 감소 함수를 적용하지 않은 경우에 특히 일반적입니다.

이 문제를 방지하려면 외부 시스템 호출을 일괄 처리합니다.

GroupByKey 변환 또는 Apache Beam Timer API를 사용하여 호출을 일괄 처리할 수 있습니다. 하지만 이 두 접근 방식에는 셔플링이 필요하므로 일부 처리 오버헤드와 키 공간을 결정하기 위한 마법수가 필요합니다.

대신 StartBundleFinishBundle 수명 주기 요소를 사용하여 데이터를 일괄 처리합니다. 이 옵션을 사용하면 셔플이 필요하지 않습니다.

이 옵션의 한 가지 단점은 번들의 크기가 현재 파이프라인 내부에서 진행되고 있는 작업과 해당 작업자를 기반으로 실행기의 구현에 따라 동적으로 결정된다는 점입니다. 스트림 모드에서 번들은 크기가 작은 경우가 많습니다. Dataflow 번들링은 샤딩 사용 범위, 특정 키에 사용할 수 있는 데이터 양, 파이프라인 처리량과 같은 백엔드 요소의 영향을 받습니다.

예시

EventItemCorrectionService.java

데이터 보강을 위한 적절한 부 입력 패턴 사용

스트리밍 분석 애플리케이션에서는 추가 분석에 유용할 수 있는 부가 정보를 사용하여 데이터를 보강하는 경우가 많습니다. 예를 들어 트랜잭션의 스토어 ID가 있으면 매장 위치에 대한 정보를 추가할 수 있습니다. 이러한 추가 정보는 보통 요소를 가져오고 참고표에서 정보를 가져오는 방식으로 추가됩니다.

변화 속도가 느리고 크기가 작은 참고표는 테이블을 Map<K,V> 인터페이스를 구현하는 싱글톤 클래스 형태로 파이프라인으로 가져오는 데 적합합니다. 이 옵션을 사용하면 각 요소가 조회를 위해 API 호출을 실행하지 않도록 할 수 있습니다. 파이프라인에 테이블의 복사본을 포함한 후에는 정기적으로 업데이트하여 최신 상태로 유지해야 합니다.

업데이트 속도가 느린 부 입력을 처리하려면 Apache Beam 부 입력 패턴을 사용합니다.

캐싱

부차 입력은 메모리에 로드되어 자동으로 캐시됩니다.

--setWorkerCacheMb 옵션을 사용하여 캐시 크기를 설정할 수 있습니다.

DoFn 인스턴스에서 캐시를 공유하고 외부 트리거를 사용하여 캐시를 새로고침할 수 있습니다.

예시

SlowMovingStoreLocationDimension.java