Pub/Sub to Redis 模板

Pub/Sub to Redis 模板是一种流处理流水线,可从 Pub/Sub 订阅中读取消息,并将消息载荷写入 Redis。此模板的最常见应用场景是将日志导出到 Redis Enterprise,以便实时进行基于高级搜索的日志分析。

  • 在写入 Redis 之前,您可以将 JavaScript 用户定义的函数应用于消息载荷。
  • 任何未能成功处理的消息都会被转发到 Pub/Sub 未处理的主题,以便进一步进行问题排查并重新处理。
  • 为了提高安全性,请在设置数据库端点连接时启用 SSL 连接。 此模板不支持双向 TLS。

流水线要求

  • 源 Pub/Sub 订阅必须已存在才能运行此流水线。
  • 在运行此流水线之前,Pub/Sub 未处理的主题必须已存在。
  • Redis 数据库端点必须可从 Dataflow 工作器的子网访问。

模板参数

必需参数

  • inputSubscription:要从中读取输入的 Pub/Sub 订阅,采用 projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID> 格式。(示例:projects/your-project-id/subscriptions/your-subscription-name)。
  • redisHost:Redis 数据库主机。(示例:your.cloud.db.redislabs.com)。默认值为 127.0.0.1。
  • redisPort:Redis 数据库端口。(示例:12345)。默认值为 6379。
  • redisPassword:Redis 数据库密码。默认值为空。

可选参数

  • sslEnabled:Redis 数据库 SSL 参数。默认值为:false。
  • redisSinkType:Redis 接收器。支持的值为 STRING_SINK, HASH_SINK, STREAMS_SINK, and LOGGING_SINK。(示例:STRING_SINK)。默认值为:STRING_SINK。
  • connectionTimeout:Redis 连接超时(以毫秒为单位)。(示例:2000)。默认值为:2000。
  • ttl:密钥到期时间(以秒为单位)。HASH_SINKttl 默认值为 -1,表示永不过期。
  • javascriptTextTransformGcsPath:.js 文件的 Cloud Storage URI,用于定义要使用的 JavaScript 用户定义的函数 (UDF)。(示例:gs://my-bucket/my-udfs/my_file.js)。
  • javascriptTextTransformFunctionName:要使用的 JavaScript 用户定义的函数 (UDF) 的名称。例如,如果 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例 (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)。
  • javascriptTextTransformReloadIntervalMinutes:指定重新加载 UDF 的频率(以分钟为单位)。如果值大于 0,则 Dataflow 会定期检查 Cloud Storage 中的 UDF 文件,并在文件修改时重新加载 UDF。此参数可让您在流水线运行时更新 UDF,而无需重启作业。如果值为 0,则停用 UDF 重新加载。默认值为 0。

用户定义的函数

(可选)您可以通过编写用户定义的函数 (UDF) 来扩展此模板。该模板会为每个输入元素调用 UDF。元素载荷会序列化为 JSON 字符串。如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数

函数规范

UDF 具有以下规范:

  • 输入:JSON 字符串
  • 输出:字符串或字符串化 JSON 对象

运行模板

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to Redis template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_Redis \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       inputSubscription=INPUT_SUBSCRIPTION,\
       redisHost=REDIS_HOST,\
       redisPort=REDIS_PORT,\
       redisPassword=REDIS_PASSWORD,\

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • INPUT_SUBSCRIPTION:Pub/Sub 输入订阅
  • REDIS_HOST:Redis 数据库主机
  • REDIS_PORT:Redis 数据库端口
  • REDIS_PASSWORD:Redis 数据库密码

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "inputSubscription": "INPUT_SUBSCRIPTION",
       "redisHost": "REDIS_HOST",
       "redisPort": "REDIS_PORT",
       "redisPassword": "REDIS_PASSWORD",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_Redis",
     "environment": { "maxWorkers": "10" }
  }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • INPUT_SUBSCRIPTION:Pub/Sub 输入订阅
  • REDIS_HOST:Redis 数据库主机
  • REDIS_PORT:Redis 数据库端口
  • REDIS_PASSWORD:Redis 数据库密码
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.templates.PubSubToRedis.RedisSinkType.HASH_SINK;
import static com.google.cloud.teleport.v2.templates.PubSubToRedis.RedisSinkType.LOGGING_SINK;
import static com.google.cloud.teleport.v2.templates.PubSubToRedis.RedisSinkType.STREAMS_SINK;
import static com.google.cloud.teleport.v2.templates.PubSubToRedis.RedisSinkType.STRING_SINK;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.templates.io.RedisHashIO;
import com.google.cloud.teleport.v2.templates.transforms.MessageTransformation;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.redis.RedisConnectionConfiguration;
import org.apache.beam.sdk.io.redis.RedisIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link PubSubToRedis} pipeline is a streaming pipeline which ingests data in Bytes from
 * PubSub, and inserts resulting records as KV in Redis.
 *
 * <p><b>Pipeline Requirements</b>
 *
 * <ul>
 *   <li>The PubSub topic and subscriptions exist
 *   <li>The Redis is up and running
 * </ul>
 *
 * <p><b>Example Usage</b>
 *
 * <pre>
 * # Set the pipeline vars
 * PROJECT_NAME=my-project
 * BUCKET_NAME=my-bucket
 * INPUT_SUBSCRIPTION=my-subscription
 * REDIS_HOST=my-host
 * REDIS_PORT=my-port
 * REDIS_PASSWORD=my-pwd
 *
 * mvn compile exec:java \
 *  -Dexec.mainClass=com.google.cloud.teleport.v2.templates.PubSubToRedis \
 *  -Dexec.cleanupDaemonThreads=false \
 *  -Dexec.args=" \
 *  --project=${PROJECT_NAME} \
 *  --stagingLocation=gs://${BUCKET_NAME}/staging \
 *  --tempLocation=gs://${BUCKET_NAME}/temp \
 *  --runner=DataflowRunner \
 *  --inputSubscription=${INPUT_SUBSCRIPTION} \
 *  --redisHost=${REDIS_HOST}
 *  --redisPort=${REDIS_PORT}
 *  --redisPassword=${REDIS_PASSWORD}"
 * </pre>
 */
