Grafik tugas Dataflow

Saat Anda memilih tugas Dataflow tertentu, antarmuka pemantauan akan memberikan representasi grafis tugas Anda: grafik tugas. Halaman grafik tugas di konsol juga menyediakan ringkasan tugas, log tugas, dan informasi tentang setiap langkah dalam pipeline.

Grafik tugas pipeline merepresentasikan setiap transformasi dalam pipeline sebagai kotak. Setiap kotak berisi nama transformasi dan informasi tentang status tugas, yang mencakup hal berikut:

  • Berjalan: langkah sedang berjalan
  • Diantrekan: langkah dalam tugas FlexRS diantrekan
  • Berhasil: langkah berhasil diselesaikan
  • Dihentikan: langkah dihentikan karena tugas dihentikan
  • Tidak diketahui: langkah gagal melaporkan status
  • Gagal: langkah gagal diselesaikan

Secara default, halaman grafik tugas menampilkan Tampilan grafik. Untuk melihat grafik tugas sebagai tabel, di Tampilan langkah tugas, pilih Tampilan tabel. Tampilan tabel berisi informasi yang sama dalam format yang berbeda. Tampilan tabel berguna dalam skenario berikut:

  • Tugas Anda memiliki banyak tahap, sehingga grafik tugas sulit dinavigasi.
  • Anda ingin mengurutkan langkah-langkah tugas berdasarkan properti tertentu. Misalnya, Anda dapat mengurutkan tabel menurut waktu tunggu untuk mengidentifikasi langkah yang lambat.

Grafik tugas dasar

Kode Pipeline:

Java

  // Read the lines of the input text.
  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
     // Count the words.
     .apply(new CountWords())
     // Write the formatted word counts to output.
     .apply("WriteCounts", TextIO.write().to(options.getOutput()));

Python

(
    pipeline
    # Read the lines of the input text.
    | 'ReadLines' >> beam.io.ReadFromText(args.input_file)
    # Count the words.
    | CountWords()
    # Write the formatted word counts to output.
    | 'WriteCounts' >> beam.io.WriteToText(args.output_path))

Go

  // Create the pipeline.
  p := beam.NewPipeline()
    s := p.Root()
  // Read the lines of the input text.
  lines := textio.Read(s, *input)
  // Count the words.
  counted := beam.ParDo(s, CountWords, lines)
  // Write the formatted word counts to output.
  textio.Write(s, *output, formatted)
Grafik tugas:

Grafik eksekusi untuk pipeline WordCount seperti yang ditunjukkan di antarmuka pemantauan Dataflow.

Gambar 1: Kode pipeline untuk pipeline WordCount yang ditampilkan dengan grafik eksekusi yang dihasilkan di antarmuka pemantauan Dataflow.

Transformasi gabungan

Dalam grafik tugas, transformasi gabungan, transformasi yang berisi beberapa sub-transformasi bertingkat, dapat diluaskan. Transformasi komposit yang dapat diperluas ditandai dengan panah di grafik. Untuk meluaskan transformasi dan melihat sub-transformasi, klik panah.

Kode Pipeline:

Java

  // The CountWords Composite Transform
  // inside the WordCount pipeline.

  public static class CountWords
    extends PTransform<PCollection<String>, PCollection<String>> {

    @Override
    public PCollection<String> apply(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }

Python

# The CountWords Composite Transform inside the WordCount pipeline.
@beam.ptransform_fn
def CountWords(pcoll):
  return (
      pcoll
      # Convert lines of text into individual words.
      | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
      # Count the number of times each word occurs.
      | beam.combiners.Count.PerElement()
      # Format each word and count into a printable string.
      | 'FormatCounts' >> beam.ParDo(FormatCountsFn()))

Go

  // The CountWords Composite Transform inside the WordCount pipeline.
  func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
    s = s.Scope("CountWords")

    // Convert lines of text into individual words.
    col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)

    // Count the number of times each word occurs.
    return stats.Count(s, col)
  }
Grafik tugas:

Grafik tugas untuk pipeline WordCount dengan transformasi CountWords diperluas untuk menampilkan transformasi komponennya.

Gambar 2: Kode pipeline untuk sub-langkah transformasi CountWords. Ditampilkan dengan grafik tugas yang diperluas untuk seluruh pipeline.

Dalam kode pipeline, Anda dapat menggunakan kode berikut untuk memanggil transformasi komposit:

result = transform.apply(input);

