커스텀 소스 및 싱크

Dataflow SDK는 커스텀 데이터 소스 및 싱크를 생성하는 데 사용할 수 있는 확장 API를 제공합니다. 파이프라인이 Dataflow SDK가 기본 지원을 제공하지 않는 데이터 소스나 싱크에서 데이터를 읽도록(또는 데이터를 쓰도록) 하려면 커스텀 데이터 소스나 싱크를 만들어야 합니다.

BoundedSource 또는 UnboundedSource와 같은 Dataflow SDK의 추상 소스 하위 클래스를 확장하여 커스텀 소스를 만듭니다. Dataflow SDK의 추상 싱크 기본 클래스를 확장하여 커스텀 싱크를 만듭니다. 확장 API를 사용하여 제한된(배치) 또는 제한되지 않은(스트리밍) 데이터를 읽는 커스텀 소스를 만들고 제한된 데이터만 쓰는 커스텀 소스를 만들 수 있습니다.

Dataflow는 향후 출시 버전에서 제한되지 않은 데이터를 쓰는 커스텀 싱크를 추가 지원할 예정입니다.

커스텀 소스와 싱크의 기본 코드 요구 사항

Dataflow 서비스는 여러 작업자 인스턴스를 동시에 사용하여 데이터를 읽고 쓰기 위해 사용자가 제공하는 클래스를 사용합니다. 따라서 SourceSink 하위 클래스를 위해 사용자가 제공하는 코드는 일부 기본 요구 사항을 충족해야 합니다.

직렬화 가능성

제한 유무에 관계없이 사용자의 Source 또는 Sink 하위 클래스는 Serializable여야 합니다. Dataflow 서비스는 원활한 동시 읽기 또는 쓰기를 위해 여러 원격 작업자에게 보낼 Source 또는 Sink 하위 클래스의 인스턴스를 여러 개 만들 수 있습니다.

불변성

Source 또는 Sink 하위 클래스는 사실상 변경이 불가능해야 합니다. 모든 비공개 필드는 final로 선언하고 컬렉션 유형의 모든 비공개 변수는 효과적인 불변성을 가져야 합니다. 클래스에 setter 메소드가 있는 경우 이러한 메소드는 관련 필드가 수정된 객체의 독립된 복사본을 반환해야 합니다.

소스 구현에 필요한 고비용의 계산을 느리게 평가하는 경우에는 Source 또는 Sink 하위 클래스에서 가변 상태만 사용해야 합니다. 이 경우 모든 가변 인스턴스 변수 transient를 선언해야 합니다.

스레드 안전

Dataflow의 동적 작업 리밸런스 기능을 사용하기 위해 커스텀 소스를 빌드하는 경우 코드가 스레드 안전성을 갖도록 하는 것이 매우 중요합니다. 자바용 Dataflow SDK는 간편하게 이를 수행하도록 지원 클래스를 제공합니다. 자세한 내용은 아래의 동적 작업 리밸런스로 BoundedSource 사용을 참조하세요.

테스트 가능성

특히 Dataflow의 동적 작업 재균등화와 같은 고급 기능을 사용하여 작동하도록 클래스를 빌드하는 경우, 모든 SourceSink 하위 클래스에 대해 철저하게 단위 테스트를 수행하는 것이 중요합니다. 사소한 구현 오류로 인해 감지하기 어려울 수 있는 데이터 손상이나 데이터 손실(예: 레코드 건너뛰기 또는 중복)이 발생할 수 있습니다.

소스 테스트를 지원하기 위해 Dataflow SDK는 SourceTestUtils 클래스를 제공합니다. SourceTestUtils에는 BoundedSource 구현의 일부 속성을 자동으로 검사하기 위한 유틸리티가 포함되어 있습니다. SourceTestUtils를 사용하여 상대적으로 적은 코드 라인으로 광범위한 입력으로 구현 테스트 범위를 넓힐 수 있습니다.

커스텀 소스 만들기

