使用 Cloud Functions 将数据从 Cloud Storage 流式传输到 BigQuery

本教程演示了如何使用 Cloud Functions 将新对象从 Cloud Storage 存储分区流式插入到 BigQuery。 Cloud Functions 是一个 Google Cloud 事件驱动的无服务器计算平台,此平台提供自动扩缩、高可用性和容错能力,无需使用服务器来预配、管理、更新或修补。通过 Cloud Functions 流式传输数据,可让您连接和扩展其他 Google Cloud 服务,而且只需在应用运行时付费。

本文适用于需要对添加到 Cloud Storage 的文件进行近于实时的分析的数据分析师、开发人员或运维人员。本文假设您熟悉 Linux、Cloud Storage 和 BigQuery。

架构

以下架构图说明了本教程的流式传输流水线的所有组件和整个流程。不过,此流水线预设您会将 JSON 文件上传到 Cloud Storage 中,如果要支持其他文件格式,还需要对代码进行少量更改。本文未介绍其他文件格式的提取。

流水线的架构图

在上图中,流水线包含以下步骤:

  1. 将 JSON 文件上传到 FILES_SOURCE Cloud Storage 存储分区。
  2. 此事件会触发 streaming Cloud Functions 函数。
  3. 系统解析数据并将其插入到 BigQuery 中。
  4. 系统会将提取状态记录到 FirestoreCloud Logging 中。
  5. 消息会发布到以下 Pub/Sub 主题之一:
    • streaming_success_topic
    • streaming_error_topic
  6. 根据结果,Cloud Functions 将 JSON 文件从 FILES_SOURCE 存储分区移动到以下存储分区之一:
    • FILES_ERROR
    • FILES_SUCCESS

目标

  • 创建 Cloud Storage 存储分区以存储 JSON 文件。
  • 创建 BigQuery 数据集和表以将数据流式传输到其中。
  • 配置 Cloud Functions 函数,使其在有文件添加到存储分区时触发。
  • 设置 Pub/Sub 主题。
  • 配置其他函数来处理函数输出。
  • 测试您的流式传输流水线。
  • 配置 Cloud Monitoring 以针对任何意外行为发出提醒。

费用

本教程使用 Google Cloud 的以下收费组件:

  • Cloud Storage
  • Cloud Functions
  • Firestore
  • BigQuery
  • Logging
  • Monitoring
  • Container Registry
  • Cloud Build

您可使用价格计算器根据您的预计使用量来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

  1. 登录您的 Google Cloud 帐号。如果您是 Google Cloud 新手,请创建一个帐号来评估我们的产品在实际场景中的表现。新客户还可获享 $300 赠金,用于运行、测试和部署工作负载。
  2. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

    转到“项目选择器”

  3. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  4. 启用 Cloud Functions and Cloud Build API。

    启用 API

  5. 在 Cloud Console 中,转到 Monitoring。

    转到 Cloud Monitoring

完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理

设置您的环境

在本教程中,您将使用 Cloud Shell 输入命令。Cloud Shell 让您能够使用 Cloud Console 中的命令行,并包含在 Google Cloud 中进行开发所需的 Cloud SDK 和其他工具。Cloud Shell 显示为 Cloud Console 底部的一个窗口。初始化可能需要几分钟,但窗口会立即显示。

如需使用 Cloud Shell 设置环境并克隆本教程中使用的 git 代码库,请执行以下操作:

  1. 在 Cloud Console 中,打开 Cloud Shell。

    打开 Cloud Shell

  2. 确保您正在使用刚刚创建的项目。将 [YOUR_PROJECT_ID] 替换为您刚创建的 Google Cloud 项目。

    gcloud config set project [YOUR_PROJECT_ID]
    
  3. 设置默认计算区域。在本教程中,计算区域为 us-east1。如果要部署到生产环境,请部署到您选择的地区

    REGION=us-east1
    
  4. 克隆包含本教程中使用的函数的代码库。

    git clone https://github.com/GoogleCloudPlatform/solutions-gcs-bq-streaming-functions-python
    cd solutions-gcs-bq-streaming-functions-python
    

