Siklus proses pipeline

Halaman ini memberikan ringkasan siklus proses pipeline dari kode pipeline ke tugas Dataflow.

Halaman ini menjelaskan konsep berikut:

  • Pengertian grafik eksekusi, dan cara pipeline Apache Beam menjadi tugas Dataflow
  • Cara Dataflow menangani error
  • Cara Dataflow secara otomatis melakukan paralelisasi dan mendistribusikan logika pemrosesan di pipeline ke pekerja yang melakukan tugas Anda
  • Pengoptimalan tugas yang mungkin dilakukan Dataflow

Grafik eksekusi

Saat Anda menjalankan pipeline Dataflow, Dataflow akan membuat grafik eksekusi dari kode yang membuat objek Pipeline, termasuk semua transformasi dan fungsi pemrosesan terkait, seperti objek DoFn. Ini adalah grafik eksekusi pipeline, dan fase ini disebut waktu konstruksi grafik.

Selama konstruksi grafik, Apache Beam akan mengeksekusi kode secara lokal dari titik entri utama kode pipeline, berhenti pada panggilan ke sumber, sink, atau langkah transformasi, dan mengubah panggilan ini menjadi node grafik. Akibatnya, potongan kode di titik entri pipeline (metode main Java dan Go atau tingkat teratas skrip Python) dieksekusi secara lokal di mesin yang menjalankan pipeline. Kode yang sama yang dideklarasikan dalam metode objek DoFn dijalankan di pekerja Dataflow.

Misalnya, contoh WordCount yang disertakan dengan Apache Beam SDK, berisi serangkaian transformasi untuk membaca, mengekstrak, menghitung, memformat, dan menulis setiap kata dalam koleksi teks, beserta jumlah kemunculan untuk setiap kata. Diagram berikut menunjukkan cara transformasi dalam pipeline WordCount diperluas menjadi grafik eksekusi:

Transformasi dalam program contoh WordCount diperluas menjadi grafik eksekusi
langkah-langkah yang akan dieksekusi oleh layanan Dataflow.

Gambar 1: Contoh grafik eksekusi WordCount

Grafik eksekusi sering kali berbeda dengan urutan transformasi yang Anda tentukan saat membuat pipeline. Perbedaan ini ada karena layanan Dataflow melakukan berbagai pengoptimalan dan penggabungan pada grafik eksekusi sebelum berjalan di resource cloud terkelola. Layanan Dataflow mematuhi dependensi data saat menjalankan pipeline Anda. Namun, langkah-langkah tanpa dependensi data di antaranya dapat berjalan dalam urutan apa pun.

Untuk melihat grafik eksekusi yang tidak dioptimalkan yang telah dihasilkan Dataflow untuk pipeline Anda, pilih tugas di antarmuka pemantauan Dataflow. Untuk informasi selengkapnya tentang cara melihat tugas, lihat Menggunakan antarmuka pemantauan Dataflow.

Selama konstruksi grafik, Apache Beam memvalidasi bahwa resource apa pun yang direferensikan oleh pipeline, seperti bucket Cloud Storage, tabel BigQuery, dan topik atau langganan Pub/Sub, benar-benar ada dan dapat diakses. Validasi dilakukan melalui panggilan API standar ke layanan masing-masing, sehingga akun pengguna yang digunakan untuk menjalankan pipeline harus memiliki konektivitas yang tepat ke layanan yang diperlukan dan diberi otorisasi untuk memanggil API layanan. Sebelum mengirimkan pipeline ke layanan Dataflow, Apache Beam juga memeriksa error lainnya, dan memastikan bahwa grafik pipeline tidak berisi operasi ilegal.

Grafik eksekusi kemudian diterjemahkan ke dalam format JSON, dan grafik eksekusi JSON ditransmisikan ke endpoint layanan Dataflow.

Layanan Dataflow kemudian memvalidasi grafik eksekusi JSON. Saat divalidasi, grafik akan menjadi tugas di layanan Dataflow. Anda dapat melihat tugas, grafik eksekusi, status, dan informasi log menggunakan antarmuka pemantauan Dataflow.

Java

