Ler do Apache Kafka para o Dataflow
Mantenha tudo organizado com as coleções
Salve e categorize o conteúdo com base nas suas preferências.
Mostra como criar um pipeline do Dataflow que lê do Apache Kafka
Mais informações
Para ver a documentação detalhada que inclui este exemplo de código, consulte:
Exemplo de código
Exceto em caso de indicação contrária, o conteúdo desta página é licenciado de acordo com a Licença de atribuição 4.0 do Creative Commons, e as amostras de código são licenciadas de acordo com a Licença Apache 2.0. Para mais detalhes, consulte as políticas do site do Google Developers. Java é uma marca registrada da Oracle e/ou afiliadas.
[[["Fácil de entender","easyToUnderstand","thumb-up"],["Meu problema foi resolvido","solvedMyProblem","thumb-up"],["Outro","otherUp","thumb-up"]],[["Difícil de entender","hardToUnderstand","thumb-down"],["Informações incorretas ou exemplo de código","incorrectInformationOrSampleCode","thumb-down"],["Não contém as informações/amostras de que eu preciso","missingTheInformationSamplesINeed","thumb-down"],["Problema na tradução","translationIssue","thumb-down"],["Outro","otherDown","thumb-down"]],[],[[["\u003cp\u003eThis content demonstrates how to create a Dataflow pipeline that reads data from Apache Kafka, utilizing both Java and Python code examples.\u003c/p\u003e\n"],["\u003cp\u003eThe Java code example uses the Managed I/O transform to read messages from Kafka, extract the payload, convert it to a string, and write it to a text file.\u003c/p\u003e\n"],["\u003cp\u003eThe Python example reads messages from a specified Kafka topic, extracts the message payload, subdivides the output into fixed 5-second windows, and then writes the data to a text file.\u003c/p\u003e\n"],["\u003cp\u003eBoth examples highlight the importance of setting up Application Default Credentials (ADC) for authentication to Dataflow, with links provided to documentation for this process.\u003c/p\u003e\n"],["\u003cp\u003eYou can find additional code samples for Dataflow and other Google Cloud products in the Google Cloud sample browser.\u003c/p\u003e\n"]]],[],null,["Shows how to create a Dataflow pipeline that reads from Apache Kafka\n\nExplore further\n\n\nFor detailed documentation that includes this code sample, see the following:\n\n- [Read from Apache Kafka to Dataflow](/dataflow/docs/guides/read-from-kafka)\n\nCode sample \n\nJava\n\n\nTo authenticate to Dataflow, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import com.google.common.collect.ImmutableMap;\n import java.io.UnsupportedEncodingException;\n import org.apache.beam.sdk.Pipeline;\n import org.apache.beam.sdk.PipelineResult;\n import org.apache.beam.sdk.io.TextIO;\n import org.apache.beam.sdk.managed.Managed;\n import org.apache.beam.sdk.options.Description;\n import org.apache.beam.sdk.options.PipelineOptionsFactory;\n import org.apache.beam.sdk.options.StreamingOptions;\n import org.apache.beam.sdk.transforms.MapElements;\n import org.apache.beam.sdk.values.TypeDescriptors;\n\n public class KafkaRead {\n\n public static Pipeline createPipeline(Options options) {\n\n // Create configuration parameters for the Managed I/O transform.\n ImmutableMap\u003cString, Object\u003e config = ImmutableMap.\u003cString, Object\u003ebuilder()\n .put(\"bootstrap_servers\", options.getBootstrapServer())\n .put(\"topic\", options.getTopic())\n .put(\"format\", \"RAW\")\n .put(\"max_read_time_seconds\", 15)\n .put(\"auto_offset_reset_config\", \"earliest\")\n .build();\n\n // Build the pipeline.\n var pipeline = Pipeline.create(options);\n pipeline\n // Read messages from Kafka.\n .apply(Managed.read(Managed.KAFKA).withConfig(config)).getSinglePCollection()\n // Get the payload of each message and convert to a string.\n .apply(MapElements\n .into(TypeDescriptors.strings())\n .via((row -\u003e {\n var bytes = row.getBytes(\"payload\");\n try {\n return new String(bytes, \"UTF-8\");\n } catch (UnsupportedEncodingException e) {\n throw new RuntimeException(e);\n }\n })))\n // Write the payload to a text file.\n .apply(TextIO\n .write()\n .to(options.getOutputPath())\n .withSuffix(\".txt\")\n .withNumShards(1));\n return pipeline;\n }\n }\n\nPython\n\n\nTo authenticate to Dataflow, set up Application Default Credentials.\nFor more information, see\n\n[Set up authentication for a local development environment](/docs/authentication/set-up-adc-local-dev-environment).\n\n import argparse\n\n import apache_beam as beam\n\n from apache_beam import window\n from apache_beam.io.textio import WriteToText\n from apache_beam.options.pipeline_options import PipelineOptions\n\n\n def read_from_kafka() -\u003e None:\n # Parse the pipeline options passed into the application. Example:\n # --topic=$KAFKA_TOPIC --bootstrap_server=$BOOTSTRAP_SERVER\n # --output=$CLOUD_STORAGE_BUCKET --streaming\n # For more information, see\n # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options\n class MyOptions(PipelineOptions):\n @staticmethod\n def _add_argparse_args(parser: argparse.ArgumentParser) -\u003e None:\n parser.add_argument(\"--topic\")\n parser.add_argument(\"--bootstrap_server\")\n parser.add_argument(\"--output\")\n\n options = MyOptions()\n with beam.Pipeline(options=options) as pipeline:\n (\n pipeline\n # Read messages from an Apache Kafka topic.\n | beam.managed.Read(\n beam.managed.KAFKA,\n config={\n \"bootstrap_servers\": options.bootstrap_server,\n \"topic\": options.topic,\n \"data_format\": \"RAW\",\n \"auto_offset_reset_config\": \"earliest\",\n # The max_read_time_seconds parameter is intended for testing.\n # Avoid using this parameter in production.\n \"max_read_time_seconds\": 5\n }\n )\n # Subdivide the output into fixed 5-second windows.\n | beam.WindowInto(window.FixedWindows(5))\n | WriteToText(\n file_path_prefix=options.output, file_name_suffix=\".txt\", num_shards=1\n )\n )\n\nWhat's next\n\n\nTo search and filter code samples for other Google Cloud products, see the\n[Google Cloud sample browser](/docs/samples?product=dataflow)."]]