创建流式传输来源和目标接收器

如需将内容流式插入 BigQuery,您需要有一个 FILES_SOURCE Cloud Storage 存储分区和一个 BigQuery 中的目标表。

创建 Cloud Storage 存储分区

创建一个 Cloud Storage 存储分区,用作本教程中介绍的流式传输流水线的来源。此存储分区的主要目的是临时存储流式插入 BigQuery 的 JSON 文件。

  • 创建您的 FILES_SOURCE Cloud Storage 存储分区,其中 FILES_SOURCE 设为具有唯一名称的环境变量。

    FILES_SOURCE=${DEVSHELL_PROJECT_ID}-files-source-$(date +%s)
    gsutil mb -c regional -l ${REGION} gs://${FILES_SOURCE}
    

创建 BigQuery 表

本部分创建用作文件的内容目的地的 BigQuery 表。BigQuery 允许您在将数据加载到表中或创建新表时指定表架构。 在本部分中,您将创建表并同时指定其架构。

  1. 创建 BigQuery 数据集和表。schema.json 文件中定义的架构必须与来自 FILES_SOURCE 存储分区的文件的架构匹配。

    bq mk mydataset
    bq mk mydataset.mytable schema.json
    
  2. 验证表是否已创建。

    bq ls --format=pretty mydataset
    

    输出为:

    +---------+-------+--------+-------------------+
    | tableId | Type  | Labels | Time Partitioning |
    +---------+-------+--------+-------------------+
    | mytable | TABLE |        |                   |
    +---------+-------+--------+-------------------+
    

将数据流式插入 BigQuery

现在您已创建源和目标接收器,您可以创建 Cloud Functions 函数以将数据从 Cloud Storage 流式插入到 BigQuery。

设置流式传输 Cloud Functions 函数

流式传输函数侦听添加到 FILES_SOURCE 存储分区的新文件,然后触发执行以下操作的过程:

  • 解析并验证文件。
  • 检查是否有重复。
  • 将文件的内容插入 BigQuery。
  • 在 Firestore 和 Logging 中记录提取状态。
  • 将消息发布到 Pub/Sub 中的错误或成功主题。

如需部署函数,请执行以下操作:

  1. 创建一个 Cloud Storage 存储分区以在部署期间暂存函数,其中 FUNCTIONS_BUCKET 设为具有唯一名称的环境变量。

    FUNCTIONS_BUCKET=${DEVSHELL_PROJECT_ID}-functions-$(date +%s)
    gsutil mb -c regional -l ${REGION} gs://${FUNCTIONS_BUCKET}
    
  2. 部署 streaming 函数。实现代码位于 ./functions/streaming 文件夹中。该过程可能需要几分钟时间才能完成。

    gcloud functions deploy streaming --region=${REGION} \
        --source=./functions/streaming --runtime=python37 \
        --stage-bucket=${FUNCTIONS_BUCKET} \
        --trigger-bucket=${FILES_SOURCE}
    

    此代码部署了一个用 Python 编写的 Cloud Functions 函数,该函数名为 streaming。只要将文件添加到 FILES_SOURCE 存储分区,就会触发该函数。

  3. 验证该函数已经部署。

    gcloud functions describe streaming  --region=${REGION} \
        --format="table[box](entryPoint, status, eventTrigger.eventType)"
    

    输出为:

    ┌────────────────┬────────┬────────────────────────────────┐
    │  ENTRY_POINT   │ STATUS │           EVENT_TYPE           │
    ├────────────────┼────────┼────────────────────────────────┤
    │ streaming      │ ACTIVE │ google.storage.object.finalize │
    └────────────────┴────────┴────────────────────────────────┘
    
  4. 预配名为 streaming_error_topic 的 Pub/Sub 主题来处理错误路径。

    STREAMING_ERROR_TOPIC=streaming_error_topic
    gcloud pubsub topics create ${STREAMING_ERROR_TOPIC}
    
  5. 预配名为 streaming_success_topic 的 Pub/Sub 主题以处理成功路径。

    STREAMING_SUCCESS_TOPIC=streaming_success_topic
    gcloud pubsub topics create ${STREAMING_SUCCESS_TOPIC}
    

