Operasi Asinkron NDB

Saat mengoptimalkan performa aplikasi, pertimbangkan penggunaan NDB-nya. Misalnya, jika aplikasi membaca nilai yang tidak ada dalam cache, proses baca tersebut akan memakan waktu cukup lama. Anda mungkin dapat mempercepat aplikasi dengan melakukan tindakan Datastore secara paralel dengan hal-hal lain, atau melakukan beberapa tindakan Datastore secara paralel.

Library Klien NDB menyediakan banyak fungsi asinkron ("async"). Masing-masing fungsi ini memungkinkan aplikasi mengirim permintaan ke Datastore. Fungsi ini segera ditampilkan, yang menampilkan objek Future. Aplikasi dapat melakukan hal-hal lain saat Datastore menangani permintaan. Setelah Datastore menangani permintaan, aplikasi dapat memperoleh hasil dari objek Future.

Pengantar

Misalkan salah satu pengendali permintaan aplikasi Anda perlu menggunakan NDB untuk menulis sesuatu, mungkin untuk merekam permintaan tersebut. Aplikasi tersebut juga perlu melakukan beberapa operasi NDB lainnya, mungkin untuk mengambil beberapa data.

class MyRequestHandler(webapp2.RequestHandler):
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        acct.put()

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')

Dengan mengganti panggilan ke put() dengan panggilan ke put_async() yang setara asinkron, aplikasi dapat langsung melakukan hal-hal lain, bukan memblokir put().

class MyRequestHandler(webapp2.RequestHandler):
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        future = acct.put_async()

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')
        future.get_result()

Hal ini memungkinkan fungsi NDB dan rendering template lainnya terjadi saat Datastore menulis data. Aplikasi tidak akan melakukan pemblokiran pada Datastore hingga mendapatkan data dari Datastore.

Dalam contoh ini, kurang sesuai jika kita memanggil future.get_result: aplikasi tersebut tidak pernah menggunakan hasil dari NDB. Kode tersebut ada untuk memastikan bahwa pengendali permintaan tidak keluar sebelum put NDB selesai; jika pengendali permintaan keluar terlalu awal, penangguhan tidak akan pernah terjadi. Untuk memudahkan, Anda dapat mendekorasi pengendali permintaan dengan @ndb.toplevel. Atribut ini memberi tahu pengendali agar tidak keluar hingga permintaan asinkronnya selesai. Dengan demikian, Anda dapat mengirimkan permintaan dan tidak perlu mengkhawatirkan hasilnya.

Anda dapat menentukan keseluruhan WSGIApplication sebagai ndb.toplevel. Hal ini akan memastikan bahwa setiap pengendali WSGIApplication menunggu semua permintaan asinkron sebelum ditampilkan. (Tidak melakukan "level teratas" semua pengendali WSGIApplication.)


app = ndb.toplevel(webapp2.WSGIApplication([('/', MyRequestHandler)]))

Menggunakan aplikasi toplevel lebih mudah daripada semua fungsi pengendalinya. Namun, jika metode pengendali menggunakan yield, metode tersebut masih harus digabungkan dalam dekorator lain, @ndb.synctasklet; jika tidak, eksekusi akan berhenti di yield dan tidak selesai.

class MyRequestHandler(webapp2.RequestHandler):
    @ndb.toplevel
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        acct.put_async()  # Ignoring the Future this returns

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')

Menggunakan Async API dan Future

Hampir setiap fungsi NDB sinkron memiliki pasangan _async. Misalnya, put() memiliki put_async(). Argumen fungsi asinkron selalu sama dengan argumen versi sinkron. Nilai hasil metode asinkron selalu berupa Future atau daftar Future (untuk fungsi "multi").

Future adalah objek yang mempertahankan status untuk operasi yang telah dimulai, tetapi mungkin belum selesai; semua API asinkron menampilkan satu atau beberapa Futures. Anda dapat memanggil fungsi get_result() Future untuk menanyakan hasil operasinya; Future akan memblokir, jika perlu, sampai hasilnya tersedia, kemudian memberikannya kepada Anda. get_result() menampilkan nilai yang akan ditampilkan oleh API versi sinkron.

Catatan: Jika telah menggunakan Future dalam bahasa pemrograman tertentu lainnya, Anda mungkin berpikir dapat menggunakan Future sebagai hasilnya secara langsung. Hal tersebut tidak bisa dilakukan di sini. Bahasa tersebut menggunakan future implisit; NDB menggunakan future eksplisit. Panggil get_result() untuk mendapatkan hasil Future NDB.

