Plantilla de Pub/Sub a Redis

La plantilla de Pub/Sub a Redis es un flujo de procesamiento en streaming que lee mensajes de una suscripción de Pub/Sub y escribe la carga útil de los mensajes en Redis. El caso práctico más habitual de esta plantilla es exportar registros a Redis Enterprise para realizar análisis de registros avanzados basados en búsquedas en tiempo real.

  • Antes de escribir en Redis, puedes aplicar una función definida por el usuario de JavaScript a la carga útil del mensaje.
  • Si surge algún problema a la hora de procesar un mensaje, se enviará ese mensaje a un tema de Pub/Sub sin procesar para ponerle solución y volver a procesar el mensaje.
  • Para aumentar la seguridad, habilita una conexión SSL al configurar la conexión del endpoint de la base de datos. Esta plantilla no admite TLS mutuo.

Requisitos del flujo de procesamiento

  • La suscripción de origen de Pub/Sub debe existir antes de ejecutar el flujo de procesamiento.
  • El tema de Pub/Sub sin procesar debe existir antes de ejecutar el flujo de procesamiento.
  • Se debe poder acceder al endpoint de la base de datos Redis desde la subred de los trabajadores de Dataflow.

Parámetros de plantilla

Parámetros obligatorios

  • inputSubscription la suscripción de Pub/Sub de la que se leerán los datos de entrada. Por ejemplo, projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>.
  • redisHost el host de la base de datos Redis. Por ejemplo, your.cloud.db.redislabs.com. El valor predeterminado es 127.0.0.1.
  • redisPort el puerto de la base de datos Redis. Por ejemplo, 12345. El valor predeterminado es 6379.
  • redisPassword la contraseña de la base de datos Redis. El valor predeterminado es empty.

Parámetros opcionales

  • sslEnabled el parámetro SSL de la base de datos Redis. Valor predeterminado: false.
  • redisSinkType el receptor de Redis. Los valores posibles son STRING_SINK, HASH_SINK, STREAMS_SINK, and LOGGING_SINK. Por ejemplo, STRING_SINK. El valor predeterminado es STRING_SINK.
  • connectionTimeout tiempo de espera de la conexión de Redis en milisegundos. Por ejemplo, 2000. El valor predeterminado es 2000.
  • ttl tiempo de caducidad de la clave en segundos. El ttl valor predeterminado de HASH_SINK es -1, lo que significa que nunca caduca.
  • javascriptTextTransformGcsPath el URI de Cloud Storage del archivo .js que define la función de JavaScript definida por el usuario (UDF) que se va a usar. Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName nombre de la función definida por el usuario (UDF) de JavaScript que se va a usar. Por ejemplo, si el código de la función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDFs de JavaScript, consulta Ejemplos de UDFs (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes especifica la frecuencia con la que se vuelve a cargar la FDU, en minutos. Si el valor es superior a 0, Dataflow comprueba periódicamente el archivo de la función definida por el usuario en Cloud Storage y vuelve a cargar la función si se modifica el archivo. Este parámetro le permite actualizar la función definida por el usuario mientras se ejecuta la canalización, sin necesidad de reiniciar el trabajo. Si el valor es 0, se inhabilita la recarga de las funciones definidas por el usuario. El valor predeterminado es 0.

Función definida por el usuario

También puedes ampliar esta plantilla escribiendo una función definida por el usuario (UDF). La plantilla llama a la función definida por el usuario para cada elemento de entrada. Las cargas útiles de los elementos se serializan como cadenas JSON. Para obtener más información, consulta el artículo sobre cómo crear funciones definidas por el usuario para plantillas de Dataflow.

Especificación de la función

La función definida por el usuario tiene las siguientes especificaciones:

  • Entrada: cadena JSON
  • Resultado: una cadena o un objeto JSON convertido en cadena

Ejecutar la plantilla

Consola

  1. Ve a la página Crear tarea a partir de plantilla de Dataflow.
  2. Ir a Crear tarea a partir de plantilla
  3. En el campo Nombre de la tarea, introduce un nombre único.
  4. Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es us-central1.

    Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de flujo de datos, seleccione the Pub/Sub to Redis template.
  6. En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
  7. Haz clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_Redis \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       inputSubscription=INPUT_SUBSCRIPTION,\
       redisHost=REDIS_HOST,\
       redisPort=REDIS_PORT,\
       redisPassword=REDIS_PASSWORD,\

Haz los cambios siguientes:

  • JOB_NAME: un nombre de trabajo único que elijas
  • VERSION: la versión de la plantilla que quieres usar

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
  • INPUT_SUBSCRIPTION: la suscripción de entrada de Pub/Sub
  • REDIS_HOST: el host de la base de datos Redis
  • REDIS_PORT: el puerto de la base de datos Redis
  • REDIS_PASSWORD: la contraseña de la base de datos Redis

API

Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launchParameter": {
     "jobName": "JOB_NAME",
     "parameters": {
       "inputSubscription": "INPUT_SUBSCRIPTION",
       "redisHost": "REDIS_HOST",
       "redisPort": "REDIS_PORT",
       "redisPassword": "REDIS_PASSWORD",
     },
     "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_Redis",
     "environment": { "maxWorkers": "10" }
  }
}

Haz los cambios siguientes:

  • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
  • JOB_NAME: un nombre de trabajo único que elijas
  • VERSION: la versión de la plantilla que quieres usar

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
  • LOCATION: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
  • INPUT_SUBSCRIPTION: la suscripción de entrada de Pub/Sub
  • REDIS_HOST: el host de la base de datos Redis
  • REDIS_PORT: el puerto de la base de datos Redis
  • REDIS_PASSWORD: la contraseña de la base de datos Redis

Siguientes pasos