이 문서에서는 Apache Beam PubSubIO
I/O 커넥터를 사용하여 Dataflow의 텍스트 데이터를 Pub/Sub로 쓰는 방법을 설명합니다.
개요
Pub/Sub에 데이터를 쓰려면 PubSubIO
커넥터를 사용합니다. 입력 요소는 Pub/Sub 메시지나 단지 메시지 데이터일 수 있습니다.
입력 요소가 Pub/Sub 메시지이면 선택적으로 메시지마다 속성이나 순서 키를 설정할 수 있습니다.
다음과 같이 PubSubIO
커넥터의 Java, Python 또는 Go 버전을 사용할 수 있습니다.
자바
단일 주제에 쓰려면 PubsubIO.writeMessages
메서드를 호출합니다. 이 메서드는 PubsubMessage
객체의 입력 컬렉션을 사용합니다. 커넥터는 문자열, 바이너리로 인코딩된 Avro 메시지 또는 바이너리로 인코딩된 protobuf 메시지를 작성하기 위한 편의 메서드도 정의합니다. 이러한 메서드는 입력 컬렉션을 Pub/Sub 메시지로 변환합니다.
입력 데이터를 기반으로 동적 주제 세트에 쓰려면 writeMessagesDynamic
을 호출합니다. 메시지에서 PubsubMessage.withTopic
을 호출하여 각 메시지의 대상 주제를 지정합니다. 예를 들어 입력 데이터의 특정 필드 값에 따라 메시지를 여러 주제로 라우팅할 수 있습니다.
자세한 내용은 PubsubIO
참고 문서를 확인하세요.
Python
pubsub.WriteToPubSub
메서드를 호출합니다.
기본적으로 이 메서드는 메시지 페이로드를 나타내는 bytes
유형의 입력 컬렉션을 사용합니다. with_attributes
매개변수가 True
이면 메서드는 PubsubMessage
객체의 컬렉션을 사용합니다.
자세한 내용은 pubsub
모듈 참고 문서를 확인하세요.
Go
Pub/Sub에 데이터를 쓰려면 pubsubio.Write
메서드를 호출합니다. 이 메서드는 PubSubMessage
객체 또는 메시지 페이로드가 포함된 바이트 슬라이스의 입력 컬렉션을 사용합니다.
자세한 내용은 pubsubio
패키지 참고 문서를 확인하세요.
Pub/Sub 메시지에 대한 자세한 내용은 Pub/Sub 문서의 메시지 형식을 참조하세요.
타임스탬프
Pub/Sub는 메시지마다 타임스탬프를 설정합니다. 이 타임스탬프는 메시지가 Pub/Sub에 게시된 시간을 나타냅니다. 스트리밍 시나리오에서는 메시지 데이터가 생성된 시간인 이벤트 타임스탬프도 고려할 수 있습니다. Apache Beam 요소 타임스탬프를 사용하여 이벤트 시간을 나타낼 수 있습니다. 바인딩되지 않은 PCollection
을 만드는 소스에서 새 요소 각각에 이벤트 시간에 해당하는 타임스탬프를 할당하는 경우가 많습니다.
Java 및 Python의 경우 Pub/Sub I/O 커넥터는 각 요소의 타임스탬프를 Pub/Sub 메시지 속성으로 쓸 수 있습니다. 메시지 소비자는 이 속성을 사용하여 이벤트 타임스탬프를 가져올 수 있습니다.
자바
PubsubIO.Write<T>.withTimestampAttribute
를 호출하고 속성 이름을 지정합니다.
Python
WriteToPubSub
을 호출할 때 timestamp_attribute
매개변수를 지정합니다.
메시지 전달
Dataflow는 한 파이프라인 내에서 메시지를 단 한 번 처리할 수 있습니다. 그러나 Pub/Sub I/O 커넥터는 Pub/Sub을 통해 단 한 번 메시지 전송을 보장할 수 없습니다.
Java 및 Python의 경우 각 요소의 고유 ID를 메시지 속성으로 쓰도록 Pub/Sub I/O 커넥터를 구성할 수 있습니다. 그런 다음 메시지 소비자가 이 속성을 사용하여 메시지를 중복 삭제할 수 있습니다.
자바
PubsubIO.Write<T>.withIdAttribute
를 호출하고 속성 이름을 지정합니다.
Python
WriteToPubSub
을 호출할 때 id_label
매개변수를 지정합니다.
예시
다음 예시에서는 Pub/Sub 메시지의 PCollection
을 만들어 Pub/Sub 주제에 씁니다. 주제는 파이프라인 옵션으로 지정됩니다. 각 메시지에는 페이로드 데이터와 속성 집합이 포함됩니다.
Java
Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.
Python
Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.