파이프라인의 커스텀 데이터 소스를 만들려면 입력 소스로부터 데이터를 읽는 방법과 여러 작업자 인스턴스가 데이터를 동시에 읽을 수 있도록 데이터 소스를 여러 부분으로 분할하는 방법을 Dataflow 서비스에 알려주는 형식별 논리를 제공해야 합니다. 제한이 없는 데이터를 읽는 커스텀 데이터 소스를 만드는 경우, 소스의 워터마크와 옵션 체크포인트를 관리하기 위한 추가 논리를 제공해야 합니다.

다음 클래스를 만들어 커스텀 소스에 대한 논리를 제공합니다.

  • 유한한(배치) 데이터 세트를 읽는 경우에는 BoundedSource의 하위 클래스 또는 무한한(스트리밍) 데이터 세트를 읽는 경우에는 UnboundedSource의 하위 클래스. 이러한 하위 클래스는 데이터의 위치와 매개변수(예: 읽을 데이터 양)를 포함하여 사용자가 읽으려는 데이터를 설명합니다.
  • Dataflow SDK 클래스 Source.Reader 하위 클래스. 각 Source에는 해당 Source로부터 읽기와 관련된 모든 상태를 포착하는 관련 Reader가 있어야 합니다. 여기에는 파일 핸들, RPC 연결, 사용자가 읽으려는 데이터 형식의 특정 요구 사항에 따라 결정되는 기타 매개변수 등이 포함될 수 있습니다.

    Reader 클래스 계층구조는 Source 계층구조를 미러링합니다. BoundedSource를 확장하는 경우에는 관련 BoundedReader를 제공해야 합니다. UnboundedSource를 확장하는 경우에는 관련 UnboundedReader를 제공해야 합니다.

소스 하위 클래스 구현

데이터가 유한한 배치 혹은 무한한 스트림인지 여부에 따라 BoundedSource 또는 UnboundedSource의 하위 클래스를 만들어야 합니다. 어느 경우에나 Source 하위 클래스는 상위 클래스에서 추상 메소드를 재정의해야 합니다. 커스텀 데이터 소스를 사용하는 경우 Dataflow 서비스는 이러한 메소드를 사용하여 데이터세트 크기를 예측하고 동시 읽기에 적합하게 분할합니다.

Source 하위 클래스는 위치와 같은 데이터 소스의 기본 정보도 관리해야 합니다. 예를 들어, Dataflow의 DatastoreIO 클래스에서 Source 구현 예에서는 Datastore에서 데이터를 가져오는 데 사용되는 인수로 host, datasetID, query를 취합니다.

BoundedSource

BoundedSource는 Dataflow 서비스가 동시에 읽을 수 있는 유한한 데이터세트를 나타냅니다. BoundedSource에는 여러 원격 작업자가 읽을 수 있도록 데이터세트를 분할하기 위해 서비스에서 사용되는 추상 메소드 조합이 포함됩니다.

BoundedSource를 구현하려면 하위 클래스가 다음 추상 메소드를 재정의해야 합니다.

  • splitIntoBundles: Dataflow 서비스는 이 메소드를 사용하여 유한한 데이터를 지정된 크기의 번들로 분할합니다.
  • getEstimatedSizeBytes: Dataflow 서비스는 이 메소드를 사용하여 데이터의 총 크기(바이트 단위)를 예측합니다.
  • producesSortedKeys: 소스가 키-값 쌍을 정렬된 순서로 생성하는지 여부를 Dataflow 서비스에 알려주는 메소드입니다. 소스가 키-값 쌍을 생성하지 않는 경우, 이 메소드를 구현하면 false가 반환되어야 합니다.
  • createReader: 이 BoundedSource에 대한 관련 BoundedReader를 만듭니다.

Dataflow SDK의 DatastoreIO 구현 예에서 BoundedSource와 필수 추상 메소드를 구현하는 방법에 대한 모델을 확인할 수 있습니다.

UnboundedSource

UnboundedSource는 Dataflow 서비스가 동시에 읽을 수 있는 무한한 데이터 스트림을 나타냅니다. UnboundedSource에는 동시 스트리밍 읽기를 지원하기 위해 서비스에서 사용되는 추상 메소드 조합이 포함됩니다. 여기에는 오류 복구를 위한 체크포인트, 데이터 중복을 방지하기 위한 레코드 ID, 파이프라인의 다운스트림 측에서 데이터의 완결성을 예측하기 위한 워터마크가 포함됩니다.

