Plantilla de UDF de Python de Pub/Sub a MongoDB

La plantilla de UDF de Pub/Sub a MongoDB con UDF de Python es una canalización de transmisión que lee mensajes codificados en JSON de una suscripción a Pub/Sub y los escribe en MongoDB como documentos. Si es necesario, esta canalización admite transformaciones adicionales que se pueden incluir mediante una función definida por el usuario (UDF) de Python.

Si se producen errores mientras se procesan registros, la plantilla los escribe en una tabla de BigQuery, junto con el mensaje de entrada. Por ejemplo, los errores pueden ocurrir debido a una falta de coincidencia del esquema, JSON con formato incorrecto o mientras se ejecutan transformaciones. Especifica el nombre de la tabla en el parámetro deadletterTable. Si la tabla no existe, la canalización la crea de forma automática.

Requisitos de la canalización

  • La suscripción a Pub/Sub debe existir y los mensajes deben estar codificados en un formato JSON válido.
  • El clúster de MongoDB debe existir y debe ser accesible desde las máquinas de trabajador de Dataflow.

Parámetros de la plantilla

Parámetro Descripción
inputSubscription Nombre de la suscripción a Pub/Sub. Por ejemplo: projects/my-project-id/subscriptions/my-subscription-id.
mongoDBUri Una lista separada por comas de los servidores de MongoDB. Por ejemplo: 192.285.234.12:27017,192.287.123.11:27017.
database La base de datos en MongoDB en la que se debe almacenar la colección. Por ejemplo: my-db.
collection Nombre de la colección dentro de la base de datos de MongoDB. Por ejemplo: my-collection.
deadletterTable La tabla de BigQuery que almacena mensajes debido a fallas (esquemas no coincidentes, JSON con formato incorrecto, etcétera). Por ejemplo: project-id:dataset-name.table-name.
pythonExternalTextTransformGcsPath Opcional: El URI de Cloud Storage del archivo de código de Python que define la función definida por el usuario (UDF) que deseas usar. Por ejemplo, gs://my-bucket/my-udfs/my_file.py.
pythonExternalTextTransformFunctionName Opcional: El nombre de la función definida por el usuario (UDF) de Python que deseas usar.
batchSize El tamaño de lote que se usa para la inserción por lotes de documentos en MongoDB (opcional). Valor predeterminado: 1000.
batchSizeBytes El tamaño del lote en bytes (opcional). Valor predeterminado: 5242880.
maxConnectionIdleTime El tiempo de inactividad máximo permitido en segundos antes de que se agote el tiempo de espera de la conexión (opcional). Valor predeterminado: 60000.
sslEnabled El valor booleano que indica si la conexión a MongoDB está habilitada con SSL (opcional). Valor predeterminado: true.
ignoreSSLCertificate El valor booleano que indica si se debe ignorar el certificado SSL (opcional). Valor predeterminado: true.
withOrdered El valor booleano que habilita las inserciones masivas ordenadas en MongoDB (opcional). Valor predeterminado: true.
withSSLInvalidHostNameAllowed El valor booleano que indica si se permite un nombre de host no válido para la conexión SSL (opcional). Valor predeterminado: true.

Función definida por el usuario

Para extender esta plantilla, puedes escribir una función definida por el usuario (UDF). La plantilla llama a la UDF para cada elemento de entrada. Las cargas útiles de elementos se serializan como cadenas JSON. Para obtener más información, consulta Crea funciones definidas por el usuario para plantillas de Dataflow.

Especificación de la función

La UDF tiene la siguiente especificación:

  • Entrada: una sola línea de un archivo CSV de entrada.
  • Resultado: Un documento JSON en cadena para insertar en MongoDB.

Ejecuta la plantilla

Consola

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Pub/Sub to MongoDB with Python UDFs template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_PubSub_to_MongoDB_Xlang \
    --parameters \
inputSubscription=INPUT_SUBSCRIPTION,\
mongoDBUri=MONGODB_URI,\
database=DATABASE,
collection=COLLECTION,
deadletterTable=UNPROCESSED_TABLE
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • INPUT_SUBSCRIPTION: Es la suscripción a Pub/Sub (por ejemplo, projects/my-project-id/subscriptions/my-subscription-id).
  • MONGODB_URI: Son las direcciones del servidor de MongoDB (por ejemplo, 192.285.234.12:27017,192.287.123.11:27017).
  • DATABASE: Es el nombre de la base de datos de MongoDB (por ejemplo, users).
  • COLLECTION: Es el nombre de la colección de MongoDB (por ejemplo, profiles).
  • UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery (por ejemplo, your-project:your-dataset.your-table-name).

API

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "INPUT_SUBSCRIPTION",
          "mongoDBUri": "MONGODB_URI",
          "database": "DATABASE",
          "collection": "COLLECTION",
          "deadletterTable": "UNPROCESSED_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_PubSub_to_MongoDB_Xlang",
   }
}
  

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • INPUT_SUBSCRIPTION: Es la suscripción a Pub/Sub (por ejemplo, projects/my-project-id/subscriptions/my-subscription-id).
  • MONGODB_URI: Son las direcciones del servidor de MongoDB (por ejemplo, 192.285.234.12:27017,192.287.123.11:27017).
  • DATABASE: Es el nombre de la base de datos de MongoDB (por ejemplo, users).
  • COLLECTION: Es el nombre de la colección de MongoDB (por ejemplo, profiles).
  • UNPROCESSED_TABLE: Es el nombre de la tabla de BigQuery (por ejemplo, your-project:your-dataset.your-table-name).

¿Qué sigue?