Apache Kafka에서 Dataflow로 읽기

이 문서에서는 Apache Kafka에서 Dataflow로 데이터를 읽는 방법을 설명합니다.

Apache Beam Kafka I/O 커넥터(KafkaIO)는 Java에서 기본적으로 사용할 수 있으며 Apache Beam 다국어 파이프라인 프레임워크를 사용하면 PythonGo에서도 사용할 수 있습니다.

Java 파이프라인의 경우 관리형 I/O 커넥터를 사용하여 Kafka에서 읽는 것이 좋습니다.

동시 로드

동시 로드는 최대 작업자 수(max_num_workers)와 Kafka 파티션 수라는 두 가지 요소에 의해 제한됩니다. Dataflow는 기본적으로 4 x max_num_workers의 동시 로드 팬아웃으로 설정됩니다. 그러나 팬아웃은 파티션 수로 제한됩니다. 예를 들어 사용 가능한 vCPU가 100개이지만 파이프라인이 10개의 Kafka 파티션에서만 읽는 경우 최대 동시 로드는 10입니다.

동시 로드를 최대화하려면 최소한 4 x max_num_workers개의 Kafka 파티션을 사용하는 것이 좋습니다. 작업에서 Runner v2를 사용하는 경우 동시 로드를 더 높게 설정하는 것이 좋습니다. 시작하려면 파티션 수를 작업자 vCPU 수의 두 배로 설정하는 것이 좋습니다.

파티션 수를 늘릴 수 없는 경우 Kafka 읽기 단계 뒤에 Reshuffle 또는 Redistribute 단계를 삽입해 보세요. 이 단계를 통해 Dataflow는 데이터를 더 효율적으로 재배포하고 동시 로드할 수 있지만 셔플 단계를 실행하는 데 추가 오버헤드가 발생합니다. 자세한 내용은 동시 로드에 영향을 미치는 요인을 참조하세요.

파티션 간의 로드가 상대적으로 균등하고 왜곡되지 않도록 합니다. 로드가 왜곡되면 작업자 사용률이 저하될 수 있습니다. 로드가 가벼운 파티션에서 읽는 작업자는 상대적으로 유휴 상태일 수 있지만, 로드가 큰 파티션에서 읽는 작업자는 뒤처질 수 있습니다. Dataflow는 파티션별 백로그에 대한 측정항목을 제공합니다.

로드가 왜곡된 경우 동적 작업 분산을 사용하면 작업을 분산하는 데 도움이 됩니다. 예를 들어 Dataflow는 볼륨이 작은 여러 파티션에서 읽도록 한 작업자를 할당하고 볼륨이 큰 단일 파티션에서 읽도록 다른 작업자를 할당할 수 있습니다. 그러나 두 작업자가 동일한 파티션에서 읽을 수 없으므로 로드가 많은 파티션으로 인해 파이프라인이 지연될 수 있습니다.

권장사항

이 섹션에는 Kafka에서 Dataflow로 읽어오는 방법에 관한 권장사항이 포함되어 있습니다.

볼륨이 적은 주제

