快速入门:构建一个正常运行的 Pub/Sub 系统

简介

本快速入门将引导您设置一组简单的应用,这些应用通过 Pub/Sub(而不是同步远程过程调用 (RPC))发送消息进行通信。通过分离应用,消息传递:

  • 使应用更稳健
  • 可以简化开发

例如,调用者(发布者)不需要接收者(订阅者)随时待命。它只是向 Pub/Sub 发送消息。发布者也不需要知道需要接收消息的订阅者应用具体是哪个以及应用的数量是多少。因此,您可以借助该服务在消息可用时将消息传递给一个或多个订阅者应用。

本快速入门适用于使用 macOS 的 Python 开发者。

要求:

  • Google 帐号
  • 安装有 Python 和 Git 的 macOS X 系统
  • 最多需要一个小时来完成

系统概览

在本快速入门中,您将启动一个发布者应用。该应用将“Hello, World!”消息发送给两个订阅者,如下图所示:

主题、其相关订阅以及向 Cloud Pub/Sub 发送消息的发布者和从 Cloud Pub/Sub 接收消息的订阅者应用的示意图

这两个订阅者应用使用相同的代码,但您可以在不同的时间启动它们。此过程演示了 Pub/Sub 启用异步通信的方式。要构建此系统,请执行以下操作:

  1. 创建所需的 Pub/Sub 主题和订阅。
  2. 创建应用用于进行身份验证的服务帐号。
  3. 设置 Cloud IAM 权限。
  4. 启动三个独立应用:一个发布者应用和两个订阅者应用

快速入门设置

设置您的 Google Cloud 项目以及 Pub/Sub 主题和订阅

  1. 登录到 Google Cloud Console。

    转到 Google Cloud Console

    如果您是 Cloud 新手,请点击激活,然后按照提示设置您的 Cloud 帐号。

    在编写本快速入门时,每月免费数据配额的第一部分不收费。如需了解详情,请参阅 Pub/Sub 价格页面。本快速入门还包括清理说明。

  2. 选择现有项目或创建新项目。首次使用 Google Cloud 时,系统会为您创建一个默认项目。

    在 Cloud Console 的首页部分中,记下项目 ID。您可以在 Cloud SDK 初始化过程中使用此值设置当前的 Cloud Storage 项目。启动发布者和订阅者应用时,您还会将此 ID 传递给 Python 脚本。

  3. 转到 Google Cloud Console 的“Pub/Sub”部分。

    转到“Pub/Sub”部分

    按照提示启用 API。

  4. 点击创建主题。发布应用向主题发送消息。 使用 hello_topic 作为名称

  5. 主题详情页面中,点击创建订阅

    1. 将订阅命名为 sub_one。请勿更改任何默认设置。您创建的是 StreamingPull 订阅,这是一种拉取订阅。

    2. 使用相同的过程创建另一个附加到 hello_topic 的订阅,将其命名为 sub_two

      您可以点击主题视图中的主题名称以查看新订阅,也可以切换为订阅视图。

此时,您的 Pub/Sub 环境已准备好管理快速入门的发布和订阅应用之间的消息流。

创建服务帐号凭据

  1. 转到控制台的服务帐号部分。

    转到 Cloud IAM 服务帐号

  2. 选择项目,然后点击创建服务帐号

  3. 输入服务帐号名称,例如 pubsub-quickstart

  4. 点击创建

  5. 对于快速入门,服务帐号需要发布和订阅权限。使用选择角色下拉列表添加 Pub/Sub Publisher 角色。

    “服务帐号权限”对话框,展示如何使用字符串“pub”过滤出 Pub/Sub 角色

  6. 点击添加其他角色,然后添加 Pub/Sub Subscriber

    点击“继续”按钮之前显示的“服务帐号权限”对话框,其中包含 Pub/Sub Publisher 和 Pub/Sub Subscriber

  7. 点击继续。您无需授予用户访问此服务帐号的权限。

  8. 点击创建密钥。客户端库使用该密钥访问 Pub/Sub API。

  9. 选择 JSON,然后点击创建

    密钥将发送到 Downloads 文件夹。对于本教程而言,您可以将其留在那里。

  10. 将密钥文件重命名为 ~/Downloads/key.json

安装 Cloud SDK

  1. 按照安装和初始化 Cloud SDK 的说明进行操作。

    • 在初始化 Cloud SDK 时,选择用于输入项目 ID 的选项,然后输入您在设置部分创建或选择的项目的 ID。

    • 安装并初始化 Cloud SDK 后,您可以返回到本快速入门。您无需安装其他组件或下载 Cloud 客户端库。

    安装了 Cloud SDK 后,您可以使用 gcloud 命令行工具在 Compute Engine 中执行 Pub/Sub 操作。

  2. 在使用以下 gcloud 命令之前启动一个新终端:

    gcloud pubsub topics list
    gcloud pubsub subscriptions list
    

    您还可以使用 gcloud config set project PROJECT_ID 更改您在初始化期间设置的项目。

