使用 Cloud Functions 将数据从 Cloud Storage 流式传输到 BigQuery

本教程演示了如何使用 Cloud Functions 将新对象从 Cloud Storage 存储分区流式插入到 BigQuery。 Cloud Functions 是一个 Google Cloud 事件驱动的无服务器计算平台,此平台提供自动扩缩、高可用性和容错能力,无需使用服务器来预配、管理、更新或修补。通过 Cloud Functions 流式传输数据,可让您连接和扩展其他 Google Cloud 服务,而且只需在应用运行时付费。

本文适用于需要对添加到 Cloud Storage 的文件进行近于实时的分析的数据分析师、开发人员或运维人员。本文假设您熟悉 Linux、Cloud Storage 和 BigQuery。

架构

以下架构图说明了本教程的流式传输流水线的所有组件和整个流程。不过,此流水线预设您会将 JSON 文件上传到 Cloud Storage 中,如果要支持其他文件格式,还需要对代码进行少量更改。本文未介绍其他文件格式的提取。

流水线的架构图

在上图中,流水线包含以下步骤:

  1. 将 JSON 文件上传到 FILES_SOURCE Cloud Storage 存储分区。
  2. 此事件会触发 streaming Cloud Functions 函数。
  3. 系统解析数据并将其插入到 BigQuery 中。
  4. 系统会将提取状态记录到 FirestoreCloud Logging 中。
  5. 消息会发布到以下 Pub/Sub 主题之一:
    • streaming_success_topic
    • streaming_error_topic
  6. 根据结果,Cloud Functions 将 JSON 文件从 FILES_SOURCE 存储分区移动到以下存储分区之一:
    • FILES_ERROR
    • FILES_SUCCESS

目标

  • 创建 Cloud Storage 存储分区以存储 JSON 文件。
  • 创建 BigQuery 数据集和表以将数据流式传输到其中。
  • 配置 Cloud Functions 函数,使其在有文件添加到存储分区时触发。
  • 设置 Pub/Sub 主题。
  • 配置其他函数来处理函数输出。
  • 测试您的流式传输流水线。
  • 配置 Cloud Monitoring 以针对任何意外行为发出提醒。

费用

本教程使用 Google Cloud 的以下收费组件:

  • Cloud Storage
  • Cloud Functions
  • Firestore
  • BigQuery
  • Logging
  • Monitoring
  • Container Registry
  • Cloud Build

您可使用价格计算器根据您的预计使用量来估算费用。 Google Cloud 新用户可能有资格申请免费试用

准备工作

  1. 登录您的 Google 帐号。

    如果您还没有 Google 帐号,请注册新帐号

  2. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

    转到项目选择器页面

  3. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  4. 启用 Cloud Functions and Cloud Build API。

    启用 API

  5. 在 Cloud Console 中,转到 Monitoring。

    转到 Cloud Monitoring

    如果您从未使用过 Cloud Monitoring,那么您在 Google Cloud Console 中首次访问 Monitoring 时,系统会自动创建一个工作区,并将您的项目与该工作区相关联。否则,如果您的项目未与工作区关联,则系统会显示一个对话框,您可以创建一个工作区,也可以将您的项目添加到现有工作区。我们建议您创建一个工作区。完成选择后,请点击添加

完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理

设置您的环境

在本教程中,您将使用 Cloud Shell 输入命令。Cloud Shell 让您能够使用 Cloud Console 中的命令行,并包含在 Google Cloud 中进行开发所需的 Cloud SDK 和其他工具。Cloud Shell 显示为 Cloud Console 底部的一个窗口。初始化可能需要几分钟,但窗口会立即显示。