UnboundedSource를 구현하려면 하위 클래스가 다음 추상 메소드를 재정의해야 합니다.

  • generateInitialSplits: Dataflow 서비스는 이 메소드를 사용하여 서비스가 동시에 읽어야 하는 하위 스트림 인스턴스 수를 나타내는 UnboundedSource 객체 목록을 생성합니다.
  • getCheckpointMarkCoder: Dataflow 서비스는 이 메소드를 사용하여 소스 체크포인트(있는 경우)의 Coder를 가져옵니다.
  • requiresDeduping: Dataflow 서비스는 이 메소드를 사용하여 데이터가 중복 레코드를 명시적 삭제해야 하는지 여부를 결정합니다. 이 메소드가 true를 반환하면 Dataflow 서비스는 소스 출력에서 중복을 삭제하기 위한 단계를 자동으로 삽입합니다.
  • createReader: 이 UnboundedSource에 대한 관련 UnboundedReader를 만듭니다.

판독기 하위 클래스 구현

소스 하위 클래스의 createReader 메소드가 반환하는 BoundedReader 또는 UnboundedReader의 하위 클래스를 만들어야 합니다. Dataflow 서비스는 Reader(제한 유무에 관계없음)의 메소드를 사용하여 실제로 데이터세트를 읽습니다.

BoundedReaderUnboundedReader는 기본 인터페이스가 비슷하며, 사용자가 이들을 정의해야 합니다. 또한 제한되지 않은 데이터로 작업하기 위해 구현해야 하는 UnboundedReader에 고유한 몇 가지 추가 메소드와 BoundedReader가 Dataflow의 동적 작업 재균등화 기능을 사용할 수 있도록 하려는 경우에 구현할 수 있는 옵션 메소드가 있습니다. 또한 UnboundedReader를 사용하면 start()advance()의 의미 체계에서 약간의 차이가 있습니다.

BoundedReader와 UnboundedReader 모두의 공통 판독기 메소드

Dataflow는 다음 메소드를 사용하여 BoundedReader 또는 UnboundedReader로 데이터를 읽습니다.

  • start: Reader를 초기화하고 읽을 첫 번째 레코드로 이동합니다. 이 메소드는 Dataflow가 데이터를 읽기 시작하면 정확히 한 번 호출되며 초기화에 필요한 값 비싼 작업을 배치하기에 좋은 위치입니다.
  • advance: 다음 유효한 레코드로 판독기를 진행합니다. 더 이상 사용할 수 있는 입력이 없으면 이 메소드는 false를 반환해야 합니다. advance가 false를 반환하면 BoundedReader는 읽기를 중단하지만 스트림에서 데이터를 추가로 사용할 수 있으면 이후 호출에서 UnboundedReadertrue를 반환할 수 있습니다.
  • getCurrent: 현재 위치에서 start 또는 advance에 의해 마지막으로 읽은 데이터 레코드를 반환합니다.
  • getCurrentTimestamp: 현재 데이터 레코드의 타임스탬프를 반환합니다. 소스가 내장 타임스탬프를 가진 데이터를 읽는 경우, getCurrentTimestamp만 재정의해야 합니다. Dataflow 서비스는 이 값을 사용하여 결과 출력 PCollection의 각 요소에 대한 내장 타임스탬프를 설정합니다.

UnboundedReader에 고유한 판독기 메소드