设置您的 Firestore 数据库

当数据流式插入到 BigQuery 时,务必要了解每次提取文件时发生的情况。例如,假设您有未正确导入的文件。在这种情况下,您需要找出问题的根本原因并加以解决,以免在流水线结束时生成损坏的数据和不准确的报告。上一部分中部署的 streaming 函数将文件提取状态存储在 Firestore 文档中,以便您可以查询最近的错误,排查任何存在的问题。

如需创建 Firestore 实例,请按照下列步骤操作:

  1. 在 Google Cloud Console 中,转到 Firestore。

    转到 Firestore

  2. 选择 Cloud Firestore 模式窗口中,点击选择原生模式

  3. 选择位置列表中,选择 nam5(美国),然后点击创建数据库。等待 Firestore 完成初始化。这通常需要几分钟时间。

处理流式传输错误

如需预配处理错误文件的路径,请部署另一个 Cloud Functions 函数,用于侦听发布到 streaming_error_topic 的消息。您的业务需求决定了您在生产环境中处理此类错误的方式。 在本教程中,有问题的文件将移动到另一个 Cloud Storage 存储分区,以方便排查。

  1. 创建 Cloud Storage 存储分区以存储有问题的文件。 FILES_ERROR 设为具有唯一名称的环境变量,用于存储错误文件的存储分区。

    FILES_ERROR=${DEVSHELL_PROJECT_ID}-files-error-$(date +%s)
    gsutil mb -c regional -l ${REGION} gs://${FILES_ERROR}
    
  2. 部署 streaming_error 函数来处理错误。这可能需要几分钟时间。

    gcloud functions deploy streaming_error --region=${REGION} \
        --source=./functions/move_file \
        --entry-point=move_file --runtime=python37 \
        --stage-bucket=${FUNCTIONS_BUCKET} \
        --trigger-topic=${STREAMING_ERROR_TOPIC} \
        --set-env-vars SOURCE_BUCKET=${FILES_SOURCE},DESTINATION_BUCKET=${FILES_ERROR}
    

    此命令与部署 streaming 函数的命令类似。其主要区别在于,此命令中的函数由发布到主题的消息触发,它会接收两个环境变量:SOURCE_BUCKET 变量(从该变量复制文件)和 DESTINATION_BUCKET 变量(文件被复制到该变量)。

  3. 验证 streaming_error 函数已经创建。

    gcloud functions describe streaming_error --region=${REGION} \
        --format="table[box](entryPoint, status, eventTrigger.eventType)"
    

    输出为:

    ┌─────────────┬────────┬─────────────────────────────┐
    │ ENTRY_POINT │ STATUS │          EVENT_TYPE         │
    ├─────────────┼────────┼─────────────────────────────┤
    │ move_file   │ ACTIVE │ google.pubsub.topic.publish │
    └─────────────┴────────┴─────────────────────────────┘
    

处理成功的流式传输