如需使用 Cloud Shell 设置环境并克隆本教程中使用的 git 代码库,请执行以下操作:

  1. 在 Cloud Console 中,打开 Cloud Shell。

    打开 Cloud Shell

  2. 确保您正在使用刚刚创建的项目。将 [YOUR_PROJECT_ID] 替换为您刚创建的 Google Cloud 项目。

    gcloud config set project [YOUR_PROJECT_ID]
    
  3. 设置默认计算区域。在本教程中,计算区域为 us-east1。如果要部署到生产环境,请部署到您选择的地区

    REGION=us-east1
    
  4. 克隆包含本教程中使用的函数的代码库。

    git clone https://github.com/GoogleCloudPlatform/solutions-gcs-bq-streaming-functions-python
    cd solutions-gcs-bq-streaming-functions-python
    

创建流式传输来源和目标接收器

如需将内容流式插入 BigQuery,您需要有一个 FILES_SOURCE Cloud Storage 存储分区和一个 BigQuery 中的目标表。

创建 Cloud Storage 存储分区

创建一个 Cloud Storage 存储分区,用作本教程中介绍的流式传输流水线的来源。此存储分区的主要目的是临时存储流式插入 BigQuery 的 JSON 文件。

  • 创建您的 FILES_SOURCE Cloud Storage 存储分区,其中 FILES_SOURCE 设为具有唯一名称的环境变量。

    FILES_SOURCE=${DEVSHELL_PROJECT_ID}-files-source-$(date +%s)
    gsutil mb -c regional -l ${REGION} gs://${FILES_SOURCE}
    

创建 BigQuery 表

本部分创建用作文件的内容目的地的 BigQuery 表。BigQuery 允许您在将数据加载到表中或创建新表时指定表架构。 在本部分中,您将创建表并同时指定其架构。

  1. 创建 BigQuery 数据集和表。schema.json 文件中定义的架构必须与来自 FILES_SOURCE 存储分区的文件的架构匹配。

    bq mk mydataset
    bq mk mydataset.mytable schema.json
    
  2. 验证表是否已创建。

    bq ls --format=pretty mydataset
    

    输出为:

    +---------+-------+--------+-------------------+
    | tableId | Type  | Labels | Time Partitioning |
    +---------+-------+--------+-------------------+
    | mytable | TABLE |        |                   |
    +---------+-------+--------+-------------------+
    

将数据流式插入 BigQuery

现在您已创建源和目标接收器,您可以创建 Cloud Functions 函数以将数据从 Cloud Storage 流式插入到 BigQuery。

设置流式传输 Cloud Functions 函数

流式传输函数侦听添加到 FILES_SOURCE 存储分区的新文件,然后触发执行以下操作的过程:

  • 解析并验证文件。
  • 检查是否有重复。
  • 将文件的内容插入 BigQuery。
  • 在 Firestore 和 Logging 中记录提取状态。
  • 将消息发布到 Pub/Sub 中的错误或成功主题。

如需部署函数,请执行以下操作:

  1. 创建一个 Cloud Storage 存储分区以在部署期间暂存函数,其中 FUNCTIONS_BUCKET 设为具有唯一名称的环境变量。

    FUNCTIONS_BUCKET=${DEVSHELL_PROJECT_ID}-functions-$(date +%s)
    gsutil mb -c regional -l ${REGION} gs://${FUNCTIONS_BUCKET}
    
  2. 部署 streaming 函数。实现代码位于 ./functions/streaming 文件夹中。该过程可能需要几分钟时间才能完成。

    gcloud functions deploy streaming --region=${REGION} \
        --source=./functions/streaming --runtime=python37 \
        --stage-bucket=${FUNCTIONS_BUCKET} \
        --trigger-bucket=${FILES_SOURCE}
    

    此代码部署了一个用 Python 编写的 Cloud Functions 函数,该函数名为 streaming。只要将文件添加到 FILES_SOURCE 存储分区,就会触发该函数。

  3. 验证该函数已经部署。

    gcloud functions describe streaming  --region=${REGION} \
        --format="table[box](entryPoint, status, eventTrigger.eventType)"
    

    输出为:

    ┌────────────────┬────────┬────────────────────────────────┐
    │  ENTRY_POINT   │ STATUS │           EVENT_TYPE           │
    ├────────────────┼────────┼────────────────────────────────┤
    │ streaming      │ ACTIVE │ google.storage.object.finalize │
    └────────────────┴────────┴────────────────────────────────┘
    
  4. 预配名为 streaming_error_topic 的 Pub/Sub 主题来处理错误路径。

    STREAMING_ERROR_TOPIC=streaming_error_topic
    gcloud pubsub topics create ${STREAMING_ERROR_TOPIC}
    
  5. 预配名为 streaming_success_topic 的 Pub/Sub 主题以处理成功路径。

    STREAMING_SUCCESS_TOPIC=streaming_success_topic
    gcloud pubsub topics create ${STREAMING_SUCCESS_TOPIC}
    

