Écrire des données de Dataflow vers Pub/Sub

Ce document explique comment écrire des données textuelles depuis Dataflow vers Cloud Storage à l'aide du Connecteur d'E/S PubSubIO d'Apache Beam.

Présentation

Pour écrire des données dans Pub/Sub, utilisez le connecteur PubSubIO. Les éléments d'entrée peuvent être des messages Pub/Sub ou simplement les données du message. Si les éléments d'entrée sont des messages Pub/Sub, vous pouvez éventuellement définir des attributs ou une clé de tri pour chaque message.

Vous pouvez utiliser la version Java, Python ou Go du connecteur PubSubIO, comme suit :

Java

Pour écrire dans un seul sujet, appelez la méthode PubsubIO.writeMessages. Cette méthode utilise une collection d'entrée d'objets PubsubMessage. Le connecteur définit également des méthodes pratiques pour écrire des chaînes, des messages Avro encodés en binaire ou des messages protobuf encodés en binaire. Ces méthodes convertissent la collection d'entrée en messages Pub/Sub.

Pour écrire dans un ensemble dynamique de sujets basé sur les données d'entrée, appelez writeMessagesDynamic. Spécifiez le sujet de destination de chaque message en appelant PubsubMessage.withTopic sur le message. Par exemple, vous pouvez acheminer des messages vers différents sujets en fonction de la valeur d'un champ particulier dans vos données d'entrée.

Pour plus d'informations, consultez la documentation de référence de PubsubIO.

Python

Appelez la méthode pubsub.WriteToPubSub. Par défaut, cette méthode utilise une collection d'entrée de type bytes, qui représente la charge utile du message. Si le paramètre with_attributes est True, la méthode utilise une collection d'objets PubsubMessage.

Pour en savoir plus, consultez la documentation de référence du module pubsub.

Go

Pour écrire des données dans Pub/Sub, appelez la méthode pubsubio.Write. Cette méthode prend une collection d'entrée d'objets PubSubMessage ou de tranches d'octets contenant les charges utiles du message.

Pour plus d'informations, consultez la documentation de référence du package pubsubio.

Pour en savoir plus sur les messages Pub/Sub, consultez la section Format des messages dans la documentation Pub/Sub.

Horodatages

Pub/Sub définit un horodatage pour chaque message. Cet horodatage représente l'heure à laquelle le message est publié dans Pub/Sub. Dans un scénario de traitement par flux, vous pouvez également vous soucier de l'horodatage de l'événement, qui correspond à l'heure à laquelle les données du message ont été générées. Vous pouvez utiliser l'horodatage d'élément Apache Beam pour représenter l'heure de l'événement. Les sources qui créent une PCollection illimitée attribuent souvent à chaque nouvel élément un horodatage correspondant à l'heure de l'événement.

Pour Java et Python, le connecteur d'E/S Pub/Sub peut écrire l'horodatage de chaque élément en tant qu'attribut de message Pub/Sub. Les clients de messages peuvent utiliser cet attribut pour obtenir l'horodatage de l'événement.

Java

Appelez PubsubIO.Write<T>.withTimestampAttribute et spécifiez le nom de l'attribut.

Python

Spécifiez le paramètre timestamp_attribute lorsque vous appelez WriteToPubSub.

Distribution des messages

Dataflow accepte le traitement de type "exactement une fois" des messages au sein d'un pipeline. Toutefois, le connecteur d'E/S Pub/Sub ne peut pas garantir une distribution "exactement une fois" des messages via Pub/Sub.

Pour Java et Python, vous pouvez configurer le connecteur d'E/S Pub/Sub pour écrire l'ID unique de chaque élément en tant qu'attribut de message. Les clients de messages peuvent ensuite utiliser cet attribut pour dédupliquer les messages.

Java

Appelez PubsubIO.Write<T>.withIdAttribute et spécifiez le nom de l'attribut.

Python

