Plantilla de Pub/Sub a MongoDB con funciones definidas por el usuario de Python

La plantilla Pub/Sub a MongoDB con UDFs de Python es un flujo de procesamiento en streaming que lee mensajes codificados en JSON de una suscripción de Pub/Sub y los escribe en MongoDB como documentos. Si es necesario, este flujo de procesamiento admite más transformaciones que se pueden incluir mediante una función de Python definida por el usuario (FDU).

Si se producen errores al procesar registros, la plantilla los escribe en una tabla de BigQuery junto con el mensaje de entrada. Por ejemplo, pueden producirse errores debido a esquemas que no coinciden, un JSON que no es válido o al ejecutar transformaciones. Especifica el nombre de la tabla en el parámetro deadletterTable. Si la tabla no existe, la canalización la crea automáticamente.

Requisitos del flujo de procesamiento

  • Se debe crear una suscripción de Pub/Sub y los mensajes deben estar codificados en un formato JSON válido.
  • Se debe crear el clúster de MongoDB y se debe poder acceder a él desde las máquinas de trabajador de Dataflow.

Parámetros de plantilla

Parámetro Descripción
inputSubscription Nombre de la suscripción de Pub/Sub. Por ejemplo: projects/my-project-id/subscriptions/my-subscription-id
mongoDBUri Lista de servidores MongoDB separados por comas. Por ejemplo: 192.285.234.12:27017,192.287.123.11:27017
database Base de datos de MongoDB para almacenar la colección. Por ejemplo: my-db.
collection Nombre de la colección en la base de datos de MongoDB. Por ejemplo: my-collection.
deadletterTable Tabla de BigQuery que almacena mensajes debido a errores (esquema no coincidente, JSON no válido, etc.). Por ejemplo: project-id:dataset-name.table-name.
pythonExternalTextTransformGcsPath Opcional: URI de Cloud Storage del archivo de código Python que define la función definida por el usuario (UDF) que quieres usar. Por ejemplo, gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName Opcional: Nombre de la función definida por el usuario (UDF) de Python que quieras usar.
batchSize Opcional: Tamaño del lote que se usa para la inserción por lotes de documentos en MongoDB. Valor predeterminado: 1000.
batchSizeBytes Opcional: tamaño del lote en bytes. Valor predeterminado: 5242880.
maxConnectionIdleTime Opcional: tiempo de inactividad máximo permitido en segundos antes de que se agote el tiempo de espera de la conexión. Valor predeterminado: 60000.
sslEnabled Opcional: valor booleano que indica si la conexión a MongoDB tiene habilitado SSL. Valor predeterminado: true.
ignoreSSLCertificate Opcional: valor booleano que indica si se debe ignorar el certificado SSL. Valor predeterminado: true.
withOrdered Opcional: valor booleano que habilita las inserciones en bloque ordenadas en MongoDB. Valor predeterminado: true.
withSSLInvalidHostNameAllowed Opcional: valor booleano que indica si se permite un nombre de host no válido para la conexión SSL. Valor predeterminado: true.

Función definida por el usuario

También puedes ampliar esta plantilla escribiendo una función definida por el usuario (UDF). La plantilla llama a la función definida por el usuario para cada elemento de entrada. Las cargas útiles de los elementos se serializan como cadenas JSON. Para obtener más información, consulta el artículo sobre cómo crear funciones definidas por el usuario para plantillas de Dataflow.

Especificación de la función

La función definida por el usuario tiene las siguientes especificaciones:

  • Entrada: una sola línea de un archivo CSV de entrada.
  • Salida: un documento JSON convertido en cadena para insertarlo en MongoDB.

Ejecutar la plantilla

Consola

  1. Ve a la página Crear tarea a partir de plantilla de Dataflow.
  2. Ir a Crear tarea a partir de plantilla
  3. En el campo Nombre de la tarea, introduce un nombre único.
  4. Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es us-central1.

    Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de flujo de datos, seleccione the Pub/Sub to MongoDB with Python UDFs template.
  6. En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
  7. Haz clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_MongoDB_Xlang \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

Haz los cambios siguientes:

  • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
  • REGION_NAME: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
  • JOB_NAME: un nombre de trabajo único que elijas
  • VERSION: la versión de la plantilla que quieres usar

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
  • INPUT_SUBSCRIPTION: la suscripción de Pub/Sub (por ejemplo, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: las direcciones del servidor MongoDB (por ejemplo, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: el nombre de la base de datos de MongoDB (por ejemplo, users)
  • COLLECTION: el nombre de la colección de MongoDB (por ejemplo, profiles)
  • UNPROCESSED_TABLE: el nombre de la tabla de BigQuery (por ejemplo, your-project:your-dataset.your-table-name)

API

Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_MongoDB_Xlang",
   }
}
  

Haz los cambios siguientes:

  • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
  • LOCATION: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
  • JOB_NAME: un nombre de trabajo único que elijas
  • VERSION: la versión de la plantilla que quieres usar

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
  • INPUT_SUBSCRIPTION: la suscripción de Pub/Sub (por ejemplo, projects/my-project-id/subscriptions/my-subscription-id)
  • MONGODB_URI: las direcciones del servidor MongoDB (por ejemplo, 192.285.234.12:27017,192.287.123.11:27017)
  • DATABASE: el nombre de la base de datos de MongoDB (por ejemplo, users)
  • COLLECTION: el nombre de la colección de MongoDB (por ejemplo, profiles)
  • UNPROCESSED_TABLE: el nombre de la tabla de BigQuery (por ejemplo, your-project:your-dataset.your-table-name)

Siguientes pasos