设置您的 Firestore 数据库

当数据流式插入到 BigQuery 时,务必要了解每次提取文件时发生的情况。例如,假设您有未正确导入的文件。在这种情况下,您需要找出问题的根本原因并加以解决,以免在流水线结束时生成损坏的数据和不准确的报告。上一部分中部署的 streaming 函数将文件提取状态存储在 Firestore 文档中,以便您可以查询最近的错误,排查任何存在的问题。

如需创建 Firestore 实例,请按照下列步骤操作:

  1. 在 Google Cloud Console 中,转到 Firestore。

    转到 Firestore

  2. 选择 Cloud Firestore 模式窗口中,点击选择原生模式

  3. 选择位置列表中,选择 nam5(美国),然后点击创建数据库。等待 Firestore 完成初始化。这通常需要几分钟时间。

处理流式传输错误

如需预配处理错误文件的路径,请部署另一个 Cloud Functions 函数,用于侦听发布到 streaming_error_topic 的消息。您的业务需求决定了您在生产环境中处理此类错误的方式。 在本教程中,有问题的文件将移动到另一个 Cloud Storage 存储分区,以方便排查。

  1. 创建 Cloud Storage 存储分区以存储有问题的文件。 FILES_ERROR 设为具有唯一名称的环境变量,用于存储错误文件的存储分区。

    FILES_ERROR=${DEVSHELL_PROJECT_ID}-files-error-$(date +%s)
    gsutil mb -c regional -l ${REGION} gs://${FILES_ERROR}
    
  2. 部署 streaming_error 函数来处理错误。这可能需要几分钟时间。

    gcloud functions deploy streaming_error --region=${REGION} \
        --source=./functions/move_file \
        --entry-point=move_file --runtime=python37 \
        --stage-bucket=${FUNCTIONS_BUCKET} \
        --trigger-topic=${STREAMING_ERROR_TOPIC} \
        --set-env-vars SOURCE_BUCKET=${FILES_SOURCE},DESTINATION_BUCKET=${FILES_ERROR}
    

    此命令与部署 streaming 函数的命令类似。其主要区别在于,此命令中的函数由发布到主题的消息触发,它会接收两个环境变量:SOURCE_BUCKET 变量(从该变量复制文件)和 DESTINATION_BUCKET 变量(文件被复制到该变量)。

  3. 验证 streaming_error 函数已经创建。

    gcloud functions describe streaming_error --region=${REGION} \
        --format="table[box](entryPoint, status, eventTrigger.eventType)"
    

    输出为:

    ┌─────────────┬────────┬─────────────────────────────┐
    │ ENTRY_POINT │ STATUS │          EVENT_TYPE         │
    ├─────────────┼────────┼─────────────────────────────┤
    │ move_file   │ ACTIVE │ google.pubsub.topic.publish │
    └─────────────┴────────┴─────────────────────────────┘
    

处理成功的流式传输

