Halaman ini memberikan ringkasan siklus proses pipeline dari kode pipeline ke tugas Dataflow.
Halaman ini menjelaskan konsep berikut:
- Pengertian grafik eksekusi, dan bagaimana pipeline Apache Beam menjadi tugas Dataflow
- Cara Dataflow menangani error
- Cara Dataflow secara otomatis memparalelkan dan mendistribusikan logika pemrosesan di pipeline Anda ke pekerja yang melakukan tugas
- Pengoptimalan tugas yang mungkin dilakukan Dataflow
Grafik eksekusi
Saat Anda menjalankan pipeline Dataflow, Dataflow akan membuat grafik eksekusi dari kode yang membentuk objek Pipeline
Anda, termasuk semua transformasi dan fungsi pemrosesan terkaitnya, seperti objek DoFn
. Ini adalah grafik eksekusi pipeline, dan fase ini disebut
waktu konstruksi grafik.
Selama pembuatan grafik, Apache Beam secara lokal mengeksekusi kode dari
titik entri utama kode pipeline, berhenti pada saat panggilan ke langkah sumber, sink, atau transformasi, dan mengubah panggilan tersebut menjadi node grafik.
Dengan demikian, sepotong kode di titik entri pipeline (metode main
Java dan Go
atau skrip Python level teratas) dieksekusi secara lokal di mesin yang
menjalankan pipeline. Kode yang sama yang dideklarasikan dalam metode objek DoFn
dijalankan di pekerja Dataflow.
Misalnya, contoh WordCount yang disertakan dengan Apache Beam SDK, berisi serangkaian transformasi untuk membaca, mengekstrak, menghitung, memformat, dan menulis setiap kata dalam kumpulan teks, beserta jumlah kemunculan untuk setiap kata. Diagram berikut menunjukkan cara transformasi di pipeline WordCount diperluas menjadi grafik eksekusi:
Gambar 1: Grafik eksekusi contoh WordCount
Grafik eksekusi sering kali berbeda dari urutan transformasi yang ditentukan saat Anda membuat pipeline. Perbedaan ini ada karena layanan Dataflow melakukan berbagai pengoptimalan dan fusi pada grafik eksekusi sebelum berjalan pada resource cloud terkelola. Layanan Dataflow mengikuti dependensi data saat menjalankan pipeline Anda. Namun, langkah-langkah tanpa dependensi data di antara langkah-langkah tersebut dapat berjalan dalam urutan apa pun.
Untuk melihat grafik eksekusi yang tidak dioptimalkan yang dihasilkan Dataflow untuk pipeline Anda, pilih tugas Anda di antarmuka pemantauan Dataflow. Untuk mengetahui informasi selengkapnya tentang melihat tugas, lihat Menggunakan antarmuka pemantauan Dataflow.
Selama pembuatan grafik, Apache Beam memvalidasi bahwa setiap resource yang dirujuk oleh pipeline, seperti bucket Cloud Storage, tabel BigQuery, dan topik atau langganan Pub/Sub, benar-benar ada dan dapat diakses. Validasi dilakukan melalui panggilan API standar ke layanan terkait, sehingga akun pengguna yang digunakan untuk menjalankan pipeline harus memiliki konektivitas yang tepat ke layanan yang diperlukan dan memiliki otorisasi untuk memanggil API layanan. Sebelum mengirimkan pipeline ke layanan Dataflow, Apache Beam juga memeriksa error lainnya, dan memastikan bahwa grafik pipeline tidak berisi operasi ilegal.
Grafik eksekusi kemudian diterjemahkan ke dalam format JSON, dan grafik eksekusi JSON akan dikirim ke endpoint layanan Dataflow.
Layanan Dataflow kemudian memvalidasi grafik eksekusi JSON. Setelah divalidasi, grafik akan menjadi tugas di layanan Dataflow. Anda dapat melihat tugas, grafik eksekusinya, status, dan informasi log menggunakan Antarmuka pemantauan Dataflow.
Java
Layanan Dataflow mengirimkan respons ke komputer tempat Anda menjalankan program Dataflow. Respons ini dienkapsulasi dalam objek DataflowPipelineJob
, yang berisi jobId
tugas Dataflow Anda.
Gunakan jobId
untuk memantau, melacak, dan memecahkan masalah tugas Anda menggunakan
Antarmuka pemantauan Dataflow
dan Antarmuka command line Dataflow.
Untuk informasi selengkapnya, lihat
Referensi API untuk DataflowPipelineJob.
Python
Layanan Dataflow mengirimkan respons ke komputer tempat Anda menjalankan program Dataflow. Respons ini dienkapsulasi dalam objek DataflowPipelineResult
, yang berisi job_id
tugas Dataflow Anda.
Gunakan job_id
untuk memantau, melacak, dan memecahkan masalah tugas
menggunakan
Antarmuka pemantauan Dataflow
dan
antarmuka command line Dataflow.
Go
Layanan Dataflow mengirimkan respons ke komputer tempat Anda menjalankan program Dataflow. Respons ini dienkapsulasi dalam objek dataflowPipelineResult
, yang berisi jobID
tugas Dataflow Anda.
Gunakan jobID
untuk memantau, melacak, dan memecahkan masalah tugas
menggunakan
Antarmuka pemantauan Dataflow
dan
antarmuka command line Dataflow.
Konstruksi grafik juga terjadi saat Anda menjalankan pipeline secara lokal, tetapi grafik tidak diterjemahkan ke JSON atau dikirim ke layanan. Sebagai gantinya, grafik ini dijalankan secara lokal di komputer yang sama tempat Anda meluncurkan program Dataflow. Untuk informasi selengkapnya, lihat Mengonfigurasi PipelineOptions untuk eksekusi lokal.
Penanganan error dan pengecualian
Pipeline Anda mungkin menampilkan pengecualian saat memproses data. Beberapa error ini bersifat sementara, seperti kesulitan sementara dalam mengakses layanan eksternal. Error lainnya bersifat permanen, seperti error yang disebabkan oleh data input yang rusak atau tidak dapat diurai, atau pointer null selama komputasi.
Dataflow memproses elemen dalam paket arbitrer, dan mencoba kembali paket lengkap saat error ditampilkan untuk elemen apa pun dalam paket tersebut. Saat berjalan dalam mode batch, paket yang menyertakan item yang gagal akan dicoba lagi empat kali. Pipeline gagal sepenuhnya jika satu bundle gagal sebanyak empat kali. Saat berjalan dalam mode streaming, paket yang menyertakan item yang gagal akan dicoba ulang tanpa batas waktu, yang dapat menyebabkan pipeline Anda berhenti secara permanen.
Saat memproses dalam mode batch, Anda mungkin melihat sejumlah besar kegagalan individual sebelum tugas pipeline gagal sepenuhnya, yang terjadi saat paket tertentu gagal setelah empat kali percobaan ulang. Misalnya, jika pipeline Anda mencoba memproses 100 paket, Dataflow dapat menghasilkan beberapa ratus kegagalan individual hingga satu paket mencapai kondisi empat kegagalan untuk keluar.
Error pekerja startup, seperti kegagalan untuk menginstal paket pada pekerja, bersifat sementara. Skenario ini mengakibatkan percobaan ulang tanpa batas waktu, dan dapat menyebabkan pipeline Anda berhenti secara permanen.
Paralelisasi dan distribusi
Layanan Dataflow secara otomatis akan memparalelkan dan mendistribusikan logika pemrosesan di pipeline Anda ke pekerja yang Anda tetapkan untuk melakukan tugas. Dataflow menggunakan abstraksi dalam
model pemrograman untuk merepresentasikan
fungsi pemrosesan paralel. Misalnya, transformasi ParDo
dalam pipeline menyebabkan Dataflow secara otomatis mendistribusikan kode pemrosesan, yang diwakili oleh objek DoFn
, ke beberapa pekerja yang akan dijalankan secara paralel.
Ada dua jenis paralelisme pekerjaan:
Paralelisme horizontal terjadi saat data pipeline dibagi dan diproses pada beberapa pekerja secara bersamaan. Lingkungan runtime Dataflow didukung oleh kumpulan pekerja terdistribusi. Pipeline memiliki potensi paralelisme yang lebih tinggi jika kumpulan berisi lebih banyak pekerja, tetapi konfigurasi tersebut juga memiliki biaya yang lebih tinggi. Secara teoritis, paralelisme horizontal tidak memiliki batas atas. Namun, Dataflow membatasi kumpulan pekerja hingga 4.000 pekerja untuk mengoptimalkan penggunaan resource di seluruh fleet.
Paralelisme vertikal terjadi saat data pipeline dibagi dan diproses oleh beberapa core CPU pada pekerja yang sama. Setiap pekerja didukung oleh VM Compute Engine. VM dapat menjalankan beberapa proses untuk memenuhi semua core CPU-nya. VM dengan lebih banyak core memiliki potensi paralelisme vertikal yang lebih tinggi, tetapi konfigurasi ini akan menghasilkan biaya yang lebih tinggi. Jumlah core yang lebih tinggi sering kali mengakibatkan peningkatan penggunaan memori, sehingga jumlah core biasanya diskalakan bersama dengan ukuran memori. Mengingat batas fisik arsitektur komputer, batas atas paralelisme vertikal jauh lebih rendah daripada batas atas paralelisme horizontal.
Paralelisme terkelola
Secara default, Dataflow mengelola paralelisme tugas secara otomatis. Dataflow memantau statistik runtime tugas, seperti penggunaan CPU dan memori, untuk menentukan cara menskalakan tugas. Bergantung pada setelan tugas Anda, Dataflow dapat menskalakan tugas secara horizontal, disebut sebagai Penskalaan Horizontal, atau secara vertikal, disebut sebagai Penskalaan vertikal. Penskalaan otomatis untuk paralelisme mengoptimalkan biaya tugas dan performa tugas.
Untuk meningkatkan performa tugas, Dataflow juga mengoptimalkan pipeline secara internal. Pengoptimalan yang umum adalah pengoptimalan fusi dan pengoptimalan gabungan. Dengan menggabungkan langkah-langkah pipeline, Dataflow meniadakan biaya yang tidak perlu terkait dengan langkah-langkah koordinasi dalam sistem terdistribusi dan menjalankan setiap langkah secara terpisah.
Faktor-faktor yang memengaruhi paralelisme
Faktor-faktor berikut memengaruhi seberapa baik fungsi paralelisme dalam tugas Dataflow.
Sumber input
Jika sumber input tidak mengizinkan paralelisme, langkah penyerapan sumber input dapat menjadi bottleneck dalam tugas Dataflow. Misalnya, saat Anda menyerap data dari satu file teks yang dikompresi, Dataflow tidak dapat memparalelkan data input. Karena sebagian besar format kompresi tidak dapat dibagi secara acak menjadi shard selama penyerapan, Dataflow perlu membaca data secara berurutan dari awal file. Throughput pipeline secara keseluruhan diperlambat oleh bagian yang tidak paralel dari pipeline. Solusi dari masalah ini adalah dengan menggunakan sumber input yang lebih skalabel.
Dalam beberapa kasus, penggabungan langkah juga mengurangi paralelisme. Jika sumber input tidak mengizinkan paralelisme, jika Dataflow menggabungkan langkah penyerapan data dengan langkah-langkah berikutnya dan menetapkan langkah gabungan ini ke satu thread, seluruh pipeline dapat berjalan lebih lambat.
Untuk menghindari skenario ini, sisipkan langkah Reshuffle
setelah langkah penyerapan
sumber input. Untuk mengetahui informasi selengkapnya, lihat bagian Mencegah fusi dalam dokumen ini.
Fanout dan bentuk data default
Fanout default dari satu langkah transformasi dapat menjadi pembatas dan membatasi paralelisme. Misalnya, transformasi ParDo
"fan-out tinggi" dapat menyebabkan fusion untuk membatasi kemampuan Dataflow dalam mengoptimalkan penggunaan pekerja. Dalam
operasi seperti itu, Anda mungkin memiliki koleksi input dengan elemen yang relatif sedikit, tetapi ParDo
menghasilkan output dengan elemen ratusan atau ribuan kali
lebih banyak, diikuti oleh ParDo
lainnya. Jika layanan Dataflow menggabungkan operasi ParDo
ini bersama-sama, paralelisme dalam langkah ini dibatasi maksimum pada jumlah item dalam kumpulan input, meskipun PCollection
perantara berisi lebih banyak elemen.
Untuk mengetahui solusinya, lihat bagian Mencegah fusi dalam dokumen ini.
Bentuk data
Bentuk data, baik berupa data input maupun data perantara, dapat membatasi paralelisme.
Misalnya, saat langkah GroupByKey
pada kunci natural, seperti kota, diikuti dengan langkah map
atau Combine
, Dataflow akan menggabungkan kedua langkah tersebut. Jika ruang kunci kecil, misalnya, lima kota, dan satu kunci sangat
panas, misalnya, kota besar, sebagian besar item dalam output langkah GroupByKey
didistribusikan ke satu proses. Proses ini menjadi bottleneck dan memperlambat
tugas.
Dalam contoh ini, Anda dapat mendistribusikan ulang hasil langkah GroupByKey
ke
ruang kunci buatan yang lebih besar, bukan menggunakan kunci alami. Masukkan langkah Reshuffle
antara langkah GroupByKey
dan langkah map
atau Combine
.
Pada langkah Reshuffle
, buat ruang kunci buatan, seperti dengan menggunakan
fungsi hash
, untuk mengatasi paralelisme terbatas yang disebabkan oleh bentuk data.
Untuk mengetahui informasi selengkapnya, lihat bagian Mencegah fusi dalam dokumen ini.
Sink output
Sink adalah transformasi yang menulis ke sistem penyimpanan data eksternal, seperti
file atau database. Dalam praktiknya, sink dimodelkan dan diimplementasikan sebagai objek
DoFn
standar serta digunakan untuk mewujud PCollection
ke sistem eksternal.
Dalam hal ini, PCollection
berisi hasil akhir pipeline. Thread yang
memanggil API sink dapat berjalan secara paralel untuk menulis data ke sistem eksternal. Secara
default, koordinasi antar-thread tidak terjadi. Tanpa lapisan perantara untuk mem-buffer permintaan tulis dan alur kontrol, sistem eksternal dapat mengalami kelebihan beban dan mengurangi throughput operasi tulis. Meningkatkan resource dengan menambahkan lebih banyak
paralelisme dapat memperlambat pipeline lebih jauh lagi.
Solusi dari masalah ini adalah mengurangi paralelisme dalam langkah tulis.
Anda dapat menambahkan langkah GroupByKey
tepat sebelum langkah tulis. Langkah GroupByKey
mengelompokkan data ke dalam kumpulan batch yang lebih kecil untuk mengurangi total panggilan RPC
dan koneksi ke sistem eksternal. Misalnya, gunakan GroupByKey
untuk membuat ruang hash 50 dari 1 juta titik data.
Kelemahan dari pendekatan ini adalah pendekatan ini memperkenalkan batas hardcode pada paralelisme. Opsi lainnya adalah mengimplementasikan backoff eksponensial di sink saat menulis data. Opsi ini dapat memberikan throttling klien minimum.
Memantau paralelisme
Untuk memantau paralelisme, Anda dapat menggunakan Konsol Google Cloud untuk melihat gangguan yang terdeteksi. Untuk mengetahui informasi selengkapnya, lihat Memecahkan masalah straggler dalam tugas batch dan Memecahkan masalah yang kurang tepat dalam tugas streaming.
Pengoptimalan fusion
Setelah bentuk JSON grafik eksekusi pipeline Anda divalidasi, layanan
Dataflow dapat mengubah grafik untuk melakukan pengoptimalan.
Pengoptimalan dapat mencakup penggabungan beberapa langkah atau transformasi di
grafik eksekusi pipeline menjadi satu langkah. Dengan menggabungkan langkah,
layanan Dataflow tidak perlu terwujud setiap PCollection
perantara
dalam pipeline Anda, yang dapat memakan biaya besar dalam hal memori dan
overhead pemrosesan.
Meskipun semua transformasi yang ditentukan dalam konstruksi pipeline dijalankan di layanan, untuk memastikan eksekusi pipeline yang paling efisien, transformasi mungkin dijalankan dalam urutan yang berbeda atau sebagai bagian dari transformasi fusi yang lebih besar. Layanan Dataflow mengikuti dependensi data antar-langkah dalam grafik eksekusi, tetapi jika tidak, langkah-langkah dapat dijalankan dalam urutan apa pun.
Contoh fusion
Diagram berikut menunjukkan bagaimana grafik eksekusi dari contoh WordCount yang disertakan dengan Apache Beam SDK untuk Java dapat dioptimalkan dan digabungkan oleh layanan Dataflow untuk eksekusi yang efisien:
Gambar 2: Grafik Eksekusi Dioptimalkan Contoh WordCount
Mencegah fusi
Dalam beberapa kasus, Dataflow mungkin salah menebak cara optimal untuk menggabungkan operasi dalam pipeline, yang dapat membatasi kemampuan Dataflow untuk menggunakan semua pekerja yang tersedia. Dalam kasus seperti itu, Anda dapat mencegah Dataflow melakukan pengoptimalan fusion.
Anda dapat mencegah fusi langkah dengan menambahkan operasi ke pipeline Anda yang memaksa layanan Dataflow untuk mewujudkan PCollection
menengah Anda. Pertimbangkan untuk menggunakan salah satu operasi berikut:
- Masukkan
GroupByKey
dan pisahkan grup setelahParDo
pertama. Layanan Dataflow tidak pernah menggabungkan operasiParDo
di seluruh agregasi. - Teruskan
PCollection
perantara sebagai input samping keParDo
lain. Layanan Dataflow selalu mewujudkan input samping. - Masukkan langkah
Reshuffle
.Reshuffle
mencegah fusi, memeriksa data, dan melakukan penghapusan duplikat catatan. Reshuffle didukung oleh Dataflow meskipun ditandai sebagai tidak digunakan lagi dalam dokumentasi Apache Beam.
Fusi monitor
Anda dapat mengakses grafik yang dioptimalkan dan tahapan yang digabungkan di Google Cloud Console, dengan menggunakan gcloud CLI, atau menggunakan API.
Konsol
Untuk melihat tahapan dan langkah gabungan grafik Anda di konsol, pada tab Execution details untuk tugas Dataflow Anda, buka tampilan grafik Stage Workflow.
Untuk melihat langkah-langkah komponen yang digabungkan untuk sebuah tahapan, klik tahap gabungan pada grafik. Di panel Stage info, baris Component steps menampilkan stage gabungan. Terkadang bagian dari transformasi komposit tunggal digabungkan menjadi beberapa tahapan.
gcloud
Untuk mengakses grafik yang dioptimalkan dan tahapan yang digabungkan menggunakan gcloud CLI, jalankan perintah gcloud
berikut:
gcloud dataflow jobs describe --full JOB_ID --format json
Ganti JOB_ID
dengan ID tugas Dataflow Anda.
Untuk mengekstrak bit yang relevan, teruskan output dari perintah gcloud
ke jq
:
gcloud dataflow jobs describe --full JOB_ID --format json | jq '.pipelineDescription.executionPipelineStage\[\] | {"stage_id": .id, "stage_name": .name, "fused_steps": .componentTransform }'
Untuk melihat deskripsi tahapan fusi dalam file respons output, dalam array ComponentTransform
, lihat objek ExecutionStageSummary
.
API
Untuk mengakses grafik yang dioptimalkan dan tahapan yang digabungkan menggunakan API, panggil
project.locations.jobs.get
.
Untuk melihat deskripsi tahapan fusi dalam file respons output, dalam array ComponentTransform
, lihat objek ExecutionStageSummary
.
Gabungkan pengoptimalan
Operasi agregasi adalah konsep penting dalam pemrosesan data berskala besar.
Agregasi menggabungkan data yang secara konseptual berjauhan, sehingga
sangat berguna untuk melakukan korelasi. Model pemrograman Dataflow mewakili operasi agregasi sebagai transformasi GroupByKey
, CoGroupByKey
, dan
Combine
.
Operasi agregasi Dataflow menggabungkan data di seluruh set data, termasuk data yang mungkin tersebar di beberapa pekerja. Selama operasi agregasi seperti itu, sering kali cara yang paling efisien adalah menggabungkan data sebanyak mungkin secara lokal sebelum menggabungkan data di seluruh instance. Saat Anda menerapkan GroupByKey
atau transformasi agregasi lainnya, layanan Dataflow akan otomatis melakukan penggabungan parsial secara lokal sebelum operasi pengelompokan utama.
Saat melakukan penggabungan parsial atau multilevel, layanan Dataflow membuat keputusan yang berbeda berdasarkan apakah pipeline Anda berfungsi dengan data batch atau streaming. Untuk data terikat, layanan ini mengutamakan efisiensi dan akan melakukan penggabungan lokal sebanyak mungkin. Untuk data tak terbatas, layanan mendukung latensi yang lebih rendah, dan mungkin tidak melakukan penggabungan parsial, karena dapat meningkatkan latensi.