Plantilla de Pub/Sub a MongoDB

La plantilla de Pub/Sub a MongoDB es una canalización de transmisión que lee mensajes con codificación JSON de una suscripción a Pub/Sub y los escribe en MongoDB como documentos. Si es necesario, esta canalización admite transformaciones adicionales que se pueden incluir mediante una función definida por el usuario (UDF) de JavaScript.

Si se producen errores durante el procesamiento de los registros, la plantilla los escribe en una tabla de BigQuery, junto con el mensaje de entrada. Por ejemplo, los errores pueden ocurrir debido a una falta de coincidencia del esquema, un JSON con formato incorrecto o mientras se ejecutan transformaciones. Especifica el nombre de la tabla en el parámetro deadletterTable. Si la tabla no existe, la canalización la crea automáticamente.

Requisitos de la canalización

  • La suscripción a Pub/Sub debe existir y los mensajes deben estar codificados en un formato JSON válido.
  • El clúster de MongoDB debe existir y debe ser accesible desde las máquinas de trabajador de Dataflow.

Parámetros de la plantilla

Parámetros obligatorios

  • inputSubscription : es el nombre de la suscripción a Pub/Sub. (Ejemplo: projects/your-project-id/subscriptions/your-subscription-name).
  • mongoDBUri :Una lista separada por comas de los servidores de MongoDB. (por ejemplo, host1:puerto,host2:puerto,host3:puerto).
  • database: La base de datos en MongoDB en la que se debe almacenar la colección. (Ejemplo: my-db).
  • collection: el nombre de la colección en la base de datos de MongoDB. (Ejemplo: my-collection).
  • deadletterTable : La tabla de BigQuery que almacena mensajes causados por fallas, como esquemas no coincidentes, JSON con formato incorrecto, etcétera. (Ejemplo: your-project-id:your-dataset.your-table-name).

Parámetros opcionales

  • batchSize : El tamaño de lote que se usa para la inserción por lotes de documentos en MongoDB (opcional). Valor predeterminado: 1000.
  • batchSizeBytes : Es el tamaño del lote en bytes. La configuración predeterminada es 5242880.
  • maxConnectionIdleTime :El tiempo de inactividad máximo permitido en segundos antes de que se agote el tiempo de espera de la conexión (opcional). La configuración predeterminada es 60000.
  • sslEnabled : Es un valor booleano que indica si la conexión a MongoDB está habilitada con SSL. Configuración predeterminada: verdadero.
  • ignoreSSLCertificate : Es un valor booleano que indica si se debe ignorar el certificado SSL. Configuración predeterminada: verdadero.
  • withOrdered :El valor booleano que habilita las inserciones masivas ordenadas en MongoDB (opcional). Configuración predeterminada: verdadero.
  • withSSLInvalidHostNameAllowed : Es un valor booleano que indica si se permite un nombre de host no válido para la conexión SSL. Configuración predeterminada: verdadero.
  • javascriptTextTransformGcsPath: el URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que se usará. (Ejemplo: gs://my-bucket/my-udfs/my_file.js).
  • javascriptTextTransformFunctionName: el nombre de la función definida por el usuario (UDF) de JavaScript que se usará. Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: Especifica la frecuencia en minutos con la que se debe volver a cargar la UDF. Si el valor es mayor que 0, Dataflow comprueba de forma periódica el archivo de UDF en Cloud Storage y vuelve a cargar la UDF si el archivo se modifica. Este parámetro te permite actualizar la UDF mientras se ejecuta la canalización, sin necesidad de reiniciar el trabajo. Si el valor es 0, se inhabilita la carga de UDF. El valor predeterminado es 0.

Función definida por el usuario

Para extender esta plantilla, puedes escribir una función definida por el usuario (UDF). La plantilla llama a la UDF para cada elemento de entrada. Las cargas útiles de elementos se serializan como cadenas JSON. Para obtener más información, consulta Crea funciones definidas por el usuario para plantillas de Dataflow.

Especificación de la función

La UDF tiene la siguiente especificación:

  • Entrada: una sola línea de un archivo CSV de entrada.
  • Resultado: Un documento JSON en cadena para insertar en MongoDB.

Ejecuta la plantilla

Console

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to MongoDB template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • INPUT_SUBSCRIPTION: Es la suscripción a Pub/Sub (por ejemplo, projects/my-project-id/subscriptions/my-subscription-id).
  • MONGODB_URI: Son las direcciones del servidor de MongoDB (por ejemplo, 192.285.234.12:27017,192.287.123.11:27017).
  • DATABASE: Es el nombre de la base de datos de MongoDB (por ejemplo, users).
  • COLLECTION: Es el nombre de la colección de MongoDB (por ejemplo, profiles).
  • UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery (por ejemplo, your-project:your-dataset.your-table-name).

API

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • INPUT_SUBSCRIPTION: Es la suscripción a Pub/Sub (por ejemplo, projects/my-project-id/subscriptions/my-subscription-id).
  • MONGODB_URI: Son las direcciones del servidor de MongoDB (por ejemplo, 192.285.234.12:27017,192.287.123.11:27017).
  • DATABASE: Es el nombre de la base de datos de MongoDB (por ejemplo, users).
  • COLLECTION: Es el nombre de la colección de MongoDB (por ejemplo, profiles).
  • UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery (por ejemplo, your-project:your-dataset.your-table-name).

¿Qué sigue?