Plantilla de Pub/Sub a MongoDB

La plantilla de Pub/Sub a MongoDB es un flujo de procesamiento en streaming que lee mensajes codificados en JSON de una suscripción de Pub/Sub y los escribe en MongoDB como documentos. Si es necesario, este flujo de procesamiento admite más transformaciones que se pueden incluir mediante una función de JavaScript definida por el usuario (FDU).

Si se producen errores al procesar registros, la plantilla los escribe en una tabla de BigQuery junto con el mensaje de entrada. Por ejemplo, pueden producirse errores debido a esquemas que no coinciden, un JSON que no es válido o al ejecutar transformaciones. Especifica el nombre de la tabla en el parámetro deadletterTable. Si la tabla no existe, la canalización la crea automáticamente.

Requisitos del flujo de procesamiento

  • Se debe crear una suscripción de Pub/Sub y los mensajes deben estar codificados en un formato JSON válido.
  • Se debe crear el clúster de MongoDB y se debe poder acceder a él desde las máquinas de trabajador de Dataflow.

Parámetros de plantilla

Parámetros obligatorios

  • inputSubscription nombre de la suscripción de Pub/Sub. Por ejemplo, projects/your-project-id/subscriptions/your-subscription-name.
  • mongoDBUri lista separada por comas de servidores MongoDB. Por ejemplo, host1:port,host2:port,host3:port.
  • database: base de datos de MongoDB en la que se almacenará la colección. Por ejemplo, my-db.
  • collection: nombre de la colección de la base de datos de MongoDB. Por ejemplo, my-collection.
  • deadletterTable la tabla de BigQuery que almacena los mensajes que han provocado errores, como esquemas que no coinciden, JSON con formato incorrecto, etc. Por ejemplo, your-project-id:your-dataset.your-table-name.

Parámetros opcionales

  • batchSize tamaño del lote usado para la inserción por lotes de documentos en MongoDB. El valor predeterminado es 1000.
  • batchSizeBytes tamaño del lote en bytes. El valor predeterminado es 5242880.
  • maxConnectionIdleTime tiempo máximo de inactividad permitido en segundos antes de que se agote el tiempo de espera de la conexión. El valor predeterminado es 60000.
  • sslEnabled valor booleano que indica si la conexión a MongoDB está habilitada para SSL. El valor predeterminado es true.
  • ignoreSSLCertificate valor booleano que indica si se debe ignorar el certificado SSL. El valor predeterminado es true.
  • withOrdered valor booleano que habilita las inserciones masivas ordenadas en MongoDB. El valor predeterminado es true.
  • withSSLInvalidHostNameAllowed valor booleano que indica si se permite un nombre de host no válido para la conexión SSL. El valor predeterminado es true.
  • javascriptTextTransformGcsPath el URI de Cloud Storage del archivo .js que define la función de JavaScript definida por el usuario (UDF) que se va a usar. Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName nombre de la función definida por el usuario (UDF) de JavaScript que se va a usar. Por ejemplo, si el código de la función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDFs de JavaScript, consulta Ejemplos de UDFs (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes especifica la frecuencia con la que se vuelve a cargar la FDU, en minutos. Si el valor es superior a 0, Dataflow comprueba periódicamente el archivo de la función definida por el usuario en Cloud Storage y vuelve a cargar la función si se modifica el archivo. Este parámetro le permite actualizar la función definida por el usuario mientras se ejecuta la canalización, sin necesidad de reiniciar el trabajo. Si el valor es 0, se inhabilita la recarga de las funciones definidas por el usuario. El valor predeterminado es 0.

Función definida por el usuario

También puedes ampliar esta plantilla escribiendo una función definida por el usuario (UDF). La plantilla llama a la función definida por el usuario para cada elemento de entrada. Las cargas útiles de los elementos se serializan como cadenas JSON. Para obtener más información, consulta el artículo sobre cómo crear funciones definidas por el usuario para plantillas de Dataflow.

Especificación de la función

La función definida por el usuario tiene las siguientes especificaciones:

  • Entrada: una sola línea de un archivo CSV de entrada.
  • Salida: un documento JSON convertido en cadena para insertarlo en MongoDB.

Ejecutar la plantilla

Consola

  1. Ve a la página Crear tarea a partir de plantilla de Dataflow.
  2. Ir a Crear tarea a partir de plantilla
  3. En el campo Nombre de la tarea, introduce un nombre único.
  4. Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es us-central1.

    Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de flujo de datos, seleccione the Pub/Sub to MongoDB template.
  6. En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
  7. Haz clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_MongoDB \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

Haz los cambios siguientes:

  • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
  • REGION_NAME: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
  • JOB_NAME: un nombre de trabajo único que elijas
  • VERSION: la versión de la plantilla que quieres usar

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
  • INPUT_SUBSCRIPTION: la suscripción de Pub/Sub (por ejemplo, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: las direcciones del servidor MongoDB (por ejemplo, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: el nombre de la base de datos de MongoDB (por ejemplo, users)
  • COLLECTION: el nombre de la colección de MongoDB (por ejemplo, profiles)
  • UNPROCESSED_TABLE: el nombre de la tabla de BigQuery (por ejemplo, your-project:your-dataset.your-table-name)

API

Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_MongoDB",
   }
}
  

Haz los cambios siguientes:

  • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
  • LOCATION: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
  • JOB_NAME: un nombre de trabajo único que elijas
  • VERSION: la versión de la plantilla que quieres usar

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
  • INPUT_SUBSCRIPTION: la suscripción de Pub/Sub (por ejemplo, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: las direcciones del servidor MongoDB (por ejemplo, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: el nombre de la base de datos de MongoDB (por ejemplo, users)
  • COLLECTION: el nombre de la colección de MongoDB (por ejemplo, profiles)
  • UNPROCESSED_TABLE: el nombre de la tabla de BigQuery (por ejemplo, your-project:your-dataset.your-table-name)

Siguientes pasos