Menulis dari Dataflow ke Pub/Sub

Dokumen ini menjelaskan cara menulis data teks dari Dataflow ke Pub/Sub menggunakan konektor I/O PubSubIO Apache Beam.

Ringkasan

Untuk menulis data ke Pub/Sub, gunakan konektor PubSubIO. Elemen input dapat berupa pesan Pub/Sub atau hanya data pesan. Jika elemen input adalah pesan Pub/Sub, Anda dapat menetapkan atribut atau kunci pengurutan pada setiap pesan secara opsional.

Anda dapat menggunakan konektor PubSubIO versi Java, Python, atau Go, seperti berikut:

Untuk menulis ke satu topik, panggil metode PubsubIO.writeMessages. Metode ini mengambil kumpulan input objek PubsubMessage. Konektor juga menentukan metode praktis untuk menulis string, pesan Avro yang dienkode biner, atau pesan protobuf yang dienkode biner. Metode ini mengonversi kumpulan input menjadi pesan Pub/Sub.

Untuk menulis ke kumpulan topik dinamis berdasarkan data input, panggil writeMessagesDynamic. Tentukan topik tujuan untuk setiap pesan dengan memanggil PubsubMessage.withTopic pada pesan. Misalnya, Anda dapat merutekan pesan ke berbagai topik berdasarkan nilai kolom tertentu dalam data input.

Untuk informasi selengkapnya, lihat dokumentasi referensi PubsubIO.

Panggil metode pubsub.WriteToPubSub. Secara default, metode ini menggunakan koleksi input jenis bytes, yang mewakili payload pesan. Jika parameter with_attributes adalah True, metode akan mengambil kumpulan objek PubsubMessage.

Untuk informasi selengkapnya, lihat dokumentasi referensi modul pubsub.

Untuk menulis data ke Pub/Sub, panggil metode pubsubio.Write. Metode ini mengambil kumpulan input objek PubSubMessage atau slice byte yang berisi payload pesan.

Untuk informasi selengkapnya, lihat dokumentasi referensi paket pubsubio.

Untuk informasi selengkapnya tentang pesan Pub/Sub, lihat Format pesan dalam dokumentasi Pub/Sub.

Stempel waktu

Pub/Sub menetapkan stempel waktu pada setiap pesan. Stempel waktu ini mewakili waktu saat pesan dipublikasikan ke Pub/Sub. Dalam skenario streaming, Anda mungkin juga tertarik dengan stempel waktu peristiwa, yang adalah waktu saat data pesan dibuat. Anda dapat menggunakan stempel waktu elemen Apache Beam untuk merepresentasikan waktu peristiwa. Sumber yang membuat PCollection tanpa batas sering kali menetapkan stempel waktu yang sesuai dengan waktu peristiwa ke setiap elemen baru.

Untuk Java dan Python, konektor I/O Pub/Sub dapat menulis stempel waktu setiap elemen sebagai atribut pesan Pub/Sub. Pengguna pesan dapat menggunakan atribut ini untuk mendapatkan stempel waktu peristiwa.

Panggil PubsubIO.Write<T>.withTimestampAttribute dan tentukan nama atribut.

Tentukan parameter timestamp_attribute saat Anda memanggil WriteToPubSub.

Pengiriman pesan

Dataflow mendukung pemrosesan tepat satu kali pesan dalam pipeline. Namun, konektor I/O Pub/Sub tidak dapat menjamin pengiriman pesan tepat satu kali melalui Pub/Sub.

Untuk Java dan Python, Anda dapat mengonfigurasi konektor I/O Pub/Sub untuk menulis ID unik setiap elemen sebagai atribut pesan. Konsumen pesan kemudian dapat menggunakan atribut ini untuk menghapus duplikat pesan.

Panggil PubsubIO.Write<T>.withIdAttribute dan tentukan nama atribut.

Tentukan parameter id_label saat Anda memanggil WriteToPubSub.

Output langsung

Jika Anda mengaktifkan mode streaming setidaknya satu kali di pipeline, konektor I/O akan menggunakan output langsung. Dalam mode ini, konektor tidak melakukan pemeriksaan pesan, yang memungkinkan penulisan lebih cepat. Namun, percobaan ulang dalam mode ini dapat menyebabkan pesan duplikat dengan ID pesan yang berbeda, yang mungkin mempersulit konsumen pesan untuk menghapus duplikat pesan.