@Template(
    name = "Cloud_PubSub_to_Redis",
    category = TemplateCategory.STREAMING,
    displayName = "Pub/Sub to Redis",
    description = {
      "The Pub/Sub to Redis template is a streaming pipeline that reads messages from a Pub/Sub subscription and "
          + "writes the message payload to Redis. The most common use case of this template is to export logs to Redis "
          + "Enterprise for advanced search-based log analysis in real time.",
      "Before writing to Redis, you can apply a JavaScript user-defined function to the message payload. Any "
          + "messages that experience processing failures are forwarded to a Pub/Sub unprocessed topic for further "
          + "troubleshooting and reprocessing.",
      "For added security, enable an SSL connection when setting up your database endpoint connection."
    },
    optionsClass = PubSubToRedis.PubSubToRedisOptions.class,
    flexContainerName = "pubsub-to-redis",
    contactInformation = "https://github.com/GoogleCloudPlatform/DataflowTemplates/issues",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-redis",
    requirements = {
      "The source Pub/Sub subscription must exist prior to running the pipeline.",
      "The Pub/Sub unprocessed topic must exist prior to running the pipeline.",
      "The Redis database endpoint must be accessible from the Dataflow workers' subnetwork.",
    },
    preview = true,
    streaming = true,
    supportsAtLeastOnce = true)
public class PubSubToRedis {
  /*
   * Options supported by {@link PubSubToRedis}
   *
   * <p>Inherits standard configuration options.
   */

  /** The log to output status messages to. */
  private static final Logger LOG = LoggerFactory.getLogger(PubSubToRedis.class);

  /**
   * The {@link PubSubToRedisOptions} class provides the custom execution options passed by the
   * executor at the command-line.
   *
   * <p>Inherits standard configuration options, options from {@link
   * JavascriptTextTransformer.JavascriptTextTransformerOptions}.
   */
  public interface PubSubToRedisOptions
      extends JavascriptTextTransformer.JavascriptTextTransformerOptions, PipelineOptions {
    @TemplateParameter.PubsubSubscription(
        order = 1,
        groupName = "Source",
        description = "Pub/Sub input subscription",
        helpText =
            "The Pub/Sub subscription to read the input from, in the format"
                + " projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>.",
        example = "projects/your-project-id/subscriptions/your-subscription-name")
    String getInputSubscription();

    void setInputSubscription(String value);

    @TemplateParameter.Text(
        order = 2,
        groupName = "Target",
        description = "Redis DB Host",
        helpText = "The Redis database host.",
        example = "your.cloud.db.redislabs.com")
    @Default.String("127.0.0.1")
    @Validation.Required
    String getRedisHost();

    void setRedisHost(String redisHost);

    @TemplateParameter.Integer(
        order = 3,
        groupName = "Target",
        description = "Redis DB Port",
        helpText = "The Redis database port.",
        example = "12345")
    @Default.Integer(6379)
    @Validation.Required
    int getRedisPort();

    void setRedisPort(int redisPort);

    @TemplateParameter.Password(
        order = 4,
        groupName = "Target",
        description = "Redis DB Password",
        helpText = "The Redis database password. Defaults to empty.")
    @Default.String("")
    @Validation.Required
    String getRedisPassword();

    void setRedisPassword(String redisPassword);

    @TemplateParameter.Boolean(
        order = 5,
        optional = true,
        description = "Redis ssl enabled",
        helpText = "The Redis database SSL parameter.")
    @Default.Boolean(false)
    @UnknownKeyFor
    @NonNull
    @Initialized
    ValueProvider<@UnknownKeyFor @NonNull @Initialized Boolean> getSslEnabled();

    void setSslEnabled(ValueProvider<Boolean> sslEnabled);

    @TemplateParameter.Enum(
        order = 6,
        optional = true,
        enumOptions = {
          @TemplateEnumOption("STRING_SINK"),
          @TemplateEnumOption("HASH_SINK"),
          @TemplateEnumOption("STREAMS_SINK"),
          @TemplateEnumOption("LOGGING_SINK")
        },
        description = "Redis sink to write",
        helpText =
            "The Redis sink. Supported values are `STRING_SINK, HASH_SINK, STREAMS_SINK, and LOGGING_SINK`.",
        example = "STRING_SINK")
    @Default.Enum("STRING_SINK")
    RedisSinkType getRedisSinkType();

    void setRedisSinkType(RedisSinkType redisSinkType);

    @TemplateParameter.Integer(
        order = 7,
        optional = true,
        description = "Redis connection timeout in milliseconds",
        helpText = "The Redis connection timeout in milliseconds. ",
        example = "2000")
    @Default.Integer(2000)
    int getConnectionTimeout();

    void setConnectionTimeout(int timeout);

    @TemplateParameter.Long(
        order = 8,
        optional = true,
        parentName = "redisSinkType",
        parentTriggerValues = {"HASH_SINK", "LOGGING_SINK"},
        description =
            "Hash key expiration time in sec (ttl), supported only for HASH_SINK and LOGGING_SINK",
        helpText =
            "The key expiration time in seconds. The `ttl` default for `HASH_SINK` is -1, which means it never expires.")
    @Default.Long(-1L)
    Long getTtl();

    void setTtl(Long ttl);
  }

