从多个 Kafka 主题读取数据到 Dataflow
使用集合让一切井井有条
根据您的偏好保存内容并对其进行分类。
展示了如何创建从多个 Kafka 主题读取数据并根据主题名称执行不同业务逻辑的 Dataflow 流水线。
深入探索
如需查看包含此代码示例的详细文档,请参阅以下内容:
代码示例
如未另行说明,那么本页面中的内容已根据知识共享署名 4.0 许可获得了许可,并且代码示例已根据 Apache 2.0 许可获得了许可。有关详情,请参阅 Google 开发者网站政策。Java 是 Oracle 和/或其关联公司的注册商标。
[[["易于理解","easyToUnderstand","thumb-up"],["解决了我的问题","solvedMyProblem","thumb-up"],["其他","otherUp","thumb-up"]],[["很难理解","hardToUnderstand","thumb-down"],["信息或示例代码不正确","incorrectInformationOrSampleCode","thumb-down"],["没有我需要的信息/示例","missingTheInformationSamplesINeed","thumb-down"],["翻译问题","translationIssue","thumb-down"],["其他","otherDown","thumb-down"]],[],[],[],null,["# Read from multiple Kafka topics to Dataflow\n\nShows how to create a Dataflow pipeline that reads from multiple Kafka topics and performs different business logic based on the topic name.\n\nExplore further\n---------------\n\n\nFor detailed documentation that includes this code sample, see the following:\n\n- [Read from Apache Kafka to Dataflow](/dataflow/docs/guides/read-from-kafka)\n\nCode sample\n-----------\n\n### Java\n\n\nTo authenticate to Dataflow, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import java.util.List;\n import org.apache.beam.sdk.Pipeline;\n import org.apache.beam.sdk.PipelineResult;\n import org.apache.beam.sdk.io.TextIO;\n import org.apache.beam.sdk.io.kafka.KafkaIO;\n import org.apache.beam.sdk.options.Description;\n import org.apache.beam.sdk.options.PipelineOptionsFactory;\n import org.apache.beam.sdk.options.StreamingOptions;\n import org.apache.beam.sdk.transforms.Filter;\n import org.apache.beam.sdk.transforms.MapElements;\n import org.apache.beam.sdk.values.TypeDescriptors;\n import org.apache.kafka.common.serialization.LongDeserializer;\n import org.apache.kafka.common.serialization.StringDeserializer;\n import org.joda.time.Duration;\n import org.joda.time.Instant;\n\n public class KafkaReadTopics {\n\n public static Pipeline createPipeline(Options options) {\n String topic1 = options.getTopic1();\n String topic2 = options.getTopic2();\n\n // Build the pipeline.\n var pipeline = Pipeline.create(options);\n var allTopics = pipeline\n .apply(KafkaIO.\u003cLong, String\u003eread()\n .withTopics(List.of(topic1, topic2))\n .withBootstrapServers(options.getBootstrapServer())\n .withKeyDeserializer(LongDeserializer.class)\n .withValueDeserializer(StringDeserializer.class)\n .withMaxReadTime(Duration.standardSeconds(10))\n .withStartReadTime(Instant.EPOCH)\n );\n\n // Create separate pipeline branches for each topic.\n // The first branch filters on topic1.\n allTopics\n .apply(Filter.by(record -\u003e record.getTopic().equals(topic1)))\n .apply(MapElements\n .into(TypeDescriptors.strings())\n .via(record -\u003e record.getKV().getValue()))\n .apply(TextIO.write()\n .to(topic1)\n .withSuffix(\".txt\")\n .withNumShards(1)\n );\n\n // The second branch filters on topic2.\n allTopics\n .apply(Filter.by(record -\u003e record.getTopic().equals(topic2)))\n .apply(MapElements\n .into(TypeDescriptors.strings())\n .via(record -\u003e record.getKV().getValue()))\n .apply(TextIO.write()\n .to(topic2)\n .withSuffix(\".txt\")\n .withNumShards(1)\n );\n return pipeline;\n }\n }\n\n### Python\n\n\nTo authenticate to Dataflow, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import argparse\n\n import apache_beam as beam\n\n from apache_beam.io.kafka import ReadFromKafka\n from apache_beam.io.textio import WriteToText\n from apache_beam.options.pipeline_options import PipelineOptions\n\n\n def read_from_kafka() -\u003e None:\n # Parse the pipeline options passed into the application. Example:\n # --bootstrap_server=$BOOTSTRAP_SERVER --output=$STORAGE_BUCKET --streaming\n # For more information, see\n # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options\n class MyOptions(PipelineOptions):\n @staticmethod\n def _add_argparse_args(parser: argparse.ArgumentParser) -\u003e None:\n parser.add_argument('--bootstrap_server')\n parser.add_argument('--output')\n\n options = MyOptions()\n with beam.Pipeline(options=options) as pipeline:\n # Read from two Kafka topics.\n all_topics = pipeline | ReadFromKafka(consumer_config={\n \"bootstrap.servers\": options.bootstrap_server\n },\n topics=[\"topic1\", \"topic2\"],\n with_metadata=True,\n max_num_records=10,\n start_read_time=0\n )\n\n # Filter messages from one topic into one branch of the pipeline.\n (all_topics\n | beam.Filter(lambda message: message.topic == 'topic1')\n | beam.Map(lambda message: message.value.decode('utf-8'))\n | \"Write topic1\" \u003e\u003e WriteToText(\n file_path_prefix=options.output + '/topic1/output',\n file_name_suffix='.txt',\n num_shards=1))\n\n # Filter messages from the other topic.\n (all_topics\n | beam.Filter(lambda message: message.topic == 'topic2')\n | beam.Map(lambda message: message.value.decode('utf-8'))\n | \"Write topic2\" \u003e\u003e WriteToText(\n file_path_prefix=options.output + '/topic2/output',\n file_name_suffix='.txt',\n num_shards=1))\n\nWhat's next\n-----------\n\n\nTo search and filter code samples for other Google Cloud products, see the\n[Google Cloud sample browser](/docs/samples?product=dataflow)."]]