Transformasi gabungan yang dipanggil dengan cara ini akan menghapus tingkatan yang diharapkan dan mungkin muncul diperluas di antarmuka pemantauan Dataflow. Pipeline Anda mungkin juga menghasilkan peringatan atau error tentang nama unik yang stabil pada waktu eksekusi pipeline.

Untuk menghindari masalah ini, panggil transformasi Anda menggunakan format yang direkomendasikan:

result = input.apply(transform);

Mengubah nama

Dataflow memiliki beberapa cara berbeda untuk mendapatkan nama transformasi yang ditampilkan dalam grafik tugas pemantauan. Nama transformasi digunakan di tempat yang dapat dilihat oleh publik, termasuk antarmuka pemantauan Dataflow, file log, dan alat proses debug. Jangan gunakan nama transformasi yang menyertakan informasi identitas pribadi, seperti nama pengguna atau nama organisasi.

Java

  • Dataflow dapat menggunakan nama yang Anda tetapkan saat Anda menerapkan transformasi. Argumen pertama yang Anda berikan ke metode apply adalah nama transformasi Anda.
  • Dataflow dapat menyimpulkan nama transformasi, baik dari nama class, jika Anda mem-build transformasi kustom, atau nama objek fungsi DoFn, jika Anda menggunakan transformasi inti seperti ParDo.

Python

  • Dataflow dapat menggunakan nama yang Anda tetapkan saat Anda menerapkan transformasi. Anda dapat menetapkan nama transformasi dengan menentukan argumen label transformasi.
  • Dataflow dapat menyimpulkan nama transformasi, baik dari nama class, jika Anda mem-build transformasi kustom, atau nama objek fungsi DoFn, jika Anda menggunakan transformasi inti seperti ParDo.

Go

  • Dataflow dapat menggunakan nama yang Anda tetapkan saat Anda menerapkan transformasi. Anda dapat menetapkan nama transformasi dengan menentukan Scope.
  • Dataflow dapat menyimpulkan nama transformasi, baik dari nama struct jika Anda menggunakan DoFn struktural atau dari nama fungsi jika Anda menggunakan DoFn fungsional.

Memahami metrik

Bagian ini memberikan detail tentang metrik yang terkait dengan grafik tugas.

Waktu proses

Saat Anda mengklik langkah, metrik Waktu berjalan akan ditampilkan di panel Info langkah. Waktu berjalan memberikan perkiraan total waktu yang dihabiskan di semua thread di semua pekerja pada tindakan berikut:

  • Melakukan inisialisasi langkah
  • Memproses data
  • Mengacak data
  • Mengakhiri langkah

Untuk langkah gabungan, waktu aktual memberi tahu Anda jumlah waktu yang dihabiskan dalam langkah komponen. Estimasi ini dapat membantu Anda mengidentifikasi langkah-langkah yang lambat dan mendiagnosis bagian pipeline mana yang memerlukan waktu lebih lama dari yang diperlukan.

Anda dapat melihat jumlah waktu yang diperlukan untuk menjalankan langkah di pipeline.
Gambar 3: Metrik Waktu proses dapat membantu Anda memastikan pipeline berjalan secara efisien.

Metrik input tambahan

Metrik input samping menunjukkan pengaruh pola dan algoritma akses input samping terhadap performa pipeline Anda. Saat pipeline Anda menggunakan input samping, Dataflow akan menulis koleksi ke lapisan persisten, seperti disk, dan transformasi Anda akan dibaca dari koleksi persisten ini. Operasi baca dan tulis ini memengaruhi waktu proses tugas Anda.

Antarmuka pemantauan Dataflow menampilkan metrik input tambahan saat Anda memilih transformasi yang membuat atau menggunakan koleksi input tambahan. Anda dapat melihat metrik di bagian Side Input Metrics pada panel Step info.

Transformasi yang membuat input tambahan

Jika transformasi yang dipilih membuat kumpulan input tambahan, bagian Side Input Metrics akan menampilkan nama kumpulan, beserta metrik berikut:

  • Waktu yang dihabiskan untuk menulis: Waktu yang dihabiskan untuk menulis kumpulan input samping.
  • Byte yang ditulis: Jumlah total byte yang ditulis ke koleksi input samping.
  • Waktu & byte yang dibaca dari side input: Tabel yang berisi metrik tambahan untuk semua transformasi yang menggunakan pengumpulan side input, yang disebut konsumen side input.

