使用工作流程將資料從 Cloud Storage 載入至 BigQuery

Last reviewed 2021-05-12 UTC

本教學課程將說明如何使用工作流程Cloud Run 函式Firestore 執行可靠的無伺服器工作流程,從 Cloud Storage 載入事件記錄等原始資料至 BigQuery。資料分析平台通常會提供協調工具,以便使用 BigQuery 工作定期載入 BigQuery 中的資料,然後轉換資料,以便使用 SQL 陳述式 (包括 BigQuery 程序語言陳述式) 提供業務指標。本教學課程適用於希望建構無伺服器事件導向資料處理管道的開發人員和架構師。本教學課程假設您已熟悉 YAML、SQL 和 Python。

架構

下圖顯示使用工作流程的無伺服器擷取、載入及轉換 (ELT) 管道的整體架構。

擷取、載入及轉換管道。

在上述圖表中,零售平台會定期從各個商店收集銷售事件檔案,然後將檔案寫入 Cloud Storage 值區。這些事件會在 BigQuery 中匯入及處理,用於提供業務指標。這個架構提供可靠的無伺服器指揮系統,可將檔案匯入 BigQuery,並分為以下兩個模組:

  • 檔案清單:維護未處理檔案清單,這些檔案會新增至 Firestore 集合中的 Cloud Storage 值區。這個模組會透過 Cloud Run 函式運作,該函式會在新增檔案至 Cloud Storage 值區時,由 Object Finalize 儲存事件觸發。檔案名稱會附加至 Firestore 中名為 new 的集合 files 陣列。
  • 工作流程:執行排定的工作流程。Cloud Scheduler 會觸發工作流程,根據以 YAML 為基礎的語法執行一系列步驟,以便協調載入作業,然後透過呼叫 Cloud Run 函式,轉換 BigQuery 中的資料。工作流程中的步驟會呼叫 Cloud Run 函式,執行下列工作:

    • 建立並啟動 BigQuery 載入工作。
    • 輪詢載入工作的狀態。
    • 建立並啟動轉換查詢工作。
    • 輪詢轉換工作狀態。

使用交易來維護 Firestore 中的新檔案清單,有助於確保工作流程將檔案匯入 BigQuery 時不會遺漏任何檔案。將工作中繼資料和狀態儲存在 Firestore 中,可讓工作流程的個別執行作業變成冪等。

目標

  • 建立 Firestore 資料庫。
  • 設定 Cloud Run 函式觸發條件,以便追蹤新增至 Firestore 中 Cloud Storage 值區的檔案。
  • 部署 Cloud Run 函式,執行及監控 BigQuery 工作。
  • 部署及執行工作流程,以便自動化程序。

費用

In this document, you use the following billable components of Google Cloud:

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

When you finish the tasks that are described in this document, you can avoid continued billing by deleting the resources that you created. For more information, see Clean up.

事前準備

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the Cloud Build, Cloud Run functions, Identity and Access Management, Resource Manager, and Workflows APIs.

    Enable the APIs

  4. 前往「歡迎」頁面,並記下專案 ID,以便在後續步驟中使用。

    前往歡迎頁面

  5. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

準備環境

