Leggere da Apache Kafka in Dataflow

Questo documento descrive come leggere i dati da Apache Kafka a Dataflow e include suggerimenti e best practice per il rendimento.

Per la maggior parte dei casi d'uso, valuta la possibilità di utilizzare il connettore I/O gestito per leggere da Kafka.

Se hai bisogno di una ottimizzazione delle prestazioni più avanzata, valuta la possibilità di utilizzare il connettore KafkaIO. Il connettore KafkaIO è disponibile per Java o utilizzando il framework per pipeline multilingue per Python e Go.

Parallelismo

Le sezioni seguenti descrivono come configurare il parallelismo durante la lettura da Kafka.

Panoramica

Il parallelismo è limitato da due fattori: il numero massimo di worker (max_num_workers) e il numero di partizioni Kafka. Dataflow utilizza per impostazione predefinita un fanout di parallelismo di 4 x max_num_workers. Tuttavia, il fanout è limitato dal numero di partizioni. Ad esempio, se sono disponibili 100 vCPU, ma la pipeline legge solo da 10 partizioni Kafka, il parallelismo massimo è 10.

Per massimizzare il parallelismo, ti consigliamo di avere almeno 4 x max_num_workers partizioni Kafka. Se il job utilizza Runner v2, valuta la possibilità di impostare un parallelismo ancora più elevato. Un buon punto di partenza è avere partizioni pari al doppio del numero di vCPU worker.

Ridistribuisci

Se non riesci ad aumentare il numero di partizioni, puoi aumentare il parallelismo chiamando KafkaIO.Read.withRedistribute. Questo metodo aggiunge una trasformazione Redistribute alla pipeline, che fornisce un suggerimento a Dataflow per ridistribuire e parallelizzare i dati in modo più efficiente. Ti consigliamo vivamente di specificare il numero ottimale di shard chiamando KafkaIO.Read.withRedistributeNumKeys. L'utilizzo di KafkaIO.Read.withRedistribute da solo può generare numerose chiavi, causando problemi di prestazioni. Per saperne di più, consulta Identificare le fasi con parallelismo elevato. La ridistribuzione dei dati aggiunge un sovraccarico aggiuntivo per eseguire il passaggio di rimescolamento. Per ulteriori informazioni, vedi Evitare la fusione.

Per ridurre al minimo il costo del rimescolamento della ridistribuzione, chiama KafkaIO.Read.withOffsetDeduplication. Questa modalità riduce al minimo la quantità di dati che devono essere resi persistenti nell'ambito dello shuffle, fornendo comunque l'elaborazione esattamente una volta.

Se l'elaborazione "exactly-once" non è necessaria, puoi consentire i duplicati chiamando KafkaIO.Read.withAllowDuplicates.

La tabella seguente riassume le opzioni di ridistribuzione:

Opzione Modalità di elaborazione Apache Beam Configurazione
Ridistribuisci input Exactly-once v2.60+ KafkaIO.Read.withRedistribute()
Consenti duplicati Almeno una volta v2.60+ KafkaIO.Read.withRedistribute().withAllowDuplicates()
Deduplicazione offset Exactly-once v2.65+ KafkaIO.Read.withRedistribute().withOffsetDeduplication()

Distorsione del carico

Cerca di assicurarti che il carico tra le partizioni sia relativamente uniforme e non distorto. Se il carico è sbilanciato, può comportare un utilizzo inefficiente dei lavoratori. I worker che leggono dalle partizioni con un carico più leggero potrebbero essere relativamente inattivi, mentre i worker che leggono dalle partizioni con un carico elevato potrebbero rimanere indietro. Dataflow fornisce metriche per il backlog per partizione.

Se il carico è sbilanciato, il bilanciamento dinamico del lavoro può contribuire a distribuire il lavoro. Ad esempio, Dataflow potrebbe allocare un worker per leggere da più partizioni a basso volume e un altro worker per leggere da una singola partizione ad alto volume. Tuttavia, due worker non possono leggere dalla stessa partizione, quindi una partizione molto caricata può comunque causare il ritardo della pipeline.