如需预配处理成功文件的路径,请部署第三个 Cloud Functions 函数,用于侦听发布到 streaming_success_topic 的消息。在本教程中,成功提取的文件归档在 Coldline Cloud Storage 存储分区中。

  1. 创建 Coldline Cloud Storage 存储分区。FILES_SUCCESS 设为具有唯一名称的环境变量,用于存储错误文件的存储分区。

    FILES_SUCCESS=${DEVSHELL_PROJECT_ID}-files-success-$(date +%s)
    gsutil mb -c coldline -l ${REGION} gs://${FILES_SUCCESS}
    
  2. 部署 streaming_success 函数以处理成功。这可能需要几分钟时间。

    gcloud functions deploy streaming_success --region=${REGION} \
        --source=./functions/move_file \
        --entry-point=move_file --runtime=python37 \
        --stage-bucket=${FUNCTIONS_BUCKET} \
        --trigger-topic=${STREAMING_SUCCESS_TOPIC} \
        --set-env-vars SOURCE_BUCKET=${FILES_SOURCE},DESTINATION_BUCKET=${FILES_SUCCESS}
    
  3. 验证该函数已经部署。

    gcloud functions describe streaming_success  --region=${REGION} \
        --format="table[box](entryPoint, status, eventTrigger.eventType)"
    

    输出为:

    ┌─────────────┬────────┬─────────────────────────────┐
    │ ENTRY_POINT │ STATUS │          EVENT_TYPE         │
    ├─────────────┼────────┼─────────────────────────────┤
    │ move_file   │ ACTIVE │ google.pubsub.topic.publish │
    └─────────────┴────────┴─────────────────────────────┘
    

测试您的流式传输流水线

现在您已完成流式传输流水线的创建工作,可以测试不同的路径了。首先测试新文件的提取,然后测试重复文件的提取,最后测试有问题的文件的提取。

提取新文件

如需测试新文件的提取,请上传一个必须成功通过整个流水线的文件。为了确保一切正常,您需要检查所有存储部分:BigQuery、Firestore 和 Cloud Storage 存储分区。

  1. data.json 文件上传到 FILES_SOURCE 存储分区。

    gsutil cp test_files/data.json gs://${FILES_SOURCE}
    

    输出:

    Operation completed over 1 objects/312.0 B.
    
  2. 在 BigQuery 中查询您的数据。

    bq query 'select first_name, last_name, dob from mydataset.mytable'
    

    此命令会输出 data.json 文件的内容:

    +------------+-----------+------------+
    | first_name | last_name |    dob     |
    +------------+-----------+------------+
    | John       | Doe       | 1968-01-22 |
    +------------+-----------+------------+
    
  3. 在 Cloud Console 中,转到 Firestore 页面。

    转到 Firestore

  4. 转到 / > streaming_files > data.json 文档,以验证是否存在 success: true 字段。streaming 函数将文件的状态存储在名为 streaming_files 的集合中,并将文件名用作文档 ID。

    验证“streaming”函数是否存储了文件的成功状态

  5. 返回到 Cloud Shell。

    转到 Cloud Shell

  6. 验证 streaming_success 函数是否已从 FILES_SOURCE 存储分区中移除已提取的文件。

    gsutil ls -l gs://${FILES_SOURCE}/data.json
    

    输出为 CommandException,因为文件不再存在于 FILES_SOURCE 存储分区中。

  7. 验证所提取的文件现在是否位于 FILES_SUCCESS 存储分区中。

    gsutil ls -l gs://${FILES_SUCCESS}/data.json
    

    输出为:

    TOTAL: 1 objects, 312 bytes.
    

提取已处理的文件

文件名在 Firestore 中用作文档 ID。这使得 streaming 函数可以轻松查询给定文件是否已处理。如果先前已成功提取文件,系统会忽略添加该文件的任何新尝试,因为它会在 BigQuery 中生成重复的信息,导致报告不准确。

