Grafik tugas Dataflow

Saat Anda memilih tugas Dataflow tertentu, antarmuka pemantauan memberikan representasi grafis dari pipeline Anda: grafik tugas. Halaman grafik tugas di konsol juga menyediakan ringkasan tugas, log tugas, dan informasi tentang setiap langkah dalam pipeline.

Grafik tugas pipeline menampilkan setiap transformasi dalam pipeline sebagai kotak. Setiap kotak berisi nama transformasi dan informasi tentang status tugas, yang mencakup hal berikut:

  • Running: langkah sedang berjalan.
  • Queued: langkah dalam tugas FlexRS diantrekan.
  • Sukses: langkah berhasil diselesaikan.
  • Dihentikan: langkah dihentikan karena tugas dihentikan.
  • Tidak diketahui: langkah gagal melaporkan status.
  • Gagal: langkah gagal diselesaikan.

Grafik tugas dasar

Kode Pipeline:

Java


  // Read the lines of the input text.
  p.apply("ReadLines", TextIO.read().from(options.getInputFile()))
     // Count the words.
     .apply(new CountWords())
     // Write the formatted word counts to output.
     .apply("WriteCounts", TextIO.write().to(options.getOutput()));

Python


(
    pipeline
    # Read the lines of the input text.
    | 'ReadLines' >> beam.io.ReadFromText(args.input_file)
    # Count the words.
    | CountWords()
    # Write the formatted word counts to output.
    | 'WriteCounts' >> beam.io.WriteToText(args.output_path))

Go


  // Create the pipeline.
  p := beam.NewPipeline()
    s := p.Root()
  // Read the lines of the input text.
  lines := textio.Read(s, *input)
  // Count the words.
  counted := beam.ParDo(s, CountWords, lines)
  // Write the formatted word counts to output.
  textio.Write(s, *output, formatted)

Grafik pekerjaan:

Grafik eksekusi untuk pipeline WordCount seperti yang ditampilkan di antarmuka pemantauan
              Dataflow.

Gambar 1: Kode pipeline untuk pipeline WordCount ditampilkan dengan grafik eksekusi yang dihasilkan di antarmuka pemantauan Dataflow.

Transformasi gabungan

Dalam grafik tugas, transformasi gabungan, transformasi yang berisi beberapa sub-transformasi bertingkat, dapat diperluas. Transformasi gabungan yang dapat diperluas ditandai dengan panah dalam grafik. Klik panah untuk memperluas transformasi dan melihat sub-transformasi di dalamnya.

Kode Pipeline:

Java


  // The CountWords Composite Transform
  // inside the WordCount pipeline.

  public static class CountWords
    extends PTransform<PCollection<String>, PCollection<String>> {

    @Override
    public PCollection<String> apply(PCollection<String> lines) {

      // Convert lines of text into individual words.
      PCollection<String> words = lines.apply(
        ParDo.of(new ExtractWordsFn()));

      // Count the number of times each word occurs.
      PCollection<KV<String, Long>> wordCounts =
        words.apply(Count.<String>perElement());

      return wordCounts;
    }
  }

Python


# The CountWords Composite Transform inside the WordCount pipeline.
@beam.ptransform_fn
def CountWords(pcoll):
  return (
      pcoll
      # Convert lines of text into individual words.
      | 'ExtractWords' >> beam.ParDo(ExtractWordsFn())
      # Count the number of times each word occurs.
      | beam.combiners.Count.PerElement()
      # Format each word and count into a printable string.
      | 'FormatCounts' >> beam.ParDo(FormatCountsFn()))

Go


  // The CountWords Composite Transform inside the WordCount pipeline.
  func CountWords(s beam.Scope, lines beam.PCollection) beam.PCollection {
    s = s.Scope("CountWords")

    // Convert lines of text into individual words.
    col := beam.ParDo(s, &extractFn{SmallWordLength: *smallWordLength}, lines)

    // Count the number of times each word occurs.
    return stats.Count(s, col)
  }
Grafik pekerjaan:

Grafik tugas untuk pipeline WordCount dengan transformasi CountWords diperluas
              untuk menampilkan transformasi komponennya.

Gambar 2: Kode pipeline untuk sub-langkah transformasi CountWords ditampilkan dengan grafik tugas yang diperluas untuk seluruh pipeline.

Dalam kode pipeline, Anda mungkin telah memanggil transformasi gabungan seperti berikut:

