Il modello Pub/Sub to Redis è una pipeline di flusso che legge i messaggi da una sottoscrizione Pub/Sub e scrive il payload dei messaggi in Redis. Il caso d'uso più comune di questo modello è esportare i log in Redis Enterprise per l'analisi avanzata dei log in tempo reale basata sulla ricerca.

  • Prima di scrivere in Redis, puoi applicare una funzione definita dall'utente JavaScript al payload del messaggio.
  • I messaggi che presentano errori di elaborazione vengono inoltrati a un argomento Pub/Sub non elaborato per un'ulteriore risoluzione dei problemi e rielaborazione.
  • Per una maggiore sicurezza, attiva una connessione SSL durante la configurazione della connessione all'endpoint del database. Questo modello non supporta TLS mutuale.

Requisiti della pipeline

  • La sottoscrizione Pub/Sub di origine deve esistere prima dell'esecuzione della pipeline.
  • L'argomento Pub/Sub non elaborato deve esistere prima dell'esecuzione della pipeline.
  • L'endpoint del database Redis deve essere accessibile dalla sottorete dei worker Dataflow.

Parametri del modello

Parametri obbligatori

  • inputSubscription: la sottoscrizione Pub/Sub da cui leggere l'input. Ad esempio, projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>.
  • redisHost: l'host del database Redis. Ad esempio, Il valore predefinito è
  • redisPort: la porta del database Redis. Ad esempio, 12345. Valore predefinito: 6379.
  • redisPassword: la password del database Redis. Il valore predefinito è empty.