Best practice

Questa sezione contiene consigli per la lettura da Kafka in Dataflow.

Argomenti con volume basso

Uno scenario comune è leggere da molti argomenti a basso volume contemporaneamente, ad esempio un argomento per cliente. La creazione di job Dataflow separati per ogni argomento è inefficiente in termini di costi, perché ogni job richiede almeno un worker completo. Valuta invece le seguenti opzioni:

  • Unire gli argomenti. Combina gli argomenti prima che vengano inseriti in Dataflow. L'importazione di pochi argomenti con un volume elevato è molto più efficiente rispetto all'importazione di molti argomenti con un volume basso. Ogni argomento ad alto volume può essere gestito da un singolo job Dataflow che utilizza completamente i suoi worker.

  • Leggere più argomenti. Se non riesci a combinare gli argomenti prima di importarli in Dataflow, valuta la possibilità di creare una pipeline che legga da più argomenti. Questo approccio consente a Dataflow di assegnare più argomenti allo stesso worker. Esistono due modi per implementare questo approccio:

    • Singolo passaggio di lettura. Crea una singola istanza del connettore KafkaIO e configurala per leggere più argomenti. Quindi, filtra per nome dell'argomento per applicare una logica diversa per ogni argomento. Per un esempio di codice, vedi Lettura da più argomenti. Prendi in considerazione questa opzione se tutti gli argomenti si trovano nello stesso cluster. Uno svantaggio è che i problemi con un singolo sink o una singola trasformazione potrebbero causare l'accumulo di backlog in tutti gli argomenti.

      Per casi d'uso più avanzati, passa un insieme di oggetti KafkaSourceDescriptor che specificano gli argomenti da leggere. L'utilizzo di KafkaSourceDescriptor ti consente di aggiornare l'elenco degli argomenti in un secondo momento, se necessario. Questa funzionalità richiede Java con Runner v2.

    • Più passaggi di lettura. Per leggere argomenti che si trovano in cluster diversi, la pipeline può includere diverse istanze di KafkaIO. Mentre il job è in esecuzione, puoi aggiornare le singole origini utilizzando le mappature di trasformazione. L'impostazione di un nuovo argomento o cluster è supportata solo quando utilizzi Runner v2. L'osservabilità è una potenziale sfida con questo approccio, perché devi monitorare ogni singola trasformazione di lettura anziché fare affidamento sulle metriche a livello di pipeline.

Eseguire il commit di nuovo in Kafka

Per impostazione predefinita, il connettore KafkaIO non utilizza gli offset Kafka per monitorare l'avanzamento e non esegue il commit di nuovo in Kafka. Se chiami commitOffsetsInFinalize, il connettore fa del suo meglio per eseguire il commit di nuovo in Kafka dopo che i record sono stati sottoposti a commit in Dataflow. I record di cui è stato eseguito il commit in Dataflow potrebbero non essere elaborati completamente, quindi se annulli la pipeline, potrebbe essere eseguito il commit di un offset senza che i record vengano mai elaborati completamente.

Poiché l'impostazione enable.auto.commit=True esegue il commit degli offset non appena vengono letti da Kafka senza alcuna elaborazione da parte di Dataflow, l'utilizzo di questa opzione non è consigliato. Il consiglio è di impostare sia enable.auto.commit=False che commitOffsetsInFinalize=True. Se imposti enable.auto.commit su True, i dati possono essere persi se la pipeline viene interrotta durante l'elaborazione. I record già inviati su Kafka potrebbero essere eliminati.

Filigrane

Per impostazione predefinita, il connettore KafkaIO utilizza l'ora di elaborazione corrente per assegnare la filigrana di output e l'ora dell'evento. Per modificare questo comportamento, chiama withTimestampPolicyFactory e assegna un TimestampPolicy. Beam fornisce implementazioni di TimestampPolicy che calcolano il watermark in base all'ora di accodamento del log di Kafka o all'ora di creazione del messaggio.

