Template Pub/Sub ke Redis

Template Pub/Sub ke Redis adalah pipeline streaming yang membaca pesan dari langganan Pub/Sub dan menulis payload pesan ke Redis. Kasus penggunaan paling umum dari template ini adalah mengekspor log ke Redis Enterprise untuk analisis log lanjutan berbasis penelusuran secara real time.

  • Sebelum menulis ke Redis, Anda dapat menerapkan fungsi yang ditentukan pengguna JavaScript ke payload pesan.
  • Setiap pesan yang mengalami kegagalan pemrosesan akan diteruskan ke topik Pub/Sub yang belum diproses untuk pemecahan masalah dan pemrosesan ulang lebih lanjut.
  • Untuk keamanan tambahan, aktifkan koneksi SSL saat menyiapkan koneksi endpoint database Anda. Template ini tidak mendukung TLS timbal balik.

Persyaratan pipeline

  • Langganan Pub/Sub sumber harus ada sebelum menjalankan pipeline.
  • Topik Pub/Sub yang belum diproses harus ada sebelum menjalankan pipeline.
  • Endpoint database Redis harus dapat diakses dari subjaringan pekerja Dataflow.

Parameter template

Parameter yang diperlukan

  • inputSubscription: Langganan Pub/Sub untuk membaca input. Contoh, projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>.
  • redisHost: Host database Redis. Contoh, your.cloud.db.redislabs.com. Default-nya adalah: 127.0.0.1.
  • redisPort: Port database Redis. Contoh, 12345. Setelan defaultnya adalah: 6379.
  • redisPassword: Sandi database Redis. Default-nya adalah empty.

Parameter opsional

  • sslEnabled: Parameter SSL database Redis. Defaultnya adalah: false.
  • redisSinkType: Sink Redis. Nilai yang didukung adalah STRING_SINK, HASH_SINK, STREAMS_SINK, and LOGGING_SINK. Misalnya, STRING_SINK. Setelan defaultnya adalah: STRING_SINK.
  • connectionTimeout: Waktu tunggu koneksi Redis dalam milidetik. Contoh, 2000. Setelan defaultnya adalah: 2000.
  • ttl: Waktu habis masa berlaku kunci dalam detik. ttl default untuk HASH_SINK adalah -1, yang berarti tidak pernah berakhir masa berlakunya.
  • javascriptTextTransformGcsPath: URI Cloud Storage file .js yang menentukan fungsi yang ditentukan pengguna (UDF) JavaScript yang akan digunakan. Contoh, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: Nama fungsi yang ditentukan pengguna (UDF) JavaScript yang akan digunakan. Misalnya, jika kode fungsi JavaScript Anda adalah myTransform(inJson) { /*...do stuff...*/ }, nama fungsinya adalah myTransform. Untuk contoh UDF JavaScript, lihat Contoh UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: Menentukan seberapa sering UDF dimuat ulang, dalam hitungan menit. Jika nilainya lebih besar dari 0, Dataflow akan memeriksa file UDF di Cloud Storage secara berkala, dan memuat ulang UDF jika file diubah. Parameter ini memungkinkan Anda mengupdate UDF saat pipeline berjalan, tanpa perlu memulai ulang tugas. Jika nilainya adalah 0, pemuatan ulang UDF akan dinonaktifkan. Nilai defaultnya adalah 0.

Fungsi yang ditentukan pengguna (UDF)

Secara opsional, Anda dapat memperluas template ini dengan menulis fungsi yang ditentukan pengguna (UDF). Template memanggil UDF untuk setiap elemen input. Payload elemen diserialisasi sebagai string JSON. Untuk informasi selengkapnya, lihat Membuat fungsi yang ditentukan pengguna untuk template Dataflow.

Spesifikasi fungsi

UDF memiliki spesifikasi berikut:

  • Input: String JSON
  • Output: string atau objek JSON yang di-string

Menjalankan template

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Pub/Sub to Redis template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