Spécifiez le paramètre id_label lorsque vous appelez WriteToPubSub.

Examples

L'exemple suivant crée un PCollection de messages Pub/Sub et les écrit dans un sujet Pub/Sub. Le sujet est spécifié en tant qu'option de pipeline. Chaque message contient des données de charge utile et un ensemble d'attributs.

Java

Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;



public class PubSubWriteWithAttributes {
  public interface Options extends PipelineOptions {
    @Description("The Pub/Sub topic to write to. Format: projects/<PROJECT>/topics/<TOPIC>")
    String getTopic();

    void setTopic(String value);
  }

  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  static class ExampleData {
    public String name;
    public String product;
    public Long timestamp; // Epoch time in milliseconds

    public ExampleData() {}

    public ExampleData(String name, String product, Long timestamp) {
      this.name = name;
      this.product = product;
      this.timestamp = timestamp;
    }
  }

  // Write messages to a Pub/Sub topic.
  public static void main(String[] args) {
    // Example source data.
    final List<ExampleData> messages = Arrays.asList(
        new ExampleData("Robert", "TV", 1613141590000L),
        new ExampleData("Maria", "Phone", 1612718280000L),
        new ExampleData("Juan", "Laptop", 1611618000000L),
        new ExampleData("Rebeca", "Videogame", 1610000000000L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --topic=projects/MY_PROJECT/topics/MY_TOPIC"
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        // Create some data to write to Pub/Sub.
        .apply(Create.of(messages))
        // Convert the data to Pub/Sub messages.
        .apply(MapElements
            .into(TypeDescriptor.of(PubsubMessage.class))
            .via((message -> {
              byte[] payload = message.product.getBytes(StandardCharsets.UTF_8);
              // Create attributes for each message.
              HashMap<String, String> attributes = new HashMap<String, String>();
              attributes.put("buyer", message.name);
              attributes.put("timestamp", Long.toString(message.timestamp));
              return new PubsubMessage(payload, attributes);
            })))
        // Write the messages to Pub/Sub.
        .apply(PubsubIO.writeMessages().to(options.getTopic()));
    pipeline.run().waitUntilFinish();
  }
}

Python

Pour vous authentifier auprès de Dataflow, configurez le service Identifiants par défaut de l'application. Pour en savoir plus, consultez Configurer l'authentification pour un environnement de développement local.

import argparse
from typing import Any, Dict, List

import apache_beam as beam
from apache_beam.io import PubsubMessage
from apache_beam.io import WriteToPubSub
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def item_to_message(item: Dict[str, Any]) -> PubsubMessage:
    # Re-import needed types. When using the Dataflow runner, this
    # function executes on a worker, where the global namespace is not
    # available. For more information, see:
    # https://cloud.google.com/dataflow/docs/guides/common-errors#name-error
    from apache_beam.io import PubsubMessage

    attributes = {
        'buyer': item['name'],
        'timestamp': str(item['ts'])
    }
    data = bytes(item['product'], 'utf-8')

    return PubsubMessage(data=data, attributes=attributes)


def write_to_pubsub(argv: List[str] = None) -> None:

    # Parse the pipeline options passed into the application. Example:
    #     --topic=$TOPIC_PATH --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option to specify the Pub/Sub topic.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic", required=True)

    example_data = [
        {'name': 'Robert', 'product': 'TV', 'ts': 1613141590000},
        {'name': 'Maria', 'product': 'Phone', 'ts': 1612718280000},
        {'name': 'Juan', 'product': 'Laptop', 'ts': 1611618000000},
        {'name': 'Rebeca', 'product': 'Video game', 'ts': 1610000000000}
    ]
    options = MyOptions()

    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(example_data)
            | "Convert to Pub/Sub messages" >> beam.Map(item_to_message)
            | WriteToPubSub(
                  topic=options.topic,
                  with_attributes=True)
        )

    print('Pipeline ran successfully.')