在本部分中,您将验证在将重复文件上传到 FILES_SOURCE 存储分区时流水线是否按预期工作。

  1. 将同一个 data.json 文件再次上传到 FILES_SOURCE 存储分区。

    gsutil cp test_files/data.json gs://${FILES_SOURCE}
    

    输出为:

    Operation completed over 1 objects/312.0 B.
    
  2. 查询 BigQuery 会返回与以前相同的结果。 这意味着流水线处理了该文件,但它没有将文件内容插入到 BigQuery 中,因为该文件之前已提取。

    bq query 'select first_name, last_name, dob from mydataset.mytable'
    

    输出为:

    +------------+-----------+------------+
    | first_name | last_name |    dob     |
    +------------+-----------+------------+
    | John       | Doe       | 1968-01-22 |
    +------------+-----------+------------+
    
  3. 在 Cloud Console 中,转到 Firestore 页面。

    转到 Firestore

  4. / > streaming_files > data.json 文档中,验证是否已添加新的 **duplication_attempts** 字段。

    验证“streaming”函数是否存储了 duplication_attempts

    每次将文件添加到与之前成功处理的具有相同名称的 FILES_SOURCE 存储分区时,系统都会忽略该文件的内容,并在 Cloud Firestore 中的 **duplication_attempts** 字段附加新的重复尝试。

  5. 返回到 Cloud Shell。

    转到 Cloud Shell

  6. 验证重复文件是否仍在 FILES_SOURCE 存储分区中。

    gsutil ls -l gs://${FILES_SOURCE}/data.json
    

    输出为:

    TOTAL: 1 objects, 312 bytes.
    

    在重复场景中,streaming 函数会在 Logging 中记录意外行为,忽略提取操作,并将文件留在 FILES_SOURCE 存储分区中以供以后分析。

提取存在错误的文件

现在您已确认流式传输流水线正在工作,并且未将重复内容提取到 BigQuery 中,下面可以检查错误路径。

  1. data_error.json 上传到 FILES_SOURCE 存储分区。

    gsutil cp test_files/data_error.json gs://${FILES_SOURCE}
    

    输出为:

    Operation completed over 1 objects/311.0 B.
    
  2. 查询 BigQuery 会返回与以前相同的结果。 这意味着流水线处理了该文件,但它没有将文件内容插入到 BigQuery 中,因为该文件不符合预期的架构。

    bq query 'select first_name, last_name, dob from mydataset.mytable'
    

    输出为:

    +------------+-----------+------------+
    | first_name | last_name |    dob     |
    +------------+-----------+------------+
    | John       | Doe       | 1968-01-22 |
    +------------+-----------+------------+
    
  3. 在 Cloud Console 中,转到 Firestore 页面。

    转到 Firestore

  4. / > streaming_files > data_error.json 文档中,验证已添加 success: false 字段。

    验证“streaming”函数是否存储了文件的错误状态

    对于存在错误的文件,streaming 函数还会存储 error_message 字段,该字段为您提供有关文件未被提取的详细信息。

  5. 返回到 Cloud Shell。

    转到 Cloud Shell

  6. 验证 streaming_error 函数是否已从 FILES_SOURCE 存储分区中移除该文件。

    gsutil ls -l gs://${FILES_SOURCE}/data_error.json
    

    输出为 CommandException,因为文件不再存在于 FILES_SOURCE 存储分区中。

  7. 验证文件现在按预期位于 FILES_ERROR 存储分区中。

    gsutil ls -l gs://${FILES_ERROR}/data_error.json
    

    输出为:

    TOTAL: 1 objects, 311 bytes.
    

找到并解决数据提取问题

通过在 Firestore 中对 streaming_files 集合运行查询,可以快速诊断和修复问题。在本部分中,您将使用适用于 Firestore 的标准 Python API 过滤所有错误文件。

db = firestore.Client()
docs = db.collection(u'streaming_files')\
    .where(u'success', u'==', False)\
    .get()

