Pub/Sub to Redis 模板

Pub/Sub to Redis 模板是一种流处理流水线,可从 Pub/Sub 订阅中读取消息,并将消息载荷写入 Redis。此模板的最常见应用场景是将日志导出到 Redis Enterprise,以便实时进行基于高级搜索的日志分析。

  • 在写入 Redis 之前,您可以将 JavaScript 用户定义的函数应用于消息载荷。
  • 任何未能成功处理的消息都会被转发到 Pub/Sub 未处理的主题,以便进一步进行问题排查并重新处理。
  • 为了提高安全性,请在设置数据库端点连接时启用 SSL 连接。 此模板不支持双向 TLS。

流水线要求

  • 源 Pub/Sub 订阅必须已存在才能运行此流水线。
  • 在运行此流水线之前,Pub/Sub 未处理的主题必须已存在。
  • Redis 数据库端点必须可从 Dataflow 工作器的子网访问。

模板参数

参数 说明
inputSubscription 如需从中读取输入的 Pub/Sub 订阅,格式为“projects/my-project-id/subscriptions/my-subscription-id”。例如:projects/my-project-id/subscriptions/my-subscription-id
redisHost Redis 数据库主机。例如 your.cloud.db.redislabs.com。默认值为 127.0.0.1
redisPort Redis 数据库端口。例如 12345。默认值为 6379
redisPassword Redis 数据库密码。默认值为空。
sslEnabled 可选:Redis 数据库 SSL 参数。默认值为 false
redisSinkType 可选:Redis 接收器。支持的值包括 STRING_SINKHASH_SINKSTREAMS_SINKLOGGING_SINK。默认值为 STRING_SINK
connectionTimeout 可选:Redis 连接超时(以毫秒为单位)。例如 2000。默认值为 2000
ttl 可选:密钥到期时间(以秒为单位)。HASH_SINKttl 默认值为 -1,表示永不过期。
javascriptTextTransformGcsPath 可选:包含用户定义的函数的 JavaScript 代码的 Cloud Storage 路径模式。例如:gs://your-bucket/your-function.js
javascriptTextTransformFunctionName 可选:要从 JavaScript 文件调用的函数的名称。只能使用字母、数字和下划线。例如: transformtransform_udf1
javascriptTextTransformReloadIntervalMinutes 可选:指定重新加载 UDF 的频率(以分钟为单位)。如果值大于 0,则 Dataflow 会定期检查 Cloud Storage 中的 UDF 文件,并在文件遭到修改时重新加载 UDF。此参数可让您在流水线运行时更新 UDF,而无需重启作业。如果值为 0,则停用 UDF 重新加载。默认值为 0。

用户定义的函数

(可选)您可以通过编写用户定义的函数 (UDF) 来扩展此模板。该模板会为每个输入元素调用 UDF。元素载荷会序列化为 JSON 字符串。如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数

函数规范

UDF 具有以下规范:

  • 输入:JSON 字符串
  • 输出:字符串或字符串化 JSON 对象

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to Redis template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_Redis \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       inputSubscription=INPUT_SUBSCRIPTION,\
       redisHost=REDIS_HOST,\
       redisPort=REDIS_PORT,\
       redisPassword=REDIS_PASSWORD,\

请替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • INPUT_SUBSCRIPTION:Pub/Sub 输入订阅
  • REDIS_HOST:Redis 数据库主机
  • REDIS_PORT:Redis 数据库端口
  • REDIS_PASSWORD:Redis 数据库密码

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "inputSubscription": "INPUT_SUBSCRIPTION",
       "redisHost": "REDIS_HOST",
       "redisPort": "REDIS_PORT",
       "redisPassword": "REDIS_PASSWORD",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_Redis",
     "environment": { "maxWorkers": "10" }
  }
}

请替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • INPUT_SUBSCRIPTION:Pub/Sub 输入订阅
  • REDIS_HOST:Redis 数据库主机
  • REDIS_PORT:Redis 数据库端口
  • REDIS_PASSWORD:Redis 数据库密码

后续步骤