如需预配处理成功文件的路径,请部署第三个 Cloud Functions 函数,用于侦听发布到 streaming_success_topic 的消息。在本教程中,成功提取的文件归档在 Coldline Cloud Storage 存储分区中。

  1. 创建 Coldline Cloud Storage 存储分区。FILES_SUCCESS 设为具有唯一名称的环境变量,用于存储错误文件的存储分区。

    FILES_SUCCESS=${DEVSHELL_PROJECT_ID}-files-success-$(date +%s)
    gsutil mb -c coldline -l ${REGION} gs://${FILES_SUCCESS}
    
  2. 部署 streaming_success 函数以处理成功。这可能需要几分钟时间。

    gcloud functions deploy streaming_success --region=${REGION} \
        --source=./functions/move_file \
        --entry-point=move_file --runtime=python37 \
        --stage-bucket=${FUNCTIONS_BUCKET} \
        --trigger-topic=${STREAMING_SUCCESS_TOPIC} \
        --set-env-vars SOURCE_BUCKET=${FILES_SOURCE},DESTINATION_BUCKET=${FILES_SUCCESS}
    
  3. 验证该函数已经部署。

    gcloud functions describe streaming_success  --region=${REGION} \
        --format="table[box](entryPoint, status, eventTrigger.eventType)"
    

    输出为:

    ┌─────────────┬────────┬─────────────────────────────┐
    │ ENTRY_POINT │ STATUS │          EVENT_TYPE         │
    ├─────────────┼────────┼─────────────────────────────┤
    │ move_file   │ ACTIVE │ google.pubsub.topic.publish │
    └─────────────┴────────┴─────────────────────────────┘
    

测试您的流式传输流水线

现在您已完成流式传输流水线的创建工作,可以测试不同的路径了。首先测试新文件的提取,然后测试重复文件的提取,最后测试有问题的文件的提取。

提取新文件

如需测试新文件的提取,请上传一个必须成功通过整个流水线的文件。为了确保一切正常,您需要检查所有存储部分:BigQuery、Firestore 和 Cloud Storage 存储分区。

  1. data.json 文件上传到 FILES_SOURCE 存储分区。

    gsutil cp test_files/data.json gs://${FILES_SOURCE}
    

    输出:

    Operation completed over 1 objects/312.0 B.
    
  2. 在 BigQuery 中查询您的数据。

    bq query 'select first_name, last_name, dob from mydataset.mytable'
    

    此命令会输出 data.json 文件的内容:

    +------------+-----------+------------+
    | first_name | last_name |    dob     |
    +------------+-----------+------------+
    | John       | Doe       | 1968-01-22 |
    +------------+-----------+------------+
    
  3. 在 Cloud Console 中,转到 Firestore 页面。

    转到 Firestore

  4. 转到 / > streaming_files > data.json 文档,以验证是否存在 success: true 字段。streaming 函数将文件的状态存储在名为 streaming_files 的集合中,并将文件名用作文档 ID。

    验证“streaming”函数是否存储了文件的成功状态

  5. 返回到 Cloud Shell。

    转到 Cloud Shell

  6. 验证 streaming_success 函数是否已从 FILES_SOURCE 存储分区中移除已提取的文件。

    gsutil ls -l gs://${FILES_SOURCE}/data.json
    

    输出为 CommandException,因为文件不再存在于 FILES_SOURCE 存储分区中。

  7. 验证所提取的文件现在是否位于 FILES_SUCCESS 存储分区中。

    gsutil ls -l gs://${FILES_SUCCESS}/data.json
    

    输出为:

    TOTAL: 1 objects, 312 bytes.
    

提取已处理的文件

文件名在 Firestore 中用作文档 ID。这使得 streaming 函数可以轻松查询给定文件是否已处理。如果先前已成功提取文件,系统会忽略添加该文件的任何新尝试,因为它会在 BigQuery 中生成重复的信息,导致报告不准确。

