Gravar do Dataflow para o Pub/Sub

Neste documento, descrevemos como gravar dados de texto do Dataflow para o Pub/Sub usando o conector de E/S do Apache Beam PubSubIO.

Visão geral

Para gravar dados no Pub/Sub, use o conector PubSubIO. Os elementos de entrada podem ser mensagens do Pub/Sub ou apenas os dados da mensagem. Se os elementos de entrada forem mensagens do Pub/Sub, será possível definir atributos ou uma chave de ordem em cada mensagem.

É possível usar a versão em Java, Python ou Go do conector PubSubIO da seguinte maneira:

Java

Para gravar em um único tópico, chame o método PubsubIO.writeMessages. Esse método usa uma coleção de entrada de objetos PubsubMessage. O conector também define métodos de conveniência para gravar strings, mensagens Avro codificadas em binário ou mensagens protobuf codificadas em binários. Esses métodos convertem a coleção de entrada em mensagens do Pub/Sub.

Para gravar em um conjunto dinâmico de temas com base nos dados de entrada, chame writeMessagesDynamic. Para especificar o tópico de destino de cada mensagem, chame PubsubMessage.withTopic nela. Por exemplo, você pode encaminhar mensagens para tópicos diferentes com base no valor de um determinado campo nos seus dados de entrada.

Para mais informações, consulte a documentação de referência PubsubIO.

Python

Chame o método pubsub.WriteToPubSub. Por padrão, esse método usa uma coleção de entrada do tipo bytes, que representa o payload da mensagem. Se o parâmetro with_attributes for True, o método usará uma coleção de objetos PubsubMessage.

Para saber mais, consulte a documentação de referência do módulo pubsub.

Go

Para gravar dados no Pub/Sub, chame o método pubsubio.Write. Esse método usa uma coleção de entrada de objetos PubSubMessage ou frações de bytes que contêm os payloads da mensagem.

Para mais informações, consulte a documentação de referência do pacote pubsubio.

Para mais informações sobre mensagens do Pub/Sub, consulte Formato da mensagem na documentação do Pub/Sub.

Timestamps

O Pub/Sub define um carimbo de data/hora em cada mensagem. Esse carimbo de data/hora representa a hora em que a mensagem é publicada no Pub/Sub. Em um cenário de streaming, o carimbo de data/hora do evento também pode ser relevante, ou seja, a hora em que os dados da mensagem foram gerados. É possível usar o carimbo de data/hora do elemento do Apache Beam para representar o horário do evento. As origens que criam uma PCollection ilimitada geralmente atribuem a cada novo elemento um carimbo de data/hora que corresponde ao horário do evento.

Em Java e Python, o conector de E/S do Pub/Sub pode gravar o carimbo de data/hora de cada elemento como um atributo de mensagem do Pub/Sub. Os consumidores de mensagens podem usar esse atributo para receber o carimbo de data/hora do evento.

Java

Chame PubsubIO.Write<T>.withTimestampAttribute e especifique o nome do atributo.

Python

Especifique o parâmetro timestamp_attribute ao chamar WriteToPubSub.

Entrega de mensagem

O Dataflow é compatível com processamento único de mensagens em um pipeline. No entanto, o conector de E/S do Pub/Sub não garante a entrega única de mensagens pelo Pub/Sub.

Em Java e Python, é possível configurar o conector de E/S do Pub/Sub para gravar o ID exclusivo de cada elemento como um atributo de mensagem. Os consumidores de mensagens podem usar esse atributo para eliminar a duplicação de mensagens.

Java

Chame PubsubIO.Write<T>.withIdAttribute e especifique o nome do atributo.

Python

Especifique o parâmetro id_label ao chamar WriteToPubSub.

Examples

O exemplo a seguir cria um PCollection de mensagens do Pub/Sub e as grava em um tópico do Pub/Sub. O tópico é especificado como uma opção de pipeline. Cada mensagem contém dados de payload e um conjunto de atributos.

Java

Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

public class PubSubWriteWithAttributes {
  public interface Options extends PipelineOptions {
    @Description("The Pub/Sub topic to write to. Format: projects/<PROJECT>/topics/<TOPIC>")
    String getTopic();

    void setTopic(String value);
  }

  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  static class ExampleData {
    public String name;
    public String product;
    public Long timestamp; // Epoch time in milliseconds

    public ExampleData() {}

    public ExampleData(String name, String product, Long timestamp) {
      this.name = name;
      this.product = product;
      this.timestamp = timestamp;
    }
  }

  // Write messages to a Pub/Sub topic.
  public static void main(String[] args) {
    // Example source data.
    final List<ExampleData> messages = Arrays.asList(
        new ExampleData("Robert", "TV", 1613141590000L),
        new ExampleData("Maria", "Phone", 1612718280000L),
        new ExampleData("Juan", "Laptop", 1611618000000L),
        new ExampleData("Rebeca", "Videogame", 1610000000000L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   ----runner=DirectRunner --topic=projects/MY_PROJECT/topics/MY_TOPIC"
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        // Create some data to write to Pub/Sub.
        .apply(Create.of(messages))
        // Convert the data to Pub/Sub messages.
        .apply(MapElements
            .into(TypeDescriptor.of(PubsubMessage.class))
            .via((message -> {
              byte[] payload = message.product.getBytes(StandardCharsets.UTF_8);
              // Create attributes for each message.
              HashMap<String, String> attributes = new HashMap<String, String>();
              attributes.put("buyer", message.name);
              attributes.put("timestamp", Long.toString(message.timestamp));
              return new PubsubMessage(payload, attributes);
            })))
        // Write the messages to Pub/Sub.
        .apply(PubsubIO.writeMessages().to(options.getTopic()));
    pipeline.run().waitUntilFinish();
  }
}

Python

Para autenticar no Dataflow, configure o Application Default Credentials. Para mais informações, consulte Configurar a autenticação para um ambiente de desenvolvimento local.

import argparse
from typing import Any, Dict, List

import apache_beam as beam
from apache_beam.io import PubsubMessage
from apache_beam.io import WriteToPubSub
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self

def item_to_message(item: Dict[str, Any]) -> PubsubMessage:
    # Re-import needed types. When using the Dataflow runner, this
    # function executes on a worker, where the global namespace is not
    # available. For more information, see:
    # https://cloud.google.com/dataflow/docs/guides/common-errors#name-error
    from apache_beam.io import PubsubMessage

    attributes = {
        'buyer': item['name'],
        'timestamp': str(item['ts'])
    }
    data = bytes(item['product'], 'utf-8')

    return PubsubMessage(data=data, attributes=attributes)

def write_to_pubsub(argv: List[str] = None) -> None:

    # Parse the pipeline options passed into the application. Example:
    #     --topic=$TOPIC_PATH --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option to specify the Pub/Sub topic.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic", required=True)

    example_data = [
        {'name': 'Robert', 'product': 'TV', 'ts': 1613141590000},
        {'name': 'Maria', 'product': 'Phone', 'ts': 1612718280000},
        {'name': 'Juan', 'product': 'Laptop', 'ts': 1611618000000},
        {'name': 'Rebeca', 'product': 'Video game', 'ts': 1610000000000}
    ]
    options = MyOptions()

    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(example_data)
            | "Convert to Pub/Sub messages" >> beam.Map(item_to_message)
            | WriteToPubSub(
                  topic=options.topic,
                  with_attributes=True)
        )

    print('Pipeline ran successfully.')