일반적인 시나리오는 적은 양의 여러 주제를 동시에 읽는 것입니다(예: 고객당 하나의 주제). 각 주제에 대해 별도의 Dataflow 작업을 만드는 것은 비용 효율적이지 않습니다. 각 작업에는 전체 작업자가 한 명 이상 필요하기 때문입니다. 대신 다음 옵션을 고려해 보세요.

  • 주제 병합. 주제를 Dataflow로 수집하기 전에 결합합니다. 소수의 대용량 주제를 수집하는 것이 다수의 소규모 주제를 수집하는 것보다 훨씬 효율적입니다. 각 대용량 주제는 작업자를 최대한 활용하는 단일 Dataflow 작업으로 처리할 수 있습니다.

  • 여러 주제 읽기. 주제를 Dataflow로 수집하기 전에 결합할 수 없는 경우 여러 주제에서 읽는 파이프라인을 만드는 것이 좋습니다. 이 접근 방식을 사용하면 Dataflow가 동일한 작업자에게 여러 주제를 할당할 수 있습니다. 이 접근 방식을 구현하는 방법에는 두 가지가 있습니다.

    • 단일 읽기 단계. KafkaIO 커넥터의 단일 인스턴스를 만들고 여러 주제를 읽도록 구성합니다. 그런 다음 주제 이름으로 필터링하여 주제별로 다른 로직을 적용합니다. 코드 예시는 여러 주제에서 읽기를 참조하세요. 모든 주제가 동일한 클러스터에 배치된 경우 이 옵션을 고려하세요. 단점은 단일 싱크 또는 변환에 문제가 있으면 모든 주제의 백로그가 누적될 수 있다는 점입니다.

      고급 사용 사례의 경우 읽을 주제를 지정하는 KafkaSourceDescriptor 객체 집합을 전달합니다. KafkaSourceDescriptor를 사용하면 필요한 경우 나중에 주제 목록을 업데이트할 수 있습니다. 이 기능을 사용하려면 Runner v2를 사용하는 Java가 필요합니다.

    • 여러 읽기 단계. 서로 다른 클러스터에 있는 주제에서 읽으려면 파이프라인에 여러 개의 KafkaIO 인스턴스를 포함할 수 있습니다. 작업이 실행되는 동안 변환 매핑을 사용하여 개별 소스를 업데이트할 수 있습니다. 새 주제 또는 클러스터를 설정하는 기능은 Runner v2를 사용할 때만 지원됩니다. 이 접근 방식에서는 파이프라인 수준 측정항목을 사용하는 대신 각 개별 읽기 변환을 모니터링해야 하므로 관측 가능성 문제가 발생할 수 있습니다.

Kafka에 다시 커밋

기본적으로 KafkaIO 커넥터는 Kafka 오프셋을 사용하여 진행 상황을 추적하지 않으며 Kafka에 다시 커밋하지 않습니다. commitOffsetsInFinalize를 호출하면 커넥터는 Dataflow에서 레코드가 커밋된 후 Kafka에 다시 커밋하기 위해 최선을 다합니다. Dataflow에서 커밋된 레코드는 완전히 처리되지 않을 수 있으므로 파이프라인을 취소하면 레코드가 완전히 처리되지 않고 오프셋이 커밋될 수 있습니다.

enable.auto.commit=True를 설정하면 Dataflow에서 처리하지 않고 Kafka에서 읽자마자 오프셋이 커밋되므로 이 옵션은 사용하지 않는 것이 좋습니다. enable.auto.commit=FalsecommitOffsetsInFinalize=True를 모두 설정하는 것이 좋습니다. enable.auto.commitTrue로 설정한 경우 처리 중에 파이프라인이 중단되면 데이터가 손실될 수 있습니다. Kafka에 이미 커밋된 레코드가 삭제될 수 있습니다.

워터마크

기본적으로 KafkaIO 커넥터는 현재 처리 시간을 사용하여 출력 워터마크 및 이벤트 시간을 할당합니다. 이 동작을 변경하려면 withTimestampPolicyFactory를 호출하고 TimestampPolicy를 할당합니다. Beam은 Kafka의 로그 추가 시간 또는 메시지 생성 시간을 기반으로 워터마크를 계산하는 TimestampPolicy 구현을 제공합니다.

러너 고려사항

KafkaIO 커넥터에는 Kafka 읽기에 관한 두 가지 기본 구현, 즉 이전 ReadFromKafkaViaUnbounded와 최신 ReadFromKafkaViaSDF가 있습니다. Dataflow는 SDK 언어 및 작업 요구사항에 따라 작업에 가장 적합한 구현을 자동으로 선택합니다. 해당 구현에서만 사용할 수 있는 특정 기능이 필요하지 않은 한 러너 또는 Kafka 구현을 명시적으로 요청하지 마세요. 러너 선택에 관한 자세한 내용은 Dataflow Runner v2 사용을 참조하세요.

