Halaman ini menguraikan praktik terbaik yang dapat digunakan saat mengembangkan pipeline Dataflow Anda. Menggunakan praktik terbaik ini memiliki manfaat sebagai berikut:
- Meningkatkan kemampuan observasi dan performa pipeline
- Peningkatan produktivitas developer
- Meningkatkan kemampuan pengujian pipeline
Contoh kode Apache Beam di halaman ini menggunakan Java, tetapi kontennya berlaku untuk Apache Beam Java, Python, dan Go SDK.
Pertanyaan yang perlu dipertimbangkan
Saat mendesain pipeline Anda, pertimbangkan pertanyaan-pertanyaan berikut:
- Di mana data input pipeline Anda disimpan? Berapa banyak set data {i>input <i} yang Anda miliki?
- Seperti apa data Anda?
- Apa yang ingin Anda lakukan dengan data Anda?
- Ke mana seharusnya data output pipeline Anda dipindahkan?
- Apakah tugas Dataflow Anda menggunakan Assured Workloads?
Menggunakan template
Untuk mempercepat pengembangan pipeline, gunakan template Dataflow jika memungkinkan, bukan dengan menulis kode Apache Beam. Template memiliki manfaat berikut:
- Template dapat digunakan kembali.
- Template memungkinkan Anda menyesuaikan setiap tugas dengan mengubah parameter pipeline tertentu.
- Siapa pun yang Anda beri izin dapat menggunakan template untuk men-deploy pipeline. Misalnya, developer dapat membuat tugas dari template, dan data scientist di organisasi dapat men-deploy template tersebut di lain waktu.
Anda dapat menggunakan template yang disediakan Google atau membuat template Anda sendiri. Beberapa template yang disediakan Google memungkinkan Anda menambahkan logika kustom sebagai langkah pipeline. Misalnya, template Pub/Sub to BigQuery menyediakan parameter untuk menjalankan fungsi yang ditentukan pengguna (UDF) JavaScript yang tersimpan di Cloud Storage.
Karena template yang disediakan Google bersifat open source berdasarkan Lisensi Apache 2.0, Anda dapat menggunakannya sebagai dasar untuk pipeline baru. {i>Template<i} juga berguna sebagai contoh kode. Lihat kode template di repositori GitHub.
Assured Workloads
Assured Workloads membantu menegakkan persyaratan keamanan dan kepatuhan bagi pelanggan Google Cloud. Misalnya, Region dan Dukungan Uni Eropa dengan Kontrol Kedaulatan membantu menerapkan jaminan residensi data dan kedaulatan data bagi pelanggan yang berbasis di Uni Eropa. Untuk menyediakan fitur ini, beberapa fitur Dataflow dibatasi atau dibatasi. Jika Anda menggunakan Assured Workloads dengan Dataflow, semua resource yang diakses pipeline Anda harus berada dalam project atau folder Assured Workloads organisasi Anda. Referensi ini mencakup:
- Bucket Cloud Storage
- Set data BigQuery
- Langganan dan topik Pub/Sub
- Set data Firestore
- Konektor I/O
Di Dataflow, untuk tugas streaming yang dibuat setelah 7 Maret 2024, semua data pengguna dienkripsi dengan CMEK.
Untuk tugas streaming yang dibuat sebelum 7 Maret 2024, kunci data yang digunakan dalam operasi berbasis kunci, seperti windowing, pengelompokan, dan penggabungan, tidak dilindungi oleh enkripsi CMEK. Untuk mengaktifkan enkripsi ini bagi tugas Anda, kuras atau batalkan tugas, lalu mulai ulang tugas. Untuk mengetahui informasi selengkapnya, lihat Enkripsi artefak status pipeline.
Membagikan data di seluruh pipeline
Tidak ada mekanisme komunikasi lintas pipeline khusus Dataflow untuk berbagi data atau memproses konteks antar-pipeline. Anda dapat menggunakan penyimpanan yang andal seperti Cloud Storage atau cache dalam memori seperti App Engine untuk berbagi data antar-instance pipeline.
Jadwalkan tugas
Anda dapat mengotomatiskan eksekusi pipeline dengan cara berikut:
- Gunakan Cloud Scheduler.
- Gunakan Operator Dataflow Apache Airflow, salah satu dari beberapa Operator Google Cloud dalam alur kerja Cloud Composer.
- Menjalankan proses kustom (cron) job di Compute Engine.
Praktik terbaik untuk menulis kode pipeline
Bagian berikut memberikan praktik terbaik yang dapat digunakan saat Anda membuat pipeline dengan menulis kode Apache Beam.
Menyusun kode Apache Beam
Untuk membuat pipeline, sebaiknya gunakan transformasi Apache Beam
pemrosesan paralel
ParDo
generik.
Saat menerapkan transformasi ParDo
, Anda akan memberikan kode dalam bentuk
objek DoFn
. DoFn
adalah class Apache Beam SDK yang menentukan fungsi
pemrosesan terdistribusi.
Anda dapat menganggap kode DoFn
sebagai entity kecil dan independen: kemungkinan ada
banyak instance yang berjalan di mesin yang berbeda, masing-masing tanpa
mengetahui yang lainnya. Dengan demikian, sebaiknya buat fungsi murni, yang
ideal untuk sifat elemen DoFn
yang paralel dan terdistribusi.
Fungsi murni memiliki karakteristik berikut:
- Fungsi murni tidak bergantung pada status tersembunyi atau eksternal.
- Tidak memiliki efek samping yang dapat diamati.
- Mereka bersifat determenistik.
Model fungsi murni tidak sepenuhnya kaku. Jika kode Anda tidak bergantung pada hal-hal yang tidak dijamin oleh layanan Dataflow, informasi status atau data inisialisasi eksternal dapat valid untuk DoFn
dan objek fungsi lainnya.
Saat menyusun transformasi ParDo
dan membuat elemen DoFn
,
pertimbangkan panduan berikut:
- Saat Anda menggunakan pemrosesan tepat satu kali, layanan Dataflow menjamin bahwa setiap elemen dalam
PCollection
input Anda diproses oleh instanceDoFn
tepat satu kali. - Layanan Dataflow tidak menjamin berapa kali
DoFn
dipanggil. - Layanan Dataflow tidak menjamin secara persis cara pengelompokan elemen yang didistribusikan. Hal tersebut tidak menjamin elemen mana, jika ada, yang diproses bersama.
- Layanan Dataflow tidak menjamin jumlah pasti instance
DoFn
yang dibuat selama pipeline. - Layanan Dataflow bersifat fault-tolerant dan mungkin mencoba lagi kode Anda beberapa kali jika pekerja mengalami masalah.
- Layanan Dataflow mungkin membuat salinan cadangan kode Anda. Masalah dapat terjadi dengan efek samping manual, seperti jika kode Anda bergantung pada atau membuat file sementara dengan nama yang tidak unik.
- Layanan Dataflow membuat serialisasi pemrosesan elemen per instance
DoFn
. Kode Anda tidak harus benar-benar aman untuk thread, tetapi setiap status yang dibagikan di antara beberapa instanceDoFn
harus aman untuk thread.
Membuat library transformasi yang dapat digunakan kembali
Model pemrograman Apache Beam memungkinkan Anda menggunakan kembali transformasi. Dengan membuat library bersama untuk transformasi umum, Anda dapat meningkatkan penggunaan kembali, kemampuan pengujian, dan kepemilikan kode oleh tim yang berbeda.
Pertimbangkan dua contoh kode Java berikut, yang membaca peristiwa pembayaran. Dengan asumsi bahwa kedua pipeline melakukan pemrosesan yang sama, keduanya dapat menggunakan transformasi yang sama melalui library bersama untuk langkah pemrosesan yang tersisa.
Contoh pertama adalah dari sumber Pub/Sub tak terbatas:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options)
// Initial read transform
PCollection<PaymentEvent> payments =
p.apply("Read from topic",
PubSubIO.readStrings().withTimestampAttribute(...).fromTopic(...))
.apply("Parse strings into payment events",
ParDo.of(new ParsePaymentEventFn()));
Contoh kedua adalah dari sumber database relasional terbatas:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<PaymentEvent> payments =
p.apply(
"Read from database table",
JdbcIO.<PaymentEvent>read()
.withDataSourceConfiguration(...)
.withQuery(...)
.withRowMapper(new RowMapper<PaymentEvent>() {
...
}));
Cara Anda menerapkan praktik terbaik penggunaan kembali kode bervariasi menurut bahasa pemrograman dan alat build. Misalnya, jika menggunakan Maven, Anda dapat memisahkan kode transformasi ke dalam modulnya sendiri. Kemudian, Anda dapat menyertakan modul sebagai submodul dalam project multi-modul yang lebih besar untuk pipeline yang berbeda, seperti ditunjukkan dalam contoh kode berikut:
// Reuse transforms across both pipelines
payments
.apply("ValidatePayments", new PaymentTransforms.ValidatePayments(...))
.apply("ProcessPayments", new PaymentTransforms.ProcessPayments(...))
...
Untuk informasi selengkapnya, lihat halaman dokumentasi Apache Beam berikut:
- Persyaratan guna menulis kode pengguna untuk transformasi Apache Beam
- Panduan gaya
PTransform
: panduan gaya bagi penulis koleksiPTransform
baru yang dapat digunakan kembali
Menggunakan antrean yang dihentikan pengirimannya untuk penanganan error
Terkadang, pipeline Anda tidak dapat memproses elemen. Masalah data merupakan penyebab umum. Misalnya, elemen yang berisi JSON yang diformat dengan buruk dapat menyebabkan kegagalan penguraian.
Meskipun Anda dapat menangkap pengecualian dalam metode
DoFn.ProcessElement
, mencatat error, dan menghapus elemen, pendekatan ini kehilangan data
dan mencegah data diperiksa nanti untuk penanganan atau pemecahan masalah manual.
Sebagai gantinya, gunakan pola yang disebut antrean yang dihentikan pengirimannya (antrean pesan yang tidak diproses).
Menangkap pengecualian dalam metode DoFn.ProcessElement
dan mencatat error. Daripada menghapus elemen yang gagal, gunakan output bercabang untuk menulis elemen yang gagal ke objek PCollection
yang terpisah. Elemen ini kemudian ditulis ke sink data untuk diperiksa
dan ditangani dengan transformasi terpisah.
Contoh kode Java berikut menunjukkan cara menerapkan pola antrean mati.
TupleTag<Output> successTag = new TupleTag<>() {};
TupleTag<Input> deadLetterTag = new TupleTag<>() {};
PCollection<Input> input = /* ... */;
PCollectionTuple outputTuple =
input.apply(ParDo.of(new DoFn<Input, Output>() {
@Override
void processElement(ProcessContext c) {
try {
c.output(process(c.element()));
} catch (Exception e) {
LOG.severe("Failed to process input {} -- adding to dead-letter file",
c.element(), e);
c.sideOutput(deadLetterTag, c.element());
}
}).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));
// Write the dead-letter inputs to a BigQuery table for later analysis
outputTuple.get(deadLetterTag)
.apply(BigQueryIO.write(...));
// Retrieve the successful elements...
PCollection<Output> success = outputTuple.get(successTag);
// and continue processing ...
Gunakan Cloud Monitoring untuk menerapkan kebijakan pemantauan dan pemberitahuan yang berbeda untuk antrean yang dihentikan pengirimannya di pipeline Anda. Misalnya, Anda dapat memvisualisasikan jumlah dan ukuran elemen yang diproses oleh transformasi yang dihentikan pengirimannya dan mengonfigurasi pemberitahuan untuk dipicu jika kondisi nilai minimum tertentu terpenuhi.
Menangani mutasi skema
Anda dapat menangani data yang memiliki skema yang tidak terduga tetapi valid dengan menggunakan pola huruf mati, yang menulis elemen yang gagal ke objek PCollection
yang terpisah.
Dalam beberapa kasus, Anda ingin otomatis menangani elemen
yang mencerminkan skema yang berubah sebagai elemen yang valid. Misalnya, jika skema elemen mencerminkan mutasi seperti penambahan kolom baru, Anda dapat menyesuaikan skema sink data ini untuk mengakomodasi mutasi.
Mutasi skema otomatis bergantung pada pendekatan percabangan-output yang digunakan oleh pola yang dihentikan. Namun, dalam hal ini, pemicu akan memicu transformasi yang mengubah skema tujuan setiap kali skema tambahan ditemukan. Untuk contoh pendekatan ini, lihat Cara menangani perubahan skema JSON di pipeline streaming, dengan Square Enix di blog Google Cloud.
Menentukan cara menggabungkan {i>database<i}
Menggabungkan set data adalah kasus penggunaan umum untuk pipeline data. Anda dapat menggunakan
input samping atau transformasi CoGroupByKey
untuk melakukan penggabungan di pipeline.
Masing-masing memiliki kelebihan dan kekurangan.
Input samping
menyediakan cara fleksibel untuk menyelesaikan masalah pemrosesan data umum, seperti pengayaan
data dan pencarian dengan kunci. Tidak seperti objek PCollection
, input samping
dapat diubah dan dapat ditentukan saat runtime. Misalnya, nilai dalam
input samping mungkin dihitung oleh cabang lain dalam pipeline Anda atau ditentukan dengan
memanggil layanan jarak jauh.
Dataflow mendukung input samping dengan menyimpan data ke dalam penyimpanan persisten, mirip dengan disk bersama. Konfigurasi ini membuat input sisi lengkap tersedia untuk semua pekerja.
Namun, ukuran input samping bisa sangat besar dan mungkin tidak masuk ke dalam memori pekerja. Membaca dari input sisi besar dapat menyebabkan masalah performa jika pekerja harus terus-menerus membaca dari penyimpanan persisten.
Transformasi CoGroupByKey
adalah
transformasi Apache Beam inti
yang menggabungkan (meratakan) beberapa objek PCollection
dan mengelompokkan elemen yang
memiliki kunci yang sama. Tidak seperti input samping, yang membuat seluruh data input samping
tersedia untuk setiap pekerja, CoGroupByKey
melakukan operasi acak (pengelompokan)
untuk mendistribusikan data ke seluruh pekerja. Oleh karena itu, CoGroupByKey
ideal jika objek PCollection
yang ingin Anda gabungkan sangat besar dan tidak sesuai dengan memori pekerja.
Ikuti panduan ini untuk membantu memutuskan apakah akan menggunakan input samping atau
CoGroupByKey
:
- Gunakan input samping saat salah satu objek
PCollection
yang Anda gabungkan ukurannya jauh lebih kecil dari yang lain, dan objekPCollection
yang lebih kecil cocok dengan memori pekerja. Menyimpan input samping ke dalam cache sepenuhnya ke dalam memori membuat pengambilan elemen dapat dilakukan dengan cepat dan efisien. - Gunakan input samping jika Anda memiliki objek
PCollection
yang harus digabungkan beberapa kali di pipeline Anda. Daripada menggunakan beberapa transformasiCoGroupByKey
, buat satu input sisi yang dapat digunakan kembali oleh beberapa transformasiParDo
. - Gunakan
CoGroupByKey
jika Anda perlu mengambil sebagian besar objekPCollection
yang secara signifikan melebihi memori pekerja.
Untuk mengetahui informasi selengkapnya, lihat Memecahkan masalah Dataflow kehabisan memori.
Meminimalkan operasi per elemen yang mahal
Instance DoFn
memproses batch elemen yang disebut
paket,
yang merupakan unit atom pekerjaan yang terdiri dari nol atau beberapa
elemen. Setiap elemen kemudian diproses oleh metode
DoFn.ProcessElement
, yang berjalan untuk setiap elemen. Karena metode DoFn.ProcessElement
dipanggil untuk setiap elemen, setiap operasi yang memakan waktu atau
biaya komputasi yang mahal, yang dipanggil oleh metode tersebut,
akan dijalankan untuk setiap elemen yang diproses oleh metode tersebut.
Jika Anda hanya perlu menjalankan operasi yang mahal satu kali untuk sekumpulan elemen,
sertakan operasi tersebut dalam metode DoFn.Setup
atau DoFn.StartBundle
,
bukan dalam elemen DoFn.ProcessElement
. Contohnya mencakup
operasi berikut:
Mengurai file konfigurasi yang mengontrol beberapa aspek perilaku instance
DoFn
. Hanya panggil tindakan ini satu kali, saat instanceDoFn
diinisialisasi, dengan menggunakan metodeDoFn.Setup
.Membuat instance klien berumur pendek yang digunakan kembali di semua elemen dalam paket, seperti saat semua elemen dalam paket dikirim melalui satu koneksi jaringan. Panggil tindakan ini satu kali per paket menggunakan metode
DoFn.StartBundle
.
Batasi ukuran tumpukan dan panggilan serentak ke layanan eksternal
Saat memanggil layanan eksternal, Anda dapat mengurangi overhead per panggilan menggunakan transformasi
GroupIntoBatches
. Transformasi ini membuat batch elemen dengan ukuran tertentu.
Pengelompokan mengirim elemen ke layanan eksternal sebagai satu payload, bukan
satu per satu.
Dalam kombinasi dengan pengelompokan, batasi jumlah maksimum panggilan paralel (secara serentak) ke layanan eksternal dengan memilih kunci yang sesuai untuk mempartisi data yang masuk. Jumlah partisi menentukan paralelisasi maksimum. Misalnya, jika setiap elemen diberi kunci yang sama, transformasi downstream untuk memanggil layanan eksternal tidak akan berjalan secara paralel.
Pertimbangkan salah satu pendekatan berikut untuk menghasilkan kunci elemen:
- Pilih atribut set data untuk digunakan sebagai kunci data, seperti ID pengguna.
- Membuat kunci data untuk memisahkan elemen secara acak pada sejumlah
partisi tetap, dengan jumlah kemungkinan nilai kunci menentukan jumlah
partisi. Anda perlu membuat cukup partisi untuk paralelisme.
Setiap partisi harus memiliki elemen yang cukup agar transformasi
GroupIntoBatches
dapat berguna.
Contoh kode Java berikut menunjukkan cara membagi elemen secara acak dalam sepuluh partisi:
// PII or classified data which needs redaction.
PCollection<String> sensitiveData = ...;
int numPartitions = 10; // Number of parallel batches to create.
PCollection<KV<Long, Iterable<String>>> batchedData =
sensitiveData
.apply("Assign data into partitions",
ParDo.of(new DoFn<String, KV<Long, String>>() {
Random random = new Random();
@ProcessElement
public void assignRandomPartition(ProcessContext context) {
context.output(
KV.of(randomPartitionNumber(), context.element()));
}
private static int randomPartitionNumber() {
return random.nextInt(numPartitions);
}
}))
.apply("Create batches of sensitive data",
GroupIntoBatches.<Long, String>ofSize(100L));
// Use batched sensitive data to fully utilize Redaction API,
// which has a rate limit but allows large payloads.
batchedData
.apply("Call Redaction API in batches", callRedactionApiOnBatch());
Mengidentifikasi masalah performa yang disebabkan oleh langkah-langkah gabungan
Dataflow membuat grafik langkah-langkah yang merepresentasikan pipeline Anda berdasarkan transformasi dan data yang digunakan untuk membuatnya. Grafik ini disebut grafik eksekusi pipeline.
Saat Anda men-deploy pipeline, Dataflow mungkin mengubah grafik eksekusi pipeline untuk meningkatkan performa. Misalnya, Dataflow mungkin menggabungkan beberapa operasi, sebuah proses yang dikenal sebagai pengoptimalan fusi, untuk menghindari dampak performa dan biaya saat menulis setiap objek PCollection
menengah di pipeline Anda.
Dalam beberapa kasus, Dataflow mungkin salah menentukan cara optimal untuk menggabungkan operasi dalam pipeline, yang dapat membatasi kemampuan tugas Anda untuk memanfaatkan semua pekerja yang tersedia. Dalam kasus tersebut, Anda dapat mencegah operasi digabungkan.
Perhatikan contoh kode Apache Beam berikut. Transformasi
GenerateSequence
membuat objek PCollection
kecil yang dibatasi, yang kemudian diproses
lebih lanjut oleh dua transformasi ParDo
downstream.
Transformasi Find Primes Less-than-N
mungkin mahal secara komputasi dan
mungkin berjalan lambat untuk jumlah besar. Sebaliknya, transformasi
Increment Number
mungkin selesai dengan cepat.
import com.google.common.math.LongMath;
...
public class FusedStepsPipeline {
final class FindLowerPrimesFn extends DoFn<Long, String> {
@ProcessElement
public void processElement(ProcessContext c) {
Long n = c.element();
if (n > 1) {
for (long i = 2; i < n; i++) {
if (LongMath.isPrime(i)) {
c.output(Long.toString(i));
}
}
}
}
}
public static void main(String[] args) {
Pipeline p = Pipeline.create(options);
PCollection<Long> sequence = p.apply("Generate Sequence",
GenerateSequence
.from(0)
.to(1000000));
// Pipeline branch 1
sequence.apply("Find Primes Less-than-N",
ParDo.of(new FindLowerPrimesFn()));
// Pipeline branch 2
sequence.apply("Increment Number",
MapElements.via(new SimpleFunction<Long, Long>() {
public Long apply(Long n) {
return ++n;
}
}));
p.run().waitUntilFinish();
}
}
Diagram berikut menunjukkan representasi grafis pipeline di antarmuka pemantauan Dataflow.
Antarmuka pemantauan Dataflow menunjukkan bahwa kecepatan pemrosesan lambat yang sama terjadi untuk kedua transformasi, khususnya 13 elemen per detik. Anda mungkin mengharapkan transformasi Increment Number
memproses
elemen dengan cepat, tetapi transformasi tersebut tampak terikat dengan kecepatan
pemrosesan yang sama dengan Find Primes Less-than-N
.
Alasannya adalah Dataflow menggabungkan langkah-langkah tersebut menjadi satu tahap, sehingga mencegahnya berjalan secara independen. Anda dapat menggunakan perintah
gcloud dataflow jobs describe
untuk menemukan informasi selengkapnya:
gcloud dataflow jobs describe --full job-id --format json
Dalam output yang dihasilkan, langkah-langkah gabungan tersebut dijelaskan dalam objek ExecutionStageSummary
di array ComponentTransform
:
...
"executionPipelineStage": [
{
"componentSource": [
...
],
"componentTransform": [
{
"name": "s1",
"originalTransform": "Generate Sequence/Read(BoundedCountingSource)",
"userName": "Generate Sequence/Read(BoundedCountingSource)"
},
{
"name": "s2",
"originalTransform": "Find Primes Less-than-N",
"userName": "Find Primes Less-than-N"
},
{
"name": "s3",
"originalTransform": "Increment Number/Map",
"userName": "Increment Number/Map"
}
],
"id": "S01",
"kind": "PAR_DO_KIND",
"name": "F0"
}
...
Dalam skenario ini, karena transformasi Find Primes Less-than-N
adalah langkah yang lambat,
sehingga membatalkan fusi sebelum langkah tersebut adalah strategi yang tepat. Salah satu metode untuk
memisahkan langkah-langkah adalah dengan menyisipkan transformasi
GroupByKey
dan membatalkan pengelompokan sebelum langkah, seperti yang ditunjukkan dalam contoh kode
Java berikut.
sequence
.apply("Map Elements", MapElements.via(new SimpleFunction<Long, KV<Long, Void>>() {
public KV<Long, Void> apply(Long n) {
return KV.of(n, null);
}
}))
.apply("Group By Key", GroupByKey.<Long, Void>create())
.apply("Emit Keys", Keys.<Long>create())
.apply("Find Primes Less-than-N", ParDo.of(new FindLowerPrimesFn()));
Anda juga dapat menggabungkan langkah-langkah yang tidak menyatu ini menjadi transformasi gabungan yang dapat digunakan kembali.
Setelah Anda membatalkan penggabungan langkah, saat Anda menjalankan pipeline, Increment Number
selesai dalam hitungan detik, dan transformasi Find Primes Less-than-N
yang berjalan lebih lama akan berjalan dalam tahap terpisah.
Contoh ini menerapkan operasi grup dan pemisahan untuk membatalkan penggabungan langkah.
Anda dapat menggunakan pendekatan lain untuk keadaan lain. Dalam hal ini, menangani
output duplikat bukanlah masalah, mengingat output berturut-turut dari
transformasi GenerateSequence
.
Objek KV
dengan kunci duplikat dihapus duplikatnya menjadi satu kunci dalam transformasi
grup (GroupByKey
)
dan transformasi pemisahan
(Keys
). Untuk mempertahankan duplikat setelah operasi pengelompokan dan pemisahan, buat key-value pair dengan menggunakan langkah-langkah berikut:
- Gunakan kunci acak dan input asli sebagai nilai.
- Kelompokkan menggunakan kunci acak.
- Memberikan nilai untuk setiap kunci sebagai output.
Anda juga dapat menggunakan transformasi
Reshuffle
untuk mencegah penggabungan transformasi yang mengelilinginya. Namun, efek samping transformasi
Reshuffle
tidak dapat diterapkan pada
runner Apache Beam yang berbeda.
Untuk informasi selengkapnya tentang paralelisme dan pengoptimalan fusi, lihat Siklus proses pipeline.
Menggunakan metrik Apache Beam untuk mengumpulkan insight pipeline
Metrik Apache Beam adalah class utilitas yang menghasilkan metrik untuk melaporkan properti pipeline yang berjalan. Saat Anda menggunakan Cloud Monitoring, metrik Apache Beam tersedia sebagai metrik kustom Cloud Monitoring.
Contoh berikut menunjukkan metrik Counter
Apache Beam yang digunakan di subclass DoFn
.
Kode contoh menggunakan dua penghitung. Satu penghitung melacak kegagalan penguraian JSON
(malformedCounter
), dan penghitung lainnya melacak apakah pesan JSON valid
tetapi berisi payload kosong (emptyCounter
). Di Cloud Monitoring,
nama metrik kustom adalah custom.googleapis.com/dataflow/malformedJson
dan
custom.googleapis.com/dataflow/emptyPayload
. Anda dapat menggunakan metrik kustom untuk membuat visualisasi dan kebijakan pemberitahuan di Cloud Monitoring.
final TupleTag<String> errorTag = new TupleTag<String>(){};
final TupleTag<MockObject> successTag = new TupleTag<MockObject>(){};
final class ParseEventFn extends DoFn<String, MyObject> {
private final Counter malformedCounter = Metrics.counter(ParseEventFn.class, "malformedJson");
private final Counter emptyCounter = Metrics.counter(ParseEventFn.class, "emptyPayload");
private Gson gsonParser;
@Setup
public setup() {
gsonParser = new Gson();
}
@ProcessElement
public void processElement(ProcessContext c) {
try {
MyObject myObj = gsonParser.fromJson(c.element(), MyObject.class);
if (myObj.getPayload() != null) {
// Output the element if non-empty payload
c.output(successTag, myObj);
}
else {
// Increment empty payload counter
emptyCounter.inc();
}
}
catch (JsonParseException e) {
// Increment malformed JSON counter
malformedCounter.inc();
// Output the element to dead-letter queue
c.output(errorTag, c.element());
}
}
}
Pelajari lebih lanjut
Halaman berikut memberikan informasi lebih lanjut tentang cara menyusun pipeline, cara memilih transformasi yang akan diterapkan ke data, dan hal-hal yang harus dipertimbangkan saat memilih metode input dan output pipeline.
Untuk mengetahui informasi selengkapnya tentang cara membuat kode pengguna, lihat persyaratan untuk fungsi yang disediakan pengguna.