Halaman ini menguraikan praktik terbaik yang dapat digunakan ketika mengembangkan pipeline Dataflow Anda. Menggunakan praktik terbaik ini akan memberikan manfaat berikut:
- Meningkatkan kemampuan observasi dan performa pipeline
- Peningkatan produktivitas developer
- Meningkatkan kemampuan pengujian pipeline
Contoh kode Apache Beam di halaman ini menggunakan Java, tetapi konten berlaku untuk Apache Beam Java, Python, dan Go SDK.
Pertanyaan yang perlu dipertimbangkan
Ketika mendesain pipeline Anda, pertimbangkan pertanyaan-pertanyaan berikut:
- Di mana data input pipeline Anda disimpan? Berapa banyak set data input yang Anda miliki?
- Seperti apa data Anda?
- Apa yang ingin Anda lakukan dengan data Anda?
- Ke mana seharusnya data output pipeline Anda disimpan?
- Apakah tugas Dataflow Anda menggunakan Assured Workloads?
Menggunakan template
Untuk mempercepat pengembangan pipeline, bukan membangun pipeline dengan menulis kode Apache Beam, gunakan Template Dataflow jika memungkinkan. Template memiliki manfaat berikut:
- Template dapat digunakan kembali.
- Dengan template, Anda dapat menyesuaikan setiap tugas dengan mengubah parameter pipeline tertentu.
- Siapa pun yang Anda beri izin dapat menggunakan template untuk men-deploy pipeline. Misalnya, developer dapat membuat tugas dari template, dan data scientist di organisasi dapat menerapkan {i>template <i}tersebut di lain waktu.
Anda dapat menggunakan template yang disediakan Google. atau Anda dapat membuat template sendiri. Beberapa template yang disediakan Google memungkinkan Anda menambahkan logika kustom sebagai langkah pipeline. Misalnya, Template Pub/Sub ke BigQuery menyediakan parameter untuk menjalankan {i> User-defined <i}JavaScript (UDF) yang yang disimpan di Cloud Storage.
Karena template yang disediakan Google bersifat open source di bawah Lisensi Apache 2.0, Anda dapat menggunakannya sebagai dasar untuk pipeline. Template ini juga berguna sebagai contoh kode. Lihat template kode pada Repositori GitHub.
Assured Workloads
Assured Workloads membantu menerapkan persyaratan keamanan dan kepatuhan untuk pelanggan Google Cloud. Misalnya, Wilayah dan Dukungan Uni Eropa dengan Kontrol Kedaulatan membantu menegakkan jaminan residensi data dan jaminan kedaulatan data untuk berbasis di Uni Eropa pelanggan. Untuk menyediakan fitur-fitur ini, beberapa fitur Dataflow adalah dibatasi atau dibatasi. Jika Anda menggunakan Assured Workloads dengan Dataflow, semua resource yang diakses pipeline Anda harus ditempatkan di Project atau folder Assured Workloads. Referensi ini mencakup:
- Bucket Cloud Storage
- Set data BigQuery
- Topik dan langganan Pub/Sub
- Set data Firestore
- konektor I/O
Di Dataflow, untuk tugas streaming yang dibuat setelah 7 Maret 2024, semua data pengguna dienkripsi dengan CMEK.
Untuk tugas streaming yang dibuat sebelum 7 Maret 2024, kunci data digunakan dalam tugas berbasis kunci operasi, seperti windowing, pengelompokan, dan merge, tidak dilindungi oleh CMEK enkripsi. Untuk mengaktifkan enkripsi ini pada tugas Anda, menguras atau membatalkan tugas, lalu atau memulai ulang. Untuk informasi selengkapnya, lihat Enkripsi artefak status pipeline.
Berbagi data di berbagai pipeline
Tidak ada komunikasi lintas pipeline khusus Dataflow untuk berbagi data atau memproses konteks antar-pipeline. Anda dapat menggunakan penyimpanan yang tahan lama seperti Cloud Storage atau cache dalam memori seperti App Engine berbagi data antar-instance pipeline.
Jadwalkan tugas
Anda dapat mengotomatiskan eksekusi pipeline dengan cara berikut:
- Menggunakan Cloud Scheduler.
- Gunakan Operator Dataflow Apache Airflow, salah satu dari beberapa Operator Google Cloud dalam alur kerja Cloud Composer.
- Jalankan proses tugas kustom (cron job) di Compute Engine.
Praktik terbaik untuk menulis kode pipeline
Bagian berikut memberikan praktik terbaik untuk digunakan saat Anda membuat pipeline dengan menulis kode Apache Beam.
Membuat struktur kode Apache Beam
Untuk membuat pipeline, biasanya Anda menggunakan
ParDo
transformasi Apache Beam dalam pemrosesan paralel.
Saat menerapkan transformasi ParDo
, Anda memberikan kode dalam bentuk
Objek DoFn
. DoFn
adalah class Apache Beam SDK yang menentukan
fungsi pemrosesan.
Anda dapat menganggap kode DoFn
sebagai entitas kecil yang independen: ada
kemungkinan ada banyak {i>instance<i} yang berjalan
pada komputer yang berbeda, masing-masing tanpa
mengetahui orang lain. Oleh karena itu, sebaiknya buat fungsi murni, yang
ideal untuk sifat paralel dan terdistribusi elemen DoFn
.
Fungsi murni memiliki karakteristik berikut:
- Fungsi murni tidak bergantung pada status tersembunyi atau eksternal.
- Tidak ada efek samping yang dapat diamati.
- Metode ini bersifat determenistik.
Model fungsi murni tidak sepenuhnya kaku. Ketika kode Anda tidak bergantung pada
hal-hal yang tidak dijamin oleh layanan Dataflow, status
informasi atau data inisialisasi eksternal dapat valid untuk DoFn
dan data lainnya
objek fungsi.
Saat menyusun struktur ParDo
mengubah dan membuat elemen DoFn
,
pertimbangkan pedoman berikut:
- Saat menggunakan pemrosesan tepat satu kali,
layanan Dataflow menjamin bahwa setiap elemen dalam
input
PCollection
diproses oleh instanceDoFn
tepat satu kali. - Layanan Dataflow tidak menjamin berapa kali
DoFn
dipanggil. - Layanan Dataflow tidak menjamin secara tepat cara elemen terdistribusi dikelompokkan. Hal ini tidak menjamin data yang mana, jika elemen apa pun, yang diproses bersama-sama.
- Layanan Dataflow tidak menjamin jumlah
DoFn
instance dibuat di sepanjang pipeline. - Layanan Dataflow bersifat fault-tolerant dan mungkin mencoba lagi menulis kode beberapa kali jika pekerja mengalami masalah.
- Layanan Dataflow dapat membuat salinan cadangan kode Anda. Masalah mungkin terjadi pada efek samping manual, seperti jika kode Anda bergantung aktif atau membuat file sementara dengan nama yang tidak unik.
- Layanan Dataflow menserialisasi pemrosesan elemen per
DoFn
di instance Compute Engine. Kode Anda tidak harus benar-benar aman untuk thread, tetapi status yang dibagikan di antara beberapa instanceDoFn
harus aman untuk thread.
Membuat library transformasi yang dapat digunakan kembali
Model pemrograman Apache Beam memungkinkan Anda menggunakan kembali transformasi. Dengan membuat {i>shared library<i} yang berisikan berbagai transformasi, Anda dapat meningkatkan penggunaan kembali, pengujian, dan kepemilikan kode oleh tim yang berbeda.
Pertimbangkan dua contoh kode Java berikut, yang keduanya membaca peristiwa pembayaran. Dengan asumsi bahwa kedua pipeline melakukan pemrosesan yang sama, mereka dapat menggunakan metode bertransformasi melalui pustaka bersama untuk langkah pemrosesan yang tersisa.
Contoh pertama adalah dari sumber Pub/Sub tak terbatas:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options)
// Initial read transform
PCollection<PaymentEvent> payments =
p.apply("Read from topic",
PubSubIO.readStrings().withTimestampAttribute(...).fromTopic(...))
.apply("Parse strings into payment events",
ParDo.of(new ParsePaymentEventFn()));
Contoh kedua adalah dari sumber database relasional terikat:
PipelineOptions options = PipelineOptionsFactory.create();
Pipeline p = Pipeline.create(options);
PCollection<PaymentEvent> payments =
p.apply(
"Read from database table",
JdbcIO.<PaymentEvent>read()
.withDataSourceConfiguration(...)
.withQuery(...)
.withRowMapper(new RowMapper<PaymentEvent>() {
...
}));
Cara Anda mengimplementasikan praktik terbaik penggunaan kembali kode bervariasi menurut bahasa pemrograman dan alat build. Misalnya, jika Anda menggunakan Maven, Anda dapat memisahkan kode transformasi ke modul sendiri. Anda kemudian dapat menyertakan sebagai submodul dalam modul project multi-modul untuk berbagai pipeline, seperti yang ditunjukkan pada contoh kode berikut:
// Reuse transforms across both pipelines
payments
.apply("ValidatePayments", new PaymentTransforms.ValidatePayments(...))
.apply("ProcessPayments", new PaymentTransforms.ProcessPayments(...))
...
Untuk informasi selengkapnya, lihat halaman dokumentasi Apache Beam berikut:
- Persyaratan untuk menulis kode pengguna untuk transformasi Apache Beam
- Panduan gaya
PTransform
: panduan gaya untuk penulis koleksiPTransform
baru yang dapat digunakan kembali
Menggunakan antrean yang dihentikan pengirimannya untuk penanganan error
Terkadang pipeline Anda tidak dapat memproses elemen. Masalah data adalah penyebab umumnya. Misalnya, elemen yang berisi JSON dengan format yang tidak benar dapat menyebabkan kegagalan penguraian.
Meskipun Anda dapat menangkap pengecualian dalam
DoFn.ProcessElement
log error, mencatat error, dan menghapus elemen, pendekatan ini kehilangan data
dan mencegah data diperiksa kemudian
untuk penanganan atau pemecahan masalah manual.
Sebagai gantinya, gunakan pola yang disebut antrean huruf mati (antrean pesan yang belum diproses).
Menangkap pengecualian dalam metode dan log DoFn.ProcessElement
yang sama. Alih-alih melepaskan
elemen yang gagal,
menggunakan output cabang untuk menulis elemen yang gagal ke dalam PCollection
terpisah
. Elemen-elemen ini kemudian ditulis ke sink data untuk diperiksa pada lain waktu
dan penanganan dengan transformasi terpisah.
Contoh kode Java berikut menunjukkan cara mengimplementasikan antrean yang dihentikan pengirimannya pola.
TupleTag<Output> successTag = new TupleTag<>() {};
TupleTag<Input> deadLetterTag = new TupleTag<>() {};
PCollection<Input> input = /* ... */;
PCollectionTuple outputTuple =
input.apply(ParDo.of(new DoFn<Input, Output>() {
@Override
void processElement(ProcessContext c) {
try {
c.output(process(c.element()));
} catch (Exception e) {
LOG.severe("Failed to process input {} -- adding to dead-letter file",
c.element(), e);
c.sideOutput(deadLetterTag, c.element());
}
}).withOutputTags(successTag, TupleTagList.of(deadLetterTag)));
// Write the dead-letter inputs to a BigQuery table for later analysis
outputTuple.get(deadLetterTag)
.apply(BigQueryIO.write(...));
// Retrieve the successful elements...
PCollection<Output> success = outputTuple.get(successTag);
// and continue processing ...
Menggunakan Cloud Monitoring menerapkan pemantauan dan peringatan yang berbeda kebijakan untuk antrean pengiriman pesan yang dihentikan pengirimannya di pipeline. Misalnya, Anda dapat memvisualisasikan jumlah dan ukuran elemen yang diproses oleh transformasi huruf mati Anda dan mengonfigurasi pemberitahuan untuk dipicu jika kondisi nilai minimum tertentu terpenuhi.
Menangani mutasi skema
Anda dapat menangani data yang memiliki skema yang tidak terduga namun valid dengan menggunakan karakter yang dihentikan pengirimannya
pola, yang menulis elemen yang gagal ke objek PCollection
terpisah.
Dalam beberapa kasus, Anda ingin
menangani elemen secara otomatis
yang mencerminkan skema bermutasi
sebagai elemen yang valid. Misalnya, jika sebuah elemen
skema mencerminkan mutasi seperti penambahan {i>field<i} baru, Anda dapat menyesuaikan
skema dari sink data untuk mengakomodasi mutasi.
Mutasi skema otomatis bergantung pada pendekatan percabangan-output yang digunakan oleh pola huruf mati. Namun, dalam kasus ini, metode ini memicu transformasi yang bermutasi skema tujuan ketika ada skema tambahan ditemui. Untuk contoh pendekatan ini, lihat Cara menangani mutasi skema JSON di pipeline streaming, dengan Square Enix di blog Google Cloud.
Menentukan cara menggabungkan {i>dataset<i}
Menggabungkan set data adalah kasus penggunaan umum untuk pipeline data. Anda dapat menggunakan
input samping atau transformasi CoGroupByKey
untuk melakukan penggabungan dalam pipeline.
Masing-masing memiliki
kelebihan dan kekurangan.
Input samping
menyediakan cara yang fleksibel untuk memecahkan masalah pemrosesan data umum, seperti
pengayaan dan pencarian berkunci. Tidak seperti objek PCollection
, input samping
dapat diubah dan dapat ditentukan saat runtime. Misalnya, nilai-nilai dalam
{i>side-input<i} mungkin dihitung oleh cabang lain
dalam pipeline Anda atau ditentukan oleh
memanggil layanan jarak jauh.
Dataflow mendukung input samping dengan mempertahankan data menjadi persisten penyimpanan, mirip dengan {i>shared disk<i}. Konfigurasi ini membuat input sisi lengkap tersedia untuk semua pekerja.
Namun, ukuran input samping bisa sangat besar dan mungkin tidak muat dengan memori pekerja. Bacaan dari input sisi yang besar dapat menyebabkan masalah performa jika pekerja perlu terus-menerus membaca dari penyimpanan persisten.
CoGroupByKey
transformasi adalah
transformasi Apache Beam inti
yang menggabungkan (meratakan) beberapa objek PCollection
dan mengelompokkan elemen yang
memiliki kunci yang sama. Tidak seperti {i>input<i} samping, yang membuat seluruh data {i>input<i} samping
tersedia untuk setiap pekerja, CoGroupByKey
akan melakukan operasi acak (pengelompokan)
untuk mendistribusikan data di seluruh pekerja. Oleh karena itu, CoGroupByKey
ideal saat
Objek PCollection
yang ingin Anda gabungkan sangat besar dan tidak muat dengan pekerja
memori.
Ikuti panduan ini untuk membantu memutuskan apakah akan menggunakan input tambahan atau
CoGroupByKey
:
- Gunakan input samping saat salah satu objek
PCollection
yang Anda gabungkan secara tidak proporsional lebih kecil dari yang lain, dan semakin kecil ObjekPCollection
cocok dengan memori worker. Menyimpan input samping ke dalam cache sepenuhnya ke dalam memori membuatnya cepat dan efisien untuk mengambil elemen. - Gunakan input samping jika Anda memiliki objek
PCollection
yang harus digabungkan beberapa kali di pipeline Anda. Daripada menggunakan banyakCoGroupByKey
bertransformasi, buat input sisi tunggal yang dapat digunakan kembali oleh beberapa transformasiParDo
. - Gunakan
CoGroupByKey
jika Anda perlu mengambil sebagian besar ObjekPCollection
yang melebihi memori pekerja secara signifikan.
Untuk informasi selengkapnya, lihat Memecahkan masalah error memori Dataflow habis.
Meminimalkan operasi per elemen yang mahal
Instance DoFn
memproses batch elemen yang disebut
paket,
yang merupakan unit atom dari kerja yang terdiri dari nol atau lebih
yang kurang penting. Masing-masing elemen kemudian
diproses oleh
DoFn.ProcessElement
, yang berjalan untuk setiap elemen. Karena DoFn.ProcessElement
metode dipanggil untuk setiap elemen, setiap elemen yang memakan waktu atau komputasi
operasi mahal yang dipanggil oleh metode tersebut
dijalankan untuk setiap elemen yang diproses oleh metode.
Jika Anda perlu melakukan operasi yang mahal
hanya sekali untuk sekumpulan elemen,
menyertakan operasi tersebut dalam metode DoFn.Setup
atau DoFn.StartBundle
alih-alih dalam elemen DoFn.ProcessElement
. Contohnya mencakup
operasi berikut:
Mengurai file konfigurasi yang mengontrol beberapa aspek
DoFn
perilaku instance secara otomatis. Hanya panggil tindakan ini satu kali, ketika InstanceDoFn
diinisialisasi, dengan menggunakanDoFn.Setup
.Membuat instance klien berumur pendek yang digunakan kembali di semua elemen lain dalam paket, misalnya jika semua elemen dalam paket dikirim melalui satu koneksi jaringan. Panggil tindakan ini satu kali per paket dengan menggunakan
DoFn.StartBundle
.
Membatasi ukuran batch dan panggilan serentak ke layanan eksternal
Saat memanggil layanan eksternal, Anda dapat mengurangi overhead per panggilan menggunakan
GroupIntoBatches
transformasi. Transformasi ini membuat batch elemen dengan ukuran tertentu.
Batching mengirimkan elemen ke layanan eksternal sebagai satu payload, bukan
satu per satu.
Dikombinasikan dengan batch, batasi jumlah maksimum paralel panggilan (serentak) ke layanan eksternal dengan memilih kunci yang sesuai untuk mempartisi data yang masuk. Jumlah partisi menentukan jumlah partisi maksimum paralelisasi. Misalnya, jika setiap elemen diberi kunci yang sama, maka transformasi downstream untuk memanggil layanan eksternal yang tidak berjalan di paralel.
Pertimbangkan salah satu pendekatan berikut dalam menghasilkan kunci untuk elemen:
- Pilih atribut set data yang akan digunakan sebagai kunci data, seperti ID pengguna.
- Buat kunci data untuk membagi elemen secara acak selama jumlah tetap
partisi, di mana jumlah kemungkinan nilai kunci menentukan jumlah
jumlah partisi. Anda perlu membuat partisi yang cukup untuk paralelisme.
Setiap partisi harus memiliki cukup elemen untuk
GroupIntoBatches
ubah agar berguna.
Contoh kode Java berikut menunjukkan cara membagi elemen secara acak ke lebih dari sepuluh elemen partisi:
// PII or classified data which needs redaction.
PCollection<String> sensitiveData = ...;
int numPartitions = 10; // Number of parallel batches to create.
PCollection<KV<Long, Iterable<String>>> batchedData =
sensitiveData
.apply("Assign data into partitions",
ParDo.of(new DoFn<String, KV<Long, String>>() {
Random random = new Random();
@ProcessElement
public void assignRandomPartition(ProcessContext context) {
context.output(
KV.of(randomPartitionNumber(), context.element()));
}
private static int randomPartitionNumber() {
return random.nextInt(numPartitions);
}
}))
.apply("Create batches of sensitive data",
GroupIntoBatches.<Long, String>ofSize(100L));
// Use batched sensitive data to fully utilize Redaction API,
// which has a rate limit but allows large payloads.
batchedData
.apply("Call Redaction API in batches", callRedactionApiOnBatch());
Mengidentifikasi masalah performa yang disebabkan oleh langkah-langkah gabungan
Dataflow membuat grafik langkah-langkah yang mewakili pipeline Anda berdasarkan transformasi dan data yang Anda gunakan untuk membangunnya. Grafik ini disebut tindakan grafik eksekusi pipeline.
Ketika Anda men-deploy pipeline, Dataflow mungkin akan memodifikasi
grafik eksekusi pipeline Anda
untuk meningkatkan performa. Misalnya, Dataflow
dapat menggabungkan beberapa operasi, sebuah proses yang dikenal sebagai
pengoptimalan fusi,
untuk menghindari dampak performa dan biaya dari penulisan setiap perantara
PCollection
di pipeline Anda.
Dalam beberapa kasus, Dataflow mungkin salah menentukan cara optimal untuk menggabungkan berbagai operasi di pipeline, yang dapat membatasi kemampuan pekerjaan Anda untuk memanfaatkan semua pekerja yang tersedia. Dalam kasus tersebut, Anda dapat mencegah operasi menyatu.
Pertimbangkan contoh kode Apache Beam berikut. J
GenerateSequence
transformasi membuat objek PCollection
kecil yang dibatasi, yang kemudian lebih jauh
diproses oleh dua transformasi ParDo
downstream.
Transformasi Find Primes Less-than-N
mungkin mahal secara komputasi dan lebih
mungkin berjalan lambat untuk jumlah yang besar. Sebaliknya,
Transformasi Increment Number
mungkin selesai dengan cepat.
import com.google.common.math.LongMath;
...
public class FusedStepsPipeline {
final class FindLowerPrimesFn extends DoFn<Long, String> {
@ProcessElement
public void processElement(ProcessContext c) {
Long n = c.element();
if (n > 1) {
for (long i = 2; i < n; i++) {
if (LongMath.isPrime(i)) {
c.output(Long.toString(i));
}
}
}
}
}
public static void main(String[] args) {
Pipeline p = Pipeline.create(options);
PCollection<Long> sequence = p.apply("Generate Sequence",
GenerateSequence
.from(0)
.to(1000000));
// Pipeline branch 1
sequence.apply("Find Primes Less-than-N",
ParDo.of(new FindLowerPrimesFn()));
// Pipeline branch 2
sequence.apply("Increment Number",
MapElements.via(new SimpleFunction<Long, Long>() {
public Long apply(Long n) {
return ++n;
}
}));
p.run().waitUntilFinish();
}
}
Diagram berikut menunjukkan representasi grafis pipeline dalam Antarmuka pemantauan Dataflow.
Tujuan
Antarmuka pemantauan Dataflow
menunjukkan bahwa laju pemrosesan lambat yang sama
terjadi untuk kedua transformasi, khususnya 13
elemen per detik. Anda mungkin mengharapkan transformasi Increment Number
diproses
elemen dengan cepat, tetapi tampaknya terikat pada tingkat yang sama
diproses sebagai Find Primes Less-than-N
.
Alasannya adalah Dataflow menggabungkan langkah-langkah menjadi satu
yang mencegahnya berjalan secara independen. Anda dapat menggunakan
gcloud dataflow jobs describe
untuk menemukan informasi selengkapnya:
gcloud dataflow jobs describe --full job-id --format json
Dalam output yang dihasilkan, langkah-langkah yang menyatu dijelaskan dalam
ExecutionStageSummary
objek di
ComponentTransform
array:
...
"executionPipelineStage": [
{
"componentSource": [
...
],
"componentTransform": [
{
"name": "s1",
"originalTransform": "Generate Sequence/Read(BoundedCountingSource)",
"userName": "Generate Sequence/Read(BoundedCountingSource)"
},
{
"name": "s2",
"originalTransform": "Find Primes Less-than-N",
"userName": "Find Primes Less-than-N"
},
{
"name": "s3",
"originalTransform": "Increment Number/Map",
"userName": "Increment Number/Map"
}
],
"id": "S01",
"kind": "PAR_DO_KIND",
"name": "F0"
}
...
Dalam skenario ini, karena transformasi Find Primes Less-than-N
adalah langkah lambat,
memecah fusi sebelum langkah tersebut
adalah strategi yang tepat. Satu metode untuk
langkah-langkah penggabungan adalah menyisipkan
GroupByKey
ubah dan pisahkan sebelum langkah, seperti yang ditunjukkan pada kode Java berikut
contoh.
sequence
.apply("Map Elements", MapElements.via(new SimpleFunction<Long, KV<Long, Void>>() {
public KV<Long, Void> apply(Long n) {
return KV.of(n, null);
}
}))
.apply("Group By Key", GroupByKey.<Long, Void>create())
.apply("Emit Keys", Keys.<Long>create())
.apply("Find Primes Less-than-N", ParDo.of(new FindLowerPrimesFn()));
Anda juga dapat menggabungkan langkah-langkah penggabungan ini menjadi transformasi komposit yang dapat digunakan kembali.
Setelah Anda membatalkan langkah-langkah tersebut, saat Anda menjalankan pipeline, Increment Number
diselesaikan dalam hitungan detik, dan waktu yang lebih lama
Transformasi Find Primes Less-than-N
berjalan dalam tahap terpisah.
Contoh ini menerapkan operasi grup dan pemisahan untuk membatalkan langkah.
Anda dapat menggunakan pendekatan lain untuk situasi lain. Dalam hal ini, penanganan
output duplikat tidak menjadi masalah, mengingat output berturut-turut
Transformasi GenerateSequence
.
KV
objek dengan kunci duplikat dihapus duplikatnya menjadi satu kunci dalam grup
Transformasi (GroupByKey
)
dan ungroup
(Keys
)
transformasi. Untuk mempertahankan duplikat setelah
mengelompokkan dan memisahkan operasi,
buat pasangan nilai kunci dengan menggunakan langkah-langkah berikut:
- Gunakan kunci acak dan input asli sebagai nilai.
- Kelompokkan menggunakan kunci acak.
- Berikan nilai untuk setiap kunci sebagai output.
Anda juga dapat menggunakan
Reshuffle
transform untuk mencegah fusi transformasi di sekitarnya. Namun, efek samping
Transformasi Reshuffle
tidak portabel di berbagai
Runner Apache Beam.
Untuk informasi selengkapnya tentang paralelisme dan pengoptimalan fusi, lihat Siklus proses pipeline.
Menggunakan metrik Apache Beam untuk mengumpulkan insight pipeline
Metrik Apache Beam adalah kelas utilitas yang menghasilkan metrik untuk yang melaporkan properti dari pipa yang berjalan. Jika Anda menggunakan Cloud Monitoring, Metrik Apache Beam tersedia sebagai metrik kustom Cloud Monitoring.
Contoh berikut menunjukkan Apache Beam
Counter
metrik
digunakan di subclass DoFn
.
Kode contoh menggunakan dua penghitung. Satu penghitung melacak kegagalan penguraian JSON
(malformedCounter
), dan penghitung lainnya melacak apakah pesan JSON
valid, tetapi berisi payload kosong (emptyCounter
). Dalam Cloud Monitoring,
nama metrik kustom adalah custom.googleapis.com/dataflow/malformedJson
dan
custom.googleapis.com/dataflow/emptyPayload
. Anda dapat menggunakan metrik kustom
untuk membuat visualisasi dan kebijakan pemberitahuan di Cloud Monitoring.
final TupleTag<String> errorTag = new TupleTag<String>(){};
final TupleTag<MockObject> successTag = new TupleTag<MockObject>(){};
final class ParseEventFn extends DoFn<String, MyObject> {
private final Counter malformedCounter = Metrics.counter(ParseEventFn.class, "malformedJson");
private final Counter emptyCounter = Metrics.counter(ParseEventFn.class, "emptyPayload");
private Gson gsonParser;
@Setup
public setup() {
gsonParser = new Gson();
}
@ProcessElement
public void processElement(ProcessContext c) {
try {
MyObject myObj = gsonParser.fromJson(c.element(), MyObject.class);
if (myObj.getPayload() != null) {
// Output the element if non-empty payload
c.output(successTag, myObj);
}
else {
// Increment empty payload counter
emptyCounter.inc();
}
}
catch (JsonParseException e) {
// Increment malformed JSON counter
malformedCounter.inc();
// Output the element to dead-letter queue
c.output(errorTag, c.element());
}
}
}
Pelajari lebih lanjut
Halaman berikut memberikan informasi lebih lanjut tentang cara menyusun pipeline, bagaimana memilih transformasi yang akan diterapkan pada data Anda, dan apa yang harus dipertimbangkan memilih metode input dan output pipeline Anda.
Untuk informasi selengkapnya tentang cara membuat kode pengguna, lihat persyaratan untuk fungsi yang disediakan pengguna.