Plantilla de MongoDB a BigQuery

Esta plantilla crea un flujo de procesamiento por lotes que lee documentos de MongoDB y los escribe en BigQuery.

Si quieres capturar datos de flujo de cambios de MongoDB, puedes usar la plantilla de MongoDB a BigQuery (CDC).

Requisitos del flujo de procesamiento

  • El conjunto de datos de BigQuery de destino debe existir.
  • Se debe poder acceder a la instancia de MongoDB de origen desde las máquinas de trabajador de Dataflow.

Formato de salida

El formato de los registros de salida depende del valor del parámetro userOption. Si userOption es NONE, la salida tiene el siguiente esquema. El campo source_data contiene el documento en formato JSON.

  [
    {"name":"id","type":"STRING"},
    {"name":"source_data","type":"STRING"},
    {"name":"timestamp","type":"TIMESTAMP"}
  ]
  

Si userOption es FLATTEN, la canalización aplana los documentos y escribe los campos de nivel superior como columnas de la tabla. Por ejemplo, supongamos que los documentos de la colección de MongoDB contienen los siguientes campos:

  • "_id" (string)
  • "title" (string)
  • "genre" (string)

Con FLATTEN, la salida tiene el siguiente esquema. La plantilla añade el campo timestamp.

  [
    {"name":"_id","type":"STRING"},
    {"name":"title","type":"STRING"},
    {"name":"genre","type":"STRING"},
    {"name":"timestamp","type":"TIMESTAMP"}
  ]
  

Si userOption es JSON, la canalización almacena el documento en formato JSON de BigQuery. BigQuery tiene compatibilidad integrada con datos JSON mediante el tipo de datos JSON. Para obtener más información, consulta el artículo Trabajar con datos JSON en GoogleSQL.

Parámetros de plantilla

Parámetros obligatorios

  • mongoDbUri el URI de conexión de MongoDB en el formato mongodb+srv://:@..
  • database: base de datos de MongoDB de la que se leerá la colección. Por ejemplo, my-db.
  • collection: nombre de la colección en la base de datos de MongoDB. Por ejemplo, my-collection.
  • userOption FLATTEN, JSON o NONE. FLATTEN combina los documentos en un solo nivel. JSON almacena el documento en formato JSON de BigQuery. NONE almacena todo el documento como una CADENA con formato JSON. El valor predeterminado es NONE.
  • outputTableSpec la tabla de BigQuery en la que se escribirán los datos. Por ejemplo, bigquery-project:dataset.output_table.

