Menggunakan beberapa model dalam pipeline

Anda dapat menggunakan RunInference API untuk mem-build pipeline yang berisi beberapa model. Pipeline multi-model berguna untuk tugas-tugas seperti pengujian A/B dan membangun ansambel untuk memecahkan masalah bisnis yang memerlukan lebih dari satu model ML.

Menggunakan beberapa model

Contoh kode berikut menunjukkan cara menggunakan transformasi RunInference untuk menambahkan beberapa model ke pipeline Anda.

Saat membangun pipeline dengan beberapa model, Anda dapat menggunakan salah satu dari dua pola:

  • Pola cabang A/B: Satu bagian dari data input masuk ke satu model, dan sisa data masuk ke model kedua.
  • Pola urutan: Data input melintasi dua model, satu demi satu.

Pola A/B

Kode berikut menunjukkan cara menambahkan pola A/B ke pipeline dengan transformasi RunInference.

with pipeline as p:
   data = p | 'Read' >> beam.ReadFromSource('a_source')
   model_a_predictions = data | RunInference(MODEL_HANDLER_A)
   model_b_predictions = data | RunInference(MODEL_HANDLER_B)

MODEL_HANDLER_A dan MODEL_HANDLER_B adalah kode penyiapan pengendali model.

Diagram berikut memberikan presentasi visual tentang proses ini.

Diagram yang menampilkan alur kerja multi-model pola A/B.

Pola urutan

Kode berikut menunjukkan cara menambahkan pola urutan ke pipeline dengan transformasi RunInference.

with pipeline as p:
   data = p | 'Read' >> beam.ReadFromSource('A_SOURCE')
   model_a_predictions = data | RunInference(MODEL_HANDLER_A)
   model_b_predictions = model_a_predictions | beam.Map(some_post_processing) | RunInference(MODEL_HANDLER_B)

MODEL_HANDLER_A dan MODEL_HANDLER_B adalah kode penyiapan pengendali model.

Diagram berikut memberikan presentasi visual tentang proses ini.

Diagram yang menunjukkan alur kerja multi-model pola urutan.

Memetakan model ke kunci

Anda dapat memuat beberapa model dan memetakannya ke kunci menggunakan pengendali model dengan kunci. Memetakan model ke kunci memungkinkan penggunaan berbagai model dalam transformasi RunInference yang sama. Contoh berikut menggunakan pengendali model dengan kunci yang memuat satu model menggunakan CONFIG_1 dan model kedua menggunakan CONFIG_2. Pipeline menggunakan model yang terkait dengan CONFIG_1 untuk menjalankan inferensi pada contoh yang terkait dengan KEY_1. Model yang terkait dengan CONFIG_2 menjalankan inferensi pada contoh yang terkait dengan KEY_2 dan KEY_3.

from apache_beam.ml.inference.base import KeyedModelHandler
keyed_model_handler = KeyedModelHandler([
  KeyModelMapping(['KEY_1'], PytorchModelHandlerTensor(CONFIG_1)),
  KeyModelMapping(['KEY_2', 'KEY_3'], PytorchModelHandlerTensor(CONFIG_2))
])
with pipeline as p:
   data = p | beam.Create([
      ('KEY_1', torch.tensor([[1,2,3],[4,5,6],...])),
      ('KEY_2', torch.tensor([[1,2,3],[4,5,6],...])),
      ('KEY_3', torch.tensor([[1,2,3],[4,5,6],...])),
   ])
   predictions = data | RunInference(keyed_model_handler)

Untuk contoh yang lebih mendetail, baca artikel Menjalankan inferensi ML dengan beberapa model yang dilatih secara berbeda.

Mengelola memori

Saat memuat beberapa model sekaligus, Anda mungkin akan mengalami error memori habis (OOM). Saat Anda menggunakan pengendali model dengan kunci, Apache Beam tidak secara otomatis membatasi jumlah model yang dimuat ke dalam memori. Jika model tidak semuanya sesuai dengan memori, akan terjadi error kehabisan memori, dan pipeline akan gagal.

Untuk menghindari masalah ini, gunakan parameter max_models_per_worker_hint untuk membatasi jumlah model yang dimuat ke dalam memori secara bersamaan. Contoh berikut menggunakan pengendali model dengan kunci dengan parameter max_models_per_worker_hint. Karena nilai parameter max_models_per_worker_hint disetel ke 2, pipeline memuat maksimum dua model pada setiap proses pekerja SDK secara bersamaan.

mhs = [
  KeyModelMapping(['KEY_1'], PytorchModelHandlerTensor(CONFIG_1)),
  KeyModelMapping(['KEY_2', 'KEY_3'], PytorchModelHandlerTensor(CONFIG_2)),
  KeyModelMapping(['KEY_4'], PytorchModelHandlerTensor(CONFIG_3)),
  KeyModelMapping(['KEY_5', 'KEY_5', 'KEY_6'], PytorchModelHandlerTensor(CONFIG_4)),
]
keyed_model_handler = KeyedModelHandler(mhs, max_models_per_worker_hint=2)

Saat mendesain pipeline, pastikan pekerja memiliki memori yang cukup untuk model dan transformasi pipeline. Karena memori yang digunakan oleh model mungkin tidak segera dirilis, untuk menghindari OOM, sertakan buffer memori tambahan.

Jika Anda memiliki banyak model dan menggunakan nilai rendah dengan parameter max_models_per_worker_hint, Anda mungkin mengalami memory thrashing. Memory thrashing terjadi saat waktu eksekusi yang berlebihan digunakan untuk menukar model ke dalam dan ke luar memori. Untuk menghindari masalah ini, sertakan transformasi GroupByKey di pipeline sebelum langkah inferensi. Transformasi GroupByKey memastikan bahwa elemen dengan kunci dan model yang sama terletak di pekerja yang sama.

Pelajari lebih lanjut