Leggere da Apache Kafka in Dataflow

Mostra come creare una pipeline Dataflow che legga da Apache Kafka

Per saperne di più

Per la documentazione dettagliata che include questo esempio di codice, consulta quanto segue:

Esempio di codice

Java

Per autenticarti a Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import com.google.common.collect.ImmutableMap;
import java.io.UnsupportedEncodingException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class KafkaRead {

  public static Pipeline createPipeline(Options options) {

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("bootstrap_servers", options.getBootstrapServer())
        .put("topic", options.getTopic())
        .put("data_format", "RAW")
        .put("max_read_time_seconds", 15)
        .put("auto_offset_reset_config", "earliest")
        .build();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    pipeline
        // Read messages from Kafka.
        .apply(Managed.read(Managed.KAFKA).withConfig(config)).getSinglePCollection()
        // Get the payload of each message and convert to a string.
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via((row -> {
              var bytes = row.getBytes("payload");
              try {
                return new String(bytes, "UTF-8");
              } catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
              }
            })))
        // Write the payload to a text file.
        .apply(TextIO
            .write()
            .to(options.getOutputPath())
            .withSuffix(".txt")
            .withNumShards(1));
    return pipeline;
  }
}

Python

Per autenticarti a Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import argparse

import apache_beam as beam

from apache_beam import window
from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #     --topic=$KAFKA_TOPIC --bootstrap_server=$BOOTSTRAP_SERVER
    #     --output=$CLOUD_STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic")
            parser.add_argument("--bootstrap_server")
            parser.add_argument("--output")

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            # Read messages from an Apache Kafka topic.
            | ReadFromKafka(
                consumer_config={"bootstrap.servers": options.bootstrap_server},
                topics=[options.topic],
                with_metadata=False,
                max_num_records=5,
                start_read_time=0,
            )
            # The previous step creates a key-value collection, keyed by message ID.
            # The values are the message payloads.
            | beam.Values()
            # Subdivide the output into fixed 5-second windows.
            | beam.WindowInto(window.FixedWindows(5))
            | WriteToText(
                file_path_prefix=options.output, file_name_suffix=".txt", num_shards=1
            )
        )

Passaggi successivi

Per cercare e filtrare i sample di codice per altri Google Cloud prodotti, consulta il Google Cloud browser di sample.