Layanan Dataflow mengirim respons ke komputer tempat Anda menjalankan program Dataflow. Respons ini dienkapsulasi dalam objek DataflowPipelineJob, yang berisi jobId tugas Dataflow Anda. Gunakan jobId untuk memantau, melacak, dan memecahkan masalah tugas Anda menggunakan antarmuka pemantauan Dataflow dan antarmuka command line Dataflow. Untuk informasi selengkapnya, lihat referensi API untuk DataflowPipelineJob.

Python

Layanan Dataflow mengirim respons ke komputer tempat Anda menjalankan program Dataflow. Respons ini dienkapsulasi dalam objek DataflowPipelineResult, yang berisi job_id tugas Dataflow Anda. Gunakan job_id untuk memantau, melacak, dan memecahkan masalah tugas Anda menggunakan antarmuka pemantauan Dataflow dan antarmuka command line Dataflow.

Go

Layanan Dataflow mengirim respons ke komputer tempat Anda menjalankan program Dataflow. Respons ini dienkapsulasi dalam objek dataflowPipelineResult, yang berisi jobID tugas Dataflow Anda. Gunakan jobID untuk memantau, melacak, dan memecahkan masalah tugas Anda menggunakan antarmuka pemantauan Dataflow dan antarmuka command line Dataflow.

Konstruksi grafik juga terjadi saat Anda menjalankan pipeline secara lokal, tetapi grafik tidak diterjemahkan ke JSON atau dikirim ke layanan. Sebagai gantinya, grafik dijalankan secara lokal di mesin yang sama tempat Anda meluncurkan program Dataflow. Untuk mengetahui informasi selengkapnya, lihat Mengonfigurasi PipelineOptions untuk eksekusi lokal.

Penanganan error dan pengecualian

Pipeline Anda mungkin menampilkan pengecualian saat memproses data. Beberapa error ini bersifat sementara, seperti kesulitan sementara dalam mengakses layanan eksternal. Error lainnya bersifat permanen, seperti error yang disebabkan oleh data input yang rusak atau tidak dapat diuraikan, atau pointer null selama komputasi.

Dataflow memproses elemen dalam paket arbitrer, dan mencoba ulang paket lengkap saat error ditampilkan untuk elemen apa pun dalam paket tersebut. Saat berjalan dalam mode batch, paket yang menyertakan item yang gagal akan dicoba lagi empat kali. Pipeline akan gagal sepenuhnya jika satu paket gagal empat kali. Saat berjalan dalam mode streaming, paket yang menyertakan item yang gagal akan dicoba ulang tanpa batas waktu, yang dapat menyebabkan pipeline Anda terhenti secara permanen.

Saat memproses dalam mode batch, Anda mungkin melihat banyak kegagalan individual sebelum tugas pipeline gagal sepenuhnya, yang terjadi saat paket tertentu gagal setelah empat upaya percobaan ulang. Misalnya, jika pipeline Anda mencoba memproses 100 paket, Dataflow dapat menghasilkan beberapa ratus kegagalan individual hingga satu paket mencapai kondisi empat kegagalan untuk keluar.

Error pekerja startup, seperti kegagalan untuk menginstal paket pada pekerja, bersifat sementara. Skenario ini menghasilkan percobaan ulang tanpa batas, dan dapat menyebabkan pipeline Anda terhenti secara permanen.

Paralelisasi dan distribusi

Layanan Dataflow secara otomatis melakukan paralelisasi dan mendistribusikan logika pemrosesan di pipeline ke pekerja yang Anda tetapkan untuk menjalankan tugas Anda. Dataflow menggunakan abstraksi dalam model pemrograman untuk merepresentasikan fungsi pemrosesan paralel. Misalnya, transformasi ParDo dalam pipeline menyebabkan Dataflow mendistribusikan kode pemrosesan secara otomatis, yang diwakili oleh objek DoFn, ke beberapa pekerja untuk dijalankan secara paralel.