파이프라인에서 withTopic 또는 withTopics를 사용하는 경우 이전 구현은 파이프라인 생성 시 사용 가능한 파티션에 대해 Kafka에 쿼리합니다. 파이프라인을 만드는 머신에 Kafka에 연결할 권한이 있어야 합니다. 권한 오류가 발생하면 로컬에서 Kafka에 연결할 권한이 있는지 확인합니다. 파이프라인 구성 시 Kafka에 연결되지 않는 withTopicPartitions를 사용하면 이 문제를 방지할 수 있습니다.

프로덕션에 배포

프로덕션에 솔루션을 배포할 때는 Flex 템플릿을 사용하는 것이 좋습니다. Flex 템플릿을 사용하면 일관된 환경에서 파이프라인이 실행되므로 로컬 구성 문제를 완화하는 데 도움이 됩니다.

KafkaIO에서 로깅하면 상당히 장황해질 수 있습니다. 다음과 같이 프로덕션에서 로깅 수준을 낮추는 것이 좋습니다.

sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.

자세한 내용은 파이프라인 작업자 로그 수준 설정을 참조하세요.

네트워킹 구성

기본적으로 Dataflow는 기본 Virtual Private Cloud(VPC) 네트워크 내에서 인스턴스를 실행합니다. Kafka 구성에 따라 Dataflow에 대해 다른 네트워크 및 서브넷을 구성해야 할 수도 있습니다. 자세한 내용은 네트워크 및 서브네트워크 지정을 참조하세요. 네트워크를 구성할 때 Dataflow 작업자 머신이 Kafka 브로커에 도달할 수 있도록 방화벽 규칙을 만듭니다.

VPC 서비스 제어를 사용하는 경우 Kafka 클러스터를 VPC 서비스 제어 경계 내에 배치하거나 승인된 VPN 또는 Cloud Interconnect로 경계를 확장합니다.

Kafka 클러스터가 Google Cloud 외부에 배포된 경우 Dataflow와 Kafka 클러스터 간에 네트워크 연결을 만들어야 합니다. 각각의 장단점에 따라 몇 가지 네트워킹 옵션이 있습니다.

예측 가능한 성능과 안정성을 위해 가장 좋은 옵션은 Dedicated Interconnect이지만, 이 옵션은 제3자가 새로운 회로를 프로비저닝해야 하므로 설정하는 데 오래 걸릴 수 있습니다. 공개 IP 기반 토폴로지에서는 네트워킹 작업이 거의 필요하지 않으므로 빠른 시작이 가능합니다.

다음 두 섹션에서는 이러한 옵션을 자세히 설명합니다.

공유 RFC 1918 주소 공간

Dedicated Interconnect와 IPsec VPN 모두 Virtual Private Cloud(VPC)의 RFC 1918 IP 주소에 직접 액세스하여 Kafka 구성을 단순화할 수 있습니다. VPN 기반 토폴로지를 사용하고 있다면 처리량이 높은 VPN 설정을 고려할 수 있습니다.

기본적으로 Dataflow는 기본 VPC 네트워크에서 인스턴스를 실행합니다. Google Cloud의 서브네트워크를 해당 Kafka 클러스터에 연결하는 경로가 Cloud Router에 명시적으로 정의된 비공개 네트워크 토폴로지에서는 Dataflow 인스턴스를 어디에 둘지를 더 자세히 제어해야 합니다. Dataflow를 사용하여 networksubnetwork 실행 매개변수를 구성할 수 있습니다.

Dataflow가 수평 확장을 시도함에 따라 인스턴스를 실행하기에 충분한 IP 주소가 해당 서브네트워크에 있는지 확인합니다. 또한 Dataflow 인스턴스를 실행하기 위한 별도의 네트워크를 만들 때 프로젝트의 모든 가상 머신 간에 TCP 트래픽을 사용 설정하는 방화벽 규칙이 있는지 확인합니다. 기본 네트워크에는 이 방화벽 규칙이 이미 구성되어 있습니다.

공개 IP 주소 공간

