使用 Python 建立 Dataflow 管道

本文說明如何使用 Apache Beam SDK for Python 建構定義管道的程式。接著,您可以使用直接本機執行器或雲端執行器 (例如 Dataflow) 執行管道。如要瞭解 WordCount 管道,請觀看「如何在 Apache Beam 中使用 WordCount」影片。


如要直接在 Google Cloud 控制台按照逐步指南操作,請按一下「Guide me」(逐步引導)

逐步引導


事前準備

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. To initialize the gcloud CLI, run the following command:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  8. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  10. Install the Google Cloud CLI.

  11. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  12. To initialize the gcloud CLI, run the following command:

    gcloud init
  13. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Make sure that billing is enabled for your Google Cloud project.

  15. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  16. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  18. 將角色授予 Compute Engine 預設服務帳戶。針對下列每個 IAM 角色,執行一次下列指令:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • PROJECT_ID 替換為您的專案 ID。
    • PROJECT_NUMBER 替換為專案編號。如要找出專案編號,請參閱「識別專案」一文,或使用 gcloud projects describe 指令。
    • SERVICE_ACCOUNT_ROLE 替換為各個角色。
  19. Create a Cloud Storage bucket and configure it as follows:
    • Set the storage class to S (標準)。
    • 將儲存空間位置設定為下列項目: US (美國)。
    • BUCKET_NAME 替換成 不重複的值區名稱。請勿在值區名稱中加入任何機密資訊,因為值區命名空間屬於全域性質,而且會公開顯示。
    • gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
    • 複製 Google Cloud 專案 ID 和 Cloud Storage 值區名稱。您會在本文後續步驟中用到這些值。

設定環境

在本節中,請使用指令提示字元設定獨立的 Python 虛擬環境,以便使用 venv 執行管道專案。這樣一來,您就能將不同專案的依附元件區隔開來。

如果沒有命令提示字元可用,您可以使用 Cloud Shell。 Cloud Shell 已安裝 Python 3 的套件管理工具,因此您可以直接建立虛擬環境

如要安裝 Python 並建立虛擬環境,請按照下列步驟操作:

  1. 請確認系統中已執行 Python 3 和 pip
    python --version
    python -m pip --version
  2. 視需要安裝 Python 3,然後設定 Python 虛擬環境:按照「 設定 Python 開發環境」頁面的「安裝 Python」和「設定 venv」一節提供的說明操作。

完成快速入門後,您可以執行 deactivate 來停用虛擬環境。

取得 Apache Beam SDK

Apache Beam SDK 是一種用於資料管道的開放原始碼程式設計模型。您可以使用 Apache Beam 程式定義管道,然後選擇 Dataflow 等執行器來執行管道。

如要下載及安裝 Apache Beam SDK,請按照下列步驟操作:

  1. 確認您位於前一節建立的 Python 虛擬環境中。 確認提示開頭為 <env_name>,其中 env_name 是虛擬環境的名稱。
  2. 安裝最新版本的 Python 適用的 Apache Beam SDK:
  3. pip install apache-beam[gcp]

在本機執行管道

如要查看管道在本機的執行情況,請使用 apache_beam 套件隨附的 wordcount 範例適用的現成 Python 模組。

wordcount 管道範例會執行下列操作:

  1. 擷取文字檔案做為輸入。

    這個文字檔案位於資源名稱為 gs://dataflow-samples/shakespeare/kinglear.txt 的 Cloud Storage bucket。

  2. 將每一行剖析為字詞。
  3. 對代碼化的字詞執行頻率計數。

如要在本機暫存 wordcount 管道,請按照下列步驟操作:

  1. 從本機終端機執行 wordcount 範例:
    python -m apache_beam.examples.wordcount \
      --output outputs
  2. 查看管道的輸出內容:
    more outputs*
  3. 如要退出,請按 q
從本機執行管道能讓您測試和偵錯 Apache Beam 程式。 您可以在 Apache Beam GitHub 上查看 wordcount.py 原始碼。

在 Dataflow 服務上執行管道

