Saat Anda memilih tugas Dataflow tertentu, antarmuka pemantauan memberikan representasi grafis dari pipeline Anda: grafik tugas. Halaman grafik tugas di konsol juga menyediakan ringkasan tugas, log tugas, dan informasi tentang setiap langkah dalam pipeline.
Grafik tugas pipeline menampilkan setiap transformasi dalam pipeline sebagai kotak. Setiap kotak berisi nama transformasi dan informasi tentang status tugas, yang mencakup hal berikut:
- Running: langkah sedang berjalan.
- Queued: langkah dalam tugas FlexRS diantrekan.
- Sukses: langkah berhasil diselesaikan.
- Dihentikan: langkah dihentikan karena tugas dihentikan.
- Tidak diketahui: langkah gagal melaporkan status.
- Gagal: langkah gagal diselesaikan.
Grafik tugas dasar
Kode Pipeline:
Java// Read the lines of the input text. p.apply("ReadLines", TextIO.read().from(options.getInputFile())) // Count the words. .apply(new CountWords()) // Write the formatted word counts to output. .apply("WriteCounts", TextIO.write().to(options.getOutput())); Python( pipeline # Read the lines of the input text. | 'ReadLines' >> beam.io.ReadFromText(args.input_file) # Count the words. | CountWords() # Write the formatted word counts to output. | 'WriteCounts' >> beam.io.WriteToText(args.output_path)) Go// Create the pipeline. p := beam.NewPipeline() s := p.Root() // Read the lines of the input text. lines := textio.Read(s, *input) // Count the words. counted := beam.ParDo(s, CountWords, lines) // Write the formatted word counts to output. textio.Write(s, *output, formatted) |
Grafik pekerjaan:
|
Transformasi gabungan
Dalam grafik tugas, transformasi gabungan, transformasi yang berisi beberapa sub-transformasi bertingkat, dapat diperluas. Transformasi gabungan yang dapat diperluas ditandai dengan panah dalam grafik. Klik panah untuk memperluas transformasi dan melihat sub-transformasi di dalamnya.
Kode Pipeline:
Java// The CountWords Composite Transform // inside the WordCount pipeline. public static class CountWords extends PTransform<PCollection<String>, PCollection<String>> { @Override public PCollection<String> apply(PCollection<String> lines) { // Convert lines of text into individual words. PCollection<String> words = lines.apply( ParDo.of(new ExtractWordsFn())); // Count the number of times each word occurs. PCollection<KV<String, Long>> wordCounts = words.apply(Count.<String>perElement()); return wordCounts; } } Python# The CountWords Composite Transform inside the WordCount pipeline. @beam.ptransform_fn def CountWords(pcoll): return ( pcoll # Convert lines of text into individual words. | 'ExtractWords' >> beam.ParDo(ExtractWordsFn()) # Count the number of times each word occurs. | beam.combiners.Count.PerElement() # Format each word and count into a printable string. | 'FormatCounts' >> beam.ParDo(FormatCountsFn())) Go// The CountWords Composite Transform inside the WordCount pipeline. func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection { s = s.Scope("CountWords") // Convert lines of text into individual words. col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines) // Count the number of times each word occurs. return stats.Count(s, col) } |
Grafik pekerjaan:
|
Dalam kode pipeline, Anda mungkin telah memanggil transformasi gabungan seperti berikut:
result = transform.apply(input);
Transformasi gabungan yang dipanggil dengan cara ini menghilangkan penyusunan bertingkat yang diharapkan, sehingga dapat diperluas di Antarmuka Pemantauan Dataflow. Pipeline Anda juga dapat menghasilkan peringatan atau error tentang nama unik yang stabil pada waktu eksekusi pipeline.
Untuk menghindari masalah ini, pastikan Anda memanggil transformasi menggunakan format yang direkomendasikan:
result = input.apply(transform);
Mengubah nama
Dataflow memiliki beberapa cara untuk mendapatkan nama transformasi yang ditampilkan dalam grafik tugas pemantauan.
Java
- Dataflow dapat menggunakan nama yang Anda tetapkan saat menerapkan transformasi. Argumen pertama yang Anda berikan ke metode
apply
adalah nama transformasi Anda. - Dataflow dapat menyimpulkan nama transformasi, baik dari nama class (jika Anda telah membuat transformasi kustom) maupun nama objek fungsi
DoFn
(jika Anda menggunakan transformasi inti sepertiParDo
).
Python
- Dataflow dapat menggunakan nama yang Anda tetapkan saat menerapkan transformasi. Anda dapat menetapkan nama transformasi dengan menentukan argumen
label
transformasi. - Dataflow dapat menyimpulkan nama transformasi, baik dari nama class (jika Anda telah membuat transformasi kustom) maupun nama objek fungsi
DoFn
(jika Anda menggunakan transformasi inti sepertiParDo
).
Go
- Dataflow dapat menggunakan nama yang Anda tetapkan saat menerapkan transformasi. Anda dapat menetapkan nama transformasi dengan menentukan
Scope
. - Dataflow dapat menyimpulkan nama transformasi, baik dari nama struct jika Anda menggunakan
DoFn
struktural maupun dari nama fungsi jika Anda menggunakanDoFn
fungsional.
Memahami metrik
Bagian ini memberikan detail tentang metrik yang terkait dengan grafik tugas.
Waktu proses
Saat Anda mengklik langkah, metrik Waktu dinding ditampilkan di panel Info langkah. Waktu proses menampilkan total perkiraan waktu yang dihabiskan di semua thread di semua pekerja untuk tindakan berikut:
- Melakukan inisialisasi langkah
- Memproses data
- Mengacak data
- Mengakhiri langkah
Untuk langkah gabungan, waktu proses menunjukkan jumlah waktu yang dihabiskan dalam langkah-langkah komponen. Perkiraan ini dapat membantu Anda mengidentifikasi langkah-langkah yang lambat dan mendiagnosis bagian pipeline mana yang memerlukan waktu lebih lama dari yang diperlukan.
Metrik input samping
Metrik input samping menunjukkan pengaruh pola dan algoritma akses input samping Anda terhadap performa pipeline. Jika pipeline Anda menggunakan input samping, Dataflow akan menulis koleksi ke lapisan persisten, seperti disk, dan transformasi Anda dibaca dari koleksi persisten ini. Pembacaan dan penulisan ini memengaruhi waktu proses tugas Anda.
Antarmuka pemantauan Dataflow menampilkan metrik input samping saat Anda memilih transformasi yang membuat atau menggunakan kumpulan input samping. Anda dapat melihat metrik di bagian Metrik Input Samping pada panel Info langkah.
Transformasi yang membuat input samping
Jika transformasi yang dipilih membuat kumpulan input samping, bagian Metrik Input Sisi akan menampilkan nama kumpulan tersebut, beserta metrik berikut:
- Waktu yang dihabiskan untuk menulis: Waktu yang dihabiskan untuk menulis kumpulan input samping.
- Byte ditulis: Total jumlah byte yang ditulis ke koleksi input samping.
- Membaca waktu & byte dari input samping: Tabel yang berisi metrik tambahan untuk semua transformasi yang menggunakan pengumpulan input samping, yang disebut konsumen input samping.
Tabel Waktu & byte baca dari input samping berisi informasi berikut untuk setiap konsumen input sisi:
- Konsumen input samping: Nama transformasi konsumen input samping.
- Waktu yang dihabiskan untuk membaca: Waktu yang dihabiskan konsumen ini untuk membaca kumpulan input samping.
- Byte dibaca: Jumlah byte yang dibaca konsumen ini dari koleksi input samping.
Jika pipeline Anda memiliki transformasi gabungan yang membuat input samping, luaskan transformasi komposit sampai Anda melihat subtransformasi tertentu yang membuat input samping. Kemudian, pilih subtransformasi tersebut untuk melihat bagian Side Input Metrics.
Gambar 4 menunjukkan metrik input samping untuk transformasi yang membuat kumpulan input samping.
Transformasi yang menggunakan satu atau beberapa input samping
Jika transformasi yang dipilih menggunakan satu atau beberapa input samping, bagian Metrik Input Sisi menampilkan tabel Waktu & byte yang dibaca dari input samping. Tabel ini berisi informasi berikut untuk setiap koleksi input sisi:
- Pengumpulan input samping: Nama koleksi input samping.
- Waktu yang dihabiskan untuk membaca: Waktu yang dihabiskan oleh transformasi untuk membaca koleksi input sisi ini.
- Byte dibaca: Jumlah byte yang dibaca transformasi dari koleksi input samping ini.
Jika pipeline Anda memiliki transformasi gabungan yang membaca input samping, luaskan transformasi komposit sampai Anda melihat subtransformasi tertentu yang membaca input samping. Kemudian, pilih subtransformasi tersebut untuk melihat bagian Side Input Metrics.
Gambar 5 menunjukkan metrik input samping untuk transformasi yang membaca dari koleksi input samping.
Mengidentifikasi masalah performa input samping
Pengulangan adalah masalah performa input samping yang umum. Jika PCollection
input samping
Anda terlalu besar, pekerja tidak dapat meng-cache seluruh koleksi di memori.
Akibatnya, pekerja harus berulang kali membaca dari koleksi input
sisi persisten.
Pada Gambar 6, metrik input samping menunjukkan bahwa total byte yang dibaca dari kumpulan input samping jauh lebih besar daripada ukuran koleksi, total byte yang ditulis.
Untuk meningkatkan performa pipeline ini, desain ulang algoritma Anda untuk menghindari iterasi atau pengambilan kembali data input samping. Dalam contoh ini, pipeline membuat produk Kartesius dari dua koleksi. Algoritma melakukan iterasi di seluruh koleksi input samping untuk setiap elemen koleksi utama. Anda dapat meningkatkan pola akses pipeline dengan mengelompokkan beberapa elemen dari koleksi utama secara bersamaan. Perubahan ini mengurangi frekuensi pekerja harus membaca ulang pengumpulan input samping.
Masalah performa umum lainnya dapat terjadi jika pipeline Anda melakukan penggabungan
dengan menerapkan ParDo
dengan satu atau beberapa input sisi besar. Dalam hal ini,
pekerja menghabiskan sebagian besar waktu pemrosesan untuk operasi gabungan yang membaca dari
koleksi input samping.
Gambar 7 menunjukkan contoh metrik input samping untuk masalah ini:
Untuk meningkatkan performa pipeline ini, gunakan CoGroupByKey, bukan input samping.