Ada dua jenis paralelisme tugas:

  • Paralelisme horizontal terjadi saat data pipeline dibagi dan diproses di beberapa pekerja secara bersamaan. Lingkungan runtime Dataflow didukung oleh kumpulan pekerja terdistribusi. Pipeline memiliki potensi paralelisme yang lebih tinggi jika kumpulan berisi lebih banyak pekerja, tetapi konfigurasi tersebut juga memiliki biaya yang lebih tinggi. Secara teori, paralelisme horisontal tidak memiliki batas atas. Namun, Dataflow membatasi kumpulan pekerja hingga 4.000 pekerja untuk mengoptimalkan penggunaan resource di seluruh fleet.

  • Paralelisme vertikal terjadi saat data pipeline dibagi dan diproses oleh beberapa core CPU pada pekerja yang sama. Setiap pekerja didukung oleh VM Compute Engine. VM dapat menjalankan beberapa proses untuk memenuhi semua core CPU-nya. VM dengan lebih banyak core memiliki potensi paralelisme vertikal yang lebih tinggi, tetapi konfigurasi ini menghasilkan biaya yang lebih tinggi. Jumlah core yang lebih tinggi sering kali menghasilkan peningkatan penggunaan memori, sehingga jumlah core biasanya diskalakan bersama dengan ukuran memori. Mengingat batas fisik arsitektur komputer, batas atas paralelisme vertikal jauh lebih rendah daripada batas atas paralelisme horizontal.

Paralelisme terkelola

Secara default, Dataflow secara otomatis mengelola paralelisme tugas. Dataflow memantau statistik runtime untuk tugas, seperti penggunaan CPU dan memori, untuk menentukan cara menskalakan tugas. Bergantung pada setelan tugas, Dataflow dapat menskalakan tugas secara horizontal, yang disebut sebagai Penskalaan Otomatis Horizontal, atau secara vertikal, yang disebut sebagai Penskalaan vertikal. Penskalaan otomatis untuk paralelisme akan mengoptimalkan biaya tugas dan performa tugas.

Untuk meningkatkan performa tugas, Dataflow juga mengoptimalkan pipeline secara internal. Pengoptimalan yang umum adalah pengoptimalan fusion dan pengoptimalan gabungan. Dengan menggabungkan langkah-langkah pipeline, Dataflow menghilangkan biaya yang tidak perlu yang terkait dengan langkah-langkah koordinasi dalam sistem terdistribusi dan menjalankan setiap langkah secara terpisah.

Faktor yang memengaruhi paralelisme

Faktor-faktor berikut memengaruhi seberapa baik paralelisme berfungsi dalam tugas Dataflow.

Sumber input

Jika sumber input tidak mengizinkan paralelisme, langkah penyerapan sumber input dapat menjadi bottleneck dalam tugas Dataflow. Misalnya, saat Anda menyerap data dari satu file teks yang dikompresi, Dataflow tidak dapat melakukan paralelisasi data input. Karena sebagian besar format kompresi tidak dapat dibagi secara arbitrer menjadi shard selama proses transfer, Dataflow perlu membaca data secara berurutan dari awal file. Throughput keseluruhan pipeline diperlambat oleh bagian pipeline yang tidak paralel. Solusi untuk masalah ini adalah menggunakan sumber input yang lebih skalabel.

Dalam beberapa kasus, penggabungan langkah juga mengurangi paralelisme. Jika sumber input tidak mengizinkan paralelisme, jika Dataflow menggabungkan langkah penyerapan data dengan langkah berikutnya dan menetapkan langkah gabungan ini ke satu thread, seluruh pipeline mungkin berjalan lebih lambat.

Untuk menghindari skenario ini, masukkan langkah Reshuffle setelah langkah penyerapan sumber input. Untuk informasi selengkapnya, lihat bagian Mencegah penggabungan dalam dokumen ini.

Fanout dan bentuk data default

Fanout default dari satu langkah transformasi dapat menjadi bottleneck dan membatasi paralelisme. Misalnya, transformasi ParDo "fan-out tinggi" dapat menyebabkan fusi membatasi kemampuan Dataflow untuk mengoptimalkan penggunaan pekerja. Dalam operasi tersebut, Anda mungkin memiliki koleksi input dengan elemen yang relatif sedikit, tetapi ParDo menghasilkan output dengan ratusan atau ribuan kali elemen, diikuti dengan ParDo lain. Jika layanan Dataflow menggabungkan operasi ParDo ini, paralelisme pada langkah ini dibatasi maksimal jumlah item dalam koleksi input, meskipun PCollection perantara berisi lebih banyak elemen.

Untuk mengetahui solusi yang mungkin, lihat bagian Mencegah penggabungan dalam dokumen ini.

Bentuk data