如要準備環境,請建立 Firestore 資料庫、從 GitHub 存放區複製程式碼範例、使用 Terraform 建立資源、編輯工作流程 YAML 檔案,以及安裝檔案產生器的必要條件。

  1. 如要建立 Firestore 資料庫,請按照下列步驟操作:

    1. 前往 Google Cloud 控制台的「Firestore」頁面。

      前往 Firestore

    2. 按一下「選取原生模式」

    3. 在「Select a location」選單中,選取要代管 Firestore 資料庫的區域。建議您選擇離您所在位置較近的區域。

    4. 按一下 [Create database] (建立資料庫)。

  2. 在 Cloud Shell 中複製原始碼存放區:

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/workflows-demos
    cd workflows-demos/workflows-bigquery-load
    
  3. 在 Cloud Shell 中使用 Terraform 建立下列資源:

    terraform init
    terraform apply \
        -var project_id=PROJECT_ID \
        -var region=REGION \
        -var zone=ZONE \
        --auto-approve
    

    更改下列內容:

    • PROJECT_ID:您的 Google Cloud 專案 ID
    • REGION:可代管資源的特定 Google Cloud地理位置,例如 us-central1
    • ZONE:區域內的資源主機位置,例如 us-central1-b

    您應該會看到類似以下的訊息: Apply complete! Resources: 7 added, 0 changed, 1 destroyed.

    Terraform 可協助您以安全且可預測的方式,建立、變更及升級基礎架構。系統會在專案中建立下列資源:

    • 服務帳戶:具備必要權限,可確保安全存取資源。
    • 名為 serverless_elt_dataset 的 BigQuery 資料集和名為 word_count 的資料表,用於載入傳入的檔案。
    • 用於暫存輸入檔案的 Cloud Storage 值區,名稱為 ${project_id}-ordersbucket
    • 下列五個 Cloud Run 函式:
      • file_add_handler 會將新增至 Cloud Storage 值區的檔案名稱加入 Firestore 集合。
      • create_job 會建立新的 BigQuery 載入工作,並將 Firebase 集合中的檔案與該工作建立關聯。
      • create_query 會建立新的 BigQuery 查詢工作。
      • poll_bigquery_job 會取得 BigQuery 工作的狀態。
      • run_bigquery_job 會啟動 BigQuery 工作。
  4. 取得您在前一個步驟中部署的 create_jobcreate_querypoll_jobrun_bigquery_job Cloud Run 函式的網址。

    gcloud functions describe create_job | grep url
    gcloud functions describe poll_bigquery_job | grep url
    gcloud functions describe run_bigquery_job | grep url
    gcloud functions describe create_query | grep url
    

    輸出結果會與下列內容相似:

    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/poll_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/run_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_query
    

    請記下這些網址,因為部署工作流程時需要使用這些網址。

