Lire des données depuis Apache Kafka vers Dataflow
Restez organisé à l'aide des collections
Enregistrez et classez les contenus selon vos préférences.
Explique comment créer un pipeline Dataflow qui lit les données à partir d'Apache Kafka
En savoir plus
Pour obtenir une documentation détaillée incluant cet exemple de code, consultez les articles suivants :
Exemple de code
Sauf indication contraire, le contenu de cette page est régi par une licence Creative Commons Attribution 4.0, et les échantillons de code sont régis par une licence Apache 2.0. Pour en savoir plus, consultez les Règles du site Google Developers. Java est une marque déposée d'Oracle et/ou de ses sociétés affiliées.
[[["Facile à comprendre","easyToUnderstand","thumb-up"],["J'ai pu résoudre mon problème","solvedMyProblem","thumb-up"],["Autre","otherUp","thumb-up"]],[["Difficile à comprendre","hardToUnderstand","thumb-down"],["Informations ou exemple de code incorrects","incorrectInformationOrSampleCode","thumb-down"],["Il n'y a pas l'information/les exemples dont j'ai besoin","missingTheInformationSamplesINeed","thumb-down"],["Problème de traduction","translationIssue","thumb-down"],["Autre","otherDown","thumb-down"]],[],[[["\u003cp\u003eThis content demonstrates how to create a Dataflow pipeline that reads data from Apache Kafka, utilizing both Java and Python code examples.\u003c/p\u003e\n"],["\u003cp\u003eThe Java code example uses the Managed I/O transform to read messages from Kafka, extract the payload, convert it to a string, and write it to a text file.\u003c/p\u003e\n"],["\u003cp\u003eThe Python example reads messages from a specified Kafka topic, extracts the message payload, subdivides the output into fixed 5-second windows, and then writes the data to a text file.\u003c/p\u003e\n"],["\u003cp\u003eBoth examples highlight the importance of setting up Application Default Credentials (ADC) for authentication to Dataflow, with links provided to documentation for this process.\u003c/p\u003e\n"],["\u003cp\u003eYou can find additional code samples for Dataflow and other Google Cloud products in the Google Cloud sample browser.\u003c/p\u003e\n"]]],[],null,["Shows how to create a Dataflow pipeline that reads from Apache Kafka\n\nExplore further\n\n\nFor detailed documentation that includes this code sample, see the following:\n\n- [Read from Apache Kafka to Dataflow](/dataflow/docs/guides/read-from-kafka)\n\nCode sample \n\nJava\n\n\nTo authenticate to Dataflow, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import com.google.common.collect.ImmutableMap;\n import java.io.UnsupportedEncodingException;\n import org.apache.beam.sdk.Pipeline;\n import org.apache.beam.sdk.PipelineResult;\n import org.apache.beam.sdk.io.TextIO;\n import org.apache.beam.sdk.managed.Managed;\n import org.apache.beam.sdk.options.Description;\n import org.apache.beam.sdk.options.PipelineOptionsFactory;\n import org.apache.beam.sdk.options.StreamingOptions;\n import org.apache.beam.sdk.transforms.MapElements;\n import org.apache.beam.sdk.values.TypeDescriptors;\n\n public class KafkaRead {\n\n public static Pipeline createPipeline(Options options) {\n\n // Create configuration parameters for the Managed I/O transform.\n ImmutableMap\u003cString, Object\u003e config = ImmutableMap.\u003cString, Object\u003ebuilder()\n .put(\"bootstrap_servers\", options.getBootstrapServer())\n .put(\"topic\", options.getTopic())\n .put(\"data_format\", \"RAW\")\n .put(\"max_read_time_seconds\", 15)\n .put(\"auto_offset_reset_config\", \"earliest\")\n .build();\n\n // Build the pipeline.\n var pipeline = Pipeline.create(options);\n pipeline\n // Read messages from Kafka.\n .apply(Managed.read(Managed.KAFKA).withConfig(config)).getSinglePCollection()\n // Get the payload of each message and convert to a string.\n .apply(MapElements\n .into(TypeDescriptors.strings())\n .via((row -\u003e {\n var bytes = row.getBytes(\"payload\");\n try {\n return new String(bytes, \"UTF-8\");\n } catch (UnsupportedEncodingException e) {\n throw new RuntimeException(e);\n }\n })))\n // Write the payload to a text file.\n .apply(TextIO\n .write()\n .to(options.getOutputPath())\n .withSuffix(\".txt\")\n .withNumShards(1));\n return pipeline;\n }\n }\n\nPython\n\n\nTo authenticate to Dataflow, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import argparse\n\n import apache_beam as beam\n\n from apache_beam import window\n from apache_beam.io.textio import WriteToText\n from apache_beam.options.pipeline_options import PipelineOptions\n\n\n def read_from_kafka() -\u003e None:\n # Parse the pipeline options passed into the application. Example:\n # --topic=$KAFKA_TOPIC --bootstrap_server=$BOOTSTRAP_SERVER\n # --output=$CLOUD_STORAGE_BUCKET --streaming\n # For more information, see\n # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options\n class MyOptions(PipelineOptions):\n @staticmethod\n def _add_argparse_args(parser: argparse.ArgumentParser) -\u003e None:\n parser.add_argument(\"--topic\")\n parser.add_argument(\"--bootstrap_server\")\n parser.add_argument(\"--output\")\n\n options = MyOptions()\n with beam.Pipeline(options=options) as pipeline:\n (\n pipeline\n # Read messages from an Apache Kafka topic.\n | beam.managed.Read(\n beam.managed.KAFKA,\n config={\n \"bootstrap_servers\": options.bootstrap_server,\n \"topic\": options.topic,\n \"data_format\": \"RAW\",\n \"auto_offset_reset_config\": \"earliest\",\n # The max_read_time_seconds parameter is intended for testing.\n # Avoid using this parameter in production.\n \"max_read_time_seconds\": 5\n }\n )\n # Subdivide the output into fixed 5-second windows.\n | beam.WindowInto(window.FixedWindows(5))\n | WriteToText(\n file_path_prefix=options.output, file_name_suffix=\".txt\", num_shards=1\n )\n )\n\nWhat's next\n\n\nTo search and filter code samples for other Google Cloud products, see the\n[Google Cloud sample browser](/docs/samples?product=dataflow)."]]