从 Apache Kafka 读取到 Dataflow

本文档介绍如何从 Apache Kafka 读取数据到 Dataflow。

Apache Beam Kafka I/O 连接器 (KafkaIO) 原生适用于 Java,并且还适用于使用 Apache Beam 多语言流水线框架PythonGo

对于 Java 流水线,请考虑使用受管 I/O 连接器从 Kafka 读取数据。

最大并行数量

并行处理受两个因素的限制:工作器数量上限 (max_num_workers) 和 Kafka 分区数量。Dataflow 默认的并行度扇出为 4 x max_num_workers。不过,扇出受分区数量的限制。例如,如果有 100 个可用的 vCPU,但流水线仅从 10 个 Kafka 分区读取数据,则并行度上限为 10。

为尽可能提高并行度,建议至少有 4 个 max_num_workers Kafka 分区。如果您的作业使用 Runner v2,请考虑将并行处理数设置得更高。一个不错的起点是,让分区数等于工作器 vCPU 数的两倍。

如果您无法增加分区数,请考虑在 Kafka 读取步骤后插入 ReshuffleRedistribute 步骤。此步骤可让 Dataflow 更高效地重新分发和并行化数据,但会增加一些额外的开销来执行 shuffle 步骤。如需了解详情,请参阅影响并行性的因素

尽量确保各个分区之间的负载相对均衡,不存在偏差。如果负载不均衡,可能会导致工作器利用率不佳。从负载较轻的分区读取数据的工作器可能会相对空闲,而从负载较重分区读取数据的工作器可能会落后。Dataflow 会提供每个分区的积压指标。

如果负载不均衡,动态工作平衡可以帮助分发工作。例如,Dataflow 可能会分配一个工作器来从多个小数据量分区读取数据,并分配另一个工作器来从一个大数据量分区读取数据。但是,两个工作器无法从同一分区读取数据,因此负载较重的分区仍可能会导致流水线滞后。

最佳做法

本部分包含有关从 Kafka 读取到 Dataflow 的建议。

低需求主题

一种常见场景是同时从许多低流量主题读取数据,例如,每个客户一个主题。为每个主题创建单独的 Dataflow 作业成本效益不高,因为每个作业至少需要一个完整的工作器。请改为考虑以下方案:

  • 合并主题。在将主题提取到 Dataflow 之前合并主题。与提取较少的热门主题相比,提取较多的冷门主题效率要低得多。每个高容量主题都可以由一个充分利用其工作器的 Dataflow 作业来处理。

  • 读取多个主题。如果您无法在将主题提取到 Dataflow 之前合并这些主题,不妨考虑创建一个从多个主题读取数据的流水线。此方法可让 Dataflow 将多个主题分配给同一个工作器。您可以通过以下两种方式实现此方法:

    • 单次读取步骤。创建 KafkaIO 连接器的单个实例,并将其配置为读取多个主题。然后按主题名称进行过滤,以便针对每个主题应用不同的逻辑。如需查看示例代码,请参阅从多个主题读取数据。如果您的所有主题都位于同一集群中,不妨考虑使用此选项。一个缺点是,单个接收器或转换出现问题可能会导致所有主题积累积压。

      对于更高级的用例,请传入一组 KafkaSourceDescriptor 对象,以指定要从中读取的主题。使用 KafkaSourceDescriptor 可让您在需要时稍后更新主题列表。此功能需要使用 Java with Runner v2。

    • 多步读取。如需从位于不同集群中的主题读取数据,您的流水线可以包含多个 KafkaIO 实例。在作业运行期间,您可以使用转换映射更新各个来源。仅在使用 Runner v2 时支持设置新主题或集群。可观测性是这种方法面临的潜在挑战,因为您需要监控每个读取转换,而不是依赖于流水线级指标。

提交回 Kafka

默认情况下,KafkaIO 连接器不会使用 Kafka 偏移量来跟踪进度,也不会提交回 Kafka。如果您调用 commitOffsetsInFinalize,则连接器会在 Dataflow 中提交记录后尽力提交回 Kafka。Dataflow 中已提交的记录可能未完全处理,因此,如果您取消流水线,则可能会提交偏移量,而记录未完全处理。

由于设置 enable.auto.commit=True 会在从 Kafka 读取偏移量后立即提交偏移量,而无需 Dataflow 进行任何处理,因此我们不建议使用此选项。建议同时设置 enable.auto.commit=FalsecommitOffsetsInFinalize=True。如果您将 enable.auto.commit 设置为 True,而流水线又在处理期间中断,则数据可能会丢失。已在 Kafka 中提交的记录可能会被丢弃。

水印

默认情况下,KafkaIO 连接器会使用当前的处理时间来分配输出水印和事件时间。如需更改此行为,请调用 withTimestampPolicyFactory 并分配 TimestampPolicy。Beam 提供了 TimestampPolicy 的实现,可根据 Kafka 的日志附加时间或消息创建时间计算水印。

