NDB 非同步作業

當要最佳化應用程式的效能時,請考慮使用 NDB。例如,若應用程式讀取並非處於快取中的值,則讀取需要花費一些時間。您可以與其他事務並行執行 Datastore 的作業,或兩者間並行執行一些 Datastore 的作業,以加速您的應用程式。

NDB 用戶端函式庫提供很多非同步 (async) 的函式。每個函式皆能使應用程式傳送要求至 Datastore。函式會立即傳回內容,並傳回一個 Future 物件。在 Datastore 處理要求時,應用程式亦可以進行其他事情。在 Datastore 處理完要求之後,應用程式會從 Future 物件取得結果。

簡介

假設您其中一個應用程式的要求處理需要使用 NDB 寫入內容,可能是為了記錄要求,還需要執行其他的 NBD 作業,可能是為了擷取一些資料。

class MyRequestHandler(webapp2.RequestHandler):
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        acct.put()

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')

將呼叫替換為 put() 以及呼叫其非同步 put_async(),應用程式可以立即執行其他動作,不用卡在 put()

class MyRequestHandler(webapp2.RequestHandler):
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        future = acct.put_async()

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')
        future.get_result()

這能讓 Datasore 在寫入資料時,顯示其他 NBD 函式以及範本。在從 Datastore 取得資料之前,應用程式不會在 Datastore 受到封鎖。

在這個示例中,呼叫 future.get_result 就有些不合理,因為應用程式未曾「使用」來自 NDB 的結果。這段程式碼的用意是為了確保 NDB put 完成以前,要求處理常式尚未退出。如果要求處理常式太早退出,則 put 可能永遠不會發生。為了方便起見,您可以使用 @ndb.toplevel 修飾要求處理常式。這告訴處理常式在完成非同步要求之前不要退出。這讓您能夠發送要求且不必擔心其結果。

您可以將整個 WSGIApplication 指定為 ndb.toplevel。這可以確保每個 WSGIApplication 的處理程序在傳回之前,要等待所有非同步的要求。 (這不會「覆蓋」所有 WSGIApplication 的處理程序)


app = ndb.toplevel(webapp2.WSGIApplication([('/', MyRequestHandler)]))

應用 toplevel 會比其所有的處理常式函式方便許多。但如果處理常式方法使用 yield,該方法仍然需要包裹在另一個修飾器中 @ndb.synctasklet,否則處理常式將在 yield 停止執行且無法完成。

class MyRequestHandler(webapp2.RequestHandler):
    @ndb.toplevel
    def get(self):
        acct = Account.get_by_id(users.get_current_user().user_id())
        acct.view_counter += 1
        acct.put_async()  # Ignoring the Future this returns

        # ...read something else from Datastore...

        self.response.out.write('Content of the page')

使用非同步 API 及 Future

幾乎每一個非同步的 NBD 函式都有一個 _async 對應函式。例如,put() 具有 put_async()。非同步函式的參數始終與同步版本的引數相同。非同步方法所傳回的值始終是 Future 或 (針對「多」函式) Future 的列表。

「Future」是一個物件,用來維持已啟動但尚未完成的作業狀態。所有非同步 API 都會傳回一或多個 Futures。您可以呼叫 Futureget_result() 函式查看有關於運作的結果,若有必要,接下來 Future 會停止,直到結果出現並提供給您。 get_result() 會傳回 API 同步版本所傳回的值。

附註: 若您在其他程式語言使用過 Future,您可能會認為您可以直接使用 Future 做為結果,但這方法在此無效。那些程式語言使用隱含的 future,NBD 使用的是明確的 future。呼叫 get_result() 以取得 Future 的結果。

如果作業引發例外狀況怎麼辦?這取決於例外狀況發生的時間。如果 NDB 在「發出」要求時發現問題 (可能是引數類型錯誤),則 _async() 方法會引發例外狀況。但是如果在 Datastore 伺服器偵測到例外狀況,_async() 方法會傳回 Future,且在您的應用程式呼叫 get_result() 時引發例外狀況。別太擔心這一點,這最終會回歸正常表現,也許最大的區別在於若顯示出過去的內容,您將會看到一些低層級的非同步機器暴露而出。

例如,假設您正在編寫留言板的應用程式。如果使用者已登入,您想要呈現留言板最新所發佈的頁面。此頁面應該亦要向使用者顯示他們的暱稱。應用程式需要兩種資訊:已登入使用者的帳戶資訊,以及留言板所發布的內容。此應用程式的「同步」版本可能如下所示:

