Scrivere da Dataflow a Pub/Sub

Questo documento descrive come scrivere dati di testo da Dataflow in Pub/Sub utilizzando il PubSubIO connettore I/O di Apache Beam.

Panoramica

Per scrivere dati in Pub/Sub, utilizza il connettore PubSubIO. Gli elementi di input possono essere messaggi Pub/Sub o solo i dati del messaggio. Se gli elementi di input sono messaggi Pub/Sub, facoltativamente puoi impostare attributi o una chiave di ordinamento su ogni messaggio.

Puoi utilizzare la versione Java, Python o Go del connettore PubSubIO, come segue:

Java

Per scrivere in un singolo argomento, chiama il metodo PubsubIO.writeMessages. Questo metodo accetta una raccolta di input di oggetti PubsubMessage. Il connettore definisce anche metodi di utilità per la scrittura di stringhe, messaggi Avro con codifica binaria o messaggi protobuf con codifica binaria. Questi metodi convertono la raccolta di input in messaggi Pub/Sub.

Per scrivere in un insieme dinamico di argomenti in base ai dati di input, chiama writeMessagesDynamic. Specifica l'argomento di destinazione per ogni messaggio chiamando PubsubMessage.withTopic sul messaggio. Ad esempio, puoi instradare i messaggi a diversi argomenti in base al valore di un determinato campo nei dati di input.

Per ulteriori informazioni, consulta la documentazione di riferimento di PubsubIO.

Python

Chiama il metodo pubsub.WriteToPubSub. Per impostazione predefinita, questo metodo accetta una raccolta di input di tipo bytes, che rappresenta il payload del messaggio. Se il parametro with_attributes è True, il metodo prende una raccolta di oggetti PubsubMessage.

Per ulteriori informazioni, consulta la documentazione di riferimento del modulo pubsub.

Vai

Per scrivere dati in Pub/Sub, chiama il metodo pubsubio.Write. Questo metodo prende una raccolta di input di oggetti PubSubMessage o di slice di byte contenenti i payload dei messaggi.

Per saperne di più, consulta la documentazione di riferimento del pacchetto pubsubio.

Per ulteriori informazioni sui messaggi Pub/Sub, consulta Formato dei messaggi nella documentazione di Pub/Sub.

Timestamp

Pub/Sub imposta un timestamp su ogni messaggio. Questo timestamp rappresenta il momento in cui il messaggio viene pubblicato in Pub/Sub. In uno scenario di streaming, potresti anche tenere conto del timestamp evento, ovvero dell'ora in cui sono stati generati i dati del messaggio. Puoi utilizzare il timestamp dell'elemento di Apache Beam per rappresentare la data e l'ora dell'evento. Le origini che creano un PCollection illimitato spesso astraggono a ogni nuovo elemento un timestamp che corrisponde all'ora dell'evento.

Per Java e Python, il connettore I/O Pub/Sub può scrivere il timestamp di ogni elemento come attributo del messaggio Pub/Sub. I consumatori di messaggi possono utilizzare questo attributo per ottenere il timestamp dell'evento.

Java

Chiama PubsubIO.Write<T>.withTimestampAttribute e specifica il nome dell'attributo.

Python

Specifica il parametro timestamp_attribute quando chiami WriteToPubSub.

Consegna messaggi

Dataflow supporta l'elaborazione exactly-once dei messaggi all'interno di una pipeline. Tuttavia, il connettore I/O Pub/Sub non può garantire la consegna "exactly-once" dei messaggi tramite Pub/Sub.

Per Java e Python, puoi configurare il connettore Pub/Sub I/O per scrivere l'ID univoco di ogni elemento come attributo del messaggio. I consumatori di messaggi possono quindi utilizzare questo attributo per deduplicare i messaggi.

Java

Chiama PubsubIO.Write<T>.withIdAttribute e specifica il nome dell'attributo.

Python

Specifica il parametro id_label quando chiami WriteToPubSub.

Output diretto

Se attivi la modalità di streaming almeno una volta nella pipeline, il connettore I/O utilizza l'output diretto. In questa modalità, il connettore non esegue il checkpoint dei messaggi, il che consente di scrivere più velocemente. Tuttavia, i tentativi in questa modalità potrebbero causare messaggi duplicati con ID messaggio diversi, rendendo potenzialmente più difficile per i consumatori dei messaggi deduplicare i messaggi.