이 아키텍처는 전송 계층 보안(TLS)을 사용하여 외부 클라이언트와 Kafka 간의 트래픽을 보호하고 브로커 간 통신에 암호화되지 않은 트래픽을 사용합니다. Kafka 리스너가 내부 및 외부 통신에 모두 사용되는 네트워크 인터페이스에 바인딩되는 경우에는 리스너 구성이 간편합니다. 그러나 많은 경우에 클러스터에 속한 Kafka 브로커의 외부 공지 주소는 Kafka가 사용하는 내부 네트워크 인터페이스와 다릅니다. 이러한 경우 advertised.listeners 속성을 사용할 수 있습니다.

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

외부 클라이언트는 'SSL' 채널을 통해 포트 9093으로 연결하고, 내부 클라이언트는 일반 텍스트 채널을 통해 포트 9092로 연결합니다. advertised.listeners에 주소를 지정할 때는 외부 및 내부 트래픽에서 동일한 인스턴스로 확인되는 DNS 이름(이 샘플에서는 kafkabroker-n.mydomain.com)을 사용합니다. 공개 IP 주소는 내부 트래픽에서 확인에 실패할 수 있으므로 사용하지 못할 가능성이 높습니다.

Kafka 조정

Kafka 클러스터 및 Kafka 클라이언트 설정은 성능에 큰 영향을 미칠 수 있습니다. 특히 다음 설정이 너무 낮을 수 있습니다. 이 섹션에서는 몇 가지 시작점을 제안하지만 특정 워크로드에 맞게 이러한 값을 실험해 보시기 바랍니다.

  • unboundedReaderMaxElements. 기본값은 10,000입니다. 100,000과 같이 더 높은 값을 사용하면 번들의 크기가 커질 수 있으며, 이는 파이프라인에 집계가 포함된 경우 성능을 크게 개선할 수 있습니다. 하지만 값이 클수록 지연 시간도 늘어날 수 있습니다. 값을 설정하려면 setUnboundedReaderMaxElements를 사용합니다. 이 설정은 Runner v2에는 적용되지 않습니다.

  • unboundedReaderMaxReadTimeMs. 기본값은 10,000msec입니다. 20,000msec와 같이 더 큰 값을 사용하면 번들 크기가 늘어날 수 있고, 5,000msec와 같이 더 작은 값을 사용하면 지연 시간 또는 백로그가 줄어들 수 있습니다. 값을 설정하려면 setUnboundedReaderMaxReadTimeMs를 사용합니다. 이 설정은 Runner v2에는 적용되지 않습니다.

  • max.poll.records. 기본값은 500입니다. 값이 클수록 특히 Runner v2를 사용할 때 더 많은 수신 레코드를 함께 검색하여 성능이 향상될 수 있습니다. 값을 설정하려면 withConsumerConfigUpdates를 호출합니다.

  • fetch.max.bytes. 기본값은 1MB입니다. 값이 클수록 특히 Runner v2를 사용할 때 요청 수가 줄어들어 처리량이 향상될 수 있습니다. 하지만 다운스트림 처리가 주요 병목 현상일 가능성이 더 높지만 이 값을 너무 높게 설정하면 지연 시간이 늘어날 수 있습니다. 권장 시작 값은 100MB입니다. 값을 설정하려면 withConsumerConfigUpdates를 호출합니다.

  • max.partition.fetch.bytes. 기본값은 1MB입니다. 이 매개변수는 서버가 반환하는 파티션당 최대 데이터 양을 설정합니다. 값을 늘리면 특히 Runner v2를 사용할 때 요청 수를 줄여 처리량을 개선할 수 있습니다. 하지만 다운스트림 처리가 주요 병목 현상일 가능성이 더 높지만 이 값을 너무 높게 설정하면 지연 시간이 늘어날 수 있습니다. 권장 시작 값은 100MB입니다. 값을 설정하려면 withConsumerConfigUpdates를 호출합니다.

  • consumerPollingTimeout. 기본값은 2초입니다. 레코드를 읽기 전에 소비자 클라이언트가 시간 초과되면 더 높은 값을 설정해 보세요. 이 설정은 교차 리전 읽기 또는 느린 네트워크에서 읽기를 실행할 때 가장 관련성이 높습니다. 값을 설정하려면 withConsumerPollingTimeout을 호출합니다.