建立及部署工作流程

  1. 在 Cloud Shell 中開啟工作流程的來源檔案 workflow.yaml

    main:
      steps:
        - constants:
            assign:
              - create_job_url: CREATE_JOB_URL
              - poll_job_url: POLL_BIGQUERY_JOB_URL
              - run_job_url: RUN_BIGQUERY_JOB_URL
              - create_query_url: CREATE_QUERY_URL
              - region: BQ_REGION
              - table_name: BQ_DATASET_TABLE_NAME
            next: createJob
    
        - createJob:
            call: http.get
            args:
              url: ${create_job_url}
              auth:
                  type: OIDC
              query:
                  region: ${region}
                  table_name: ${table_name}
            result: job
            next: setJobId
    
        - setJobId:
            assign:
              - job_id: ${job.body.job_id}
            next: jobCreateCheck
    
        - jobCreateCheck:
            switch:
              - condition: ${job_id == Null}
                next: noOpJob
            next: runLoadJob
    
        - runLoadJob:
            call: runBigQueryJob
            args:
                job_id: ${job_id}
                run_job_url: ${run_job_url}
                poll_job_url: ${poll_job_url}
            result: jobStatus
            next: loadRunCheck
    
        - loadRunCheck:
            switch:
              - condition: ${jobStatus == 2}
                next: createQueryJob
            next: failedLoadJob
    
        - createQueryJob:
            call: http.get
            args:
              url: ${create_query_url}
              query:
                  qs: "select count(*) from serverless_elt_dataset.word_count"
                  region: "US"
              auth:
                  type: OIDC
            result: queryjob
            next: setQueryJobId
    
        - setQueryJobId:
            assign:
              - qid: ${queryjob.body.job_id}
            next: queryCreateCheck
    
        - queryCreateCheck:
            switch:
              - condition: ${qid == Null}
                next: failedQueryJob
            next: runQueryJob
    
        - runQueryJob:
            call: runBigQueryJob
            args:
              job_id: ${qid}
              run_job_url: ${run_job_url}
              poll_job_url: ${poll_job_url}
            result: queryJobState
            next: runQueryCheck
    
        - runQueryCheck:
            switch:
              - condition: ${queryJobState == 2}
                next: allDone
            next: failedQueryJob
    
        - noOpJob:
            return: "No files to import"
            next: end
    
        - allDone:
            return: "All done!"
            next: end
    
        - failedQueryJob:
            return: "Query job failed"
            next: end
    
        - failedLoadJob:
            return: "Load job failed"
            next: end
    
    
    runBigQueryJob:
      params: [job_id, run_job_url, poll_job_url]
      steps:
        - startBigQueryJob:
            try:
              call: http.get
              args:
                  url: ${run_job_url}
                  query:
                    job_id: ${job_id}
                  auth:
                    type: OIDC
                  timeout: 600
              result: submitJobState
            retry: ${http.default_retry}
            next: validateSubmit
    
        - validateSubmit:
            switch:
              - condition: ${submitJobState.body.status == 1}
                next: sleepAndPollLoad
            next: returnState
    
        - returnState:
            return: ${submitJobState.body.status}
    
        - sleepAndPollLoad:
            call: sys.sleep
            args:
              seconds: 5
            next: pollJob
    
        - pollJob:
            try:
              call: http.get
              args:
                url: ${poll_job_url}
                query:
                  job_id: ${job_id}
                auth:
                  type: OIDC
                timeout: 600
              result: pollJobState
            retry:
              predicate: ${http.default_retry_predicate}
              max_retries: 10
              backoff:
                initial_delay: 1
                max_delay: 60
                multiplier: 2
            next: stateCheck
    
        - stateCheck:
            switch:
              - condition: ${pollJobState.body.status == 2}
                return: ${pollJobState.body.status}
              - condition: ${pollJobState.body.status == 3}
                return: ${pollJobState.body.status}
            next: sleepAndPollLoad

    更改下列內容:

    • CREATE_JOB_URL:建立新工作的函式網址
    • POLL_BIGQUERY_JOB_URL:用於輪詢執行中工作狀態的函式網址
    • RUN_BIGQUERY_JOB_URL:用於啟動 BigQuery 載入工作的函式網址
    • CREATE_QUERY_URL:要啟動 BigQuery 查詢工作的函式網址
    • BQ_REGION:資料儲存的 BigQuery 區域,例如 US
    • BQ_DATASET_TABLE_NAME:BigQuery 資料集資料表名稱,格式為 PROJECT_ID.serverless_elt_dataset.word_count
  2. 部署 workflow 檔案:

    gcloud workflows deploy WORKFLOW_NAME \
        --location=WORKFLOW_REGION \
        --description='WORKFLOW_DESCRIPTION' \
        --service-account=workflow-runner@PROJECT_ID.iam.gserviceaccount.com \
        --source=workflow.yaml
    

    更改下列內容:

    • WORKFLOW_NAME:工作流程的專屬名稱
    • WORKFLOW_REGION:工作流程的區域,例如 us-central1
    • WORKFLOW_DESCRIPTION:工作流程的說明
  3. 建立 Python 3 虛擬環境,並安裝檔案產生器的必要元件:

    sudo apt-get install -y python3-venv
    python3 -m venv env
    . env/bin/activate
    cd generator
    pip install -r requirements.txt
    

產生要匯入的檔案

gen.py Python 指令碼會產生 Avro 格式的隨機內容。結構定義與 BigQuery word_count 資料表相同。這些 Avro 檔案會複製到指定的 Cloud Storage 值區。

在 Cloud Shell 中產生檔案:

python gen.py -p PROJECT_ID \
    -o PROJECT_ID-ordersbucket \
    -n RECORDS_PER_FILE \
    -f NUM_FILES \
    -x FILE_PREFIX

更改下列內容:

  • RECORDS_PER_FILE:單一檔案中的記錄數量
  • NUM_FILES:要上傳的檔案總數
  • FILE_PREFIX:產生檔案名稱的前置字串

在 Firestore 中查看檔案項目

當檔案複製到 Cloud Storage 時,系統會觸發 handle_new_file Cloud Run 函式。這個函式會將檔案清單加入 Firestore jobs 集合中 new 文件的檔案清單陣列。