Di shell atau terminal, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_Redis \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       inputSubscription=INPUT_SUBSCRIPTION,\
       redisHost=REDIS_HOST,\
       redisPort=REDIS_PORT,\
       redisPassword=REDIS_PASSWORD,\

Ganti kode berikut:

  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • INPUT_SUBSCRIPTION: langganan input Pub/Sub
  • REDIS_HOST: Host DB Redis
  • REDIS_PORT: Port DB Redis
  • REDIS_PASSWORD: Sandi DB Redis

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "inputSubscription": "INPUT_SUBSCRIPTION",
       "redisHost": "REDIS_HOST",
       "redisPort": "REDIS_PORT",
       "redisPassword": "REDIS_PASSWORD",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_Redis",
     "environment": { "maxWorkers": "10" }
  }
}

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • INPUT_SUBSCRIPTION: langganan input Pub/Sub
  • REDIS_HOST: Host DB Redis
  • REDIS_PORT: Port DB Redis
  • REDIS_PASSWORD: Sandi DB Redis
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import static com.google.cloud.teleport.v2.templates.PubSubToRedis.RedisSinkType.HASH_SINK;
import static com.google.cloud.teleport.v2.templates.PubSubToRedis.RedisSinkType.LOGGING_SINK;
import static com.google.cloud.teleport.v2.templates.PubSubToRedis.RedisSinkType.STREAMS_SINK;
import static com.google.cloud.teleport.v2.templates.PubSubToRedis.RedisSinkType.STRING_SINK;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.templates.io.RedisHashIO;
import com.google.cloud.teleport.v2.templates.transforms.MessageTransformation;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.redis.RedisConnectionConfiguration;
import org.apache.beam.sdk.io.redis.RedisIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link PubSubToRedis} pipeline is a streaming pipeline which ingests data in Bytes from
 * PubSub, and inserts resulting records as KV in Redis.
 *
 * <p><b>Pipeline Requirements</b>
 *
 * <ul>
 *   <li>The PubSub topic and subscriptions exist
 *   <li>The Redis is up and running
 * </ul>
 *
 * <p><b>Example Usage</b>
 *
 * <pre>
 * # Set the pipeline vars
 * PROJECT_NAME=my-project
 * BUCKET_NAME=my-bucket
 * INPUT_SUBSCRIPTION=my-subscription
 * REDIS_HOST=my-host
 * REDIS_PORT=my-port
 * REDIS_PASSWORD=my-pwd
 *
 * mvn compile exec:java \
 *  -Dexec.mainClass=com.google.cloud.teleport.v2.templates.PubSubToRedis \
 *  -Dexec.cleanupDaemonThreads=false \
 *  -Dexec.args=" \
 *  --project=${PROJECT_NAME} \
 *  --stagingLocation=gs://${BUCKET_NAME}/staging \
 *  --tempLocation=gs://${BUCKET_NAME}/temp \
 *  --runner=DataflowRunner \
 *  --inputSubscription=${INPUT_SUBSCRIPTION} \
 *  --redisHost=${REDIS_HOST}
 *  --redisPort=${REDIS_PORT}
 *  --redisPassword=${REDIS_PASSWORD}"
 * </pre>
 */
@Template(
    name = "Cloud_PubSub_to_Redis",
    category = TemplateCategory.STREAMING,
    displayName = "Pub/Sub to Redis",
    description = {
      "The Pub/Sub to Redis template is a streaming pipeline that reads messages from a Pub/Sub subscription and "
          + "writes the message payload to Redis. The most common use case of this template is to export logs to Redis "
          + "Enterprise for advanced search-based log analysis in real time.",
      "Before writing to Redis, you can apply a JavaScript user-defined function to the message payload. Any "
          + "messages that experience processing failures are forwarded to a Pub/Sub unprocessed topic for further "
          + "troubleshooting and reprocessing.",
      "For added security, enable an SSL connection when setting up your database endpoint connection."
    },
    optionsClass = PubSubToRedis.PubSubToRedisOptions.class,
    flexContainerName = "pubsub-to-redis",
    contactInformation = "https://github.com/GoogleCloudPlatform/DataflowTemplates/issues",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-redis",
    requirements = {
      "The source Pub/Sub subscription must exist prior to running the pipeline.",
      "The Pub/Sub unprocessed topic must exist prior to running the pipeline.",
      "The Redis database endpoint must be accessible from the Dataflow workers' subnetwork.",
    },
    preview = true,
    streaming = true,
    supportsAtLeastOnce = true)