Bentuk data, baik data input maupun data perantara, dapat membatasi paralelisme. Misalnya, jika langkah GroupByKey pada kunci alami, seperti kota, diikuti dengan langkah map atau Combine, Dataflow akan menggabungkan kedua langkah tersebut. Jika ruang kunci kecil, misalnya, lima kota, dan satu kunci sangat populer, misalnya, kota besar, sebagian besar item dalam output langkah GroupByKey didistribusikan ke satu proses. Proses ini menjadi bottleneck dan memperlambat tugas.

Dalam contoh ini, Anda dapat mendistribusikan ulang hasil langkah GroupByKey ke dalam ruang kunci buatan yang lebih besar, bukan menggunakan kunci alami. Sisipkan langkah Reshuffle di antara langkah GroupByKey dan langkah map atau Combine. Pada langkah Reshuffle, buat ruang kunci buatan, seperti dengan menggunakan fungsi hash, untuk mengatasi paralelisme terbatas yang disebabkan oleh bentuk data.

Untuk informasi selengkapnya, lihat bagian Mencegah penggabungan dalam dokumen ini.

Sink output

Sink adalah transformasi yang menulis ke sistem penyimpanan data eksternal, seperti file atau database. Dalam praktiknya, sink dimodelkan dan diimplementasikan sebagai objek DoFn standar dan digunakan untuk mewujudkan PCollection ke sistem eksternal. Dalam hal ini, PCollection berisi hasil pipeline akhir. Thread yang memanggil sink API dapat berjalan secara paralel untuk menulis data ke sistem eksternal. Secara default, tidak ada koordinasi antara thread. Tanpa lapisan perantara untuk melakukan buffering pada permintaan tulis dan alur kontrol, sistem eksternal dapat dibebani secara berlebihan dan mengurangi throughput tulis. Menskalakan resource dengan menambahkan lebih banyak paralelisme dapat memperlambat pipeline lebih jauh.

Solusi untuk masalah ini adalah mengurangi paralelisme dalam langkah penulisan. Anda dapat menambahkan langkah GroupByKey tepat sebelum langkah tulis. Langkah GroupByKey mengelompokkan data output ke dalam kumpulan batch yang lebih kecil untuk mengurangi total panggilan RPC dan koneksi ke sistem eksternal. Misalnya, gunakan GroupByKey untuk membuat ruang hash 50 dari 1 juta titik data.

Kelemahan pendekatan ini adalah pendekatan ini memperkenalkan batas hardcode ke paralelisme. Opsi lainnya adalah menerapkan backoff eksponensial di sink saat menulis data. Opsi ini dapat memberikan throttling klien minimum.

Memantau paralelisme

Untuk memantau paralelisme, Anda dapat menggunakan konsol Google Cloud untuk melihat pelacak yang terdeteksi. Untuk mengetahui informasi selengkapnya, lihat Memecahkan masalah straggler dalam tugas batch dan Memecahkan masalah straggler dalam tugas streaming.

Pengoptimalan penggabungan

Setelah bentuk JSON grafik eksekusi pipeline Anda divalidasi, layanan Dataflow dapat mengubah grafik untuk melakukan pengoptimalan. Pengoptimalan dapat mencakup penggabungan beberapa langkah atau transformasi dalam grafik eksekusi pipeline menjadi satu langkah. Menggabungkan langkah-langkah mencegah layanan Dataflow perlu mewujudkan setiap PCollection perantara dalam pipeline Anda, yang dapat mahal dalam hal memori dan overhead pemrosesan.

Meskipun semua transformasi yang Anda tentukan dalam konstruksi pipeline dijalankan di layanan, untuk memastikan eksekusi pipeline yang paling efisien, transformasi dapat dijalankan dalam urutan yang berbeda atau sebagai bagian dari transformasi gabungan yang lebih besar. Layanan Dataflow mematuhi dependensi data di antara langkah-langkah dalam grafik eksekusi, tetapi jika tidak, langkah-langkah tersebut dapat dijalankan dalam urutan apa pun.

Contoh penggabungan

Diagram berikut menunjukkan cara grafik eksekusi dari contoh WordCount yang disertakan dengan Apache Beam SDK untuk Java dapat dioptimalkan dan digabungkan oleh layanan Dataflow untuk eksekusi yang efisien:

Grafik eksekusi untuk program contoh WordCount yang dioptimalkan dan dengan langkah-langkah yang digabungkan
oleh layanan Dataflow.

Gambar 2: Contoh Grafik Eksekusi yang Dioptimalkan WordCount