Considerazioni per i runner

Il connettore KafkaIO ha due implementazioni sottostanti per le letture Kafka: la ReadFromKafkaViaUnbounded precedente e la ReadFromKafkaViaSDF più recente. Dataflow sceglie automaticamente l'implementazione migliore per il tuo job in base al linguaggio SDK e ai requisiti del job. Evita di richiedere esplicitamente un runner o un'implementazione di Kafka, a meno che tu non abbia bisogno di funzionalità specifiche disponibili solo in quell'implementazione. Per ulteriori informazioni sulla scelta di un runner, consulta Utilizzare Dataflow Runner v2.

Se la tua pipeline utilizza withTopic o withTopics, l'implementazione precedente esegue query su Kafka al momento della creazione della pipeline per le partizioni disponibili. La macchina che crea la pipeline deve disporre dell'autorizzazione per connettersi a Kafka. Se ricevi un errore di autorizzazione, verifica di disporre delle autorizzazioni per connetterti a Kafka localmente. Puoi evitare questo problema utilizzando withTopicPartitions, che non si connette a Kafka durante la creazione della pipeline.

Distribuzione in produzione

Quando esegui il deployment della soluzione in produzione, ti consigliamo di utilizzare i modelli flessibili. Utilizzando un modello flessibile, la pipeline viene avviata da un ambiente coerente, il che può contribuire a mitigare i problemi di configurazione locali.

La registrazione da KafkaIO può essere piuttosto dettagliata. Valuta la possibilità di ridurre il livello di logging in produzione nel seguente modo:

sdkHarnessLogLevelOverrides='{"org.apache.kafka.clients.consumer.internals.SubscriptionState":"WARN"}'.

Per saperne di più, vedi Impostare i livelli di log dei worker della pipeline.

Configurazione del networking

Per impostazione predefinita, Dataflow avvia le istanze all'interno della rete Virtual Private Cloud (VPC) predefinita. A seconda della configurazione di Kafka, potresti dover configurare una rete e una subnet diverse per Dataflow. Per saperne di più, consulta Specifica una rete e una subnet. Quando configuri la rete, crea regole firewall che consentano alle macchine worker di Dataflow di raggiungere i broker Kafka.

Se utilizzi Controlli di servizio VPC, posiziona il cluster Kafka all'interno del perimetro di Controlli di servizio VPC oppure estendi i perimetri alla VPN o a Cloud Interconnect autorizzati.

Se il cluster Kafka è implementato al di fuori di Google Cloud, devi creare una connessione di rete tra Dataflow e il cluster Kafka. Esistono diverse opzioni di networking con compromessi diversi:

L'Dedicated Interconnect è l'opzione migliore per prestazioni e affidabilità prevedibili, ma la configurazione può richiedere più tempo perché le terze parti devono eseguire il provisioning dei nuovi circuiti. Con una topologia basata su IP pubblico, puoi iniziare rapidamente perché è necessario eseguire un lavoro di networking minimo.

Le due sezioni successive descrivono queste opzioni in modo più dettagliato.

Spazio di indirizzi RFC 1918 condiviso

Sia l'Dedicated Interconnect che la VPN IPsec ti consentono di accedere direttamente agli indirizzi IP RFC 1918 nel tuo Virtual Private Cloud (VPC), il che può semplificare la configurazione di Kafka. Se utilizzi una topologia basata su VPN, valuta la possibilità di configurare una VPN a velocità effettiva elevata.

Per impostazione predefinita, Dataflow avvia le istanze sulla tua rete VPC predefinita. In una topologia di rete privata con route definite in modo esplicito in Cloud Router che connettono le subnet in Google Cloud a quel cluster Kafka, hai bisogno di un maggiore controllo sulla posizione delle istanze Dataflow. Puoi utilizzare Dataflow per configurare i network e subnetwork parametri di esecuzione.