Untuk pipeline yang menggunakan mode exactly-once, Anda dapat mengaktifkan output langsung dengan menetapkan opsi layanan streaming_enable_pubsub_direct_output. Output langsung mengurangi latensi tulis dan menghasilkan pemrosesan yang lebih efisien. Pertimbangkan opsi ini jika konsumen pesan Anda dapat menangani pesan duplikat dengan ID pesan yang tidak unik.

Contoh

Contoh berikut membuat PCollection pesan Pub/Sub dan menulisnya ke topik Pub/Sub. Topik ditentukan sebagai opsi pipeline. Setiap pesan berisi data payload dan kumpulan atribut.

JavaPython

Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.DefaultCoder;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;



public class PubSubWriteWithAttributes {
  public interface Options extends PipelineOptions {
    @Description("The Pub/Sub topic to write to. Format: projects/<PROJECT>/topics/<TOPIC>")
    String getTopic();

    void setTopic(String value);
  }

  // A custom datatype for the source data.
  @DefaultCoder(AvroCoder.class)
  static class ExampleData {
    public String name;
    public String product;
    public Long timestamp; // Epoch time in milliseconds

    public ExampleData() {}

    public ExampleData(String name, String product, Long timestamp) {
      this.name = name;
      this.product = product;
      this.timestamp = timestamp;
    }
  }

  // Write messages to a Pub/Sub topic.
  public static void main(String[] args) {
    // Example source data.
    final List<ExampleData> messages = Arrays.asList(
        new ExampleData("Robert", "TV", 1613141590000L),
        new ExampleData("Maria", "Phone", 1612718280000L),
        new ExampleData("Juan", "Laptop", 1611618000000L),
        new ExampleData("Rebeca", "Videogame", 1610000000000L)
    );

    // Parse the pipeline options passed into the application. Example:
    //   --runner=DirectRunner --topic=projects/MY_PROJECT/topics/MY_TOPIC"
    // For more information, see https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    var options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    var pipeline = Pipeline.create(options);
    pipeline
        // Create some data to write to Pub/Sub.
        .apply(Create.of(messages))
        // Convert the data to Pub/Sub messages.
        .apply(MapElements
            .into(TypeDescriptor.of(PubsubMessage.class))
            .via((message -> {
              byte[] payload = message.product.getBytes(StandardCharsets.UTF_8);
              // Create attributes for each message.
              HashMap<String, String> attributes = new HashMap<String, String>();
              attributes.put("buyer", message.name);
              attributes.put("timestamp", Long.toString(message.timestamp));
              return new PubsubMessage(payload, attributes);
            })))
        // Write the messages to Pub/Sub.
        .apply(PubsubIO.writeMessages().to(options.getTopic()));
    pipeline.run().waitUntilFinish();
  }
}

Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.

import argparse
from typing import Any, Dict, List

import apache_beam as beam
from apache_beam.io import PubsubMessage
from apache_beam.io import WriteToPubSub
from apache_beam.options.pipeline_options import PipelineOptions

from typing_extensions import Self


def item_to_message(item: Dict[str, Any]) -> PubsubMessage:
    # Re-import needed types. When using the Dataflow runner, this
    # function executes on a worker, where the global namespace is not
    # available. For more information, see:
    # https://cloud.google.com/dataflow/docs/guides/common-errors#name-error
    from apache_beam.io import PubsubMessage

    attributes = {"buyer": item["name"], "timestamp": str(item["ts"])}
    data = bytes(item["product"], "utf-8")

    return PubsubMessage(data=data, attributes=attributes)


def write_to_pubsub(argv: List[str] = None) -> None:
    # Parse the pipeline options passed into the application. Example:
    #     --topic=$TOPIC_PATH --streaming
    # For more information, see
    # https://beam.apache.org/documentation/programming-guide/#configuring-pipeline-options
    class MyOptions(PipelineOptions):
        @classmethod
        # Define a custom pipeline option to specify the Pub/Sub topic.
        def _add_argparse_args(cls: Self, parser: argparse.ArgumentParser) -> None:
            parser.add_argument("--topic", required=True)

    example_data = [
        {"name": "Robert", "product": "TV", "ts": 1613141590000},
        {"name": "Maria", "product": "Phone", "ts": 1612718280000},
        {"name": "Juan", "product": "Laptop", "ts": 1611618000000},
        {"name": "Rebeca", "product": "Video game", "ts": 1610000000000},
    ]
    options = MyOptions()

    with beam.Pipeline(options=options) as pipeline:
        (
            pipeline
            | "Create elements" >> beam.Create(example_data)
            | "Convert to Pub/Sub messages" >> beam.Map(item_to_message)
            | WriteToPubSub(topic=options.topic, with_attributes=True)
        )

    print("Pipeline ran successfully.")