uid = users.get_current_user().user_id()
acct = Account.get_by_id(uid)  # I/O action 1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries = qry.fetch(10)  # I/O action 2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

這裡有兩個獨立的 I/O 作業:取得 Account 實體以及最近的 Guestbook 實體。使用同步 API 的話,這些將接連發生,我們在擷取留言板實體之前,等待收到帳戶資訊。但是應用程式不需要立即取得帳戶資訊。我們可以利用這一點並使用非同步 API:

uid = users.get_current_user().user_id()
acct_future = Account.get_by_id_async(uid)  # Start I/O action #1
qry = Guestbook.query().order(-Guestbook.post_date)
recent_entries_future = qry.fetch_async(10)  # Start I/O action #2
acct = acct_future.get_result()  # Complete #1
recent_entries = recent_entries_future.get_result()  # Complete #2

# ...render HTML based on this data...
self.response.out.write('<html><body>{}</body></html>'.format(''.join(
    '<p>{}</p>'.format(entry.content) for entry in recent_entries)))

此版本的程式碼首先建立兩個 Futures (acct_futurerecent_entries_future),並等待這兩個物件。伺服器並行處理兩個要求。每個 _async() 函數呼叫都會建立 Future 物件,並傳送要求至 Datastore 伺服器。伺服器可以立即開始處理要求。伺服器的回應能夠以任意的順序傳回,Future 物件會連結回應至其相對應的要求。

同步要求不重疊,但非同步可以重疊。
同步 vs. 非同步要求

在非同步版本中所花費的總時間大約等於作業的最大時間。同步版本中花費的總時間超過了作業時間的總和。若您可以並行執行更多作業,那麼非同步作業可以提供更多幫助。

若要查看應用程式查詢需要花多久時間,或是每個要求需要執行多少次 I/O 作業,請考慮使用 Appstats。此工具可以根據檢測設備上的執行中 App,顯示與上圖類似的圖表。

使用 Tasklet

NDB 的 tasklet 是一小段程式碼,可能與其他程式碼一同執行。若您要編寫一個 tasklet,您的應用程式可以使用非同步 NBD 函式的相似方式使用 tasklet:應用程式呼叫 tasklet,tasklet 傳回 Future,接著呼叫 Futureget_result() 方法取得結果。

tasklet 是一種可在沒有執行緒的情況下編寫並行函式的方法,由事件迴圈執行,且可以使用 yield 陳述式暫停自身,以封鎖 I/O 或其他作業。封鎖作業的概念適用於 Future 類別,但 tasklet 可能會為了等待遠端程序呼叫 (RPC) 完成而對 RPC 使用 yield。當 tasklet 產生結果時,會 raise 一個 ndb.Return 例外狀況,接著 NDB 會將結果與過去 yieldFuture 建立關聯。

當您要編寫 NBD tasklet 時,您能夠以不尋常的方式使用 yieldraise。因此,若您在查找有關於如何使用它們的範例,您可能找不到像 NBD tasklet 這樣的程式碼。

若要將函式轉換成 NBD tasklet:

  • 使用 @ndb.tasklet 修飾函式,
  • 將所有的同步資料儲存庫呼叫替換為非同步資料庫儲存呼叫的 yield
  • 使函式「傳回」其傳回值 raise ndb.Return(retval) (函式未傳回任何內容時則不需要)。

應用程式可以使用 tasklet 以更精細地控制非同步 API。例如,不妨考慮以下結構定義:

class Account(ndb.Model):
    email = ndb.StringProperty()
    nickname = ndb.StringProperty()

    def nick(self):
        return self.nickname or self.email  # Whichever is non-empty
...
class Message(ndb.Model):
    text = ndb.StringProperty()
    when = ndb.DateTimeProperty(auto_now_add=True)
    author = ndb.KeyProperty(kind=Account)  # references Account

顯示訊息時一起顯示作者的暱稱會很有用。使用「同步」方式擷取資料以顯示訊息的列表可能如下所示:

qry = Message.query().order(-Message.when)
for msg in qry.fetch(20):
    acct = msg.author.get()
    self.response.out.write(
        '<p>On {}, {} wrote:'.format(msg.when, acct.nick()))
    self.response.out.write('<p>{}'.format(msg.text))

不過這種方法的效率很低。若您在 Appstats 中查看,您會看到一連串的「Get」要求。您可能會看到以下「階梯」模式。

同步的「Get」一連串地發生
同步的「Get」會一連串地發生。

若那些「Get」能夠重疊,那麼這部分的程式會比較快。您可能會使用 get_async 重寫程式碼,不過要追蹤那些放在一起的非同步要求及訊息是相當棘手的。