result = transform.apply(input);

Transformasi gabungan yang dipanggil dengan cara ini menghilangkan penyusunan bertingkat yang diharapkan, sehingga dapat diperluas di Antarmuka Pemantauan Dataflow. Pipeline Anda juga dapat menghasilkan peringatan atau error tentang nama unik yang stabil pada waktu eksekusi pipeline.

Untuk menghindari masalah ini, pastikan Anda memanggil transformasi menggunakan format yang direkomendasikan:

result = input.apply(transform);

Mengubah nama

Dataflow memiliki beberapa cara untuk mendapatkan nama transformasi yang ditampilkan dalam grafik tugas pemantauan.

Java

  • Dataflow dapat menggunakan nama yang Anda tetapkan saat menerapkan transformasi. Argumen pertama yang Anda berikan ke metode apply adalah nama transformasi Anda.
  • Dataflow dapat menyimpulkan nama transformasi, baik dari nama class (jika Anda telah membuat transformasi kustom) maupun nama objek fungsi DoFn (jika Anda menggunakan transformasi inti seperti ParDo).

Python

  • Dataflow dapat menggunakan nama yang Anda tetapkan saat menerapkan transformasi. Anda dapat menetapkan nama transformasi dengan menentukan argumen label transformasi.
  • Dataflow dapat menyimpulkan nama transformasi, baik dari nama class (jika Anda telah membuat transformasi kustom) maupun nama objek fungsi DoFn (jika Anda menggunakan transformasi inti seperti ParDo).

Go

  • Dataflow dapat menggunakan nama yang Anda tetapkan saat menerapkan transformasi. Anda dapat menetapkan nama transformasi dengan menentukan Scope.
  • Dataflow dapat menyimpulkan nama transformasi, baik dari nama struct jika Anda menggunakan DoFn struktural maupun dari nama fungsi jika Anda menggunakan DoFn fungsional.

Memahami metrik

Bagian ini memberikan detail tentang metrik yang terkait dengan grafik tugas.

Waktu proses

Saat Anda mengklik langkah, metrik Waktu dinding ditampilkan di panel Info langkah. Waktu proses menampilkan total perkiraan waktu yang dihabiskan di semua thread di semua pekerja untuk tindakan berikut:

  • Melakukan inisialisasi langkah
  • Memproses data
  • Mengacak data
  • Mengakhiri langkah

Untuk langkah gabungan, waktu proses menunjukkan jumlah waktu yang dihabiskan dalam langkah-langkah komponen. Perkiraan ini dapat membantu Anda mengidentifikasi langkah-langkah yang lambat dan mendiagnosis bagian pipeline mana yang memerlukan waktu lebih lama dari yang diperlukan.

Anda dapat melihat jumlah waktu yang diperlukan untuk menjalankan sebuah langkah di pipeline Anda.
Gambar 3: Metrik Waktu dinding dapat membantu Anda memastikan pipeline Anda berjalan secara efisien.

Metrik input samping

Metrik input samping menunjukkan pengaruh pola dan algoritma akses input samping Anda terhadap performa pipeline. Jika pipeline Anda menggunakan input samping, Dataflow akan menulis koleksi ke lapisan persisten, seperti disk, dan transformasi Anda dibaca dari koleksi persisten ini. Pembacaan dan penulisan ini memengaruhi waktu proses tugas Anda.

Antarmuka pemantauan Dataflow menampilkan metrik input samping saat Anda memilih transformasi yang membuat atau menggunakan kumpulan input samping. Anda dapat melihat metrik di bagian Metrik Input Samping pada panel Info langkah.

Transformasi yang membuat input samping

Jika transformasi yang dipilih membuat kumpulan input samping, bagian Metrik Input Sisi akan menampilkan nama kumpulan tersebut, beserta metrik berikut:

  • Waktu yang dihabiskan untuk menulis: Waktu yang dihabiskan untuk menulis kumpulan input samping.
  • Byte ditulis: Total jumlah byte yang ditulis ke koleksi input samping.
  • Membaca waktu & byte dari input samping: Tabel yang berisi metrik tambahan untuk semua transformasi yang menggunakan pengumpulan input samping, yang disebut konsumen input samping.

Tabel Waktu & byte baca dari input samping berisi informasi berikut untuk setiap konsumen input sisi:

  • Konsumen input samping: Nama transformasi konsumen input samping.
  • Waktu yang dihabiskan untuk membaca: Waktu yang dihabiskan konsumen ini untuk membaca kumpulan input samping.
  • Byte dibaca: Jumlah byte yang dibaca konsumen ini dari koleksi input samping.