在本節中,請在 Dataflow 服務上執行 apache_beam 套件中的 wordcount 範例管道。這個範例會將 DataflowRunner 指定為 --runner 的參數。
  • 執行管道:
    python -m apache_beam.examples.wordcount \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://BUCKET_NAME/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://BUCKET_NAME/tmp/

    更改下列內容:

    • DATAFLOW_REGION:您要部署 Dataflow 工作的區域,例如 europe-west1

      --region 標記會覆寫中繼資料伺服器、本機用戶端或環境變數中設定的預設地區。

    • BUCKET_NAME:您先前複製的 Cloud Storage bucket 名稱
    • PROJECT_ID:您先前複製的 Google Cloud 專案 ID

查看結果

透過 Dataflow 執行管道時,結果會儲存於 Cloud Storage 值區。在本節中,請使用 Google Cloud 控制台或本機終端機,確認管道正在執行。

Google Cloud 控制台

如要在 Google Cloud 控制台中查看結果,請按照下列步驟操作:

  1. 前往 Google Cloud 控制台的 Dataflow「Jobs」(工作) 頁面。

    前往「Jobs」(工作) 頁面

    「Jobs」(工作) 頁面會顯示 wordcount 工作的詳細資料,包括一開始的「Running」(執行中) 狀態,然後是「Succeeded」(成功) 狀態。

  2. 前往 Cloud Storage 的「Buckets」(值區) 頁面。

    前往「Buckets」(值區) 頁面

  3. 在專案的值區清單中,按一下您先前建立的儲存空間值區。

    wordcount 目錄中,會顯示工作建立的輸出檔案。

本機終端機

從終端機或使用 Cloud Shell 查看結果。

  1. 如要列出輸出檔案,請使用 gcloud storage ls 指令
    gcloud storage ls gs://BUCKET_NAME/results/outputs* --long
  2. 請將 BUCKET_NAME 改成管道程式中使用的 Cloud Storage 值區名稱。

  3. 如要查看輸出檔案中的結果,請使用 gcloud storage cat 指令
    gcloud storage cat gs://BUCKET_NAME/results/outputs*

修改管道程式碼

上述範例中的 wordcount 管道區分大寫和小寫字詞。 以下步驟說明如何修改管道,讓 wordcount 管道不區分大小寫。
  1. 在本機電腦上,從 Apache Beam GitHub 存放區下載 wordcount 程式碼的最新副本。
  2. 從本機終端機執行管道:
    python wordcount.py --output outputs
  3. 查看結果:
    more outputs*
  4. 如要退出,請按 q
  5. 在您選擇的編輯器中開啟 wordcount.py 檔案。
  6. run 函式中,檢查管道步驟:
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum))

    split 之後,這幾行將會以字串形式分割成字詞。

  7. 如要將字串改為小寫,請修改 split 後的這幾行:
    counts = (
            lines
            | 'Split' >> (beam.ParDo(WordExtractingDoFn()).with_output_types(str))
            | 'lowercase' >> beam.Map(str.lower)
            | 'PairWithOne' >> beam.Map(lambda x: (x, 1))
            | 'GroupAndSum' >> beam.CombinePerKey(sum)) 
    這項修改會將 str.lower 函式對應到每個字詞。這行相當於 beam.Map(lambda word: str.lower(word))
  8. 儲存檔案並執行修改後的 wordcount 工作:
    python wordcount.py --output outputs
  9. 查看已修改管道的結果:
    more outputs*
  10. 如要退出,請按 q
  11. 在 Dataflow 服務上執行修改後的管道:
    python wordcount.py \
        --region DATAFLOW_REGION \
        --input gs://dataflow-samples/shakespeare/kinglear.txt \
        --output gs://BUCKET_NAME/results/outputs \
        --runner DataflowRunner \
        --project PROJECT_ID \
        --temp_location gs://BUCKET_NAME/tmp/

    更改下列內容:

    • DATAFLOW_REGION:要部署 Dataflow 工作的區域
    • BUCKET_NAME:您的 Cloud Storage bucket 名稱
    • PROJECT_ID:您的 Google Cloud 專案 ID

清除所用資源

如要避免系統向您的 Google Cloud 帳戶收取本頁面所用資源的費用,請刪除含有這些資源的 Google Cloud 專案。

  1. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  2. Click the checkbox for the bucket that you want to delete.
  3. To delete the bucket, click Delete, and then follow the instructions.
  4. 如果保留專案,請撤銷您授予 Compute Engine 預設服務帳戶的角色。針對下列每個 IAM 角色,執行一次下列指令:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  6. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

後續步驟