Assicurati che la subnet corrispondente disponga di indirizzi IP sufficienti per consentire a Dataflow di avviare le istanze quando tenta di fare lo scale out. Inoltre, quando crei una rete separata per avviare le istanze Dataflow, assicurati di avere una regola firewall che consenta il traffico TCP tra tutte le macchine virtuali del progetto. La rete predefinita ha già questa regola firewall configurata.

Spazio di indirizzi IP pubblici

Questa architettura utilizza Transport Layer Security (TLS) per proteggere il traffico tra i client esterni e Kafka e utilizza il traffico non criptato per la comunicazione tra i broker. Quando il listener Kafka si associa a un'interfaccia di rete utilizzata sia per la comunicazione interna che per quella esterna, la configurazione del listener è semplice. Tuttavia, in molti scenari, gli indirizzi pubblicizzati esternamente dei broker Kafka nel cluster differiscono dalle interfacce di rete interne utilizzate da Kafka. In questi scenari, puoi utilizzare la proprietà advertised.listeners:

# Configure protocol map
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:SSL
# Use plaintext for inter-broker communication inter.broker.listener.name=INTERNAL
# Specify that Kafka listeners should bind to all local interfaces listeners=INTERNAL://0.0.0.0:9092,EXTERNAL://0.0.0.0:9093
# Separately, specify externally visible address advertised.listeners=INTERNAL://kafkabroker-n.mydomain.com:9092,EXTERNAL://kafkabroker-n.mydomain.com:9093

I client esterni si connettono utilizzando la porta 9093 tramite un canale "SSL", mentre i client interni si connettono utilizzando la porta 9092 tramite un canale non criptato. Quando specifichi un indirizzo in advertised.listeners, utilizza nomi DNS (kafkabroker-n.mydomain.com, in questo esempio) che si risolvono nella stessa istanza sia per il traffico esterno che per quello interno. L'utilizzo di indirizzi IP pubblici potrebbe non funzionare perché la risoluzione degli indirizzi potrebbe non riuscire per il traffico interno.

Ottimizzare Kafka

Le impostazioni del cluster Kafka e del client Kafka possono avere un grande impatto sulle prestazioni. In particolare, le seguenti impostazioni potrebbero essere troppo basse. Questa sezione fornisce alcuni punti di partenza suggeriti, ma devi sperimentare con questi valori per il tuo carico di lavoro specifico.

  • unboundedReaderMaxElements. Il valore predefinito è 10.000. Un valore più alto, ad esempio 100.000, può aumentare le dimensioni dei bundle, il che può migliorare notevolmente le prestazioni se la pipeline include aggregazioni. Tuttavia, un valore più alto potrebbe anche aumentare la latenza. Per impostare il valore, utilizza setUnboundedReaderMaxElements. Questa impostazione non si applica a Runner v2. Per Runner v2, utilizza l'opzione sdf_checkpoint_after_output_bytes del servizio Dataflow.

  • unboundedReaderMaxReadTimeMs. Il valore predefinito è 10.000 ms. Un valore più alto, ad esempio 20.000 ms, può aumentare le dimensioni del bundle, mentre un valore più basso, ad esempio 5000 ms, può ridurre la latenza o il backlog. Per impostare il valore, utilizza setUnboundedReaderMaxReadTimeMs. Questa impostazione non si applica a Runner v2. Per Runner v2, utilizza l'opzione sdf_checkpoint_after_duration del servizio Dataflow.

  • max.poll.records. Il valore predefinito è 500. Un valore più alto potrebbe avere un rendimento migliore recuperando più record in entrata insieme, soprattutto quando utilizzi Runner v2. Per impostare il valore, chiama withConsumerConfigUpdates.

  • fetch.max.bytes. Il valore predefinito è 1 MB. Un valore più elevato potrebbe migliorare la velocità effettiva riducendo il numero di richieste, soprattutto quando utilizzi Runner v2. Tuttavia, se lo imposti su un valore troppo elevato, la latenza potrebbe aumentare, anche se l'elaborazione a valle è più probabile che sia il collo di bottiglia principale. Un valore iniziale consigliato è 100 MB. Per impostare il valore, chiama withConsumerConfigUpdates.

  • max.partition.fetch.bytes. Il valore predefinito è 1 MB. Questo parametro imposta la quantità massima di dati per partizione restituita dal server. L'aumento del valore può migliorare la velocità effettiva riducendo il numero di richieste, soprattutto quando si utilizza Runner v2. Tuttavia, se lo imposti su un valore troppo alto, la latenza potrebbe aumentare, anche se l'elaborazione downstream è più probabile che sia il collo di bottiglia principale. Un valore iniziale consigliato è 100 MB. Per impostare il valore, chiama withConsumerConfigUpdates.

  • consumerPollingTimeout. Il valore predefinito è 2 secondi. Se il client consumer scade prima di poter leggere i record, prova a impostare un valore più alto. Questa impostazione è più spesso pertinente quando si eseguono letture tra regioni o letture con una rete lenta. Per impostare il valore, chiama withConsumerPollingTimeout.