應用程式能夠將其做為 tasklet 來定義自己的「非同步」函式,可讓您以較不易混淆的方式整理這些程式碼。

此外,函式應使用 acct = yield key.get_async(),而不是 acct = key.get()acct = key.get_async().get_result()。此 yield 會告訴 NDB 這是停用 tasklet 且讓其他 tasklet 執行的好時機。

使用 @ndb.tasklet 修飾生成函式,使函式傳回 Future 而非生成的物件。在 tasklet 中,Future 的任何 yield 等待並傳回 Future的結果。

例如:

@ndb.tasklet
def callback(msg):
    acct = yield msg.author.get_async()
    raise ndb.Return('On {}, {} wrote:\n{}'.format(
        msg.when, acct.nick(), msg.text))

qry = Message.query().order(-Message.when)
outputs = qry.map(callback, limit=20)
for output in outputs:
    self.response.out.write('<p>{}</p>'.format(output))

請注意,雖然 get_async() 傳回一個 Future,tasklet 框架會造成 yield 表達式傳回 Future 的結果給 acct 變數。

map() 呼叫 callback() 數次。但是 callback() 中的 yield ..._async() 讓 NBD 的排程器在等待任何一項要求完成之前,發送非同步的要求。

重疊非同步的「Get」
重疊非同步的「Get」

若您在 Appstats 中看到這個,您可能會很訝異地發現這些 Get 不僅僅是重疊,且皆在同一個要求中完成了。NBD 採用了「autobatcher」。Autobatcher 將單個批次處理 RPC 中的多個要求繫結到伺服器,以此方式執行此作業:只要還有更多工作要做 (可能執行另一個回呼),就會收集金鑰。若需要其中一個結果,autobatcher 就會發送批次 RPC。這與大多數的要求不同,查詢不是「批次」的。

當一個 tasklet 執行時,會從 tasklet 生成時的任何預設內容,或在執行過程 tasklet 更改的任何內容中,取得其預設的命名空間。換句話說,預設的命名空間不會與 context 關聯,亦不會儲存在 Context 中,且更改一個 tasklet 中的預設命名空間不會影響其他 tasklet 中的預設命名空間,除非是由 context 所生成。

Tasklet、並行查詢、並行 yield

您可以使用 tasklet,以便多個查詢同時擷取記錄。例如,假設您的應用程式有一個頁面,能夠顯示購物車內容以及特別優惠的列表。其結構定義可能如下所示:

class Account(ndb.Model):
    pass

class InventoryItem(ndb.Model):
    name = ndb.StringProperty()

class CartItem(ndb.Model):
    account = ndb.KeyProperty(kind=Account)
    inventory = ndb.KeyProperty(kind=InventoryItem)
    quantity = ndb.IntegerProperty()

class SpecialOffer(ndb.Model):
    inventory = ndb.KeyProperty(kind=InventoryItem)

擷取購物車項目以及特別優惠的「同步」函式可能如下所示:

def get_cart_plus_offers(acct):
    cart = CartItem.query(CartItem.account == acct.key).fetch()
    offers = SpecialOffer.query().fetch(10)
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

此範例使用查詢來擷取購物車項目列表及優惠,接著使用 get_multi() 擷取與庫存項目有關的詳細資料。(此函式不直接使用 get_multi() 傳回的值,其呼叫 get_multi() 擷取所有庫存詳情至快取中,以便日後可以快速讀取) get_multi 結合許多 Get 至一個要求。但是查詢擷取會接續發生。為了要讓這些擷取同時發生,請重疊兩個查詢:

def get_cart_plus_offers_async(acct):
    cart_future = CartItem.query(CartItem.account == acct.key).fetch_async()
    offers_future = SpecialOffer.query().fetch_async(10)
    cart = cart_future.get_result()
    offers = offers_future.get_result()
    ndb.get_multi([item.inventory for item in cart] +
                  [offer.inventory for offer in offers])
    return cart, offers

get_multi() 呼叫仍然是各自獨立的:其取決於查詢結果,所以您無法將其與查詢做結合。

假設此應用程式有時需要購物車,有時需要優惠,有時兩者皆需。您需要要整理您的程式碼,以做成能取得購物車的函式以及取得優惠的函式。若您的應用程式同時呼叫這兩個函式,在理想情況下他們的查詢能夠「重疊」。為此請將這些函式 tasklet:

@ndb.tasklet
def get_cart_tasklet(acct):
    cart = yield CartItem.query(CartItem.account == acct.key).fetch_async()
    yield ndb.get_multi_async([item.inventory for item in cart])
    raise ndb.Return(cart)

