이 문서에서는 Dataflow에서 Apache Kafka로 데이터를 쓰는 방법을 설명합니다.
Apache Beam Kafka I/O 커넥터(KafkaIO
)는 Java에서 기본적으로 사용할 수 있으며 Apache Beam 다국어 파이프라인 프레임워크를 사용하면 Python 및 Go에서도 사용할 수 있습니다.
Java 파이프라인의 경우 관리형 I/O 커넥터를 사용하여 Kafka에서 읽는 것이 좋습니다.
단 한 번 처리
기본적으로 KafkaIO
커넥터는 쓰기에 정확히 한 번 시맨틱스를 제공하지 않습니다. 즉, 데이터가 Kafka 주제에 여러 번 쓰일 수 있습니다. 정확히 한 번 쓰기를 사용 설정하려면 withEOS
메서드를 호출합니다. 정확히 한 번 쓰기는 데이터가 대상 Kafka 주제에 정확히 한 번만 쓰여지도록 보장합니다.
하지만 파이프라인 비용이 증가하고 처리량이 감소합니다.
정확히 한 번 시맨틱스에 대한 엄격한 요구사항이 없고 파이프라인의 로직이 중복 레코드를 처리할 수 있다면 전체 파이프라인에 적어도 한 번 모드를 사용 설정하여 비용을 절감하세요. 자세한 내용은 파이프라인 스트리밍 모드 설정을 참고하세요.
파이프라인 드레이닝
파이프라인을 드레이닝하면 정확히 한 번 시맨틱스가 보장되지 않습니다. 확인된 데이터가 손실되지 않는다는 점만 보장됩니다. 따라서 파이프라인이 드레이닝되는 동안 Kafka에 읽기 오프셋을 커밋하지 않고 일부 데이터가 처리될 수 있습니다. 파이프라인을 수정할 때 Kafka의 적어도 한 번 시맨틱스를 사용하려면 작업을 취소하고 새 작업을 시작하는 대신 파이프라인을 업데이트하세요.
적확히 한 번 시맨틱스에 맞게 Kafka 조정
transaction.max.timeout.ms
및 transactional.id.expiration.ms
를 조정하면 전반적인 내결함성 및 정확히 한 번 전송 전략을 보완할 수 있습니다.
하지만 서비스 중단 특성과 구체적인 구성에 따라 영향이 달라집니다. Kafka 브로커 중단으로 인한 데이터 중복을 방지하려면 transaction.max.timeout.ms
를 Kafka 주제의 보관 기간에 가깝게 설정합니다.
(네트워크 파티션 또는 노드 장애 등으로 인해) Kafka 브로커를 일시적으로 사용할 수 없게 되고 프로듀서에 진행 중인 트랜잭션이 있으면 이러한 트랜잭션의 시간이 초과될 수 있습니다. transaction.max.timeout.ms
값을 늘리면 브로커가 복구된 후 트랜잭션을 완료하는 데 더 많은 시간이 주어지므로 트랜잭션을 다시 시작하고 메시지를 다시 전송할 필요가 없을 수 있습니다. 이 완화 조치는 트랜잭션 다시 시작으로 인한 중복 메시지 발생 가능성을 줄여 정확히 한 번 시맨틱스를 유지하는 데 간접적으로 도움이 됩니다. 반면 만료 시간을 줄이면 비활성 트랜잭션 ID를 더 빠르게 정리하여 잠재적인 리소스 사용량을 줄일 수 있습니다.
네트워킹 구성
기본적으로 Dataflow는 기본 Virtual Private Cloud(VPC) 네트워크 내에서 인스턴스를 실행합니다. Kafka 구성에 따라 Dataflow에 대해 다른 네트워크 및 서브넷을 구성해야 할 수도 있습니다. 자세한 내용은 네트워크 및 서브네트워크 지정을 참조하세요. 네트워크를 구성할 때 Dataflow 작업자 머신이 Kafka 브로커에 도달할 수 있도록 방화벽 규칙을 만듭니다.
VPC 서비스 제어를 사용하는 경우 Kafka 클러스터를 VPC 서비스 제어 경계 내에 배치하거나 승인된 VPN 또는 Cloud Interconnect로 경계를 확장합니다.
Kafka 클러스터가 Google Cloud 외부에 배포된 경우 Dataflow와 Kafka 클러스터 간에 네트워크 연결을 만들어야 합니다. 각각의 장단점에 따라 몇 가지 네트워킹 옵션이 있습니다.
- 다음 중 하나를 사용하여 공유 RFC 1918 주소 공간을 사용하여 연결합니다.
- 다음 중 하나를 사용하여 공개 IP 주소를 통해 외부에 호스팅된 Kafka 클러스터에 연결합니다.
예측 가능한 성능과 안정성을 위해 가장 좋은 옵션은 Dedicated Interconnect이지만, 이 옵션은 제3자가 새로운 회로를 프로비저닝해야 하므로 설정하는 데 오래 걸릴 수 있습니다. 공개 IP 기반 토폴로지에서는 네트워킹 작업이 거의 필요하지 않으므로 빠른 시작이 가능합니다.
다음 두 섹션에서는 이러한 옵션을 자세히 설명합니다.
공유 RFC 1918 주소 공간
Dedicated Interconnect와 IPsec VPN 모두 Virtual Private Cloud(VPC)의 RFC 1918 IP 주소에 직접 액세스하여 Kafka 구성을 단순화할 수 있습니다. VPN 기반 토폴로지를 사용하고 있다면 처리량이 높은 VPN 설정을 고려할 수 있습니다.
기본적으로 Dataflow는 기본 VPC 네트워크에서 인스턴스를 실행합니다. Google Cloud의 서브네트워크를 해당 Kafka 클러스터에 연결하는 경로가 Cloud Router에 명시적으로 정의된 비공개 네트워크 토폴로지에서는 Dataflow 인스턴스를 어디에 둘지를 더 자세히 제어해야 합니다. Dataflow를 사용하여 network
및 subnetwork
실행 매개변수를 구성할 수 있습니다.
Dataflow가 수평 확장을 시도함에 따라 인스턴스를 실행하기에 충분한 IP 주소가 해당 서브네트워크에 있는지 확인합니다. 또한 Dataflow 인스턴스를 실행하기 위한 별도의 네트워크를 만들 때 프로젝트의 모든 가상 머신 간에 TCP 트래픽을 사용 설정하는 방화벽 규칙이 있는지 확인합니다. 기본 네트워크에는 이 방화벽 규칙이 이미 구성되어 있습니다.
공개 IP 주소 공간
이 아키텍처는 전송 계층 보안(TLS)을 사용하여 외부 클라이언트와 Kafka 간의 트래픽을 보호하고 브로커 간 통신에 암호화되지 않은 트래픽을 사용합니다. Kafka 리스너가 내부 및 외부 통신에 모두 사용되는 네트워크 인터페이스에 바인딩되는 경우에는 리스너 구성이 간편합니다. 그러나 많은 경우에 클러스터에 속한 Kafka 브로커의 외부 공지 주소는 Kafka가 사용하는 내부 네트워크 인터페이스와 다릅니다. 이러한 경우 advertised.listeners
속성을 사용할 수 있습니다.
# Configure protocol map listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093
외부 클라이언트는 'SSL' 채널을 통해 포트 9093으로 연결하고, 내부 클라이언트는 일반 텍스트 채널을 통해 포트 9092로 연결합니다. advertised.listeners
에 주소를 지정할 때는 외부 및 내부 트래픽에서 동일한 인스턴스로 확인되는 DNS 이름(이 샘플에서는 kafkabroker-n.mydomain.com
)을 사용합니다. 공개 IP 주소는 내부 트래픽에서 확인에 실패할 수 있으므로 사용하지 못할 가능성이 높습니다.
로깅
KafkaIO
에서 로깅하면 상당히 상세한 로그가 생성될 수 있습니다. 다음과 같이 프로덕션에서 로깅 수준을 낮추는 것이 좋습니다.
sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.
자세한 내용은 파이프라인 작업자 로그 수준 설정을 참고하세요.