Bagaimana jika operasi memunculkan pengecualian? Hal ini bergantung pada kapan pengecualian terjadi. Jika NDB menemukan masalah saat membuat permintaan (mungkin argumen dari jenis yang salah), metode _async() akan memunculkan pengecualian. Tetapi, jika pengecualian terdeteksi oleh, misalnya, server Datastore,_async() akan menampilkan Future , dan pengecualian akan diterapkan saat aplikasi Anda memanggil get_result()-nya. Jangan terlalu khawatir tentang hal ini, semuanya akhirnya berperilaku cukup alami; mungkin perbedaan terbesarnya adalah jika traceback dicetak, Anda akan melihat beberapa mesin asinkron tingkat rendah terekspos.

Misalnya, Anda menulis aplikasi buku tamu. Jika pengguna login, Anda ingin menampilkan halaman yang menampilkan postingan buku tamu terbaru. Halaman ini juga harus menampilkan nama panggilan pengguna. Aplikasi memerlukan dua jenis informasi: informasi akun pengguna yang login dan isi postingan buku tamu. Versi "sinkron" aplikasi ini mungkin terlihat seperti ini:

uid = users.get_current_user().user_id()
acct = Account.get_by_id(uid)  # I/O action 1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries = qry.fetch(10)  # I/O action 2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

Ada dua tindakan I/O independen di sini: mendapatkan entity Account dan mengambil entity Guestbook terbaru. API sinkron digunakan secara berurutan; kami menunggu untuk menerima informasi akun sebelum mengambil entity buku tamu. Namun aplikasi tidak membutuhkan informasi akun itu segera. Kita dapat memanfaatkan ini dan menggunakan API asinkron:

uid = users.get_current_user().user_id()
acct_future = Account.get_by_id_async(uid)  # Start I/O action #1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries_future = qry.fetch_async(10)  # Start I/O action #2
acct = acct_future.get_result()  # Complete #1
recent_entries = recent_entries_future.get_result()  # Complete #2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

Versi kode ini pertama-tama membuat dua Futures (acct_future dan recent_entries_future), lalu menunggunya. Server bekerja pada kedua permintaan secara paralel. Setiap panggilan fungsi _async() membuat objek Future dan mengirim permintaan ke server Datastore. Server dapat segera mulai menangani permintaan. Respons server dapat kembali dalam urutan arbitrer; link objek Future merespons permintaannya yang sesuai.

Permintaan sinkron tidak tumpang tindih, tetapi permintaan asinkron bisa tumpang tindih.
Permintaan Sinkron vs. Asinkron

Total waktu (nyata) yang dihabiskan dalam versi asinkron kira-kira sama dengan waktu maksimum di seluruh operasi. Total waktu yang dihabiskan dalam versi sinkron melebihi jumlah waktu operasi. Jika Anda dapat menjalankan lebih banyak operasi secara paralel, operasi asinkron akan lebih membantu.

Untuk melihat durasi kueri aplikasi Anda atau jumlah operasi I/O yang dilakukan per permintaan, pertimbangkan untuk menggunakan Appstats. Alat ini dapat menampilkan diagram yang mirip dengan gambar di atas berdasarkan instrumentasi aplikasi aktif.

Menggunakan Tasklet

Tasklet NDB adalah potongan kode yang dapat berjalan serentak dengan kode lainnya. Jika Anda menulis tasklet, aplikasi dapat menggunakannya seperti menggunakan fungsi NDB asinkron: aplikasi memanggil tasklet, yang menampilkan Future; kemudian, memanggil metode get_result() Future akan mendapatkan hasil.

Tasklet adalah cara untuk menulis fungsi serentak tanpa rangkaian pesan; tasklet dijalankan oleh loop peristiwa dan dapat menangguhkan pemblokirannya sendiri untuk I/O atau beberapa operasi lain menggunakan pernyataan hasil. Gagasan tentang operasi pemblokiran diabstraksikan ke dalam class Future, tetapi tasklet juga dapat yield RPC untuk menunggu RPC tersebut selesai. Ketika tasklet memberikan hasil, tasklet akan meng-raise pengecualian ndb.Return; NDB kemudian mengaitkan hasilnya dengan hasil operasi Future yang di-yield sebelumnya.

