Von Dataflow in Pub/Sub schreiben

In diesem Dokument wird beschrieben, wie Sie mit dem E/A-Connector PubSubIO von Apache Beam Textdaten aus Dataflow in Pub/Sub schreiben.

Überblick

Verwenden Sie den PubSubIO-Connector, um Daten in Pub/Sub zu schreiben. Die Eingabeelemente können entweder Pub/Sub-Nachrichten oder einfach die Nachrichtendaten sein. Wenn die Eingabeelemente Pub/Sub-Nachrichten sind, können Sie optional für jede Nachricht Attribute oder einen Reihenfolgeschlüssel festlegen.

Sie können entweder die Java-, Python- oder Go-Version des PubSubIO-Connectors verwenden:

Java

Rufen Sie die Methode PubsubIO.writeMessages auf, um in ein einzelnes Thema zu schreiben. Bei dieser Methode wird eine Eingabesammlung von PubsubMessage-Objekten verwendet. Der Connector definiert auch praktische Methoden für das Schreiben von Strings, binär codierten Avro-Nachrichten oder binär codierten protobuf-Nachrichten. Diese Methoden konvertieren die Eingabesammlung in Pub/Sub-Nachrichten.

Rufen Sie writeMessagesDynamic auf, um in einen dynamischen Satz von Themen basierend auf den Eingabedaten zu schreiben. Geben Sie das Zielthema für jede Nachricht an, indem Sie für die Nachricht PubsubMessage.withTopic aufrufen. Beispielsweise können Sie Nachrichten anhand des Werts eines bestimmten Felds in Ihren Eingabedaten an verschiedene Themen weiterleiten.

Weitere Informationen finden Sie in der PubsubIO-Referenzdokumentation.

Python

Rufen Sie die Methode pubsub.WriteToPubSub auf. Standardmäßig verwendet diese Methode eine Eingabesammlung vom Typ bytes, die die Nachrichtennutzlast darstellt. Wenn der Parameter with_attributes True lautet, verwendet die Methode eine Sammlung von PubsubMessage-Objekten.

Weitere Informationen finden Sie in der Referenzdokumentation zum pubsub-Modul.

Go

Rufen Sie die Methode pubsubio.Write auf, um Daten in Pub/Sub zu schreiben. Bei dieser Methode wird eine Eingabesammlung von entweder PubSubMessage-Objekten oder Bytesegmenten verwendet, die die Nachrichtennutzlasten enthalten.

Weitere Informationen finden Sie in der Referenzdokumentation zum pubsubio-Paket.

Weitere Informationen zu Pub/Sub-Nachrichten finden Sie in der Pub/Sub-Dokumentation unter Nachrichtenformat.

Zeitstempel

Pub/Sub legt für jede Nachricht einen Zeitstempel fest. Dieser Zeitstempel stellt den Zeitpunkt dar, zu dem die Nachricht in Pub/Sub veröffentlicht wird. In einem Streaming-Szenario ist vielleicht auch der Ereigniszeitstempel relevant, also der Zeitpunkt, zu dem die Nachrichtendaten generiert wurden. Sie können den Elementzeitstempel von Apache Beam verwenden, um die Ereigniszeit darzustellen. Quellen, die eine unbegrenzte PCollection erstellen, weisen neuen Elementen häufig einen Zeitstempel zu, der der Ereigniszeit entspricht.

Bei Java und Python kann der Pub/Sub-E/A-Connector den Zeitstempel jedes Elements als Pub/Sub-Nachrichtenattribut schreiben. Nachrichtennutzer können mit diesem Attribut den Ereigniszeitstempel abrufen.

Java

Rufen Sie PubsubIO.Write<T>.withTimestampAttribute auf und geben Sie den Namen des Attributs an.

Python

Geben Sie beim Aufrufen von WriteToPubSub den Parameter timestamp_attribute an.

E-Mail-Zustellung

Dataflow unterstützt die genau einmalige Verarbeitung von Nachrichten innerhalb einer Pipeline. Der Pub/Sub-E/A-Connector kann jedoch nicht die genau einmalige Übermittlung von Nachrichten über Pub/Sub garantieren.

Für Java und Python können Sie den Pub/Sub-E/A-Connector so konfigurieren, dass die eindeutige ID jedes Elements als Nachrichtenattribut geschrieben wird. Nachrichtennutzer können dieses Attribut dann verwenden, um Nachrichten zu deduplizieren.