기본 Reader 인터페이스 외에 UnboundedReader에는 제한되지 않은 데이터 소스로부터 읽기를 관리하기 위한 몇 가지 추가 메소드가 있습니다.

  • getCurrentRecordId: 현재 레코드에 대한 고유 식별자를 반환합니다. Dataflow 서비스는 이러한 레코드 ID를 사용하여 중복 레코드를 필터링합니다. 각 레코드에 존재하는 논리 ID가 데이터에 있으면 이 메소드가 이를 반환하도록 할 수 있습니다. 그렇지 않으면 최소한 128비트 해시를 사용하여 레코드 콘텐츠의 해시를 반환할 수 있습니다. (32비트 해시는 일반적으로 충돌을 예방하기에 충분하지 않으므로, 자바의 Object.hashCode()를 사용하는 것은 좋지 않습니다.)
  • 참고: 소스가 각 레코드를 고유하게 식별하는 체크포인트 체계를 사용하는 경우, getCurrentRecordId 구현은 선택사항입니다. 그러나 소스에 데이터를 쓰는 업스트림 시스템이 간혹 중복 레코드를 생성하고 소스가 이를 읽을 수 있는 경우에는 레코드 ID가 여전히 유용할 수 있습니다.

  • getWatermark: Reader가 제공하는 워터마크를 반환합니다. 워터마크는 Reader가 읽을 향후 요소의 타임스탬프에 대한 대략적인 하한입니다. Dataflow 서비스는 데이터 완결성을 예측하는 수단으로 워터마크를 사용합니다. 워터마크는 Dataflow의 윈도우트리거 기능에서 사용됩니다.
  • getCheckpointMark: Dataflow 서비스는 이 메소드를 사용하여 데이터 스트림에서 체크포인트를 만듭니다. 체크포인트는 UnboundedReader의 진행 상태를 나타내며 오류 복구에 사용될 수 있습니다. 데이터 스트림마다 다양한 체크포인트 메소드를 사용할 수 있습니다. 일부 소스는 수신된 레코드를 확인해야 할 수 있고, 또 다른 소스는 위치 체크포인트를 사용할 수 있습니다. 이 메소드를 가장 적합한 체크포인트 체계로 맞춤설정해야 합니다. 예를 들어, 이 메소드가 최근 확인된 레코드를 반환하도록 할 수 있습니다.
  • 참고: getCheckpointMark는 선택사항입니다. 데이터에 의미 있는 체크포인트가 없으면 구현할 필요가 없습니다. 그러나 소스에 체크포인트를 구현하지 않으면 데이터 소스가 오류 발생 시 레코드를 다시 전송하려고 시도하는지 여부에 따라 파이프라인에서 데이터가 중복 또는 손실될 수 있습니다.

동적 작업 리밸런스로 BoundedSource 사용

소스가 제한된 데이터를 제공하는 경우, 메소드 splitAtFraction를 구현하여 BoundedReader가 Dataflow 서비스의 동적 작업 리밸런스 기능과 함께 작동하도록 할 수 있습니다. Dataflow 서비스는 Source의 나머지 데이터를 분할하여 다른 작업자에게 재분배할 수 있도록 특정 판독기에서 start 또는 advance와 동시적으로 splitAtFraction을 호출할 수 있습니다.

splitAtFraction을 구현하면 코드는 이러한 분할의 통합이 전체 데이터세트와 일치하는 상호 배타적인 분할 집합을 생성해야 합니다.

편리한 소스 및 판독기 기본 클래스

Dataflow SDK에는 파일과 같은 일반적인 데이터 저장소 형식으로 작동하는 SourceReader 클래스를 간편하게 만들도록 편리한 추상 기본 클래스가 포함되어 있습니다.

FileBasedSource

데이터 소스가 파일을 사용하는 경우, 자바용 Dataflow SDK의 FileBasedSourceFileBasedReader 추상 기본 클래스에서 SourceReader 클래스를 유도할 수 있습니다. FileBasedSource는 다음과 같이 파일과 상호작용하는 Dataflow 소스에 공통적인 코드를 구현하는 제한된 소스 하위 클래스입니다.

  • 파일 패턴 확장
  • 순차 레코드 읽기
  • 분할 지점

XmlSource

데이터 소스가 XML 형식 파일을 사용하는 경우, 자바용 Dataflow SDK의 XmlSource 추상 기본 클래스에서 Source 클래스를 유도할 수 있습니다. XmlSourceFileBasedSource를 확장하고, 파일 루트와 파일의 개별 레코드를 지정하는 XML 요소를 설정하는 등 XML 파일을 파싱하는 추가 메소드를 제공합니다.

커스텀 소스에서 읽기

파이프라인의 커스텀 소스에서 데이터를 읽으려면 SDK 일반 Read 변환을 적용하고 .from 연산자를 사용하여 커스텀 소스를 매개변수로 전달해야 합니다.