在本部分中,您将验证在将重复文件上传到 FILES_SOURCE 存储分区时流水线是否按预期工作。

  1. 将同一个 data.json 文件再次上传到 FILES_SOURCE 存储分区。

    gsutil cp test_files/data.json gs://${FILES_SOURCE}
    

    输出为:

    Operation completed over 1 objects/312.0 B.
    
  2. 查询 BigQuery 会返回与以前相同的结果。 这意味着流水线处理了该文件,但它没有将文件内容插入到 BigQuery 中,因为该文件之前已提取。

    bq query 'select first_name, last_name, dob from mydataset.mytable'
    

    输出为:

    +------------+-----------+------------+
    | first_name | last_name |    dob     |
    +------------+-----------+------------+
    | John       | Doe       | 1968-01-22 |
    +------------+-----------+------------+
    
  3. 在 Cloud Console 中,转到 Firestore 页面。

    转到 Firestore

  4. / > streaming_files > data.json 文档中,验证是否已添加新的 **duplication_attempts** 字段。

    验证“streaming”函数是否存储了 duplication_attempts

    每次将文件添加到与之前成功处理的具有相同名称的 FILES_SOURCE 存储分区时,系统都会忽略该文件的内容,并在 Cloud Firestore 中的 **duplication_attempts** 字段附加新的重复尝试。

  5. 返回到 Cloud Shell。

    转到 Cloud Shell

  6. 验证重复文件是否仍在 FILES_SOURCE 存储分区中。

    gsutil ls -l gs://${FILES_SOURCE}/data.json
    

    输出为:

    TOTAL: 1 objects, 312 bytes.
    

    在重复场景中,streaming 函数会在 Logging 中记录意外行为,忽略提取操作,并将文件留在 FILES_SOURCE 存储分区中以供以后分析。

提取存在错误的文件

现在您已确认流式传输流水线正在工作,并且未将重复内容提取到 BigQuery 中,下面可以检查错误路径。

  1. data_error.json 上传到 FILES_SOURCE 存储分区。

    gsutil cp test_files/data_error.json gs://${FILES_SOURCE}
    

    输出为:

    Operation completed over 1 objects/311.0 B.
    
  2. 查询 BigQuery 会返回与以前相同的结果。 这意味着流水线处理了该文件,但它没有将文件内容插入到 BigQuery 中,因为该文件不符合预期的架构。

    bq query 'select first_name, last_name, dob from mydataset.mytable'
    

    输出为:

    +------------+-----------+------------+
    | first_name | last_name |    dob     |
    +------------+-----------+------------+
    | John       | Doe       | 1968-01-22 |
    +------------+-----------+------------+
    
  3. 在 Cloud Console 中,转到 Firestore 页面。

    转到 Firestore

  4. / > streaming_files > data_error.json 文档中,验证已添加 success: false 字段。

    验证“streaming”函数是否存储了文件的错误状态

    对于存在错误的文件,streaming 函数还会存储 error_message 字段,该字段为您提供有关文件未被提取的详细信息。

  5. 返回到 Cloud Shell。

    转到 Cloud Shell

  6. 验证 streaming_error 函数是否已从 FILES_SOURCE 存储分区中移除该文件。

    gsutil ls -l gs://${FILES_SOURCE}/data_error.json
    

    输出为 CommandException,因为文件不再存在于 FILES_SOURCE 存储分区中。

  7. 验证文件现在按预期位于 FILES_ERROR 存储分区中。

    gsutil ls -l gs://${FILES_ERROR}/data_error.json
    

    输出为:

    TOTAL: 1 objects, 311 bytes.
    

找到并解决数据提取问题

通过在 Firestore 中对 streaming_files 集合运行查询,可以快速诊断和修复问题。在本部分中,您将使用适用于 Firestore 的标准 Python API 过滤所有错误文件。

db = firestore.Client()
docs = db.collection(u'streaming_files')\
    .where(u'success', u'==', False)\
    .get()