Jika pipeline Anda memiliki transformasi gabungan yang membuat input samping, luaskan transformasi komposit sampai Anda melihat subtransformasi tertentu yang membuat input samping. Kemudian, pilih subtransformasi tersebut untuk melihat bagian Side Input Metrics.

Gambar 4 menunjukkan metrik input samping untuk transformasi yang membuat kumpulan input samping.

Anda dapat memilih subtransformasi dan metrik input samping-nya terlihat di panel samping info Langkah.
Gambar 4: Grafik tugas memiliki transformasi komposit yang diperluas (MakeMapView). Subtransformasi yang membuat input samping (CreateDataflowView) dipilih, dan metrik input samping terlihat di panel samping Info langkah.

Transformasi yang menggunakan satu atau beberapa input samping

Jika transformasi yang dipilih menggunakan satu atau beberapa input samping, bagian Metrik Input Sisi menampilkan tabel Waktu & byte yang dibaca dari input samping. Tabel ini berisi informasi berikut untuk setiap koleksi input sisi:

  • Pengumpulan input samping: Nama koleksi input samping.
  • Waktu yang dihabiskan untuk membaca: Waktu yang dihabiskan oleh transformasi untuk membaca koleksi input sisi ini.
  • Byte dibaca: Jumlah byte yang dibaca transformasi dari koleksi input samping ini.

Jika pipeline Anda memiliki transformasi gabungan yang membaca input samping, luaskan transformasi komposit sampai Anda melihat subtransformasi tertentu yang membaca input samping. Kemudian, pilih subtransformasi tersebut untuk melihat bagian Side Input Metrics.

Gambar 5 menunjukkan metrik input samping untuk transformasi yang membaca dari koleksi input samping.

Anda dapat memilih transformasi dan metrik input sisinya terlihat di panel samping info Langkah.
Gambar 5: Transformasi JoinBothCollections membaca dari koleksi input samping. JoinBothCollections dipilih dalam grafik tugas dan metrik input samping terlihat di panel samping Info langkah.

Mengidentifikasi masalah performa input samping

Pengulangan adalah masalah performa input samping yang umum. Jika PCollection input samping Anda terlalu besar, pekerja tidak dapat meng-cache seluruh koleksi di memori. Akibatnya, pekerja harus berulang kali membaca dari koleksi input sisi persisten.

Pada Gambar 6, metrik input samping menunjukkan bahwa total byte yang dibaca dari kumpulan input samping jauh lebih besar daripada ukuran koleksi, total byte yang ditulis.

Anda dapat memilih transformasi dan metrik input sisinya terlihat di panel samping info Langkah.
Gambar 6: Contoh pengulangan. Kumpulan input samping berukuran 563 MB, dan jumlah byte yang dibaca oleh transformasi yang digunakan hampir 12 GB.

Untuk meningkatkan performa pipeline ini, desain ulang algoritma Anda untuk menghindari iterasi atau pengambilan kembali data input samping. Dalam contoh ini, pipeline membuat produk Kartesius dari dua koleksi. Algoritma melakukan iterasi di seluruh koleksi input samping untuk setiap elemen koleksi utama. Anda dapat meningkatkan pola akses pipeline dengan mengelompokkan beberapa elemen dari koleksi utama secara bersamaan. Perubahan ini mengurangi frekuensi pekerja harus membaca ulang pengumpulan input samping.

Masalah performa umum lainnya dapat terjadi jika pipeline Anda melakukan penggabungan dengan menerapkan ParDo dengan satu atau beberapa input sisi besar. Dalam hal ini, pekerja menghabiskan sebagian besar waktu pemrosesan untuk operasi gabungan yang membaca dari koleksi input samping.

Gambar 7 menunjukkan contoh metrik input samping untuk masalah ini:

Anda dapat memilih transformasi dan metrik input sisinya terlihat di panel samping info Langkah.
Gambar 7: Transformasi JoinBothCollections memiliki total waktu pemrosesan 18 menit 31 detik. Pekerja menghabiskan sebagian besar waktu pemrosesan (10 menit 3 detik) untuk membaca dari koleksi input samping sebesar 10 GB.

Untuk meningkatkan performa pipeline ini, gunakan CoGroupByKey, bukan input samping.