Java

Rufen Sie PubsubIO.Write<T>.withIdAttribute auf und geben Sie den Namen des Attributs an.

Python

Geben Sie beim Aufrufen von WriteToPubSub den Parameter id_label an.

Beispiele

Im folgenden Beispiel wird eine PCollection mit Pub/Sub-Nachrichten erstellt und in ein Pub/Sub-Thema geschrieben. Das Thema wird als Pipelineoption angegeben. Jede Nachricht enthält Nutzlastdaten und eine Reihe von Attributen.

Java

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class PubSubWriteWithAttributes {
  public interface Options extends PipelineOptions {
    @Description("The Pub/Sub topic to write to. Format: projects/<PROJECT>/topics/<TOPIC>")
    String getTopic();

    void setTopic(String value);
  }

  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  static class ExampleData {
    public String name;
    public String product;
    public Long timestamp; // Epoch time in milliseconds

    public ExampleData() {}

    public ExampleData(String name, String product, Long timestamp) {
      this.name = name;
      this.product = product;
      this.timestamp = timestamp;
    }
  }

  // Write messages to a Pub/Sub topic.
  public static void main(String[] args) {
    // Example source data.
    final List<ExampleData> messages = Arrays.asList(
        new ExampleData("Robert", "TV", 1613141590000L),
        new ExampleData("Maria", "Phone", 1612718280000L),
        new ExampleData("Juan", "Laptop", 1611618000000L),
        new ExampleData("Rebeca", "Videogame", 1610000000000L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   ----runner=DirectRunner --topic=projects/MY_PROJECT/topics/MY_TOPIC"
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        // Create some data to write to Pub/Sub.
        .apply(Create.of(messages))
        // Convert the data to Pub/Sub messages.
        .apply(MapElements
            .into(TypeDescriptor.of(PubsubMessage.class))
            .via((message -> {
              byte[] payload = message.product.getBytes(StandardCharsets.UTF_8);
              // Create attributes for each message.
              HashMap<String, String> attributes = new HashMap<String, String>();
              attributes.put("buyer", message.name);
              attributes.put("timestamp", Long.toString(message.timestamp));
              return new PubsubMessage(payload, attributes);
            })))
        // Write the messages to Pub/Sub.
        .apply(PubsubIO.writeMessages().to(options.getTopic()));
    pipeline.run().waitUntilFinish();
  }
}

Python

Richten Sie die Standardanmeldedaten für Anwendungen ein, um sich bei Dataflow zu authentifizieren. Weitere Informationen finden Sie unter Authentifizierung für eine lokale Entwicklungsumgebung einrichten.

import argparse
from typing import Any, Dict, List

import apache_beam as beam
from apache_beam.io import PubsubMessage
from apache_beam.io import WriteToPubSub
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self

def item_to_message(item: Dict[str, Any]) -> PubsubMessage:
    # Re-import needed types. When using the Dataflow runner, this
    # function executes on a worker, where the global namespace is not
    # available. For more information, see:
    # https://cloud.google.com/dataflow/docs/guides/common-errors#name-error
    from apache_beam.io import PubsubMessage

    attributes = {
        'buyer': item['name'],
        'timestamp': str(item['ts'])
    }
    data = bytes(item['product'], 'utf-8')

    return PubsubMessage(data=data, attributes=attributes)

def write_to_pubsub(argv: List[str] = None) -> None:

    # Parse the pipeline options passed into the application. Example:
    #     --topic=$TOPIC_PATH --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option to specify the Pub/Sub topic.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic", required=True)

    example_data = [
        {'name': 'Robert', 'product': 'TV', 'ts': 1613141590000},
        {'name': 'Maria', 'product': 'Phone', 'ts': 1612718280000},
        {'name': 'Juan', 'product': 'Laptop', 'ts': 1611618000000},
        {'name': 'Rebeca', 'product': 'Video game', 'ts': 1610000000000}
    ]
    options = MyOptions()

    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(example_data)
            | "Convert to Pub/Sub messages" >> beam.Map(item_to_message)
            | WriteToPubSub(
                  topic=options.topic,
                  with_attributes=True)
        )

    print('Pipeline ran successfully.')