Parametri facoltativi

  • sslEnabled: il parametro SSL del database Redis. Il valore predefinito è false.
  • redisSinkType: l'emissario Redis. I valori supportati sono STRING_SINK, HASH_SINK, STREAMS_SINK, and LOGGING_SINK. Ad esempio: STRING_SINK. Il valore predefinito è: STRING_SINK.
  • connectionTimeout: il timeout della connessione Redis in millisecondi. Ad esempio, 2000. Il valore predefinito è 2000.
  • ttl: la data e l'ora di scadenza della chiave in secondi. Il valore predefinito di ttl per HASH_SINK è -1, il che significa che non scade mai.
  • javascriptTextTransformGcsPath: l'URI Cloud Storage del file .js che definisce la funzione JavaScript definita dall'utente (UDF) da utilizzare. Ad esempio, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: il nome della funzione definita dall'utente (UDF) JavaScript da utilizzare. Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /* stuff...*/ }, il nome della funzione è myTransform. Per esempi di funzioni JavaScript definite dall'utente, consulta Esempi di funzioni UDF (
  • javascriptTextTransformReloadIntervalMinutes: specifica la frequenza con cui ricaricare la UDF, in minuti. Se il valore è maggiore di 0, Dataflow controlla periodicamente il file UDF in Cloud Storage e lo ricarica se il file viene modificato. Questo parametro ti consente di aggiornare la UDF durante l'esecuzione della pipeline, senza dover riavviare il job. Se il valore è 0, il ricaricamento delle funzioni definite dall'utente è disattivato. Il valore predefinito è 0.

Funzione definita dall'utente

Se vuoi, puoi estendere questo modello scrivendo una funzione definita dall'utente (UDF). Il modello chiama la UDF per ogni elemento di input. I payload degli elementi vengono serializzati come stringhe JSON. Per ulteriori informazioni, consulta Creare funzioni predefinite dall'utente per i modelli Dataflow.

Specifiche della funzione

La UDF ha la seguente specifica:

  • Input: stringa JSON
  • Output: una stringa o un oggetto JSON con formato di stringa

Esegui il modello

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the Pub/Sub to Redis template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_Redis \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \

Sostituisci quanto segue:

  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • INPUT_SUBSCRIPTION: la sottoscrizione di input Pub/Sub
  • REDIS_HOST: l'host del database Redis
  • REDIS_PORT: la porta del database Redis
  • REDIS_PASSWORD: la password del database Redis

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "inputSubscription": "INPUT_SUBSCRIPTION",
       "redisHost": "REDIS_HOST",
       "redisPort": "REDIS_PORT",
       "redisPassword": "REDIS_PASSWORD",
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_Redis",
     "environment": { "maxWorkers": "10" }

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • INPUT_SUBSCRIPTION: la sottoscrizione di input Pub/Sub
  • REDIS_HOST: l'host del database Redis
  • REDIS_PORT: la porta del database Redis
  • REDIS_PASSWORD: la password del database Redis
import static;
import static;
import static;
import static;

import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.checkerframework.checker.initialization.qual.Initialized;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.UnknownKeyFor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

 * The {@link PubSubToRedis} pipeline is a streaming pipeline which ingests data in Bytes from
 * PubSub, and inserts resulting records as KV in Redis.
 * <p><b>Pipeline Requirements</b>
 * <ul>
 *   <li>The PubSub topic and subscriptions exist
 *   <li>The Redis is up and running
 * </ul>
 * <p><b>Example Usage</b>
 * <pre>
 * # Set the pipeline vars
 * PROJECT_NAME=my-project
 * BUCKET_NAME=my-bucket
 * INPUT_SUBSCRIPTION=my-subscription
 * REDIS_HOST=my-host
 * REDIS_PORT=my-port
 * mvn compile exec:java \
 * \
 *  -Dexec.cleanupDaemonThreads=false \
 *  -Dexec.args=" \
 *  --project=${PROJECT_NAME} \
 *  --stagingLocation=gs://${BUCKET_NAME}/staging \
 *  --tempLocation=gs://${BUCKET_NAME}/temp \
 *  --runner=DataflowRunner \
 *  --inputSubscription=${INPUT_SUBSCRIPTION} \
 *  --redisHost=${REDIS_HOST}
 *  --redisPort=${REDIS_PORT}
 *  --redisPassword=${REDIS_PASSWORD}"
 * </pre>
    name = "Cloud_PubSub_to_Redis",
    category = TemplateCategory.STREAMING,
    displayName = "Pub/Sub to Redis",
    description = {
      "The Pub/Sub to Redis template is a streaming pipeline that reads messages from a Pub/Sub subscription and "
          + "writes the message payload to Redis. The most common use case of this template is to export logs to Redis "
          + "Enterprise for advanced search-based log analysis in real time.",
      "Before writing to Redis, you can apply a JavaScript user-defined function to the message payload. Any "
          + "messages that experience processing failures are forwarded to a Pub/Sub unprocessed topic for further "
          + "troubleshooting and reprocessing.",
      "For added security, enable an SSL connection when setting up your database endpoint connection."
    optionsClass = PubSubToRedis.PubSubToRedisOptions.class,
    flexContainerName = "pubsub-to-redis",
    contactInformation = "",
    documentation =
    requirements = {
      "The source Pub/Sub subscription must exist prior to running the pipeline.",
      "The Pub/Sub unprocessed topic must exist prior to running the pipeline.",
      "The Redis database endpoint must be accessible from the Dataflow workers' subnetwork.",
    preview = true,
    streaming = true,
    supportsAtLeastOnce = true)
public class PubSubToRedis {
   * Options supported by {@link PubSubToRedis}
   * <p>Inherits standard configuration options.

  /** The log to output status messages to. */
  private static final Logger LOG = LoggerFactory.getLogger(PubSubToRedis.class);

   * The {@link PubSubToRedisOptions} class provides the custom execution options passed by the
   * executor at the command-line.
   * <p>Inherits standard configuration options, options from {@link
   * JavascriptTextTransformer.JavascriptTextTransformerOptions}.
  public interface PubSubToRedisOptions
      extends JavascriptTextTransformer.JavascriptTextTransformerOptions, PipelineOptions {
        order = 1,
        groupName = "Source",
        description = "Pub/Sub input subscription",
        helpText = "The Pub/Sub subscription to read the input from.",
        example = "projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>")
    String getInputSubscription();

    void setInputSubscription(String value);

        order = 2,
        groupName = "Target",
        description = "Redis DB Host",
        helpText = "The Redis database host.",
        example = "")
    String getRedisHost();

    void setRedisHost(String redisHost);

        order = 3,
        groupName = "Target",
        description = "Redis DB Port",
        helpText = "The Redis database port.",
        example = "12345")
    int getRedisPort();

    void setRedisPort(int redisPort);

        order = 4,
        groupName = "Target",
        description = "Redis DB Password",
        helpText = "The Redis database password. Defaults to `empty`.")
    String getRedisPassword();

    void setRedisPassword(String redisPassword);

        order = 5,
        optional = true,
        description = "Redis ssl enabled",
        helpText = "The Redis database SSL parameter.")
    ValueProvider<@UnknownKeyFor @NonNull @Initialized Boolean> getSslEnabled();

    void setSslEnabled(ValueProvider<Boolean> sslEnabled);

        order = 6,
        optional = true,
        enumOptions = {
        description = "Redis sink to write",
        helpText =
            "The Redis sink. Supported values are `STRING_SINK, HASH_SINK, STREAMS_SINK, and LOGGING_SINK`.",
        example = "STRING_SINK")
    RedisSinkType getRedisSinkType();

    void setRedisSinkType(RedisSinkType redisSinkType);

        order = 7,
        optional = true,
        description = "Redis connection timeout in milliseconds",
        helpText = "The Redis connection timeout in milliseconds. ",
        example = "2000")
    int getConnectionTimeout();

    void setConnectionTimeout(int timeout);

        order = 8,
        optional = true,
        parentName = "redisSinkType",
        parentTriggerValues = {"HASH_SINK", "LOGGING_SINK"},
        description =
            "Hash key expiration time in sec (ttl), supported only for HASH_SINK and LOGGING_SINK",
        helpText =
            "The key expiration time in seconds. The `ttl` default for `HASH_SINK` is -1, which means it never expires.")
    Long getTtl();

    void setTtl(Long ttl);

  /** Allowed list of sink types. */
  public enum RedisSinkType {

   * Main entry point for executing the pipeline.
   * @param args The command-line arguments to the pipeline.
  public static void main(String[] args) {

    // Parse the user options passed from the command-line.
    PubSubToRedisOptions options =

   * Runs the pipeline with the supplied options.
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
  public static PipelineResult run(PubSubToRedisOptions options) {

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    PCollection<PubsubMessage> input;

    RedisConnectionConfiguration redisConnectionConfiguration =

     * Steps: 1) Read PubSubMessage with attributes and messageId from input PubSub subscription.
     *        2) Extract PubSubMessage message to PCollection<String>.
     *        3) Transform PCollection<String> to PCollection<KV<String, String>> so it can be consumed by RedisIO
     *        4) Write to Redis using SET
        "Starting PubSub-To-Redis Pipeline. Reading from subscription: {}",

    input =
            "Read PubSub Events",

    if (options.getRedisSinkType().equals(STRING_SINK)) {
      PCollection<String> pCollectionString =
              "Map to Redis String", ParDo.of(new MessageTransformation.MessageToRedisString()));

      PCollection<KV<String, String>> kvStringCollection =
              "Transform to String KV",
                      TypeDescriptors.kvs(TypeDescriptors.strings(), TypeDescriptors.strings()))
                  .via(record -> KV.of(MessageTransformation.key, record)));

          "Write to " +,
    if (options.getRedisSinkType().equals(HASH_SINK)) {
      PCollection<KV<String, KV<String, String>>> pCollectionHash =
              "Map to Redis Hash", ParDo.of(new MessageTransformation.MessageToRedisHash()));

          "Write to " +,
    if (options.getRedisSinkType().equals(LOGGING_SINK)) {
      PCollection<KV<String, KV<String, String>>> pCollectionHash =
              "Map to Redis Logs", ParDo.of(new MessageTransformation.MessageToRedisLogs()));

          "Write to " +,
    if (options.getRedisSinkType().equals(STREAMS_SINK)) {
      PCollection<KV<String, Map<String, String>>> pCollectionStreams =
              "Map to Redis Streams", ParDo.of(new MessageTransformation.MessageToRedisStreams()));

          "Write to " +,
    // Execute the pipeline and return the result.

