Read from Apache Kafka to Dataflow
Stay organized with collections
Save and categorize content based on your preferences.
Shows how to create a Dataflow pipeline that reads from Apache Kafka
Explore further
For detailed documentation that includes this code sample, see the following:
Code sample
Except as otherwise noted, the content of this page is licensed under the Creative Commons Attribution 4.0 License, and code samples are licensed under the Apache 2.0 License. For details, see the Google Developers Site Policies. Java is a registered trademark of Oracle and/or its affiliates.
[[["Easy to understand","easyToUnderstand","thumb-up"],["Solved my problem","solvedMyProblem","thumb-up"],["Other","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["Other","otherDown","thumb-down"]],[],[[["\u003cp\u003eThis content demonstrates how to create a Dataflow pipeline that reads data from Apache Kafka, utilizing both Java and Python code examples.\u003c/p\u003e\n"],["\u003cp\u003eThe Java code example uses the Managed I/O transform to read messages from Kafka, extract the payload, convert it to a string, and write it to a text file.\u003c/p\u003e\n"],["\u003cp\u003eThe Python example reads messages from a specified Kafka topic, extracts the message payload, subdivides the output into fixed 5-second windows, and then writes the data to a text file.\u003c/p\u003e\n"],["\u003cp\u003eBoth examples highlight the importance of setting up Application Default Credentials (ADC) for authentication to Dataflow, with links provided to documentation for this process.\u003c/p\u003e\n"],["\u003cp\u003eYou can find additional code samples for Dataflow and other Google Cloud products in the Google Cloud sample browser.\u003c/p\u003e\n"]]],[],null,["Shows how to create a Dataflow pipeline that reads from Apache Kafka\n\nExplore further\n\n\nFor detailed documentation that includes this code sample, see the following:\n\n- [Read from Apache Kafka to Dataflow](/dataflow/docs/guides/read-from-kafka)\n\nCode sample \n\nJava\n\n\nTo authenticate to Dataflow, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import com.google.common.collect.ImmutableMap;\n import java.io.UnsupportedEncodingException;\n import org.apache.beam.sdk.Pipeline;\n import org.apache.beam.sdk.PipelineResult;\n import org.apache.beam.sdk.io.TextIO;\n import org.apache.beam.sdk.managed.Managed;\n import org.apache.beam.sdk.options.Description;\n import org.apache.beam.sdk.options.PipelineOptionsFactory;\n import org.apache.beam.sdk.options.StreamingOptions;\n import org.apache.beam.sdk.transforms.MapElements;\n import org.apache.beam.sdk.values.TypeDescriptors;\n\n public class KafkaRead {\n\n public static Pipeline createPipeline(Options options) {\n\n // Create configuration parameters for the Managed I/O transform.\n ImmutableMap\u003cString, Object\u003e config = ImmutableMap.\u003cString, Object\u003ebuilder()\n .put(\"bootstrap_servers\", options.getBootstrapServer())\n .put(\"topic\", options.getTopic())\n .put(\"format\", \"RAW\")\n .put(\"max_read_time_seconds\", 15)\n .put(\"auto_offset_reset_config\", \"earliest\")\n .build();\n\n // Build the pipeline.\n var pipeline = Pipeline.create(options);\n pipeline\n // Read messages from Kafka.\n .apply(Managed.read(Managed.KAFKA).withConfig(config)).getSinglePCollection()\n // Get the payload of each message and convert to a string.\n .apply(MapElements\n .into(TypeDescriptors.strings())\n .via((row -\u003e {\n var bytes = row.getBytes(\"payload\");\n try {\n return new String(bytes, \"UTF-8\");\n } catch (UnsupportedEncodingException e) {\n throw new RuntimeException(e);\n }\n })))\n // Write the payload to a text file.\n .apply(TextIO\n .write()\n .to(options.getOutputPath())\n .withSuffix(\".txt\")\n .withNumShards(1));\n return pipeline;\n }\n }\n\nPython\n\n\nTo authenticate to Dataflow, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import argparse\n\n import apache_beam as beam\n\n from apache_beam import window\n from apache_beam.io.textio import WriteToText\n from apache_beam.options.pipeline_options import PipelineOptions\n\n\n def read_from_kafka() -\u003e None:\n # Parse the pipeline options passed into the application. Example:\n # --topic=$KAFKA_TOPIC --bootstrap_server=$BOOTSTRAP_SERVER\n # --output=$CLOUD_STORAGE_BUCKET --streaming\n # For more information, see\n # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options\n class MyOptions(PipelineOptions):\n @staticmethod\n def _add_argparse_args(parser: argparse.ArgumentParser) -\u003e None:\n parser.add_argument(\"--topic\")\n parser.add_argument(\"--bootstrap_server\")\n parser.add_argument(\"--output\")\n\n options = MyOptions()\n with beam.Pipeline(options=options) as pipeline:\n (\n pipeline\n # Read messages from an Apache Kafka topic.\n | beam.managed.Read(\n beam.managed.KAFKA,\n config={\n \"bootstrap_servers\": options.bootstrap_server,\n \"topic\": options.topic,\n \"data_format\": \"RAW\",\n \"auto_offset_reset_config\": \"earliest\",\n # The max_read_time_seconds parameter is intended for testing.\n # Avoid using this parameter in production.\n \"max_read_time_seconds\": 5\n }\n )\n # Subdivide the output into fixed 5-second windows.\n | beam.WindowInto(window.FixedWindows(5))\n | WriteToText(\n file_path_prefix=options.output, file_name_suffix=\".txt\", num_shards=1\n )\n )\n\nWhat's next\n\n\nTo search and filter code samples for other Google Cloud products, see the\n[Google Cloud sample browser](/docs/samples?product=dataflow)."]]