使用互動式執行器開發 Apache Beam 筆記本

搭配使用 Apache Beam 互動式執行器和 JupyterLab 筆記本,完成下列工作:

  • 反覆開發管道。
  • 檢查管道圖。
  • 在「讀取-求值-印出迴圈」(REPL) 工作流程中剖析個別 PCollections

這些 Apache Beam 筆記本可透過 Vertex AI Workbench 取得,這項服務會代管筆記本虛擬機器,並預先安裝最新的數據資料學和機器學習架構。Dataflow 僅支援使用 Apache Beam 容器的 Workbench 執行個體。

本指南著重於 Apache Beam 筆記本推出的功能,但不會說明如何建構筆記本。如要進一步瞭解 Apache Beam,請參閱 Apache Beam 程式設計指南

支援與限制

  • Apache Beam 筆記本僅支援 Python。
  • 在這些筆記本中執行的 Apache Beam 管道區段,會在測試環境中執行,而不是針對正式的 Apache Beam 執行器。如要在 Dataflow 服務上啟動筆記本,請匯出在 Apache Beam 筆記本中建立的管道。詳情請參閱「從筆記本中建立的管道啟動 Dataflow 工作」。

事前準備

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  3. Verify that billing is enabled for your Google Cloud project.

  4. Enable the Compute Engine and Notebooks APIs.

    Enable the APIs

  5. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  6. Verify that billing is enabled for your Google Cloud project.

  7. Enable the Compute Engine and Notebooks APIs.

    Enable the APIs

  8. 建立 Apache Beam 筆記本執行個體前,請為使用其他服務 (例如 Pub/Sub) 的管道啟用額外 API。

    如未指定,筆記本執行個體會由預設的 Compute Engine 服務帳戶執行,並具備 IAM 專案編輯者角色。如果專案明確限制服務帳戶的角色,請確認該帳戶仍有足夠的授權可執行筆記本。舉例來說,從 Pub/Sub 主題讀取資料會隱含建立訂閱項目,而您的服務帳戶需要 IAM Pub/Sub 編輯者角色。相較之下,從 Pub/Sub 訂閱項目讀取資料只需要 IAM Pub/Sub 訂閱者角色。

    完成本指南後,請刪除您建立的資源,以免系統繼續計費。詳情請參閱清除所用資源一節。

    啟動 Apache Beam 筆記本執行個體

    1. 前往 Google Cloud 控制台的 Dataflow「Workbench」Workbench頁面。

      前往工作台

    2. 確認目前顯示的是「INSTANCES」分頁。

    3. 按一下工具列中的「新建」圖示

    4. 在「環境」部分,請確認「環境」和「容器」皆為「Apache Beam」。Apache Beam 筆記本僅支援 JupyterLab 3.x。

    5. 選用:如要在 GPU 上執行筆記本,請在「Machine type」(機器類型) 部分中,選取支援 GPU 的機器類型。詳情請參閱「GPU 平台」。

    6. 在「Networking」部分中,選取 Notebook VM 的子網路。

    7. 選用:如要設定自訂筆記本執行個體,請參閱「使用自訂容器建立執行個體」。

    8. 按一下「建立」,Dataflow Workbench 會建立新的 Apache Beam 筆記本執行個體。

    9. 筆記本執行個體建立完成後,「Open JupyterLab」連結就會啟用。 點選「Open JupyterLab」

    選用:安裝依附元件

    Apache Beam 筆記本已安裝 Apache Beam 和Google Cloud 連接器依附元件。如果管線包含依附於第三方程式庫的自訂連線器或自訂 PTransforms,請在建立筆記本執行個體後安裝這些程式庫。

    Apache Beam 範例筆記本

    建立筆記本執行個體後,請在 JupyterLab 中開啟。在 JupyterLab 側欄的「Files」分頁中,「Examples」資料夾包含範例筆記本。如要進一步瞭解如何使用 JupyterLab 檔案,請參閱 JupyterLab 使用者指南中的「使用檔案」。

    可用的筆記本如下:

    • 字數計算
    • 串流字數統計
    • 串流紐約市計程車乘車資料
    • 筆記本中的 Apache Beam SQL,並與管道進行比較
    • 在筆記本中使用 Apache Beam SQL 和 Dataflow 執行器
    • 筆記本中的 Apache Beam SQL
    • Dataflow 字數
    • 大規模互動式 Flink
    • RunInference
    • 搭配使用 GPU 和 Apache Beam
    • 以視覺化方式呈現資料

    「Tutorials」資料夾包含其他教學課程,說明 Apache Beam 的基本概念。以下是可用的教學課程:

    • 基本作業
    • 元素級別作業
    • 匯總資料
    • Windows
    • I/O 作業
    • 串流
    • 最終練習

    這些筆記本包含說明文字和註解程式碼區塊,可協助您瞭解 Apache Beam 概念和 API 用法。教學課程也會提供練習,讓您實際運用所學概念。

    以下各節會使用「串流字數統計」筆記本中的程式碼範例。本指南中的程式碼片段與「Streaming Word Count」筆記本中的程式碼片段可能略有差異。

    建立筆記本執行個體

    依序前往「File」>「New」>「Notebook」,然後選取 Apache Beam 2.22 以上版本。

    Apache Beam 筆記本是根據 Apache Beam SDK 的主要分支版本建構而成。也就是說,筆記本 UI 中顯示的最新核心版本,可能比最近發布的 SDK 版本還新。

    Apache Beam 已安裝在筆記本執行個體上,因此請在筆記本中加入 interactive_runnerinteractive_beam 模組。

    import apache_beam as beam
    from apache_beam.runners.interactive.interactive_runner import InteractiveRunner
    import apache_beam.runners.interactive.interactive_beam as ib
    

    如果筆記本使用其他 Google API,請新增下列匯入陳述式:

    from apache_beam.options import pipeline_options
    from apache_beam.options.pipeline_options import GoogleCloudOptions
    import google.auth
    

    設定互動選項

    下一行會設定 InteractiveRunner 從無界限來源記錄資料的時間量。在本範例中,時間長度設為 10 分鐘。

    ib.options.recording_duration = '10m'
    

    您也可以使用 recording_size_limit 屬性,變更無界限來源的錄音大小限制 (以位元組為單位)。

    # Set the recording size limit to 1 GB.
    ib.options.recording_size_limit = 1e9
    

    如需其他互動式選項,請參閱 interactive_beam.options 類別

    建立管道

    使用 InteractiveRunner 物件初始化管道。

    options = pipeline_options.PipelineOptions(flags={})
    
    # Set the pipeline mode to stream the data from Pub/Sub.
    options.view_as(pipeline_options.StandardOptions).streaming = True
    
    # Set the project to the default project in your current Google Cloud environment.
    # The project is used to create a subscription to the Pub/Sub topic.
    _, options.view_as(GoogleCloudOptions).project = google.auth.default()
    
    p = beam.Pipeline(InteractiveRunner(), options=options)
    

    讀取及以圖表呈現資料

    下例顯示 Apache Beam 管道,該管道會建立指定 Pub/Sub 主題的訂閱項目,並從該訂閱項目讀取資料。

    words = p | "read" >> beam.io.ReadFromPubSub(topic="projects/pubsub-public-data/topics/shakespeare-kinglear")
    

    管道會依來源的視窗計算字數。這會建立固定時間區間,每個時間區間的持續時間為 10 秒。

    windowed_words = (words
       | "window" >> beam.WindowInto(beam.window.FixedWindows(10)))
    

    資料經過視窗化處理後,系統會計算每個視窗中的字詞。

    windowed_word_counts = (windowed_words
       | "count" >> beam.combiners.Count.PerElement())
    

    show() 方法會在筆記本中將產生的 PCollection 視覺化。

    ib.show(windowed_word_counts, include_window_info=True)
    

    以表格形式顯示 PCollection 的 show 方法。

    您可以設定兩個選用參數:nduration,將結果集範圍縮小回 show()

    • 設定 n,將結果集限制為最多顯示 n 個元素,例如 20 個。如未設定 n,預設行為是列出擷取到的最新元素,直到來源錄製結束為止。
    • 設定 duration,將結果集限制為從來源錄音開頭算起,指定秒數的資料。如果未設定 duration,預設動作是列出所有元素,直到錄製結束為止。

    如果同時設定這兩個選用參數,只要達到任一門檻,show() 就會停止。在下列範例中,show() 最多會傳回 20 個元素,這些元素是根據錄製來源的前 30 秒資料計算而得。

    ib.show(windowed_word_counts, include_window_info=True, n=20, duration=30)
    

    如要顯示資料的視覺化效果,請將 visualize_data=True 傳遞至 show() 方法。您可以對視覺化內容套用多個篩選器。您可以透過下列視覺化圖表依標籤和軸篩選:

    show 方法會將 PCollection 顯示為一組可篩選的 UI 元素。

    為確保在原型設計串流管道時可重複播放,show() 方法預設會重複使用擷取的資料。如要變更這項行為,讓 show() 方法一律擷取新資料,請設定 interactive_beam.options.enable_capture_replay = False。此外,如果您在筆記本中新增第二個不受限的來源,系統會捨棄先前不受限來源的資料。

    Apache Beam 筆記本中另一個實用的視覺化項目是 Pandas DataFrame。以下範例會先將字詞轉換為小寫,然後計算每個字詞的頻率。

    windowed_lower_word_counts = (windowed_words
       | beam.Map(lambda word: word.lower())
       | "count" >> beam.combiners.Count.PerElement())
    

    collect() 方法會以 Pandas DataFrame 格式提供輸出內容。

    ib.collect(windowed_lower_word_counts, include_window_info=True)
    

    代表 Pandas DataFrame 中 PCollection 的 collect 方法。

    在筆記本開發作業中,編輯及重新執行儲存格是常見做法。在 Apache Beam 筆記本中編輯並重新執行儲存格時,儲存格不會還原原始儲存格中程式碼的預期動作。舉例來說,如果儲存格將 PTransform 新增至管道,重新執行該儲存格會將額外的 PTransform 新增至管道。如要清除狀態,請重新啟動核心,然後重新執行儲存格。

    透過互動式 Beam 檢查器將資料視覺化

    您可能會發現,不斷呼叫 show()collect() 來檢查 PCollection 的資料會造成干擾,尤其是在輸出內容佔用大量螢幕空間,導致難以瀏覽筆記本時。您也可以並列比較多個 PCollections,驗證轉換作業是否正常運作。舉例來說,當一個 PCollection 經過轉換並產生另一個時,對於這些用途,互動式 Beam 檢查工具是便利的解決方案。

    互動式 Beam 檢查器以 JupyterLab 擴充功能的形式提供,並預先安裝在 Apache Beam 筆記本中: apache-beam-jupyterlab-sidepanel 使用擴充功能時,您可以互動式檢查管道的狀態,以及與每個 PCollection 相關聯的資料,不必明確叫用 show()collect()

    開啟檢查器的方式有 3 種:

    • 按一下 JupyterLab 頂端選單列的 Interactive Beam。在下拉式選單中找到 Open Inspector,然後按一下即可開啟檢查器。

      透過選單開啟檢查器

    • 使用啟動器頁面。如果沒有開啟啟動器頁面,請按一下 File -> New Launcher 開啟。在啟動器頁面中找出 Interactive Beam,然後按一下 Open Inspector 開啟檢查器。

      透過啟動器開啟檢查器

    • 使用指令區塊面板。在 JupyterLab 選單列中,依序點選 View > Activate Command Palette。在對話方塊中搜尋 Interactive Beam,列出擴充功能的所有選項。按一下 Open Inspector 開啟檢查器。

      透過指令區塊面板開啟檢查器

    檢查工具即將開啟時:

    • 如果只開啟一個筆記本,檢查器會自動連線至該筆記本。

    • 如果沒有開啟任何筆記本,系統會顯示對話方塊,供您選取核心。

    • 如果開啟多個筆記本,系統會顯示對話方塊,方便你選取筆記本工作階段。

      選取要連結的筆記本

    建議您先開啟至少一個筆記本,並選取核心,再開啟檢查器。如果您在開啟任何筆記本之前,先使用核心開啟檢查器,之後開啟筆記本並連線至檢查器時,必須從 Use Kernel from Preferred Session 選取 Interactive Beam Inspector Session。檢查器和筆記本共用相同的工作階段時,兩者就會連線,而不是共用從相同核心建立的不同工作階段。從 Start Preferred Kernel 選取相同核心會建立新工作階段,與已開啟筆記本或檢查器的現有工作階段無關。

    您可以為開啟的筆記本開啟多個檢查器,並在工作區中隨意拖曳分頁標籤來排列檢查器。

    開啟 2 個檢查器並並排顯示

    在筆記本中執行儲存格時,檢查器頁面會自動重新整理。這個頁面會列出已連結筆記本中定義的管道和 PCollectionsPCollections 會依所屬管道分類,點選管道標題即可收合。

    點選管道和 PCollections 清單中的項目時,檢查器會在右側顯示對應的視覺化內容:

    • 如果是 PCollection,檢查器會算繪其資料 (如果資料仍適用於無界限的 PCollections,則為動態),並在點選 APPLY 按鈕後,提供額外的小工具來調整視覺化效果。

      檢查工具頁面

      由於檢查器和開啟的筆記本共用同一個核心工作階段,因此兩者會互相阻礙執行作業。舉例來說,如果筆記本正忙於執行程式碼,檢查器就不會更新,直到筆記本完成該項執行作業為止。反之,如果您想在檢查器動態顯示 PCollection 時,立即在筆記本中執行程式碼,則必須點選 STOP 按鈕停止顯示,並預先將核心發布至筆記本。

    • 如果是管道,檢查器會顯示管道圖。

      檢查工具頁面

    您可能會看到匿名管道。這些管道有 PCollections,您可以存取,但主要工作階段不再參照這些管道。例如:

    p = beam.Pipeline()
    pcoll = p | beam.Create([1, 2, 3])
    
    p = beam.Pipeline()
    

    上一個範例會建立空白管道 p,以及包含一個 PCollection pcoll 的匿名管道。您可以使用 pcoll.pipeline 存取匿名管道。

    你可以切換管道和 PCollection 清單,為大型視覺化內容節省空間。切換左側清單

    瞭解管道的錄製狀態

    除了視覺化,您也可以呼叫 describe,檢查筆記本執行個體中一或多個管道的錄製狀態。

    # Return the recording status of a specific pipeline. Leave the parameter list empty to return
    # the recording status of all pipelines.
    ib.recordings.describe(p)
    

    describe() 方法提供下列詳細資料:

    • 磁碟上管道所有錄音的總大小 (以位元組為單位)
    • 背景錄製作業的開始時間 (自 Unix Epoch 紀元時間開始算起,以秒為單位)
    • 背景錄製工作的目前管道狀態
    • 管道的 Python 變數

    從筆記本中建立的管道啟動 Dataflow 工作

    1. 選用:使用筆記本執行 Dataflow 工作前,請先重新啟動核心、重新執行所有儲存格,並驗證輸出內容。如果略過這個步驟,筆記本中的隱藏狀態可能會影響管道物件中的工作圖。
    2. 啟用 Dataflow API
    3. 新增下列匯入陳述式:

      from apache_beam.runners import DataflowRunner
      
    4. 傳入管道選項

      # Set up Apache Beam pipeline options.
      options = pipeline_options.PipelineOptions()
      
      # Set the project to the default project in your current Google Cloud
      # environment.
      _, options.view_as(GoogleCloudOptions).project = google.auth.default()
      
      # Set the Google Cloud region to run Dataflow.
      options.view_as(GoogleCloudOptions).region = 'us-central1'
      
      # Choose a Cloud Storage location.
      dataflow_gcs_location = 'gs://<change me>/dataflow'
      
      # Set the staging location. This location is used to stage the
      # Dataflow pipeline and SDK binary.
      options.view_as(GoogleCloudOptions).staging_location = '%s/staging' % dataflow_gcs_location
      
      # Set the temporary location. This location is used to store temporary files
      # or intermediate results before outputting to the sink.
      options.view_as(GoogleCloudOptions).temp_location = '%s/temp' % dataflow_gcs_location
      
      # If and only if you are using Apache Beam SDK built from source code, set
      # the SDK location. This is used by Dataflow to locate the SDK
      # needed to run the pipeline.
      options.view_as(pipeline_options.SetupOptions).sdk_location = (
          '/root/apache-beam-custom/packages/beam/sdks/python/dist/apache-beam-%s0.tar.gz' %
          beam.version.__version__)
      

      您可以調整參數值。例如,您可以將 region 值從 us-central1 變更為其他值。

    5. 使用 DataflowRunner 執行管道。這個步驟會在 Dataflow 服務上執行工作。

      runner = DataflowRunner()
      runner.run_pipeline(p, options=options)
      

      p 是來自「建立管道」的管道物件。

    如需在互動式筆記本中執行這項轉換的範例,請參閱筆記本執行個體中的 Dataflow 字數筆記本。

    或者,您也可以將筆記本匯出為可執行的指令碼,使用先前的步驟修改產生的 .py 檔案,然後將管道部署至 Dataflow 服務。

    儲存筆記本

    您建立的筆記本會儲存在執行中的筆記本例項本機。如果您在開發期間重設或關閉筆記本執行個體,只要新筆記本是在 /home/jupyter 目錄下建立,就會保留下來。不過,如果刪除筆記本執行個體,系統也會一併刪除這些筆記本。

    如要保留筆記本以供日後使用,請將筆記本下載到工作站本機、儲存至 GitHub,或匯出為其他檔案格式。

    將筆記本儲存到其他永久磁碟

    如要在不同筆記本執行個體中保留筆記本和指令碼等工作,請將這些項目儲存在永久磁碟中。

    1. 建立或連結永久磁碟。按照操作說明使用 ssh 連線至筆記本執行個體的 VM,並在開啟的 Cloud Shell 中發出指令。

    2. 請記下永久磁碟的掛接目錄,例如 /mnt/myDisk

    3. 編輯筆記本執行個體的 VM 詳細資料,在 Custom metadata 中新增項目:鍵 - container-custom-params;值 - -v /mnt/myDisk:/mnt/myDisk需要額外中繼資料才能繫結已掛接的 PD

    4. 按一下 [儲存]

    5. 如要更新這些變更,請重設筆記本執行個體。 重設筆記本執行個體

    6. 重設後,按一下「Open JupyterLab」。JupyterLab UI 可能需要一段時間才能使用。使用者介面顯示後,請開啟終端機並執行下列指令: ls -al /mnt 應該會列出 /mnt/myDisk 目錄。 列出繫結的磁碟區

    現在你可以將工作儲存至 /mnt/myDisk 目錄。即使刪除筆記本執行個體,永久磁碟仍會保留在專案中。接著,您就可以將這個永久磁碟附加至其他筆記本執行個體。

    清除所用資源

    使用完 Apache Beam 筆記本執行個體後,請 Google Cloud 關閉筆記本執行個體,清除您建立的資源。

    後續步驟