Assicurati che receive.buffer.bytes sia abbastanza grande da gestire le dimensioni dei messaggi. Se il valore è troppo piccolo, i log potrebbero mostrare che i consumatori vengono continuamente ricreati e cercano un offset specifico.

Esempi

Gli esempi di codice riportati di seguito mostrano come creare pipeline Dataflow che leggono da Kafka. Quando utilizzi le Credenziali predefinite dell'applicazione#39;applicazione insieme al gestore di callback fornito da Google Cloud Managed Service per Apache Kafka, è richiesta la versione kafka-clients 3.7.0 o successive.

Leggere da un singolo argomento

Questo esempio utilizza il connettore I/O gestito. Mostra come leggere da un argomento Kafka e scrivere i payload dei messaggi in file di testo.

Java

Per eseguire l'autenticazione in Dataflow, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.

import com.google.common.collect.ImmutableMap;
import java.io.UnsupportedEncodingException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.managed.Managed;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;

public class KafkaRead {

  public static Pipeline createPipeline(Options options) {

    // Create configuration parameters for the Managed I/O transform.
    ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
        .put("bootstrap_servers", options.getBootstrapServer())
        .put("topic", options.getTopic())
        .put("format", "RAW")
        .put("max_read_time_seconds", 15)
        .put("auto_offset_reset_config", "earliest")
        .build();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    pipeline
        // Read messages from Kafka.
        .apply(Managed.read(Managed.KAFKA).withConfig(config)).getSinglePCollection()
        // Get the payload of each message and convert to a string.
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via((row -> {
              var bytes = row.getBytes("payload");
              try {
                return new String(bytes, "UTF-8");
              } catch (UnsupportedEncodingException e) {
                throw new RuntimeException(e);
              }
            })))
        // Write the payload to a text file.
        .apply(TextIO
            .write()
            .to(options.getOutputPath())
            .withSuffix(".txt")
            .withNumShards(1));
    return pipeline;
  }
}

Python

Per eseguire l'autenticazione in Dataflow, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.

import argparse

import apache_beam as beam

from apache_beam import window
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #     --topic=$KAFKA_TOPIC --bootstrap_server=$BOOTSTRAP_SERVER
    #     --output=$CLOUD_STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic")
            parser.add_argument("--bootstrap_server")
            parser.add_argument("--output")

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            # Read messages from an Apache Kafka topic.
            | beam.managed.Read(
                beam.managed.KAFKA,
                config={
                  "bootstrap_servers": options.bootstrap_server,
                  "topic": options.topic,
                  "data_format": "RAW",
                  "auto_offset_reset_config": "earliest",
                  # The max_read_time_seconds parameter is intended for testing.
                  # Avoid using this parameter in production.
                  "max_read_time_seconds": 5
                }
            )
            # Subdivide the output into fixed 5-second windows.
            | beam.WindowInto(window.FixedWindows(5))
            | WriteToText(
                file_path_prefix=options.output, file_name_suffix=".txt", num_shards=1
            )
        )