如需在您的环境中查看查询结果,请执行以下操作:

  1. firestore 文件夹中创建虚拟环境。

    pip install virtualenv
    virtualenv firestore
    source firestore/bin/activate
    
  2. 在虚拟环境中安装 Python Firestore 模块。

    pip install google-cloud-firestore
    
  3. 呈现现有的流水线问题。

    python firestore/show_streaming_errors.py
    

    show_streaming_errors.py 文件包含 Firestore 查询和其他样板,用于循环结果和格式化输出。运行上述命令后,输出类似于以下内容:

    +-----------------+-------------------------+----------------------------------------------------------------------------------+
    | File Name       | When                    | Error Message                                                                    |
    +-----------------+-------------------------+----------------------------------------------------------------------------------+
    | data_error.json | 2019-01-22 11:31:58 UTC | Error streaming file 'data_error.json'. Cause: Traceback (most recent call las.. |
    +-----------------+-------------------------+----------------------------------------------------------------------------------+
    
  4. 完成分析后,停用虚拟环境。

    deactivate
    

    找到并解决有问题的文件后,再次使用相同的文件名将其上传到 FILES_SOURCE 存储分区。此过程会让这些文件通过整个流式传输流水线,以将其内容插入 BigQuery。

有关意外行为的提醒

在生产环境中,务必要监控发生的意外情况并发出提醒。自定义指标是众多 Logging 功能中的一个。通过自定义指标,您可以创建提醒政策,以便在指标满足指定条件时通知您和您的团队。

在本部分中,您将 Monitoring 配置为在文件提取失败时发送电子邮件提醒。为了识别失败的提取,以下配置使用默认的 Python logging.error(..) 消息。

  1. 在 Cloud Console 中,转到“基于日志的指标”页面。

    转到“基于日志的指标”页面

  2. 点击创建指标

  3. 过滤列表中,选择转换为高级过滤器

    高级过滤器菜单

  4. 在高级过滤器中,粘贴以下配置。

    resource.type="cloud_function"
    resource.labels.function_name="streaming"
    resource.labels.region="us-east1"
    "Error streaming file "
    

    要在高级过滤器中粘贴的配置

  5. 指标编辑器中,填写以下字段,然后点击创建指标

    • 名称字段中,输入 streaming-error
    • 标签部分的名称字段中输入 payload_error
    • 标签类型列表中,选择字符串
    • 字段名称列表中,选择 textPayload
    • 提取正则表达式字段中,输入 (Error streaming file '.*'.)
    • 类型列表中,选择计数器

      指标编辑器字段

  6. 在 Google Cloud Console 中,转到 Monitoring 或使用以下按钮:

    转到“监控”

  7. 在 Monitoring 导航窗格中,选择 提醒,然后选择创建政策

  8. 为此政策命名字段中,输入 streaming-error-alert

  9. 点击添加条件

    • 标题字段中,输入 streaming-error-condition
    • 指标字段中,输入 logging/user/streaming-error
    • 发生以下情况时触发条件列表中,选择任何违反时间序列的情况
    • 条件列表中,选择超过
    • 阈值字段中,输入 0
    • 时限列表中,选择 1 分钟
  10. 通知渠道类型列表中,选择电子邮件,输入您的电子邮件地址,然后点击添加通知渠道

  11. (可选)点击文档并添加您希望包含在通知消息中的任何信息。

  12. 点击保存

保存提醒政策后,Monitoring 会以每分钟一次的间隔监视 streaming 函数错误日志,并在出现流式传输错误时发送电子邮件提醒。

清理

为避免因本教程中使用的资源导致您的 Google Cloud 帐号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除项目

  1. 在 Cloud Console 中,转到管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

后续步骤

  • 查看事件和触发器,了解在 Google Cloud 中触发无服务器函数的其他方法。
  • 访问提醒页面,了解如何改进本教程中定义的提醒政策。
  • 访问 Firestore 文档,详细了解这种全球规模的 NoSQL 数据库。
  • 访问 BigQuery 配额和限制页面,了解在生产环境中实现此解决方案时的流式插入限制。
  • 访问 Cloud Functions 配额和限制页面,了解部署的函数能够处理的大小上限。
  • 探索有关 Google Cloud 的参考架构、图表、教程和最佳做法。查看我们的云架构中心