자바

  MySource source = new MySource(false, file.getPath(), 64, null);
  p.apply("ReadFileData", Read.from(source))

커스텀 싱크 만들기

파이프라인의 커스텀 데이터 싱크를 만들려면 파이프라인의 PCollection에서 제한된 데이터를 디렉토리나 파일 시스템, 데이터베이스 테이블 등과 같은 출력 싱크에 쓰는 방식을 Dataflow 서비스에 알려주는 형식별 논리를 제공해야 합니다.

참고: Dataflow는 현재 커스텀 출력 싱크에 제한된 데이터 쓰기만 지원합니다.

다음 클래스를 만들어 쓰기 논리를 공급합니다.

  • Dataflow SDK 추상 기본 클래스 싱크의 하위 클래스. Sink는 파이프라인이 동시에 쓸 수 있는 위치나 리소스를 설명합니다. Sink 하위 클래스에는 리소스나 파일 위치, 데이터베이스 테이블 이름 등과 같은 필드가 포함될 수 있습니다.
  • Sink.WriteOperation의 하위 클래스. Sink.WriteOperationSink에 설명된 출력 위치로의 단일 병렬 쓰기 작업 상태를 나타냅니다. WriteOperation 하위 클래스는 병렬 쓰기의 초기화와 최종화 프로세스를 정의해야 합니다.
  • Sink.Writer의 하위 클래스. Sink.Writer는 입력 PCollection의 요소 번들을 지정된 데이터 싱크에 씁니다.

싱크 하위 클래스 구현

Sink 하위 클래스는 파이프라인이 출력을 쓰는 위치나 리소스를 설명합니다. 여기에는 파일 시스템 위치, 데이터베이스 테이블이나 데이터세트의 이름 등이 포함될 수 있습니다. Sink 하위 클래스는 출력 위치에 쓸 수 있는지를 확인하고 해당 출력 위치에 데이터를 쓰는 방법을 정의하는 WriteOperation을 만듭니다.

Sink를 구현하려면 하위 클래스가 다음 추상 메소드를 재정의해야 합니다.

  • validate: 이 메소드는 파이프라인 데이터의 출력 위치가 유효하고 쓸 수 있는지 확인합니다. validate는 파일을 열 수 있고 출력 디렉토리가 존재하며 사용자가 데이터베이스 테이블 등에 액세스 권한을 가지고 있는지 확인합니다. Dataflow 서비스는 파이프라인 생성 시에 validate를 호출합니다.
  • createWriteOperation: 이 메소드는 출력 위치에 쓰는 방식을 정의하는 Sink.WriteOperation 객체를 만듭니다.

WriteOperation 하위 클래스 구현

WriteOperation 하위 클래스는 요소 번들을 Sink에 정의된 출력 위치에 쓰는 방식을 정의합니다. WriteOperation은 병렬 쓰기에 필요한 초기화최종화를 수행합니다.

WriteOperation을 구현하려면 하위 클래스가 다음 추상 메소드를 재정의해야 합니다.

  • initialize: 이 메소드는 출력 위치에 쓰기 전에 필요한 초기화를 수행합니다. Dataflow 서비스는 쓰기를 시작하기 전에 이 메소드를 호출합니다. 예를 들어, initialize를 사용하여 임시 출력 디렉토리를 만들 수 있습니다.
  • finalize: 이 메소드는 Writer 클래스가 수행한 쓰기 결과를 처리합니다. finalize 구현은 실패한 쓰기 또는 성공적으로 다시 시도된 쓰기에서 삭제를 수행하고 실패한 쓰기에 의해 쓰여진 임시 또는 부분 출력을 찾을 수 있어야 합니다.

    finalize는 오류 또는 다시 시도 시에 여러 번 호출될 수 있으므로, 가장 좋은 방법은 finalize 구현을 원자적으로 만드는 것입니다. 이것이 불가능하다면 finalize를 멱등적으로 구현해야 합니다.
  • createWriter: 이 메소드는 Sink.Writer 객체를 만듭니다. 이 객체는 데이터 번들을 Sink에 정의된 출력 위치에 씁니다.

