使用 Java 建立 Dataflow 管道

本文說明如何設定 Google Cloud 專案、使用 Java 適用的 Apache Beam SDK 建立範例管道,以及在 Dataflow 服務上執行範例管道。這個管道會從 Cloud Storage 讀取文字檔,計算檔案中不重複的字數,然後將字數寫回 Cloud Storage。如要瞭解 WordCount 管道,請觀看「如何在 Apache Beam 中使用 WordCount」影片。

本教學課程需要使用 Maven,但您也可以將範例專案從 Maven 轉換為 Gradle。詳情請參閱「 選用:從 Maven 轉換為 Gradle」。


如要直接在 Google Cloud 控制台按照逐步指南操作,請按一下「Guide me」(逐步引導)

逐步引導


事前準備

  1. Sign in to your Google Cloud account. If you're new to Google Cloud, create an account to evaluate how our products perform in real-world scenarios. New customers also get $300 in free credits to run, test, and deploy workloads.
  2. Install the Google Cloud CLI.

  3. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  4. To initialize the gcloud CLI, run the following command:

    gcloud init
  5. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  6. Make sure that billing is enabled for your Google Cloud project.

  7. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  8. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  9. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  10. Install the Google Cloud CLI.

  11. If you're using an external identity provider (IdP), you must first sign in to the gcloud CLI with your federated identity.

  12. To initialize the gcloud CLI, run the following command:

    gcloud init
  13. Create or select a Google Cloud project.

    • Create a Google Cloud project:

      gcloud projects create PROJECT_ID

      Replace PROJECT_ID with a name for the Google Cloud project you are creating.

    • Select the Google Cloud project that you created:

      gcloud config set project PROJECT_ID

      Replace PROJECT_ID with your Google Cloud project name.

  14. Make sure that billing is enabled for your Google Cloud project.

  15. Enable the Dataflow, Compute Engine, Cloud Logging, Cloud Storage, Google Cloud Storage JSON, BigQuery, Cloud Pub/Sub, Cloud Datastore, and Cloud Resource Manager APIs:

    gcloud services enable dataflow compute_component logging storage_component storage_api bigquery pubsub datastore.googleapis.com cloudresourcemanager.googleapis.com
  16. Create local authentication credentials for your user account:

    gcloud auth application-default login

    If an authentication error is returned, and you are using an external identity provider (IdP), confirm that you have signed in to the gcloud CLI with your federated identity.

  17. Grant roles to your user account. Run the following command once for each of the following IAM roles: roles/iam.serviceAccountUser

    gcloud projects add-iam-policy-binding PROJECT_ID --member="user:USER_IDENTIFIER" --role=ROLE
    • Replace PROJECT_ID with your project ID.
    • Replace USER_IDENTIFIER with the identifier for your user account. For example, user:myemail@example.com.

    • Replace ROLE with each individual role.
  18. 將角色授予 Compute Engine 預設服務帳戶。針對下列每個 IAM 角色,執行一次下列指令:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects add-iam-policy-binding PROJECT_ID --member="serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com" --role=SERVICE_ACCOUNT_ROLE
    • PROJECT_ID 替換為您的專案 ID。
    • PROJECT_NUMBER 替換為專案編號。 如要找出專案編號,請參閱「識別專案」一文,或使用 gcloud projects describe 指令。
    • SERVICE_ACCOUNT_ROLE 替換為各個角色。
  19. Create a Cloud Storage bucket and configure it as follows:
    • Set the storage class to S (標準)。
    • 將儲存空間位置設定為下列項目: US (美國)。
    • BUCKET_NAME 替換成 不重複的值區名稱。請勿在值區名稱中加入任何機密資訊,因為值區命名空間屬於全域性質,而且會公開顯示。
    • gcloud storage buckets create gs://BUCKET_NAME --default-storage-class STANDARD --location US
    • 複製下列內容,因為後續章節會用到:
      • Cloud Storage 值區名稱。
      • 您的 Google Cloud 專案 ID。如要找出這個 ID,請參閱「識別專案」。
    • 下載並安裝 Java Development Kit (JDK) 第 11 版。(Dataflow 仍支援第 8 版)。確認 JAVA_HOME 環境變數已設定,且指向您的 JDK 安裝。
    • 按照適用於您作業系統的 Maven 安裝指南,下載並安裝 Apache Maven

取得管道程式碼