如要查看檔案清單,請在 Google Cloud 控制台中前往 Firestore 的「Data」頁面。

前往「資料」

新增至珍藏內容的檔案清單。

觸發工作流程

Workflows 會將一系列來自Google Cloud 和 API 服務的無伺服器工作串連在一起。這個工作流程中的個別步驟會以 Cloud Run 函式執行,狀態則會儲存在 Firestore 中。所有對 Cloud Run 函式的呼叫都會使用工作流程的服務帳戶進行驗證。

在 Cloud Shell 中執行工作流程:

gcloud workflows execute WORKFLOW_NAME

下圖顯示工作流程中使用的步驟:

主要和子工作流程中使用的步驟。

工作流程分為兩個部分:主要工作流程和子工作流程。主工作流程會處理工作建立和條件式執行作業,而子工作流程會執行 BigQuery 工作。工作流程會執行下列作業:

  • create_job Cloud Run 函式會建立新的作業物件,從 Firestore 文件取得新增至 Cloud Storage 的檔案清單,並將檔案與載入作業建立關聯。如果沒有要載入的檔案,函式就不會建立新工作。
  • create_query Cloud Run 函式會取得需要執行的查詢,以及查詢應執行的 BigQuery 區域。這個函式會在 Firestore 中建立工作,並傳回工作 ID。
  • run_bigquery_job Cloud Run 函式會取得需要執行的工作 ID,然後呼叫 BigQuery API 提交工作。
  • 您可以定期輪詢工作狀態,而非等待 Cloud Run 函式中的作業完成。
    • poll_bigquery_job Cloud Run 函式會提供工作狀態。系統會重複呼叫此函式,直到工作完成為止。
    • 如要在對 poll_bigquery_job Cloud Run 函式發出呼叫時加入延遲時間,請從工作流程呼叫 sleep 例行程序

查看工作狀態

您可以查看檔案清單和工作狀態。

  1. 在Google Cloud 控制台中,前往 Firestore 的「Data」頁面。

    前往「資料」

  2. 系統會為每個工作產生專屬 ID (UUID)。如要查看 job_typestatus,請按一下工作 ID。每個工作可能具有下列任一類型和狀態:

    • job_type:工作流程執行的工作類型,可使用下列其中一個值:

      • 0:將資料載入 BigQuery。
      • 1:在 BigQuery 中執行查詢。
    • status:工作目前狀態,會顯示下列其中一個值:

      • 0:工作已建立,但尚未開始。
      • 1:工作正在執行。
      • 2:工作已順利執行完畢。
      • 3:發生錯誤,工作未順利完成。

    工作物件也會包含中繼資料屬性,例如 BigQuery 資料集的地區、BigQuery 資料表名稱,以及 (如果是查詢工作) 執行中的查詢字串。

列出檔案清單,並醒目顯示工作狀態。

查看 BigQuery 中的資料

如要確認 ELT 工作是否成功,請確認資料是否顯示在資料表中。

  1. 前往 Google Cloud 控制台的「BigQuery 編輯器」頁面。

    前往編輯器

  2. 按一下 serverless_elt_dataset.word_count 資料表。

  3. 按一下「預覽」分頁標籤。

    預覽分頁顯示資料表中的資料。

排定工作流程

如要定期依排程執行工作流程,您可以使用 Cloud Scheduler

清除所用資源

如要避免付費,最簡單的方法就是刪除您為了本教學課程所建立的 Google Cloud 專案。或者,您也可以刪除個別資源。

刪除個別資源

  1. 在 Cloud Shell 中移除使用 Terraform 建立的所有資源:

    cd $HOME/bigquery-workflows-load
    terraform destroy \
    -var project_id=PROJECT_ID \
    -var region=REGION \
    -var zone=ZONE \
    --auto-approve
    
  2. 在 Google Cloud 控制台中,前往 Firestore 的「Data」頁面。

    前往「資料」

  3. 按一下「Jobs」旁邊的「Menu」圖示 ,然後選取「Delete」

    刪除產品素材資源集合的選單路徑。

刪除專案

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

後續步驟