Mencegah penggabungan

Dalam beberapa kasus, Dataflow mungkin salah menebak cara optimal untuk menggabungkan operasi dalam pipeline, yang dapat membatasi kemampuan Dataflow untuk menggunakan semua pekerja yang tersedia. Dalam kasus tersebut, Anda dapat mencegah Dataflow melakukan pengoptimalan penggabungan.

Anda dapat mencegah penggabungan langkah dengan menambahkan operasi ke pipeline yang memaksa layanan Dataflow untuk mewujudkan PCollection perantara. Pertimbangkan untuk menggunakan salah satu operasi berikut:

  • Masukkan GroupByKey dan batalkan pengelompokan setelah ParDo pertama. Layanan Dataflow tidak pernah menggabungkan operasi ParDo di seluruh agregasi.
  • Teruskan PCollection perantara sebagai input samping ke ParDo lain. Layanan Dataflow selalu mewujudkan input samping.
  • Sisipkan langkah Reshuffle. Reshuffle mencegah penggabungan, memeriksa data, dan melakukan penghapusan duplikat data. Pengurutan ulang didukung oleh Dataflow meskipun ditandai tidak digunakan lagi dalam dokumentasi Apache Beam.

Memantau penggabungan

Anda dapat mengakses grafik yang dioptimalkan dan tahap gabungan di konsol Google Cloud, menggunakan gcloud CLI, atau menggunakan API.

Konsol

Untuk melihat tahap dan langkah gabungan grafik di konsol, di tab Execution details untuk tugas Dataflow, buka tampilan grafik Stage workflow.

Untuk melihat langkah komponen yang digabungkan untuk suatu tahap, di grafik, klik tahap gabungan. Di panel Info tahap, baris Langkah komponen menampilkan tahap yang digabungkan. Terkadang, bagian dari satu transformasi komposit digabungkan menjadi beberapa tahap.

gcloud

Untuk mengakses grafik yang dioptimalkan dan tahap gabungan menggunakan gcloud CLI, jalankan perintah gcloud berikut:

  gcloud dataflow jobs describe --full JOB_ID --format json

Ganti JOB_ID dengan ID tugas Dataflow Anda.

Untuk mengekstrak bit yang relevan, alirkan output perintah gcloud ke jq:

gcloud dataflow jobs describe --full JOB_ID --format json | jq '.pipelineDescription.executionPipelineStage\[\] | {"stage_id": .id, "stage_name": .name, "fused_steps": .componentTransform }'

Untuk melihat deskripsi tahap yang digabungkan dalam file respons output, dalam array ComponentTransform, lihat objek ExecutionStageSummary.

API

Untuk mengakses grafik yang dioptimalkan dan tahap gabungan menggunakan API, panggil project.locations.jobs.get.

Untuk melihat deskripsi tahap yang digabungkan dalam file respons output, dalam array ComponentTransform, lihat objek ExecutionStageSummary.

Menggabungkan pengoptimalan

Operasi agregasi adalah konsep penting dalam pemrosesan data skala besar. Agregasi menggabungkan data yang secara konseptual sangat berbeda, sehingga sangat berguna untuk melakukan korelasi. Model pemrograman Dataflow merepresentasikan operasi agregasi sebagai transformasi GroupByKey, CoGroupByKey, dan Combine.

Operasi agregasi Dataflow menggabungkan data di seluruh set data, termasuk data yang mungkin tersebar di beberapa pekerja. Selama operasi agregasi tersebut, menggabungkan data sebanyak mungkin secara lokal sebelum menggabungkan data di seluruh instance sering kali merupakan cara yang paling efisien. Saat Anda menerapkan GroupByKey atau transformasi agregasi lainnya, layanan Dataflow akan otomatis melakukan penggabungan sebagian secara lokal sebelum operasi pengelompokan utama.

Saat melakukan penggabungan sebagian atau multi-level, layanan Dataflow akan membuat keputusan yang berbeda berdasarkan apakah pipeline Anda menggunakan data batch atau streaming. Untuk data terbatas, layanan ini mendukung efisiensi dan akan melakukan penggabungan lokal sebanyak mungkin. Untuk data yang tidak terbatas, layanan ini lebih memilih latensi yang lebih rendah, dan mungkin tidak melakukan penggabungan sebagian, karena dapat meningkatkan latensi.