Per le pipeline che utilizzano la modalità esattamente una volta, puoi attivare l'output diretto impostando l'streaming_enable_pubsub_direct_output opzione di servizio. L'output diretto riduce la latenza di scrittura e consente un'elaborazione più efficiente. Valuta questa opzione se i tuoi utenti possono gestire messaggi duplicati con ID messaggio non unici.

Esempi

L'esempio seguente crea un PCollection di messaggi Pub/Sub e li scrive in un argomento Pub/Sub. L'argomento è specificato come opzione della pipeline. Ogni messaggio contiene i dati del payload e un insieme di attributi.

Java

Per autenticarti a Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;



public class PubSubWriteWithAttributes {
  public interface Options extends PipelineOptions {
    @Description("The Pub/Sub topic to write to. Format: projects/<PROJECT>/topics/<TOPIC>")
    String getTopic();

    void setTopic(String value);
  }

  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  static class ExampleData {
    public String name;
    public String product;
    public Long timestamp; // Epoch time in milliseconds

    public ExampleData() {}

    public ExampleData(String name, String product, Long timestamp) {
      this.name = name;
      this.product = product;
      this.timestamp = timestamp;
    }
  }

  // Write messages to a Pub/Sub topic.
  public static void main(String[] args) {
    // Example source data.
    final List<ExampleData> messages = Arrays.asList(
        new ExampleData("Robert", "TV", 1613141590000L),
        new ExampleData("Maria", "Phone", 1612718280000L),
        new ExampleData("Juan", "Laptop", 1611618000000L),
        new ExampleData("Rebeca", "Videogame", 1610000000000L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --topic=projects/MY_PROJECT/topics/MY_TOPIC"
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        // Create some data to write to Pub/Sub.
        .apply(Create.of(messages))
        // Convert the data to Pub/Sub messages.
        .apply(MapElements
            .into(TypeDescriptor.of(PubsubMessage.class))
            .via((message -> {
              byte[] payload = message.product.getBytes(StandardCharsets.UTF_8);
              // Create attributes for each message.
              HashMap<String, String> attributes = new HashMap<String, String>();
              attributes.put("buyer", message.name);
              attributes.put("timestamp", Long.toString(message.timestamp));
              return new PubsubMessage(payload, attributes);
            })))
        // Write the messages to Pub/Sub.
        .apply(PubsubIO.writeMessages().to(options.getTopic()));
    pipeline.run().waitUntilFinish();
  }
}

Python

Per autenticarti a Dataflow, configura le credenziali predefinite dell'applicazione. Per ulteriori informazioni, consulta Configurare l'autenticazione per un ambiente di sviluppo locale.

import argparse
from typing import Any, Dict, List

import apache_beam as beam
from apache_beam.io import PubsubMessage
from apache_beam.io import WriteToPubSub
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def item_to_message(item: Dict[str, Any]) -> PubsubMessage:
    # Re-import needed types. When using the Dataflow runner, this
    # function executes on a worker, where the global namespace is not
    # available. For more information, see:
    # https://cloud.google.com/dataflow/docs/guides/common-errors#name-error
    from apache_beam.io import PubsubMessage

    attributes = {"buyer": item["name"], "timestamp": str(item["ts"])}
    data = bytes(item["product"], "utf-8")

    return PubsubMessage(data=data, attributes=attributes)


def write_to_pubsub(argv: List[str] = None) -> None:
    # Parse the pipeline options passed into the application. Example:
    #     --topic=$TOPIC_PATH --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option to specify the Pub/Sub topic.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic", required=True)

    example_data = [
        {"name": "Robert", "product": "TV", "ts": 1613141590000},
        {"name": "Maria", "product": "Phone", "ts": 1612718280000},
        {"name": "Juan", "product": "Laptop", "ts": 1611618000000},
        {"name": "Rebeca", "product": "Video game", "ts": 1610000000000},
    ]
    options = MyOptions()

    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(example_data)
            | "Convert to Pub/Sub messages" >> beam.Map(item_to_message)
            | WriteToPubSub(topic=options.topic, with_attributes=True)
        )

    print("Pipeline ran successfully.")