Saat menulis tasklet NDB, Anda menggunakan yield dan raise dengan cara yang tidak biasa. Oleh karena itu, jika mencari contoh cara menggunakannya, Anda mungkin tidak akan menemukan kode seperti tasklet NDB.

Untuk mengubah fungsi menjadi tasklet NDB:

  • dekorasi fungsi dengan @ndb.tasklet,
  • ganti semua panggilan datastore sinkron dengan yield panggilan datastore asinkron,
  • buat fungsi "menampilkan" nilai yang ditampilkan dengan raise ndb.Return(retval) (tidak diperlukan jika fungsi tidak menampilkan apa pun).

Aplikasi bisa menggunakan tasklet untuk kontrol yang lebih baik atas API asinkron. Sebagai contoh, perhatikan skema berikut:

class Account(ndb.Model):
    email = ndb.StringProperty()
    nickname = ndb.StringProperty()

    def nick(self):
        return self.nickname or self.email  # Whichever is non-empty
...
class Message(ndb.Model):
    text = ndb.StringProperty()
    when = ndb.DateTimeProperty(auto_now_add=True)
    author = ndb.KeyProperty(kind=Account)  # references Account

Saat menampilkan pesan, sebaiknya tampilkan nama panggilan penulis. Cara "sinkron" untuk mengambil data guna menampilkan daftar pesan mungkin terlihat seperti ini:

qry = Message.query().order(-Message.when)
for msg in qry.fetch(20):
    acct = msg.author.get()
    self.response.out.write(
        '<p>On {}, {} wrote:'.format(msg.when, acct.nick()))
    self.response.out.write('<p>{}'.format(msg.text))

Sayangnya, pendekatan ini tidak efisien. Jika melihatnya di Appstats, Anda akan melihat bahwa permintaan "Get" berada dalam rangkaian. Anda mungkin melihat pola "tangga" berikut.

Permintaan &quot;Get&quot; yang sinkron terjadi secara berurutan
"Get" yang sinkron terjadi secara berurutan.

Bagian program ini akan lebih cepat jika "Get" itu bisa tumpang tindih. Anda dapat menulis ulang kode untuk menggunakan get_async, tetapi akan sulit untuk melacak permintaan dan pesan asinkron mana yang saling terkait.

Aplikasi dapat menentukan fungsi "async"-nya sendiri dengan menjadikannya sebagai tasklet. Ini memungkinkan Anda untuk mengatur kode dengan cara yang tidak terlalu membingungkan.

Selain itu, daripada menggunakan acct = key.get() atau acct = key.get_async().get_result(), fungsi tersebut harus menggunakan acct = yield key.get_async(). yield ini memberi tahu NDB bahwa ini adalah tempat yang tepat untuk menangguhkan tasklet ini dan memungkinkan tasklet lainnya berjalan.

Mendekorasi fungsi generator dengan @ndb.tasklet akan membuat fungsi tersebut menampilkan Future, bukan objek generator. Dalam tasklet, yield mana pun dari Future akan menunggu dan menampilkan hasil Future.

Contoh:

@ndb.tasklet
def callback(msg):
    acct = yield msg.author.get_async()
    raise ndb.Return('On {}, {} wrote:\n{}'.format(
        msg.when, acct.nick(), msg.text))

qry = Message.query().order(-Message.when)
outputs = qry.map(callback, limit=20)
for output in outputs:
    self.response.out.write('<p>{}</p>'.format(output))

Perhatikan bahwa meskipun get_async() menampilkan Future, kerangka kerja tasklet menyebabkan ekspresi yield menampilkan hasil Future ke variabel acct.

map() memanggil callback() beberapa kali. Namun, yield ..._async() di callback() memungkinkan penjadwal NDB mengirim banyak permintaan asinkron sebelum menunggu salah satunya selesai.

Pemintaan &quot;Get&quot; Asinkron yang Tumpang Tindih
Pemintaan "Get" Asinkron yang Tumpang Tindih

Jika melihat hal ini di Appstats, Anda mungkin terkejut melihat beberapa "Get" ini tidak hanya tumpang-tindih, tetapi semuanya berfungsi dalam permintaan yang sama. NDB akan menerapkan "autobatcher". Autobatcher memaketkan beberapa permintaan dalam satu batch RPC ke server; hal ini dilakukan sedemikian rupa agar selama masih ada banyak pekerjaan yang harus dilakukan (callback lain dapat berjalan), pengumpulan kunci tetap bisa dilakukan. Segera setelah salah satu hasil diperlukan, autobatcher akan mengirimkan batch RPC. Tidak seperti kebanyakan permintaan, kueri tidak "dikelompokkan".