Parámetros opcionales

  • KMSEncryptionKey clave de cifrado de Cloud KMS para descifrar la cadena de conexión del URI de MongoDB. Si se proporciona la clave de Cloud KMS, toda la cadena de conexión del URI de MongoDB debe proporcionarse cifrada. Por ejemplo, projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key.
  • filter: filtro Bson en formato JSON. Por ejemplo, { "val": { $gt: 0, $lt: 9 }}.
  • useStorageWriteApi si es true, la canalización usa la API Storage Write de BigQuery (https://cloud.google.com/bigquery/docs/write-api). El valor predeterminado es false. Para obtener más información, consulta el artículo sobre cómo usar la API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • useStorageWriteApiAtLeastOnce cuando se usa la API Storage Write, especifica la semántica de escritura. Para usar la semántica de al menos una vez (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), asigna a este parámetro el valor true. Para usar la semántica de entrega única, asigna el valor false al parámetro. Este parámetro solo se aplica cuando useStorageWriteApi es true. El valor predeterminado es false.
  • bigQuerySchemaPath la ruta de Cloud Storage del esquema JSON de BigQuery. Por ejemplo, gs://your-bucket/your-schema.json.
  • javascriptDocumentTransformGcsPath el URI de Cloud Storage del archivo .js que define la función de JavaScript definida por el usuario (UDF) que se va a usar. Por ejemplo, gs://your-bucket/your-transforms/*.js.
  • javascriptDocumentTransformFunctionName nombre de la función definida por el usuario (UDF) de JavaScript que se va a usar. Por ejemplo, si el código de la función JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDFs de JavaScript, consulta Ejemplos de UDFs (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). Por ejemplo, transform.

Función definida por el usuario

También puedes ampliar esta plantilla escribiendo una función definida por el usuario (UDF) en JavaScript. La plantilla llama a la función definida por el usuario para cada elemento de entrada. Las cargas útiles de los elementos se serializan como cadenas JSON.

Para usar una UDF, sube el archivo JavaScript a Cloud Storage y define los siguientes parámetros de plantilla:

ParámetroDescripción
javascriptDocumentTransformGcsPath Ubicación de Cloud Storage del archivo JavaScript.
javascriptDocumentTransformFunctionName Nombre de la función de JavaScript.

Para obtener más información, consulta el artículo sobre cómo crear funciones definidas por el usuario para plantillas de Dataflow.

Especificación de la función

La función definida por el usuario tiene las siguientes especificaciones:

  • Entrada: un documento de MongoDB.
  • Salida: un objeto serializado como una cadena JSON. Si userOption es NONE, el objeto JSON debe incluir una propiedad llamada _id que contenga el ID del documento.
  • Ejecutar la plantilla

    Consola

    1. Ve a la página Crear tarea a partir de plantilla de Dataflow.
    2. Ir a Crear tarea a partir de plantilla
    3. En el campo Nombre de la tarea, introduce un nombre único.
    4. Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es us-central1.

      Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.

    5. En el menú desplegable Plantilla de flujo de datos, seleccione the MongoDB to BigQuery template.
    6. En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
    7. Haz clic en Ejecutar trabajo.

    gcloud

    En tu shell o terminal, ejecuta la plantilla:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/MongoDB_to_BigQuery \
        --parameters \
    outputTableSpec=OUTPUT_TABLE_SPEC,\
    mongoDbUri=MONGO_DB_URI,\
    database=DATABASE,\
    collection=COLLECTION,\
    userOption=USER_OPTION

    Haz los cambios siguientes:

    • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
    • JOB_NAME: un nombre de trabajo único que elijas
    • REGION_NAME: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
    • VERSION: la versión de la plantilla que quieres usar

      Puedes usar los siguientes valores:

      • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
      • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
    • OUTPUT_TABLE_SPEC: nombre de la tabla de BigQuery de destino.
    • MONGO_DB_URI: tu URI de MongoDB.
    • DATABASE: tu base de datos de MongoDB.
    • COLLECTION: tu colección de MongoDB.
    • USER_OPTION: FLATTEN, JSON o NONE.

    API

    Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
              "inputTableSpec": "INPUT_TABLE_SPEC",
              "mongoDbUri": "MONGO_DB_URI",
              "database": "DATABASE",
              "collection": "COLLECTION",
              "userOption": "USER_OPTION"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/MongoDB_to_BigQuery",
       }
    }

    Haz los cambios siguientes:

    • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
    • JOB_NAME: un nombre de trabajo único que elijas
    • LOCATION: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
    • VERSION: la versión de la plantilla que quieres usar

      Puedes usar los siguientes valores:

      • latest para usar la última versión de la plantilla, que está disponible en la carpeta principal sin fecha del contenedor: gs://dataflow-templates-REGION_NAME/latest/
      • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se encuentra anidada en la carpeta principal correspondiente con la fecha en el bucket: gs://dataflow-templates-REGION_NAME/
    • OUTPUT_TABLE_SPEC: nombre de la tabla de BigQuery de destino.
    • MONGO_DB_URI: tu URI de MongoDB.
    • DATABASE: tu base de datos de MongoDB.
    • COLLECTION: tu colección de MongoDB.
    • USER_OPTION: FLATTEN, JSON o NONE.

    Siguientes pasos