Leggere da più argomenti

Questo esempio utilizza il connettore KafkaIO. Mostra come leggere da più argomenti Kafka e applicare una logica della pipeline separata per ogni argomento.

Per casi d'uso più avanzati, trasmetti dinamicamente un insieme di oggetti KafkaSourceDescriptor, in modo da poter aggiornare l'elenco degli argomenti da leggere. Questo approccio richiede Java con Runner v2.

Java

Per eseguire l'autenticazione in Dataflow, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.

import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.transforms.Filter;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.kafka.common.serialization.LongDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.joda.time.Duration;
import org.joda.time.Instant;

public class KafkaReadTopics {

  public static Pipeline createPipeline(Options options) {
    String topic1 = options.getTopic1();
    String topic2 = options.getTopic2();

    // Build the pipeline.
    var pipeline = Pipeline.create(options);
    var allTopics = pipeline
        .apply(KafkaIO.<Long, String>read()
            .withTopics(List.of(topic1, topic2))
            .withBootstrapServers(options.getBootstrapServer())
            .withKeyDeserializer(LongDeserializer.class)
            .withValueDeserializer(StringDeserializer.class)
            .withMaxReadTime(Duration.standardSeconds(10))
            .withStartReadTime(Instant.EPOCH)
        );

    // Create separate pipeline branches for each topic.
    // The first branch filters on topic1.
    allTopics
        .apply(Filter.by(record -> record.getTopic().equals(topic1)))
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via(record -> record.getKV().getValue()))
        .apply(TextIO.write()
            .to(topic1)
            .withSuffix(".txt")
            .withNumShards(1)
        );

    // The second branch filters on topic2.
    allTopics
        .apply(Filter.by(record -> record.getTopic().equals(topic2)))
        .apply(MapElements
            .into(TypeDescriptors.strings())
            .via(record -> record.getKV().getValue()))
        .apply(TextIO.write()
            .to(topic2)
            .withSuffix(".txt")
            .withNumShards(1)
        );
    return pipeline;
  }
}

Python

Per eseguire l'autenticazione in Dataflow, configura le Credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configura l'autenticazione per un ambiente di sviluppo locale.

import argparse

import apache_beam as beam

from apache_beam.io.kafka import ReadFromKafka
from apache_beam.io.textio import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions


def read_from_kafka() -> None:
    # Parse the pipeline options passed into the application. Example:
    #   --bootstrap_server=$BOOTSTRAP_SERVER --output=$STORAGE_BUCKET --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @staticmethod
        def _add_argparse_args(parser: argparse.ArgumentParser) -> None:
            parser.add_argument('--bootstrap_server')
            parser.add_argument('--output')

    options = MyOptions()
    with beam.Pipeline(options=options) as pipeline:
        # Read from two Kafka topics.
        all_topics = pipeline | ReadFromKafka(consumer_config={
                "bootstrap.servers": options.bootstrap_server
            },
            topics=["topic1", "topic2"],
            with_metadata=True,
            max_num_records=10,
            start_read_time=0
        )

        # Filter messages from one topic into one branch of the pipeline.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic1')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic1" >> WriteToText(
                file_path_prefix=options.output + '/topic1/output',
                file_name_suffix='.txt',
                num_shards=1))

        # Filter messages from the other topic.
        (all_topics
            | beam.Filter(lambda message: message.topic == 'topic2')
            | beam.Map(lambda message: message.value.decode('utf-8'))
            | "Write topic2" >> WriteToText(
                file_path_prefix=options.output + '/topic2/output',
                file_name_suffix='.txt',
                num_shards=1))

Passaggi successivi