Pub/Sub to Redis 模板

Pub/Sub to Redis 模板是一种流处理流水线,可从 Pub/Sub 订阅中读取消息,并将消息载荷写入 Redis。此模板的最常见应用场景是将日志导出到 Redis Enterprise,以便实时进行基于高级搜索的日志分析。

  • 在写入 Redis 之前,您可以将 JavaScript 用户定义的函数应用于消息载荷。
  • 任何未能成功处理的消息都会被转发到 Pub/Sub 未处理的主题,以便进一步进行问题排查并重新处理。
  • 为了提高安全性,请在设置数据库端点连接时启用 SSL 连接。 此模板不支持双向 TLS。

流水线要求

  • 源 Pub/Sub 订阅必须已存在才能运行此流水线。
  • 在运行此流水线之前,Pub/Sub 未处理的主题必须已存在。
  • Redis 数据库端点必须可从 Dataflow 工作器的子网访问。

模板参数

必需参数

  • inputSubscription:要从中读取输入的 Pub/Sub 订阅,采用 projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID> 格式。(示例:projects/your-project-id/subscriptions/your-subscription-name)。
  • redisHost:Redis 数据库主机。(示例:your.cloud.db.redislabs.com)。默认值为 127.0.0.1。
  • redisPort:Redis 数据库端口。(示例:12345)。默认值为 6379。
  • redisPassword:Redis 数据库密码。默认值为空。

可选参数

  • sslEnabled:Redis 数据库 SSL 参数。默认值为:false。
  • redisSinkType:Redis 接收器。支持的值为 STRING_SINK, HASH_SINK, STREAMS_SINK, and LOGGING_SINK。(示例:STRING_SINK)。默认值为 STRING_SINK。
  • connectionTimeout:Redis 连接超时(以毫秒为单位)。(示例:2000)。默认值为 2000。
  • ttl:密钥到期时间(以秒为单位)。HASH_SINKttl 默认值为 -1,表示永不过期。
  • javascriptTextTransformGcsPath:.js 文件的 Cloud Storage URI,用于定义要使用的 JavaScript 用户定义的函数 (UDF)。(示例:gs://my-bucket/my-udfs/my_file.js)。
  • javascriptTextTransformFunctionName:要使用的 JavaScript 用户定义的函数 (UDF) 的名称。例如,如果 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例 (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)。
  • javascriptTextTransformReloadIntervalMinutes:指定重新加载 UDF 的频率(以分钟为单位)。如果值大于 0,则 Dataflow 会定期检查 Cloud Storage 中的 UDF 文件,并在文件修改时重新加载 UDF。此参数可让您在流水线运行时更新 UDF,而无需重启作业。如果值为 0,则停用 UDF 重新加载。默认值为 0。

用户定义的函数

(可选)您可以通过编写用户定义的函数 (UDF) 来扩展此模板。该模板会为每个输入元素调用 UDF。元素载荷会序列化为 JSON 字符串。如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数

函数规范

UDF 具有以下规范:

  • 输入:JSON 字符串
  • 输出:字符串或字符串化 JSON 对象

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to Redis template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_Redis \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       inputSubscription=INPUT_SUBSCRIPTION,\
       redisHost=REDIS_HOST,\
       redisPort=REDIS_PORT,\
       redisPassword=REDIS_PASSWORD,\

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • INPUT_SUBSCRIPTION:Pub/Sub 输入订阅
  • REDIS_HOST:Redis 数据库主机
  • REDIS_PORT:Redis 数据库端口
  • REDIS_PASSWORD:Redis 数据库密码

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "inputSubscription": "INPUT_SUBSCRIPTION",
       "redisHost": "REDIS_HOST",
       "redisPort": "REDIS_PORT",
       "redisPassword": "REDIS_PASSWORD",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_Redis",
     "environment": { "maxWorkers": "10" }
  }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • INPUT_SUBSCRIPTION:Pub/Sub 输入订阅
  • REDIS_HOST:Redis 数据库主机
  • REDIS_PORT:Redis 数据库端口
  • REDIS_PASSWORD:Redis 数据库密码

后续步骤