如需在您的环境中查看查询结果,请执行以下操作:

  1. firestore 文件夹中创建虚拟环境。

    pip install virtualenv
    virtualenv firestore
    source firestore/bin/activate
    
  2. 在虚拟环境中安装 Python Firestore 模块。

    pip install google-cloud-firestore
    
  3. 呈现现有的流水线问题。

    python firestore/show_streaming_errors.py
    

    show_streaming_errors.py 文件包含 Firestore 查询和其他样板,用于循环结果和格式化输出。运行上述命令后,输出类似于以下内容:

    +-----------------+-------------------------+----------------------------------------------------------------------------------+
    | File Name       | When                    | Error Message                                                                    |
    +-----------------+-------------------------+----------------------------------------------------------------------------------+
    | data_error.json | 2019-01-22 11:31:58 UTC | Error streaming file 'data_error.json'. Cause: Traceback (most recent call las.. |
    +-----------------+-------------------------+----------------------------------------------------------------------------------+
    
  4. 完成分析后,停用虚拟环境。

    deactivate
    

    找到并解决有问题的文件后,再次使用相同的文件名将其上传到 FILES_SOURCE 存储分区。此过程会让这些文件通过整个流式传输流水线,以将其内容插入 BigQuery。

有关意外行为的提醒

在生产环境中,务必要监控发生的意外情况并发出提醒。自定义指标是众多 Logging 功能中的一个。通过自定义指标,您可以创建提醒政策,以便在指标满足指定条件时通知您和您的团队。

在本部分中,您将 Monitoring 配置为在文件提取失败时发送电子邮件提醒。为了识别失败的提取,以下配置使用默认的 Python logging.error(..) 消息。

  1. 在 Cloud Console 中,转到“基于日志的指标”页面。

    转到“基于日志的指标”页面

  2. 点击创建指标

  3. 过滤列表中,选择转换为高级过滤器

    高级过滤器菜单

  4. 在高级过滤器中,粘贴以下配置。

    resource.type="cloud_function"
    resource.labels.function_name="streaming"
    resource.labels.region="us-east1"
    "Error streaming file "
    

    要在高级过滤器中粘贴的配置

  5. 指标编辑器中,填写以下字段,然后点击创建指标

    • 名称字段中,输入 streaming-error
    • 标签部分的名称字段中输入 payload_error
    • 标签类型列表中,选择字符串
    • 字段名称列表中,选择 textPayload
    • 提取正则表达式字段中,输入 (Error streaming file '.*'.)
    • 类型列表中,选择计数器

      指标编辑器字段

  6. 在 Google Cloud Console 中,转到 Monitoring 或使用以下按钮:

    转到“监控”

  7. 在 Monitoring 导航窗格中,选择 提醒,然后选择创建政策

  8. 为此政策命名字段中,输入 streaming-error-alert

  9. 点击添加条件

    • 标题字段中,输入 streaming-error-condition
    • 指标字段中,输入 logging/user/streaming-error
    • 发生以下情况时触发条件列表中,选择任何违反时间序列的情况
    • 条件列表中,选择超过
    • 阈值字段中,输入 0
    • 时限列表中,选择 1 分钟
  10. 通知渠道类型列表中,选择电子邮件,输入您的电子邮件地址,然后点击添加通知渠道

  11. (可选)点击文档并添加您希望包含在通知消息中的任何信息。

  12. 点击保存

保存提醒政策后,Monitoring 会以每分钟一次的间隔监视 streaming 函数错误日志,并在出现流式传输错误时发送电子邮件提醒。

清理

为避免因本教程中使用的资源导致您的 Google Cloud 帐号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除项目

  1. 在 Cloud Console 中,转到管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

后续步骤

  • 查看事件和触发器,了解在 Google Cloud 中触发无服务器函数的其他方法。
  • 访问提醒页面,了解如何改进本教程中定义的提醒政策。
  • 访问 Firestore 文档,详细了解这种全球规模的 NoSQL 数据库。
  • 访问 BigQuery 配额和限制页面,了解在生产环境中实现此解决方案时的流式插入限制。
  • 访问 Cloud Functions 配额和限制页面,了解部署的函数能够处理的大小上限。
  • 试用其他 Google Cloud 功能。查阅我们的教程