public class PubSubToRedis {
  /*
   * Options supported by {@link PubSubToRedis}
   *
   * <p>Inherits standard configuration options.
   */

  /** The log to output status messages to. */
  private static final Logger LOG = LoggerFactory.getLogger(PubSubToRedis.class);

  /**
   * The {@link PubSubToRedisOptions} class provides the custom execution options passed by the
   * executor at the command-line.
   *
   * <p>Inherits standard configuration options, options from {@link
   * JavascriptTextTransformer.JavascriptTextTransformerOptions}.
   */
  public interface PubSubToRedisOptions
      extends JavascriptTextTransformer.JavascriptTextTransformerOptions, PipelineOptions {
    @TemplateParameter.PubsubSubscription(
        order = 1,
        groupName = "Source",
        description = "Pub/Sub input subscription",
        helpText = "The Pub/Sub subscription to read the input from.",
        example = "projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>")
    String getInputSubscription();

    void setInputSubscription(String value);

    @TemplateParameter.Text(
        order = 2,
        groupName = "Target",
        description = "Redis DB Host",
        helpText = "The Redis database host.",
        example = "your.cloud.db.redislabs.com")
    @Default.String("127.0.0.1")
    @Validation.Required
    String getRedisHost();

    void setRedisHost(String redisHost);

    @TemplateParameter.Integer(
        order = 3,
        groupName = "Target",
        description = "Redis DB Port",
        helpText = "The Redis database port.",
        example = "12345")
    @Default.Integer(6379)
    @Validation.Required
    int getRedisPort();

    void setRedisPort(int redisPort);

    @TemplateParameter.Password(
        order = 4,
        groupName = "Target",
        description = "Redis DB Password",
        helpText = "The Redis database password. Defaults to `empty`.")
    @Default.String("")
    @Validation.Required
    String getRedisPassword();

    void setRedisPassword(String redisPassword);

    @TemplateParameter.Boolean(
        order = 5,
        optional = true,
        description = "Redis ssl enabled",
        helpText = "The Redis database SSL parameter.")
    @Default.Boolean(false)
    @UnknownKeyFor
    @NonNull
    @Initialized
    ValueProvider<@UnknownKeyFor @NonNull @Initialized Boolean> getSslEnabled();

    void setSslEnabled(ValueProvider<Boolean> sslEnabled);

    @TemplateParameter.Enum(
        order = 6,
        optional = true,
        enumOptions = {
          @TemplateEnumOption("STRING_SINK"),
          @TemplateEnumOption("HASH_SINK"),
          @TemplateEnumOption("STREAMS_SINK"),
          @TemplateEnumOption("LOGGING_SINK")
        },
        description = "Redis sink to write",
        helpText =
            "The Redis sink. Supported values are `STRING_SINK, HASH_SINK, STREAMS_SINK, and LOGGING_SINK`.",
        example = "STRING_SINK")
    @Default.Enum("STRING_SINK")
    RedisSinkType getRedisSinkType();

    void setRedisSinkType(RedisSinkType redisSinkType);

    @TemplateParameter.Integer(
        order = 7,
        optional = true,
        description = "Redis connection timeout in milliseconds",
        helpText = "The Redis connection timeout in milliseconds. ",
        example = "2000")
    @Default.Integer(2000)
    int getConnectionTimeout();

    void setConnectionTimeout(int timeout);

    @TemplateParameter.Long(
        order = 8,
        optional = true,
        parentName = "redisSinkType",
        parentTriggerValues = {"HASH_SINK", "LOGGING_SINK"},
        description =
            "Hash key expiration time in sec (ttl), supported only for HASH_SINK and LOGGING_SINK",
        helpText =
            "The key expiration time in seconds. The `ttl` default for `HASH_SINK` is -1, which means it never expires.")
    @Default.Long(-1L)
    Long getTtl();

    void setTtl(Long ttl);
  }