  /** Allowed list of sink types. */
  public enum RedisSinkType {
    HASH_SINK,
    LOGGING_SINK,
    STREAMS_SINK,
    STRING_SINK
  }

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    // Parse the user options passed from the command-line.
    PubSubToRedisOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToRedisOptions.class);
    run(options);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(PubSubToRedisOptions options) {

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    PCollection<PubsubMessage> input;

    RedisConnectionConfiguration redisConnectionConfiguration =
        RedisConnectionConfiguration.create()
            .withHost(options.getRedisHost())
            .withPort(options.getRedisPort())
            .withAuth(options.getRedisPassword())
            .withTimeout(options.getConnectionTimeout())
            .withSSL(options.getSslEnabled());

    /*
     * Steps: 1) Read PubSubMessage with attributes and messageId from input PubSub subscription.
     *        2) Extract PubSubMessage message to PCollection<String>.
     *        3) Transform PCollection<String> to PCollection<KV<String, String>> so it can be consumed by RedisIO
     *        4) Write to Redis using SET
     *
     */

    LOG.info(
        "Starting PubSub-To-Redis Pipeline. Reading from subscription: {}",
        options.getInputSubscription());

    input =
        pipeline.apply(
            "Read PubSub Events",
            MessageTransformation.readFromPubSub(options.getInputSubscription()));

    if (options.getRedisSinkType().equals(STRING_SINK)) {
      PCollection<String> pCollectionString =
          input.apply(
              "Map to Redis String", ParDo.of(new MessageTransformation.MessageToRedisString()));

      PCollection<KV<String, String>> kvStringCollection =
          pCollectionString.apply(
              "Transform to String KV",
              MapElements.into(
                      TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
                  .via(record -> KV.of(MessageTransformation.key, record)));

      kvStringCollection.apply(
          "Write to " + STRING_SINK.name(),
          RedisIO.write()
              .withMethod(RedisIO.Write.Method.SET)
              .withConnectionConfiguration(redisConnectionConfiguration));
    }
    if (options.getRedisSinkType().equals(HASH_SINK)) {
      PCollection<KV<String, KV<String, String>>> pCollectionHash =
          input.apply(
              "Map to Redis Hash", ParDo.of(new MessageTransformation.MessageToRedisHash()));

      pCollectionHash.apply(
          "Write to " + HASH_SINK.name(),
          RedisHashIO.write()
              .withConnectionConfiguration(redisConnectionConfiguration)
              .withTtl(options.getTtl()));
    }
    if (options.getRedisSinkType().equals(LOGGING_SINK)) {
      PCollection<KV<String, KV<String, String>>> pCollectionHash =
          input.apply(
              "Map to Redis Logs", ParDo.of(new MessageTransformation.MessageToRedisLogs()));

      pCollectionHash.apply(
          "Write to " + LOGGING_SINK.name(),
          RedisHashIO.write()
              .withConnectionConfiguration(redisConnectionConfiguration)
              .withTtl(options.getTtl()));
    }
    if (options.getRedisSinkType().equals(STREAMS_SINK)) {
      PCollection<KV<String, Map<String, String>>> pCollectionStreams =
          input.apply(
              "Map to Redis Streams", ParDo.of(new MessageTransformation.MessageToRedisStreams()));

      pCollectionStreams.apply(
          "Write to " + STREAMS_SINK.name(),
          RedisIO.writeStreams().withConnectionConfiguration(redisConnectionConfiguration));
    }
    // Execute the pipeline and return the result.
    return pipeline.run();
  }
}

后续步骤