receive.buffer.bytes가 메시지 크기를 처리하기에 충분히 큰지 확인합니다. 값이 너무 작으면 로그에 소비자가 지속적으로 다시 만들어지고 특정 오프셋을 찾고 있다고 표시될 수 있습니다.

예시

다음 코드 예시는 Kafka에서 읽는 Dataflow 파이프라인을 만드는 방법을 보여줍니다.

단일 주제에서 읽기

이 예시에서는 Kafka 주제에서 읽고 메시지 페이로드를 텍스트 파일에 씁니다.

Java

Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import com.google.common.collect.ImmutableMap;
import java.io.UnsupportedEncodingException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class KafkaRead {

  public static Pipeline createPipeline(Options options) {

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("bootstrap_servers", options.getBootstrapServer())
        .put("topic", options.getTopic())
        .put("data_format", "RAW")
        .put("max_read_time_seconds", 15)
        .put("auto_offset_reset_config", "earliest")
        .build();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    pipeline
        // Read messages from Kafka.
        .apply(Managed.read(Managed.KAFKA).withConfig(config)).getSinglePCollection()
        // Get the payload of each message and convert to a string.
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via((row -> {
              var bytes = row.getBytes("payload");
              try {
                return new String(bytes, "UTF-8");
              } catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
              }
            })))
        // Write the payload to a text file.
        .apply(TextIO
            .write()
            .to(options.getOutputPath())
            .withSuffix(".txt")
            .withNumShards(1));
    return pipeline;
  }
}

Python

Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import argparse

import apache_beam as beam

from apache_beam import window
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #     --topic=$KAFKA_TOPIC --bootstrap_server=$BOOTSTRAP_SERVER
    #     --output=$CLOUD_STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic")
            parser.add_argument("--bootstrap_server")
            parser.add_argument("--output")

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            # Read messages from an Apache Kafka topic.
            | ReadFromKafka(
                consumer_config={"bootstrap.servers": options.bootstrap_server},
                topics=[options.topic],
                with_metadata=False,
                max_num_records=5,
                start_read_time=0,
            )
            # The previous step creates a key-value collection, keyed by message ID.
            # The values are the message payloads.
            | beam.Values()
            # Subdivide the output into fixed 5-second windows.
            | beam.WindowInto(window.FixedWindows(5))
            | WriteToText(
                file_path_prefix=options.output, file_name_suffix=".txt", num_shards=1
            )
        )

여러 주제에서 읽기

이 예시에서는 여러 Kafka 주제에서 읽고 각 주제에 별도의 파이프라인 로직을 적용합니다.

고급 사용 사례의 경우 읽을 주제 목록을 업데이트할 수 있도록 KafkaSourceDescriptor 객체 집합을 동적으로 전달합니다. 이 접근 방식에는 Runner v2를 사용하는 Java가 필요합니다.

Python

Dataflow에 인증하려면 애플리케이션 기본 사용자 인증 정보를 설정합니다. 자세한 내용은 로컬 개발 환경의 인증 설정을 참조하세요.

import argparse

import apache_beam as beam

from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #   --bootstrap_server=$BOOTSTRAP_SERVER --output=$STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument('--bootstrap_server')
            parser.add_argument('--output')

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        # Read from two Kafka topics.
        all_topics = pipeline | ReadFromKafka(consumer_config={
                "bootstrap.servers": options.bootstrap_server
            },
            topics=["topic1", "topic2"],
            with_metadata=True,
            max_num_records=10,
            start_read_time=0
        )

        # Filter messages from one topic into one branch of the pipeline.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic1')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic1" >> WriteToText(
                file_path_prefix=options.output + '/topic1/output',
                file_name_suffix='.txt',
                num_shards=1))

        # Filter messages from the other topic.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic2')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic2" >> WriteToText(
                file_path_prefix=options.output + '/topic2/output',
                file_name_suffix='.txt',
                num_shards=1))