Apache Beam SDK 是一種用於資料處理管道的開放原始碼程式設計模型。您可以使用 Apache Beam 程式來定義這些管道,並選擇 Dataflow 等執行器來執行管道。

  1. 在殼層或終端機中,使用 Maven 原型外掛程式在電腦上建立 Maven 專案,其中包含 Apache Beam SDK 的 WordCount 範例:
    mvn archetype:generate \
        -DarchetypeGroupId=org.apache.beam \
        -DarchetypeArtifactId=beam-sdks-java-maven-archetypes-examples \
        -DarchetypeVersion=2.66.0 \
        -DgroupId=org.example \
        -DartifactId=word-count-beam \
        -Dversion="0.1" \
        -Dpackage=org.apache.beam.examples \
        -DinteractiveMode=false
    

    這項指令會在目前的目錄下建立名為 word-count-beam 的新目錄。word-count-beam 目錄包含簡易的 pom.xml 檔案,以及一系列計算文字檔字數的範例管道。

  2. 確認 word-count-beam 目錄包含 pom.xml 檔案:

    Linux 或 macOS

    cd word-count-beam/
    ls

    輸出內容如下:

    pom.xml   src

    Windows

    cd word-count-beam/
    dir

    輸出內容如下:

    pom.xml   src
  3. 確認 Maven 專案包含範例管道:

    Linux 或 macOS

    ls src/main/java/org/apache/beam/examples/

    輸出內容如下:

    DebuggingWordCount.java   WindowedWordCount.java   common
    MinimalWordCount.java   WordCount.java

    Windows

    dir src/main/java/org/apache/beam/examples/

    輸出內容如下:

    DebuggingWordCount.java   WindowedWordCount.java   common
    MinimalWordCount.java   WordCount.java

如要詳細瞭解這些範例中使用的 Apache Beam 概念,請參閱 Apache Beam WordCount 範例。下一節的操作說明會使用 WordCount.java

在本機執行管道

  • 在殼層或終端機中,從 word-count-beam 目錄在本機執行 WordCount 管道:
    mvn compile exec:java \
        -Dexec.mainClass=org.apache.beam.examples.WordCount \
        -Dexec.args="--output=counts"
    

    輸出檔案的前置字串為 counts,並寫入 word-count-beam 目錄。其中包含輸入文字中的不重複字詞,以及每個字詞的出現次數。

在 Dataflow 服務上執行管道

  • 在殼層或終端機中,從 word-count-beam 目錄在 Dataflow 服務上建構並執行 WordCount 管道:
    mvn -Pdataflow-runner compile exec:java \
        -Dexec.mainClass=org.apache.beam.examples.WordCount \
        -Dexec.args="--project=PROJECT_ID \
        --gcpTempLocation=gs://BUCKET_NAME/temp/ \
        --output=gs://BUCKET_NAME/output \
        --runner=DataflowRunner \
        --region=REGION"
    

    更改下列內容:

    • PROJECT_ID:您的 Google Cloud 專案 ID
    • BUCKET_NAME:Cloud Storage bucket 的名稱
    • REGIONDataflow 區域,例如 us-central1

查看結果

  1. 前往 Google Cloud 控制台的 Dataflow「Jobs」(工作) 頁面。

    前往「Jobs」(工作) 頁面

    「Jobs」(工作) 頁面會顯示所有可用工作的詳細資料,包括狀態。「wordcount」工作的「Status」(狀態) 一開始為「Running」(執行中),然後更新為「Succeeded」(成功)

  2. 在 Google Cloud 控制台,前往「Cloud Storage bucket」頁面。

    前往「Buckets」(值區) 頁面

    「Buckets」(值區) 頁面會顯示專案中所有儲存空間值區的清單。

  3. 按一下您建立的儲存空間 bucket。

    「Bucket details」(值區詳細資料) 頁面會顯示 Dataflow 工作建立的輸出檔案和暫存檔案。

清除所用資源

如要避免系統向您的 Google Cloud 帳戶收取本頁面所用資源的費用,請刪除含有這些資源的 Google Cloud 專案。

刪除專案

如要避免付費,最簡單的方法就是刪除您為快速入門導覽建立的專案。 Google Cloud

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

刪除個別資源

如要保留在本快速入門導覽課程中使用的 Google Cloud 專案,請刪除個別資源:

  1. In the Google Cloud console, go to the Cloud Storage Buckets page.

    Go to Buckets

  2. Click the checkbox for the bucket that you want to delete.
  3. To delete the bucket, click Delete, and then follow the instructions.
  4. 撤銷您授予 Compute Engine 預設服務帳戶的角色。針對下列每個 IAM 角色,執行一次下列指令:

    • roles/dataflow.admin
    • roles/dataflow.worker
    • roles/storage.objectAdmin
    gcloud projects remove-iam-policy-binding PROJECT_ID \
        --member=serviceAccount:PROJECT_NUMBER-compute@developer.gserviceaccount.com \
        --role=SERVICE_ACCOUNT_ROLE
  5. Optional: Revoke the authentication credentials that you created, and delete the local credential file.

    gcloud auth application-default revoke
  6. Optional: Revoke credentials from the gcloud CLI.

    gcloud auth revoke

後續步驟