기록기 하위 클래스 구현

Writer 하위 클래스는 Sink에 정의된 출력 위치에 단일 레코드 번들을 쓰기 위한 논리를 구현합니다. Dataflow 서비스는 동일한 작업자의 여러 스레드에서 Writer의 인스턴스 여러 개를 인스턴스화할 수 있습니다. 따라서 정적 구성원 또는 메소드에 대한 액세스는 스레드 안전이어야 합니다.

Writer를 구현하려면 하위 클래스가 다음 추상 메소드를 재정의해야 합니다.

  • open: 이 메소드는 쓰기를 위한 임시 파일을 만드는 등 써야 하는 레코드 번들을 초기화합니다. Dataflow 서비스는 쓰기 시작 시 이 메소드를 한 번 호출하고 여기에 쓰려는 레코드 번들의 고유 번들 ID를 전달합니다.
  • write: 이 메소드는 출력 위치에 단일 레코드를 씁니다. Dataflow 서비스는 번들의 각 값에 대해 write를 호출합니다.
  • close: 이 메소드는 쓰기를 마치고 번들 쓰기에 사용된 리소스를 닫습니다. close는 바깥쪽 WriteOperation이 성공적 쓰기를 확인하는 데 사용하는 기록기 결과를 반환해야 합니다. 쓰기가 끝나면 Dataflow 서비스는 이 메소드를 한 번 호출합니다.

번들 ID 처리

서비스가 Writer.open을 호출하면 쓰려는 레코드의 고유 번들 ID가 전달됩니다. Writer는 이 번들 ID를 사용하여 출력이 동시에 만들어졌을 수 있는 다른 Writer 인스턴스의 출력과 간섭을 일으키지 않는지 확인해야 합니다. 이는 Dataflow 서비스가 오류 발생 시 쓰기를 여러 번 다시 시도할 수 있으므로 매우 중요합니다.

예를 들어 Sink의 출력이 파일 기반인 경우, Writer 클래스는 번들 ID를 파일 이름 접미사로 사용하여 Writer가 다른 Writer에서 사용되지 않는 고유한 출력 파일에 레코드를 쓰도록 할 수 있습니다. 그런 다음 Writerclose 메소드가 해당 파일 위치를 쓰기 결과의 일부로 반환하도록 할 수 있습니다.

Dataflow SDK의 DatastoreIO 구현 예에서 필요한 추상 메소드와 함께 Sink, WriteOperation, Writer를 구현하는 방법에 대한 모델을 확인할 수 있습니다.

편리한 싱크 및 기록기 기본 클래스

Dataflow SDK에는 파일과 같은 일반적인 데이터 저장소 형식으로 작동하는 SourceReader 클래스를 간편하게 만들도록 편리한 추상 기본 클래스가 포함되어 있습니다.

FileBasedSink

데이터 소스가 파일을 사용하는 경우, 자바용 Dataflow SDK의 FileBasedSink, FileBasedWriteOperation, FileBasedWriter 추상 기본 클래스에서 Sink, WriteOperation, Writer 클래스를 유도할 수 있습니다. 이러한 클래스는 다음과 같이 파일과 상호작용하는 Dataflow 소스에 공통적인 코드를 구현합니다.

  • 파일 머리글 및 바닥글 설정
  • 순차적 레코드 쓰기
  • 출력 MIME 유형 설정

FileBasedSink 및 하위 클래스는 로컬 파일과 Google Cloud Storage의 파일 모두에 쓰기를 지원합니다. 자세한 내용은 자바용 Dataflow SDK에서 XmlSink라 하는 FileBasedSink 구현 예를 참조하세요.

커스텀 싱크에 쓰기

파이프라인의 커스텀 싱크에 데이터를 쓰려면 SDK 일반 Write 변환을 적용하고 .to 연산자를 사용하여 커스텀 싱크를 매개변수로 전달합니다.

자바

  p.apply("WriteResults", Write.to(new MySink()));
이 페이지가 도움이 되었나요? 평가를 부탁드립니다.

다음에 대한 의견 보내기...

도움이 필요하시나요? 지원 페이지를 방문하세요.