Dokumen ini menjelaskan cara menulis data teks dari Dataflow ke
Pub/Sub menggunakan konektor I/O PubSubIO
Apache Beam.
Ringkasan
Untuk menulis data ke Pub/Sub, gunakan konektor PubSubIO
. Elemen input dapat berupa pesan Pub/Sub atau hanya data pesan.
Jika elemen input adalah pesan Pub/Sub, Anda dapat menetapkan atribut atau kunci pengurutan pada setiap pesan secara opsional.
Anda dapat menggunakan konektor PubSubIO
versi Java, Python, atau Go,
seperti berikut:
Untuk menulis ke satu topik, panggil metode PubsubIO.writeMessages
. Metode
ini mengambil kumpulan input objek PubsubMessage
. Konektor juga menentukan metode praktis untuk menulis string, pesan Avro yang dienkode biner, atau pesan protobuf yang dienkode biner. Metode ini mengonversi kumpulan input
menjadi pesan Pub/Sub.
Untuk menulis ke kumpulan topik dinamis berdasarkan data input, panggil
writeMessagesDynamic
. Tentukan
topik tujuan untuk setiap pesan dengan memanggil PubsubMessage.withTopic
pada
pesan. Misalnya, Anda dapat merutekan pesan ke berbagai topik berdasarkan nilai kolom tertentu dalam data input.
Untuk informasi selengkapnya, lihat dokumentasi referensi PubsubIO
.
Panggil metode pubsub.WriteToPubSub
.
Secara default, metode ini menggunakan koleksi input jenis bytes
,
yang mewakili payload pesan. Jika parameter with_attributes
adalah
True
, metode akan mengambil kumpulan objek PubsubMessage
.
Untuk informasi selengkapnya, lihat dokumentasi referensi
modul pubsub
.
Untuk menulis data ke Pub/Sub, panggil metode pubsubio.Write
. Metode ini mengambil
kumpulan input objek PubSubMessage
atau slice byte yang berisi
payload pesan.
Untuk informasi selengkapnya, lihat dokumentasi referensi
paket pubsubio
.
Untuk informasi selengkapnya tentang pesan Pub/Sub, lihat Format pesan dalam dokumentasi Pub/Sub.
Stempel waktu
Pub/Sub menetapkan stempel waktu pada setiap pesan. Stempel waktu ini mewakili waktu saat pesan dipublikasikan ke Pub/Sub. Dalam
skenario streaming, Anda mungkin juga tertarik dengan stempel waktu peristiwa, yang
adalah waktu saat data pesan dibuat. Anda dapat menggunakan stempel waktu elemen Apache Beam untuk merepresentasikan waktu peristiwa. Sumber yang membuat PCollection
tanpa batas sering kali
menetapkan stempel waktu yang sesuai dengan waktu peristiwa ke setiap elemen baru.
Untuk Java dan Python, konektor I/O Pub/Sub dapat menulis stempel waktu setiap elemen sebagai atribut pesan Pub/Sub. Pengguna pesan dapat menggunakan atribut ini untuk mendapatkan stempel waktu peristiwa.
Panggil PubsubIO.Write<T>.withTimestampAttribute
dan tentukan nama
atribut.
Tentukan parameter timestamp_attribute
saat Anda memanggil WriteToPubSub
.
Pengiriman pesan
Dataflow mendukung pemrosesan tepat satu kali pesan dalam pipeline. Namun, konektor I/O Pub/Sub tidak dapat menjamin pengiriman pesan tepat satu kali melalui Pub/Sub.
Untuk Java dan Python, Anda dapat mengonfigurasi konektor I/O Pub/Sub untuk menulis ID unik setiap elemen sebagai atribut pesan. Konsumen pesan kemudian dapat menggunakan atribut ini untuk menghapus duplikat pesan.
Panggil PubsubIO.Write<T>.withIdAttribute
dan tentukan nama
atribut.
Tentukan parameter id_label
saat Anda memanggil WriteToPubSub
.
Output langsung
Jika Anda mengaktifkan mode streaming setidaknya satu kali di pipeline, konektor I/O akan menggunakan output langsung. Dalam mode ini, konektor tidak melakukan pemeriksaan pesan, yang memungkinkan penulisan lebih cepat. Namun, percobaan ulang dalam mode ini dapat menyebabkan pesan duplikat dengan ID pesan yang berbeda, yang mungkin mempersulit konsumen pesan untuk menghapus duplikat pesan.
Untuk pipeline yang menggunakan mode exactly-once, Anda dapat mengaktifkan output langsung dengan
menetapkan opsi layanan
streaming_enable_pubsub_direct_output
. Output langsung
mengurangi latensi tulis dan menghasilkan pemrosesan yang lebih efisien. Pertimbangkan opsi
ini jika konsumen pesan Anda dapat menangani pesan duplikat dengan ID pesan
yang tidak unik.
Contoh
Contoh berikut membuat PCollection
pesan Pub/Sub
dan menulisnya ke topik Pub/Sub. Topik ditentukan sebagai
opsi pipeline. Setiap pesan berisi data payload dan kumpulan atribut.
Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.
Untuk melakukan autentikasi ke Dataflow, siapkan Kredensial Default Aplikasi. Untuk mengetahui informasi selengkapnya, baca Menyiapkan autentikasi untuk lingkungan pengembangan lokal.