使用 Workflows 将数据从 Cloud Storage 加载到 BigQuery

Last reviewed 2021-05-12 UTC

<ph type="x-smartling-placeholder">
</ph> 本教程介绍如何使用 Workflows Cloud Run 函数、 和 Firestore 从命令行加载原始数据,例如事件日志, Cloud Storage 更改为 BigQuery。 分析平台通常都有编排工具,定期将数据 BigQuery(使用 BigQuery 作业, 然后使用 SQL 转换数据以提供业务指标 语句,包括 BigQuery 过程语言语句。 本教程适用于想要构建无服务器、事件驱动型数据处理流水线的开发者和架构师。本教程假定您熟悉 YAML、SQL 和 Python。

架构

下图显示了无服务器架构的概要架构 提取、加载和转换 (ELT) 流水线 工作流

提取、加载和转换流水线。

在上图中,假设一个零售平台定期从各个商店以文件形式收集销售事件,然后将文件写入 Cloud Storage 存储分区。事件用于通过在 BigQuery 中导入和处理来提供业务指标。此架构提供了一个用于将文件导入到 BigQuery 中的可靠、无服务器的编排系统,它分为以下两个模块:

  • 文件列表:维护已添加到 Cloud Storage 存储分区内 Firestore 集合中的未处理文件的列表。此模块通过一个 Cloud Run 函数运行,该函数由 一 完成对象 存储事件(将新文件添加到 Cloud Storage 存储桶。文件名会附加到 Firestore 中名为 new 的集合 files 数组中。
  • 工作流:运行计划的工作流。Cloud Scheduler 触发工作流 该模型会根据数据 基于 YAML 的语法 编排加载,然后将数据转换为 调用 BigQuery Cloud Run 函数。 工作流中的步骤调用 Cloud Run 函数来运行 以下任务:

    • 创建并启动 BigQuery 加载作业。
    • 轮询加载作业状态。
    • 创建并启动转换查询作业。
    • 轮询转换作业状态。

通过使用事务来维护 Firestore 中的新文件列表,可帮助确保在工作流将这些文件导入 BigQuery 中时不会丢失任何文件。通过将作业元数据和状态存储在 Firestore 中,可以使得工作流的单独运行具有幂等性。

目标

  • 创建 Firestore 数据库。
  • 设置 Cloud Run 函数触发器来跟踪添加到 Firestore 中的 Cloud Storage 存储桶。
  • 部署 Cloud Run 函数以运行和监控 BigQuery 作业。
  • 部署并运行工作流以自动执行此过程。

费用

在本文档中,您将使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用情况来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本文档中描述的任务后,您可以通过删除所创建的资源来避免继续计费。如需了解详情,请参阅清理

准备工作

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Make sure that billing is enabled for your Google Cloud project.

  3. Enable the Cloud Build, Cloud Run functions, Identity and Access Management, Resource Manager, and Workflows APIs.

    Enable the APIs

  4. 转到 Welcome 页面,记下要使用的项目 ID

    前往欢迎页面

  5. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

准备环境

如需准备环境,请创建一个 Firestore 数据库并克隆 GitHub 代码库中的代码示例,使用 Terraform、 修改 Workflows YAML 文件 文件生成器。

  1. 如需创建 Firestore 数据库,请执行以下操作:

    1. 在 Google Cloud Console 中,前往 Firestore 页面。

      转到 Firestore

    2. 点击选择原生模式

    3. 选择位置菜单中,选择要托管 Firestore 数据库的区域。我们建议选择靠近您的物理位置的区域。

    4. 点击创建数据库

  2. 在 Cloud Shell 中,克隆源代码库:

    cd $HOME && git clone https://github.com/GoogleCloudPlatform/workflows-demos
    cd workflows-demos/workflows-bigquery-load
    
  3. 在 Cloud Shell 中,使用 Terraform 创建以下资源:

    terraform init
    terraform apply \
        -var project_id=PROJECT_ID \
        -var region=REGION \
        -var zone=ZONE \
        --auto-approve
    

    替换以下内容:

    • PROJECT_ID:您的 Google Cloud 项目 ID
    • REGION:用于托管资源的特定 Google Cloud 地理位置,例如 us-central1
    • ZONE:用于托管资源的区域内的位置,例如 us-central1-b

    您应该会看到类似于以下内容的消息: Apply complete! Resources: 7 added, 0 changed, 1 destroyed.

    Terraform 可帮助您以可预测的方式安全地大规模创建、更改和升级基础架构。系统会在您的项目中创建以下资源:

    • 服务账号 拥有所需的权限,以确保安全访问资源。
    • 名为 serverless_elt_dataset 的 BigQuery 数据集和名为 word_count 的表,用于加载传入的文件。
    • 名为 ${project_id}-ordersbucket 的 Cloud Storage 存储分区,用于暂存输入文件。
    • 以下是五个 Cloud Run 函数:
      • file_add_handler 用于将添加到 Cloud Storage 存储分区的文件的名称添加到 Firestore 集合。
      • create_job 用于创建一个新的 BigQuery 加载作业,并将 Firebase 集合中的文件与该作业相关联。
      • create_query 用于创建一个新的 BigQuery 查询作业。
      • poll_bigquery_job 用于获取 BigQuery 作业的状态。
      • run_bigquery_job 用于启动 BigQuery 作业。
  4. 获取 create_jobcreate_querypoll_jobrun_bigquery_job 您在上一项中部署的 Cloud Run 函数 操作。

    gcloud functions describe create_job | grep url
    gcloud functions describe poll_bigquery_job | grep url
    gcloud functions describe run_bigquery_job | grep url
    gcloud functions describe create_query | grep url
    

    输出类似于以下内容:

    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/poll_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/run_bigquery_job
    url: https://REGION-PROJECT_ID.cloudfunctions.net/create_query
    

    请记下这些网址,因为您在部署工作流时会用到它们。

创建和部署工作流

  1. 在 Cloud Shell 中,打开工作流的源文件。 workflow.yaml

    main:
      steps:
        - constants:
            assign:
              - create_job_url: CREATE_JOB_URL
              - poll_job_url: POLL_BIGQUERY_JOB_URL
              - run_job_url: RUN_BIGQUERY_JOB_URL
              - create_query_url: CREATE_QUERY_URL
              - region: BQ_REGION
              - table_name: BQ_DATASET_TABLE_NAME
            next: createJob
    
        - createJob:
            call: http.get
            args:
              url: ${create_job_url}
              auth:
                  type: OIDC
              query:
                  region: ${region}
                  table_name: ${table_name}
            result: job
            next: setJobId
    
        - setJobId:
            assign:
              - job_id: ${job.body.job_id}
            next: jobCreateCheck
    
        - jobCreateCheck:
            switch:
              - condition: ${job_id == Null}
                next: noOpJob
            next: runLoadJob
    
        - runLoadJob:
            call: runBigQueryJob
            args:
                job_id: ${job_id}
                run_job_url: ${run_job_url}
                poll_job_url: ${poll_job_url}
            result: jobStatus
            next: loadRunCheck
    
        - loadRunCheck:
            switch:
              - condition: ${jobStatus == 2}
                next: createQueryJob
            next: failedLoadJob
    
        - createQueryJob:
            call: http.get
            args:
              url: ${create_query_url}
              query:
                  qs: "select count(*) from serverless_elt_dataset.word_count"
                  region: "US"
              auth:
                  type: OIDC
            result: queryjob
            next: setQueryJobId
    
        - setQueryJobId:
            assign:
              - qid: ${queryjob.body.job_id}
            next: queryCreateCheck
    
        - queryCreateCheck:
            switch:
              - condition: ${qid == Null}
                next: failedQueryJob
            next: runQueryJob
    
        - runQueryJob:
            call: runBigQueryJob
            args:
              job_id: ${qid}
              run_job_url: ${run_job_url}
              poll_job_url: ${poll_job_url}
            result: queryJobState
            next: runQueryCheck
    
        - runQueryCheck:
            switch:
              - condition: ${queryJobState == 2}
                next: allDone
            next: failedQueryJob
    
        - noOpJob:
            return: "No files to import"
            next: end
    
        - allDone:
            return: "All done!"
            next: end
    
        - failedQueryJob:
            return: "Query job failed"
            next: end
    
        - failedLoadJob:
            return: "Load job failed"
            next: end
    
    
    runBigQueryJob:
      params: [job_id, run_job_url, poll_job_url]
      steps:
        - startBigQueryJob:
            try:
              call: http.get
              args:
                  url: ${run_job_url}
                  query:
                    job_id: ${job_id}
                  auth:
                    type: OIDC
                  timeout: 600
              result: submitJobState
            retry: ${http.default_retry}
            next: validateSubmit
    
        - validateSubmit:
            switch:
              - condition: ${submitJobState.body.status == 1}
                next: sleepAndPollLoad
            next: returnState
    
        - returnState:
            return: ${submitJobState.body.status}
    
        - sleepAndPollLoad:
            call: sys.sleep
            args:
              seconds: 5
            next: pollJob
    
        - pollJob:
            try:
              call: http.get
              args:
                url: ${poll_job_url}
                query:
                  job_id: ${job_id}
                auth:
                  type: OIDC
                timeout: 600
              result: pollJobState
            retry:
              predicate: ${http.default_retry_predicate}
              max_retries: 10
              backoff:
                initial_delay: 1
                max_delay: 60
                multiplier: 2
            next: stateCheck
    
        - stateCheck:
            switch:
              - condition: ${pollJobState.body.status == 2}
                return: ${pollJobState.body.status}
              - condition: ${pollJobState.body.status == 3}
                return: ${pollJobState.body.status}
            next: sleepAndPollLoad

    替换以下内容:

    • CREATE_JOB_URL:用于创建新作业的函数的网址
    • POLL_BIGQUERY_JOB_URL:用于轮询运行中作业状态的函数的网址
    • RUN_BIGQUERY_JOB_URL:用于启动 BigQuery 加载作业的函数的网址
    • CREATE_QUERY_URL:用于启动 BigQuery 查询作业的函数的网址
    • BQ_REGIONBigQuery 区域 数据的存储位置,例如 US
    • BQ_DATASET_TABLE_NAME:BigQuery 数据集表名称,格式为 PROJECT_ID.serverless_elt_dataset.word_count
  2. 部署 workflow 文件:

    gcloud workflows deploy WORKFLOW_NAME \
        --location=WORKFLOW_REGION \
        --description='WORKFLOW_DESCRIPTION' \
        --service-account=workflow-runner@PROJECT_ID.iam.gserviceaccount.com \
        --source=workflow.yaml
    

    替换以下内容:

    • WORKFLOW_NAME:工作流的唯一名称
    • WORKFLOW_REGIONregion [地区] 也就是 us-central1
    • WORKFLOW_DESCRIPTION:工作流的说明
  3. 创建 Python 3 虚拟环境并安装文件生成器的必备项目:

    sudo apt-get install -y python3-venv
    python3 -m venv env
    . env/bin/activate
    cd generator
    pip install -r requirements.txt
    

生成要导入的文件

gen.py Python 脚本会在 Avro 格式。架构与 BigQuery word_count 表。这些 Avro 文件会复制到 指定的 Cloud Storage 存储桶。

在 Cloud Shell 中,生成以下文件:

python gen.py -p PROJECT_ID \
    -o PROJECT_ID-ordersbucket \
    -n RECORDS_PER_FILE \
    -f NUM_FILES \
    -x FILE_PREFIX

替换以下内容:

  • RECORDS_PER_FILE:单个文件中的记录数
  • NUM_FILES:要上传的文件总数
  • FILE_PREFIX:已生成文件的名称的前缀

查看 Firestore 中的文件条目

将文件复制到 Cloud Storage 后, 触发了 handle_new_file Cloud Run 函数。该函数会将文件列表添加到 Firestore jobs 集合的 new 文档的文件列表数组中。

如需查看文件列表,请在 Google Cloud 控制台中前往 Firestore 数据页面。

转到“数据”

已添加到集合的文件的列表。

触发工作流

工作流将一系列无服务器任务与 Google Cloud 和 API 服务相关联。此工作流中的各个步骤以 Cloud Run 函数,而状态存储在 Firestore 中。全部 对 Cloud Run 函数的调用使用服务账号进行身份验证 工作流。

在 Cloud Shell 中,运行工作流:

gcloud workflows execute WORKFLOW_NAME

下图显示了在工作流中使用的步骤:

在主工作流和子工作流中使用的步骤。

工作流分为两个部分:主工作流和子工作流。主工作流处理作业创建和有条件执行,子工作流执行 BigQuery 作业。工作流会执行以下操作:

  • Cloud Run 函数 create_job 会创建一个新的作业对象, 从 Cloud Storage 的 Firestore 文档,并将文件与加载相关联 作业。如果没有要加载的文件,函数将不会创建新作业。
  • create_query Cloud Run 函数会接受所需的查询, 查询将连同查询的 BigQuery 区域一起执行 。该函数在 Firestore 中创建作业并返回作业 ID。
  • run_bigquery_job Cloud Run 函数会获取 然后调用 BigQuery API 来 提交作业。
  • 您不必在 Cloud Run 函数,您可以定期轮询作业的状态。
    • poll_bigquery_job Cloud Run 函数提供 状态 作业的组成部分。系统会重复调用该函数,直到作业完成。
    • poll_bigquery_job 调用之间添加延迟 Cloud Run 函数 sleep”日常安排 是从 Workflows 调用的。

查看作业状态

您可以查看文件列表和作业状态。

  1. 在 在 Google Cloud 控制台中,前往 Firestore 数据页面。

    转到“数据”

  2. 唯一标识符 (UUID) 都会生成相应的文件如需查看 job_typestatus,请点击作业 ID。每个作业可能具有以下类型之一和状态:

    • job_type:工作流正在运行的作业的类型,值为以下值之一:

      • 0:将数据加载到 BigQuery 中。
      • 1:在 BigQuery 中运行查询。
    • status:作业的当前状态,值为以下值之一:

      • 0:作业已创建,但尚未开始。
      • 1:作业正在运行。
      • 2:作业已成功完成其执行。
      • 3:出错了,作业未成功完成。

    作业对象还包含元数据属性,例如 BigQuery 数据集,BigQuery 的名称 表,如果是查询作业,则为要运行的查询字符串。

突出显示了作业状态的文件列表。

在 BigQuery 中查看数据

如需确认 ELT 作业是否成功,请验证数据是否显示在 表格。

  1. 在 Google Cloud 控制台中,转到 BigQuery 编辑器页面。

    前往编辑器

  2. 点击 serverless_elt_dataset.word_count 表。

  3. 点击预览标签页。

    在表格中显示数据的“预览”标签页。

安排工作流

如需定期按计划运行工作流,您可以使用 Cloud Scheduler

清理

避免产生费用的最简单方法是删除您为本教程创建的 Google Cloud 项目。或者,您也可以删除各个资源。

逐个删除资源

  1. 在 Cloud Shell 中,移除使用 Terraform 创建的所有资源:

    cd $HOME/bigquery-workflows-load
    terraform destroy \
    -var project_id=PROJECT_ID \
    -var region=REGION \
    -var zone=ZONE \
    --auto-approve
    
  2. 在 Google Cloud 控制台中,前往 Firestore 数据 页面。

    转到“数据”

  3. 作业旁边,点击 菜单,然后选择删除

    用于删除集合的菜单路径。

删除项目

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

后续步骤