获取 Python 并设置虚拟环境

本快速入门提供了一个用法示例,因此您不必遵循虚拟环境设置部分所显示的示例。安装了虚拟环境后,您可以返回到此快速入门。

签出发布者和订阅者代码

  1. 创建一个项目文件夹以包含本快速入门所需的 Pub/Sub Python 文件。然后切换到该文件夹并下载代码:

    git clone https://github.com/GoogleCloudPlatform/python-docs-samples.git
    
  2. 关闭所有打开的终端,然后再继续操作。

设置三个终端

  1. 为每个快速入门应用(一个发布者应用和两个订阅者应用)启动一个终端。在每个终端中,执行本部分的所有操作。为方便起见,我们将这些终端称为:

    • 发布者终端
    • sub_one 终端
    • sub_two 终端
  2. 创建 Python 虚拟环境并将其激活。

    • 在第一个终端中,运行以下命令以创建并激活名为 pyenv-qs 的虚拟环境:

      python -m venv pyenv-qs && source pyenv-qs/bin/activate
    • 在其他两个终端中,运行以下命令即可:

      source pyenv-qs/bin/activate

    运行 activate 命令后,命令提示符应包含 (pyenv-qs) $

    您还可以将虚拟环境指向其他 Python 版本。

  3. 确保您使用设置指南中所述的虚拟环境:

    pip install --upgrade google-cloud-pubsub

    将 JSON 密钥与服务帐号相关联。您在创建服务帐号凭据时为密钥分配了 Pub/Sub 角色。 Pub/Sub 客户端库访问环境变量 GOOGLE_APPLICATION_CREDENTIALS,并被授予与服务帐号关联的角色和权限。

    export GOOGLE_APPLICATION_CREDENTIALS=~/Downloads/key.json
  4. 使用当前项目 ID 设置环境变量。此 gcloud 命令确定您当前选择的项目 ID 并将其设置为变量:

    export PROJECT=`gcloud config get-value project`

    要验证您当前的 GCP 是否已正确注册为此变量,请运行以下命令:

    echo $PROJECT
  5. 切换到项目文件夹,然后导航到快速入门示例文件夹:

    cd python-docs-samples/pubsub/cloud-client/quickstart/
    

启动应用并观察消息流

启动订阅者 1 应用

在 sub_one 终端中,启动订阅者 1

python sub.py $PROJECT sub_one

启动后,此应用将轮询 Pub/Sub sub_one 订阅。

订阅者 1 应用开始侦听有关 sub_one 订阅的消息。

启动发布者应用

在发布者终端中,启动发布者应用:

python pub.py $PROJECT hello_topic
  • 发布者应用将“Hello, World!”消息发送给 Pub/Sub,此时该应用还不知道现在是否有任何订阅。该应用还分配一个消息 ID。

  • 订阅者 1 应用接收“Hello World”消息,输出该消息,并向 Pub/Sub 发送确认。

  • 发布者应用输出确认。确认告知 Pub/Sub 消息已成功处理,无需重新发送给此订阅者或任何其他 sub_one 订阅者。

  • Pub/Sub 移除 sub_one 中的消息。

发布者应用发布消息并分配消息 ID。订阅者 1 应用接收“Hello World”消息并发送确认。

启动订阅者 2 应用

在 sub_two 终端中,启动订阅者 2

python sub.py $PROJECT sub_two

该订阅者接收传递给 sub_two 订阅的消息。 订阅者 2 重复使用 sub.py 脚本。其区别在于,订阅者 2 在发布者将消息发送到主题和订阅之后才会启动。如果发布者直接调用订阅者 2,则发布应用要么等到订阅者 2 启动,要么就会超时。Pub/Sub 通过有效地为订阅者 2 保存该消息来管理此过程。

订阅者 2 开始侦听并接收在 sub_two 中等待它的消息

现在,您可以使用 Pub/Sub 进行开发了!

结果怎么样?

Pub/Sub 支持页面提供了其他资源和链接。

清理

  1. 停止所有正在运行的应用。

  2. 从本地环境中删除 ~/pubsub-quickstart 目录。

  3. 在 Google Cloud Console 的 IAM 和管理部分关闭快速入门项目。

  4. 移除服务帐号凭据:rm ~/Downloads/key.json

后续步骤

以下是您可以尝试的一些事项:

  • 检查快速入门中的 pub.pysub.py 代码,并浏览 github 中的其他 Pub/Sub 示例。作为练习,请创建一个每秒发布一次本地时间的 pub.py 版本。

  • 了解如何批量处理消息

  • 使用推送订阅,接收触发 App Engine 端点Cloud Functions 函数的消息。

  • 使用重放检索以前确认的消息。 默认情况下,Pub/Sub 会从订阅中移除已确认的消息。例如,在本快速入门中,您将无法重新运行 sub.py 来再次接收“Hello, World!”消息。您可以使用重放功能设置订阅,以便在确认消息后接收这些消息。