Saat berjalan, tasklet akan mendapatkan namespace defaultnya dari apa pun default-nya saat tasklet muncul, atau apa pun yang diubah tasklet saat berjalan. Dengan kata lain, namespace default tidak terkait dengan atau disimpan dalam Konteks, dan mengubah namespace default dalam satu tasklet tidak memengaruhi namespace default di tasklet lainnya, kecuali yang dihasilkannya.

Tasklet, Kueri Paralel, Hasil Paralel

Anda dapat menggunakan tasklet agar beberapa kueri mengambil data secara bersamaan. Misalnya, aplikasi Anda memiliki sebuah halaman yang menampilkan isi keranjang belanja dan daftar penawaran spesial. Skemanya mungkin terlihat seperti ini:

class Account(ndb.Model):
    pass


class InventoryItem(ndb.Model):
    name = ndb.StringProperty()


class CartItem(ndb.Model):
    account = ndb.KeyProperty(kind=Account)
    inventory = ndb.KeyProperty(kind=InventoryItem)
    quantity = ndb.IntegerProperty()


class SpecialOffer(ndb.Model):
    inventory = ndb.KeyProperty(kind=InventoryItem)

Fungsi "sinkron" yang mendapatkan item keranjang belanja dan penawaran spesial mungkin terlihat seperti berikut:

def get_cart_plus_offers(acct):
    cart = CartItem.query(CartItem.account == acct.key).fetch()
    offers = SpecialOffer.query().fetch(10)
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

Contoh ini menggunakan kueri untuk mengambil daftar item keranjang dan penawaran; kemudian, kode ini akan mengambil detail tentang item inventaris dengan get_multi(). (Fungsi ini tidak menggunakan nilai yang ditampilkan get_multi() secara langsung. API ini memanggil get_multi() untuk mengambil semua detail inventaris ke dalam cache sehingga dapat dibaca dengan cepat nanti.) get_multi menggabungkan banyak "Get" menjadi satu permintaan. Namun, pengambilan kueri terjadi satu per satu. Agar pengambilan tersebut terjadi secara bersamaan, tumpangkan kedua kueri tersebut:

def get_cart_plus_offers_async(acct):
    cart_future = CartItem.query(CartItem.account == acct.key).fetch_async()
    offers_future = SpecialOffer.query().fetch_async(10)
    cart = cart_future.get_result()
    offers = offers_future.get_result()
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

Panggilan get_multi() masih terpisah: bergantung pada hasil kueri, sehingga Anda tidak dapat menggabungkannya dengan kueri.

Misalkan, aplikasi ini terkadang memerlukan keranjang, terkadang penawaran, dan terkadang keduanya. Anda ingin mengatur kode sehingga ada fungsi untuk mendapatkan keranjang dan fungsi untuk mendapatkan penawaran. Jika aplikasi Anda memanggil fungsi-fungsi tersebut secara bersamaan, idealnya kuerinya bisa "tumpang tindih". Untuk melakukannya, buat fungsi ini menjadi tasklet:

@ndb.tasklet
def get_cart_tasklet(acct):
    cart = yield CartItem.query(CartItem.account == acct.key).fetch_async()
    yield ndb.get_multi_async([item.inventory for item in cart])
    raise ndb.Return(cart)


@ndb.tasklet
def get_offers_tasklet(acct):
    offers = yield SpecialOffer.query().fetch_async(10)
    yield ndb.get_multi_async([offer.inventory for offer in offers])
    raise ndb.Return(offers)


@ndb.tasklet
def get_cart_plus_offers_tasklet(acct):
    cart, offers = yield get_cart_tasklet(acct), get_offers_tasklet(acct)
    raise ndb.Return((cart, offers))

yield xy itu penting tetapi mudah diabaikan. Jika ada dua pernyataan yield yang berbeda, keduanya akan terjadi secara berurutan. Namun, meng-yield sebuah tuple tasklet berarti hasil paralel: tasklet yang dapat berjalan secara paralel dan yield menunggu sampai semuanya selesai dan menampilkan hasilnya. (Dalam beberapa bahasa pemrograman, hal ini dikenal sebagai penghalang.)