运行程序考虑事项

KafkaIO 连接器针对 Kafka 读取有两个底层实现:较旧的 ReadFromKafkaViaUnbounded 和较新的 ReadFromKafkaViaSDF。Dataflow 会根据您的 SDK 语言和作业要求,自动为您的作业选择最佳实现。除非您需要仅在该实现中提供的特定功能,否则请避免明确请求运行程序或 Kafka 实现。如需详细了解如何选择运行程序,请参阅使用 Dataflow Runner v2

如果您的流水线使用 withTopicwithTopics,则较旧的实现会在流水线构建时查询 Kafka 以获取可用分区。创建流水线的计算机必须具有连接到 Kafka 的权限。如果您收到权限错误,请验证您是否拥有在本地连接到 Kafka 的权限。您可以使用 withTopicPartitions 来避免此问题,因为它不会在流水线构建时连接到 Kafka。

部署到生产环境

在生产环境中部署解决方案时,建议使用 Flex 模板。通过使用 Flex 模板,流水线会从一致的环境中启动,这有助于缓解本地配置问题。

KafkaIO 记录日志可能会非常冗长。请考虑在生产环境中降低日志记录级别,如下所示:

sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.

如需了解详情,请参阅设置流水线工作器日志级别

配置网络

默认情况下,Dataflow 会在您的默认 Virtual Private Cloud (VPC) 网络中启动实例。根据您的 Kafka 配置,您可能需要为 Dataflow 配置不同的网络和子网。如需了解详情,请参阅指定网络和子网。 配置网络时,请创建防火墙规则,允许 Dataflow 工作器机器访问 Kafka 代理。

如果您在使用 VPC Service Controls,请将 Kafka 集群放在 VPC Service Controls 边界内,或者将边界扩展到授权 VPN 或 Cloud Interconnect 范围

如果您的 Kafka 集群部署在 Google Cloud 之外,您必须在 Dataflow 和 Kafka 集群之间创建网络连接。有几种网络选项可供您选择,它们各有利弊:

专用互连是实现可预测性能和可靠性的最佳选择,但由于第三方必须预配新线路,因此设置专用互连可能需要更长时间。使用基于公共 IP 的拓扑,您可以快速开始,因为只需完成很少的网络操作。

接下来的两个部分更详细地介绍了这些选项。

共享的 RFC 1918 地址空间

使用专用互连和 IPsec VPN,均可以直接访问虚拟私有云 (VPC) 中的 RFC 1918 IP 地址,从而简化 Kafka 配置。如果您使用基于 VPN 的拓扑,请考虑设置高吞吐量 VPN

默认情况下,Dataflow 会在您的默认 VPC 网络中启动实例。在专用网络拓扑中(其中的路由已在 Cloud Router 中明确定义并将 Google Cloud 中的子网连接到该 Kafka 集群),您需要更好地控制 Dataflow 实例的放置位置。您可以使用 Dataflow 配置 networksubnetwork 执行参数

确保相应子网具有足够的 IP 地址,以便 Dataflow 在尝试横向扩容时可在这些地址上启动实例。此外,在创建用于启动 Dataflow 实例的单独网络时,请确保您具有可在项目中的所有虚拟机之间启用 TCP 流量的防火墙规则。默认网络已配置此防火墙规则。

公共 IP 地址空间

此架构使用传输层安全协议 (TLS) 来保护外部客户端与 Kafka 之间的流量,并使用未加密的流量进行代理间通信。当 Kafka 侦听器绑定到用于内部和外部通信的网络接口时,配置侦听器就非常简单了。但在许多情况下,集群中 Kafka broker 的外部通告地址将与 Kafka 使用的内部网络接口不同。在这种情况下,您可以使用 advertised.listeners 属性:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

外部客户端使用端口 9093 通过“SSL”通道连接,内部客户端使用端口 9092 通过明文通道连接。在 advertised.listeners 下指定地址时,请使用解析为外部和内部流量的相同实例的 DNS 名称(在此示例中为 kafkabroker-n.mydomain.com)。使用公共 IP 地址可能不起作用,因为该地址可能无法解析内部流量。

调整 Kafka