@ndb.tasklet
def get_offers_tasklet(acct):
    offers = yield SpecialOffer.query().fetch_async(10)
    yield ndb.get_multi_async([offer.inventory for offer in offers])
    raise ndb.Return(offers)

@ndb.tasklet
def get_cart_plus_offers_tasklet(acct):
    cart, offers = yield get_cart_tasklet(acct), get_offers_tasklet(acct)
    raise ndb.Return((cart, offers))

yield xy 很重要,但卻容易受到忽視。如果是兩個獨立的 yield 陳述式,則這兩個陳述式會一連串地發生。但是對一個 tasklet 組合進行 yield 屬於「並行 yield」:tasklet 可並行執行,且 yield 會等待所有內容完成並傳回結果 (在一些程式語言中,會稱為「障礙 (barrier)」)。

若您將一小段程式碼轉換為一個 tasklet,您可能希望盡快繼續轉換。若您注意到可以與 tasklet 並行執行的「同步」程式碼,那麼將其做為 tasklet 或許會是個好方法。然後,您可以使用並行 yield 進行平行處理。

若您編寫一個要求函式 (webapp2要求函式、Django 查看函式等) 做為 tasklet,其作業並不會符合您的預期:tasklet 會在 yield 後即停止執行。在這種情況下,最好使用 @ndb.synctasklet 修式函式。 @ndb.synctasklet 類似於 @ndb.tasklet,但是改為在 tasklet 上呼叫 get_result()。這會將您的 tasklet 轉換為一個函式,以平常的方式傳回您的結果。

在 tasklet 中查詢迭代

若要在 tasklet 中迭代查詢結果,請使用以下模式:

qry = Model.query()
qit = qry.iter()
while (yield qit.has_next_async()):
    entity = qit.next()
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

此與以下內容相等且適用 tasklet:

# DO NOT DO THIS IN A TASKLET
qry = Model.query()
for entity in qry:
    # Do something with entity
    if is_the_entity_i_want(entity):
        raise ndb.Return(entity)

第一個版本中的三個粗體部分與第二個版本中的單一粗體部分相等且適用 tasklet。tasklet 只能在 yield 關鍵字處停用。循環缺乏 yield 不會使其他 tasklet 執行。

您可能想知道為何此程式碼完全使用查詢迭代,而非使用 qry.fetch_async() 擷取所有的實體。應用程式可能有很多不適用 RAM 的實體。也許您也正在尋找一個實體,找到後即可停止迭代,但是您不能僅使用查詢語言來表達搜尋條件。您可以使用迭代器載入要檢查的實體,然後在找到所需內容時中斷迴圈。

使用 NDB 進行非同步的 Urlfetch

NDB Context 具有非同步的 urlfetch() 函式,能與 NBD 的 tasklet 順暢地平行處理,例如:

@ndb.tasklet
def get_google():
    context = ndb.get_context()
    result = yield context.urlfetch("http://www.google.com/")
    if result.status_code == 200:
        raise ndb.Return(result.content)

URL Fetch 服務具有自己的非同步要求 API。這並不成問題,但有時不方便與 NDB tasklet 一起使用。

使用非同步交易

交易亦可以同步執行。您可以將現有的函式傳送至 ndb.transaction_async() 或是使用 @ndb.transactional_async 修飾器。與其他非同步函式相同,會傳回 NDB 的 Future

@ndb.transactional_async
def update_counter(counter_key):
    counter = counter_key.get()
    counter.value += 1
    counter.put()

交易也可與 tasklet 一起執行。例如,我們可以在等待封鎖 RPC 時將 update_counter 程式碼變更為 yield

@ndb.transactional_tasklet
def update_counter(counter_key):
    counter = yield counter_key.get_async()
    counter.value += 1
    yield counter.put_async()

使用 Future.wait_any()

有時您需要建立多個非同步要求,並在第一個完成時即傳回。此時,您可以使用 ndb.Future.wait_any() 類別方法:

def get_first_ready():
    urls = ["http://www.google.com/", "http://www.blogspot.com/"]
    context = ndb.get_context()
    futures = [context.urlfetch(url) for url in urls]
    first_future = ndb.Future.wait_any(futures)
    return first_future.get_result().content

不過,您無法輕鬆地將上述程式碼轉換為 tasklet;平行 yield 會等待所有 Future 完成,包含那些您不想等待的。

本頁內容對您是否有任何幫助?請提供意見:

傳送您對下列選項的寶貴意見...

這個網頁
Python 2 適用的 App Engine 標準環境