  /** Allowed list of sink types. */
  public enum RedisSinkType {
    HASH_SINK,
    LOGGING_SINK,
    STREAMS_SINK,
    STRING_SINK
  }

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    // Parse the user options passed from the command-line.
    PubSubToRedisOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToRedisOptions.class);
    run(options);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(PubSubToRedisOptions options) {

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    PCollection<PubsubMessage> input;

    RedisConnectionConfiguration redisConnectionConfiguration =
        RedisConnectionConfiguration.create()
            .withHost(options.getRedisHost())
            .withPort(options.getRedisPort())
            .withAuth(options.getRedisPassword())
            .withTimeout(options.getConnectionTimeout())
            .withSSL(options.getSslEnabled());

    /*
     * Steps: 1) Read PubSubMessage with attributes and messageId from input PubSub subscription.
     *        2) Extract PubSubMessage message to PCollection<String>.
     *        3) Transform PCollection<String> to PCollection<KV<String, String>> so it can be consumed by RedisIO
     *        4) Write to Redis using SET
     *
     */

    LOG.info(
        "Starting PubSub-To-Redis Pipeline. Reading from subscription: {}",
        options.getInputSubscription());

    input =
        pipeline.apply(
            "Read PubSub Events",
            MessageTransformation.readFromPubSub(options.getInputSubscription()));

    if (options.getRedisSinkType().equals(STRING_SINK)) {
      PCollection<String> pCollectionString =
          input.apply(
              "Map to Redis String", ParDo.of(new MessageTransformation.MessageToRedisString()));

      PCollection<KV<String, String>> kvStringCollection =
          pCollectionString.apply(
              "Transform to String KV",
              MapElements.into(
                      TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
                  .via(record -> KV.of(MessageTransformation.key, record)));

      kvStringCollection.apply(
          "Write to " + STRING_SINK.name(),
          RedisIO.write()
              .withMethod(RedisIO.Write.Method.SET)
              .withConnectionConfiguration(redisConnectionConfiguration));
    }
    if (options.getRedisSinkType().equals(HASH_SINK)) {
      PCollection<KV<String, KV<String, String>>> pCollectionHash =
          input.apply(
              "Map to Redis Hash", ParDo.of(new MessageTransformation.MessageToRedisHash()));

      pCollectionHash.apply(
          "Write to " + HASH_SINK.name(),
          RedisHashIO.write()
              .withConnectionConfiguration(redisConnectionConfiguration)
              .withTtl(options.getTtl()));
    }
    if (options.getRedisSinkType().equals(LOGGING_SINK)) {
      PCollection<KV<String, KV<String, String>>> pCollectionHash =
          input.apply(
              "Map to Redis Logs", ParDo.of(new MessageTransformation.MessageToRedisLogs()));

      pCollectionHash.apply(
          "Write to " + LOGGING_SINK.name(),
          RedisHashIO.write()
              .withConnectionConfiguration(redisConnectionConfiguration)
              .withTtl(options.getTtl()));
    }
    if (options.getRedisSinkType().equals(STREAMS_SINK)) {
      PCollection<KV<String, Map<String, String>>> pCollectionStreams =
          input.apply(
              "Map to Redis Streams", ParDo.of(new MessageTransformation.MessageToRedisStreams()));

      pCollectionStreams.apply(
          "Write to " + STREAMS_SINK.name(),
          RedisIO.writeStreams().withConnectionConfiguration(redisConnectionConfiguration));
    }
    // Execute the pipeline and return the result.
    return pipeline.run();
  }
}

Langkah berikutnya