Kafka 集群和 Kafka 客户端设置可能会对性能产生很大影响。具体而言,以下设置可能过低。本部分提供了一些建议的起点,但您应针对自己的特定工作负载尝试使用这些值。

  • unboundedReaderMaxElements。默认值为 10,000。较高的值(例如 100,000)可以增加捆绑包的大小,如果您的流水线包含汇总,这可以显著提高性能。不过,值越高,延迟时间也可能会越长。如需设置值,请使用 setUnboundedReaderMaxElements。此设置不适用于 Runner v2。

  • unboundedReaderMaxReadTimeMs。默认为 10,000 毫秒。较高的值(例如 20,000 毫秒)可以增加捆绑包大小,而较低的值(例如 5000 毫秒)可以缩短延迟时间或减少积压。如需设置值,请使用 setUnboundedReaderMaxReadTimeMs。此设置不适用于 Runner v2。

  • max.poll.records。默认值为 500。值越高,系统就越有可能一次检索更多传入记录,从而提高性能,尤其是在使用 Runner v2 时。如需设置值,请调用 withConsumerConfigUpdates

  • fetch.max.bytes。默认值为 1MB。值越高,通过减少请求数来提高吞吐量的可能性就越大,尤其是在使用 Runner v2 时。不过,如果将其设置得过高,可能会增加延迟时间,尽管下游处理更有可能成为主要瓶颈。建议的起始值为 100MB。 如需设置值,请调用 withConsumerConfigUpdates

  • max.partition.fetch.bytes。默认值为 1MB。此参数用于设置服务器返回的每个分区的最大数据量。增加该值可以通过减少请求数量来提高吞吐量,尤其是在使用 Runner v2 时。不过,如果将其设置得过高,可能会增加延迟时间,尽管下游处理更有可能成为主要瓶颈。建议的起始值为 100MB。如需设置值,请调用 withConsumerConfigUpdates

  • consumerPollingTimeout。默认值为 2 秒。如果使用方客户端在读取任何记录之前超时,请尝试设置更高的值。在执行跨区域读取或在网络速度较慢的情况下进行读取时,此设置最为相关。如需设置值,请调用 withConsumerPollingTimeout

确保 receive.buffer.bytes 足够大,可以处理消息的大小。如果该值过小,日志可能会显示使用方不断重新创建并寻找特定偏移量。

示例

以下代码示例展示了如何创建从 Kafka 读取数据的 Dataflow 流水线。

从单个主题读取数据

此示例从 Kafka 主题中读取消息载荷并将其写入文本文件。

Java

如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import com.google.common.collect.ImmutableMap;
import java.io.UnsupportedEncodingException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class KafkaRead {

  public static Pipeline createPipeline(Options options) {

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("bootstrap_servers", options.getBootstrapServer())
        .put("topic", options.getTopic())
        .put("data_format", "RAW")
        .put("max_read_time_seconds", 15)
        .put("auto_offset_reset_config", "earliest")
        .build();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    pipeline
        // Read messages from Kafka.
        .apply(Managed.read(Managed.KAFKA).withConfig(config)).getSinglePCollection()
        // Get the payload of each message and convert to a string.
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via((row -> {
              var bytes = row.getBytes("payload");
              try {
                return new String(bytes, "UTF-8");
              } catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
              }
            })))
        // Write the payload to a text file.
        .apply(TextIO
            .write()
            .to(options.getOutputPath())
            .withSuffix(".txt")
            .withNumShards(1));
    return pipeline;
  }
}

Python

如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import argparse

import apache_beam as beam

from apache_beam import window
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #     --topic=$KAFKA_TOPIC --bootstrap_server=$BOOTSTRAP_SERVER
    #     --output=$CLOUD_STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic")
            parser.add_argument("--bootstrap_server")
            parser.add_argument("--output")

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            # Read messages from an Apache Kafka topic.
            | ReadFromKafka(
                consumer_config={"bootstrap.servers": options.bootstrap_server},
                topics=[options.topic],
                with_metadata=False,
                max_num_records=5,
                start_read_time=0,
            )
            # The previous step creates a key-value collection, keyed by message ID.
            # The values are the message payloads.
            | beam.Values()
            # Subdivide the output into fixed 5-second windows.
            | beam.WindowInto(window.FixedWindows(5))
            | WriteToText(
                file_path_prefix=options.output, file_name_suffix=".txt", num_shards=1
            )
        )

从多个主题读取数据

此示例会从多个 Kafka 主题读取数据,并针对每个主题应用单独的流水线逻辑。

对于更高级的用例,请动态传入一组 KafkaSourceDescriptor 对象,以便您更新要从中读取的主题列表。此方法需要使用带有 Runner v2 的 Java。

Python

如需向 Dataflow 进行身份验证,请设置应用默认凭据。 如需了解详情,请参阅为本地开发环境设置身份验证

import argparse

import apache_beam as beam

from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #   --bootstrap_server=$BOOTSTRAP_SERVER --output=$STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument('--bootstrap_server')
            parser.add_argument('--output')

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        # Read from two Kafka topics.
        all_topics = pipeline | ReadFromKafka(consumer_config={
                "bootstrap.servers": options.bootstrap_server
            },
            topics=["topic1", "topic2"],
            with_metadata=True,
            max_num_records=10,
            start_read_time=0
        )

        # Filter messages from one topic into one branch of the pipeline.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic1')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic1" >> WriteToText(
                file_path_prefix=options.output + '/topic1/output',
                file_name_suffix='.txt',
                num_shards=1))

        # Filter messages from the other topic.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic2')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic2" >> WriteToText(
                file_path_prefix=options.output + '/topic2/output',
                file_name_suffix='.txt',
                num_shards=1))