Tabel Waktu & byte yang dibaca dari side input berisi informasi berikut untuk setiap konsumen side input:

  • Konsumen input samping: Nama transformasi konsumen input samping.
  • Waktu yang dihabiskan untuk membaca: Waktu yang dihabiskan konsumen ini untuk membaca koleksi input samping.
  • Byte yang dibaca: Jumlah byte yang dibaca konsumen ini dari koleksi input samping.

Jika pipeline Anda memiliki transformasi gabungan yang membuat input samping, perluas transformasi gabungan hingga Anda melihat subtransformasi tertentu yang membuat input samping. Kemudian, pilih subtransformasi tersebut untuk melihat bagian Side Input Metrics.

Gambar 4 menunjukkan metrik input samping untuk transformasi yang membuat pengumpulan input samping.

Anda dapat memilih subtransformasi dan metrik input tambahannya
         terlihat di panel samping Info langkah.
Gambar 4: Grafik tugas memiliki transformasi komposit yang diperluas (MakeMapView). Subtransformasi yang membuat input samping (CreateDataflowView) dipilih, dan metrik input samping terlihat di panel samping Info langkah.

Transformasi yang menggunakan satu atau beberapa input samping

Jika transformasi yang dipilih menggunakan satu atau beberapa input tambahan, bagian Metrik Input Tambahan akan menampilkan tabel Waktu & byte yang dibaca dari input tambahan. Tabel ini berisi informasi berikut untuk setiap pengumpulan input sisi:

  • Koleksi input samping: Nama koleksi input samping.
  • Waktu yang dihabiskan untuk membaca: Waktu yang dihabiskan transformasi untuk membaca kumpulan input samping ini.
  • Byte yang dibaca: Jumlah byte yang dibaca transformasi dari koleksi input samping ini.

Jika pipeline Anda memiliki transformasi gabungan yang membaca input samping, perluas transformasi gabungan hingga Anda melihat subtransformasi tertentu yang membaca input samping. Kemudian, pilih subtransformasi tersebut untuk melihat bagian Side Input Metrics.

Gambar 5 menunjukkan metrik input tambahan untuk transformasi yang membaca dari kumpulan input tambahan.

Anda dapat memilih transformasi dan metrik input tambahannya
         terlihat di panel samping Info langkah.
Gambar 5: Transformasi JoinBothCollections membaca dari koleksi input samping. JoinBothCollections dipilih di grafik tugas dan metrik input samping terlihat di panel samping Info langkah.

Mengidentifikasi masalah performa input samping

Pengulangan adalah masalah umum terkait performa input samping. Jika PCollection input samping Anda terlalu besar, pekerja tidak dapat meng-cache seluruh koleksi dalam memori. Akibatnya, pekerja harus berulang kali membaca dari kumpulan input sisi persisten.

Pada gambar 6, metrik input samping menunjukkan bahwa total byte yang dibaca dari koleksi input samping jauh lebih besar dari ukuran koleksi, total byte yang ditulis.

Anda dapat memilih transformasi dan metrik input tambahannya
         terlihat di panel samping Info langkah.
Gambar 6: Contoh iterasi ulang. Pengumpulan input samping adalah 563 MB, dan jumlah byte yang dibaca dengan menggunakan transformasi hampir 12 GB.

Untuk meningkatkan performa pipeline ini, desain ulang algoritma Anda agar tidak melakukan iterasi atau mengambil ulang data input samping. Dalam contoh ini, pipeline membuat produk Kartesius dari dua koleksi. Algoritme melakukan iterasi di seluruh koleksi input samping untuk setiap elemen koleksi utama. Anda dapat meningkatkan pola akses pipeline dengan mengelompokkan beberapa elemen koleksi utama secara bersamaan. Perubahan ini mengurangi frekuensi pekerja harus membaca ulang koleksi input samping.

Masalah performa umum lainnya dapat terjadi jika pipeline Anda melakukan join dengan menerapkan ParDo dengan satu atau beberapa input samping yang besar. Dalam hal ini, pekerja menghabiskan sebagian besar waktu pemrosesan untuk operasi join yang membaca dari koleksi input samping.

Gambar 7 menunjukkan contoh metrik input sisi untuk masalah ini:

Anda dapat memilih transformasi dan metrik input tambahannya
         terlihat di panel samping Info langkah.
Gambar 7: Transformasi JoinBothCollections memiliki total waktu pemrosesan 18 menit 31 detik. Pekerja menghabiskan sebagian besar waktu pemrosesan (10 menit 3 detik) untuk membaca dari koleksi input samping 10 GB.

Untuk meningkatkan performa pipeline ini, gunakan CoGroupByKey, bukan input samping.