Jika satu potongan kode menjadi tasklet, Anda mungkin ingin segera melakukan lebih banyak hal. Jika Anda melihat kode "sinkron" yang dapat berjalan secara paralel dengan tasklet, sebaiknya jadikan juga sebagai tasklet. Nantinya Anda dapat memparalelkannya dengan yield paralel.

Jika Anda menulis fungsi permintaan (fungsi permintaan webapp2, fungsi tampilan Django, dll.) menjadi tasklet, fungsi tersebut tidak akan melakukan apa yang Anda inginkan: fungsi tersebut menghasilkan tetapi kemudian berhenti berjalan. Dalam situasi ini, Anda ingin mendekorasi fungsi dengan @ndb.synctasklet. @ndb.synctasklet mirip dengan @ndb.tasklet tetapi diubah untuk memanggil get_result() di tasklet. Tindakan ini mengubah tasklet menjadi fungsi yang menampilkan hasilnya dengan cara biasa.

Iterator Kueri di Tasklet

Untuk melakukan iterasi atas hasil kueri dalam tasklet, gunakan pola berikut:

qry = Model.query()
qit = qry.iter()
while (yield qit.has_next_async()):
    entity = qit.next()
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

Ini adalah tugas yang cocok dengan tasklet yang setara dengan berikut ini:

# DO NOT DO THIS IN A TASKLET
qry = Model.query()
for entity in qry:
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

Tiga baris tebal di versi pertama adalah versi yang cocok untuk tugas yang setara dengan satu baris tebal di versi kedua. Tasklet hanya dapat ditangguhkan dengan kata kunci yield. Loop for yang tidak memiliki yield tidak mengizinkan tasklet lainnya berjalan.

Anda mungkin bertanya-tanya mengapa kode ini malah menggunakan iterator kueri, bukan mengambil semua entity menggunakan qry.fetch_async(). Aplikasi mungkin memiliki begitu banyak entity yang tidak muat di dalam RAM. Mungkin Anda sedang mencari entity dan dapat berhenti melakukan iterasi setelah menemukannya; tetapi Anda tidak dapat mengekspresikan kriteria pencarian Anda hanya dengan bahasa kueri. Anda dapat menggunakan iterator untuk memuat entity yang akan diperiksa, lalu keluar dari loop saat menemukan apa yang Anda inginkan.

Urlfetch Asinkron dengan NDB

Context NDB memiliki fungsi urlfetch() asinkron yang diparalelkan dengan baik dengan tasklet NDB, misalnya:

@ndb.tasklet
def get_google():
    context = ndb.get_context()
    result = yield context.urlfetch("http://www.google.com/")
    if result.status_code == 200:
        raise ndb.Return(result.content)

Layanan URL-fetch memiliki API permintaan asinkronnya sendiri. Tindakan ini diperbolehkan, tetapi tidak selalu mudah digunakan dengan tasklet NDB.

Menggunakan Transaksi Asinkron

Transaksi juga dapat dilakukan secara asinkron. Anda dapat meneruskan fungsi yang ada ke ndb.transaction_async(), atau menggunakan dekorator @ndb.transactional_async. Seperti fungsi asinkron lainnya, fungsi ini akan menampilkan Future NDB:

@ndb.transactional_async
def update_counter(counter_key):
    counter = counter_key.get()
    counter.value += 1
    counter.put()

Transaksi juga berfungsi dengan tasklet. Misalnya, kita dapat mengubah kode update_counter menjadi yield saat menunggu pemblokiran RPC:

@ndb.transactional_tasklet
def update_counter(counter_key):
    counter = yield counter_key.get_async()
    counter.value += 1
    yield counter.put_async()

Menggunakan Future.wait_any()

Terkadang, Anda ingin membuat beberapa permintaan asinkron dan kembali setiap kali permintaan pertama selesai. Anda dapat melakukannya menggunakan metode class ndb.Future.wait_any():

def get_first_ready():
    urls = ["http://www.google.com/", "http://www.blogspot.com/"]
    context = ndb.get_context()
    futures = [context.urlfetch(url) for url in urls]
    first_future = ndb.Future.wait_any(futures)
    return first_future.get_result().content

Sayangnya, tidak ada cara mudah untuk mengubahnya menjadi tasklet; yield paralel menunggu semua Future selesai, termasuk yang tidak ingin Anda tunggu.