Plantilla de Datastream a BigQuery (transmisión)

La plantilla de Datastream a BigQuery es una canalización de transmisión que lee datos de Datastream y los replica en BigQuery. La plantilla lee los datos de Cloud Storage mediante notificaciones de Pub/Sub y los replica en una tabla de etapa de pruebas de BigQuery particionada por tiempo. Después de la replicación, la plantilla ejecuta una MERGE en BigQuery para actualizar todos los cambios de captura de datos modificados (CDC) en una réplica de la tabla de origen.

La plantilla controla la creación y la actualización de las tablas de BigQuery que administra la replicación. Cuando se requiere un lenguaje de definición de datos (DDL), una devolución de llamada a Datastream extrae el esquema de la tabla de origen y lo traduce a los tipos de datos de BigQuery. Las operaciones admitidas incluyen las siguientes:

  • Las tablas nuevas se crean a medida que se insertan los datos.
  • Se agregan columnas nuevas a las tablas de BigQuery con valores iniciales nulos.
  • Las columnas descartadas se ignoran en BigQuery y los valores futuros son nulos.
  • Las columnas cuyos nombres se han cambiado se agregan a BigQuery como columnas nuevas.
  • Los cambios de tipo no se propagan a BigQuery.

Se recomienda ejecutar esta canalización con el modo de transmisión al menos una vez, ya que la plantilla realiza la anulación de duplicación cuando combina datos de una tabla de BigQuery temporal con la tabla principal de BigQuery. Este paso en la canalización significa que no hay beneficios adicionales en usar el modo de transmisión “exactamente una vez”.

Requisitos de la canalización

  • Una transmisión de Datastream que está lista para replicar los datos o ya los está replicando.
  • Las notificaciones de Pub/Sub de Cloud Storage están habilitadas para los datos de Datastream.
  • Se crean los conjuntos de datos de destino de BigQuery y se les otorgó acceso de administrador a la cuenta de servicio de Compute Engine.
  • En la tabla de origen, se necesita una clave primaria para crear la réplica de destino.
  • Una base de datos de origen de MySQL o de Oracle. Las bases de datos de PostgreSQL no son compatibles.

Parámetros de la plantilla

Parámetros obligatorios

  • inputFilePattern : La ubicación del archivo de salida del archivo de Datastream en Cloud Storage, en el formato: gs://<BUCKET_NAME>/<ROOT_PATH>/.
  • inputFileFormat: El formato de los archivos de salida que produce Datastream (opcional). El valor puede ser “avro” o “json”. La configuración predeterminada es avro.
  • gcsPubSubSubscription: Es la suscripción a Pub/Sub que usa Cloud Storage para notificar a Dataflow sobre los nuevos archivos disponibles para su procesamiento, en el formato projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
  • outputStagingDatasetTemplate : Es el nombre del conjunto de datos que contiene las tablas de etapa de pruebas. Este parámetro admite plantillas, por ejemplo, {_metadata_dataset}_log o my_dataset_log. Por lo general, este parámetro es un nombre de conjunto de datos. La configuración predeterminada es {_metadata_dataset}.
  • outputDatasetTemplate : nombre del conjunto de datos que contiene las tablas de réplica. Este parámetro admite plantillas, por ejemplo, {_metadata_dataset} o my_dataset. Por lo general, este parámetro es un nombre de conjunto de datos. La configuración predeterminada es {_metadata_dataset}.
  • deadLetterQueueDirectory : La ruta de acceso que usa Dataflow para escribir el resultado de la cola de mensajes no entregados. Esta ruta de acceso no debe estar en la misma ruta que el resultado del archivo de Datastream. La configuración predeterminada es vacía.

Parámetros opcionales

  • streamName: El nombre o la plantilla del flujo que se consultará para obtener la información del esquema. La configuración predeterminada es: {_metadata_stream}. Por lo general, el valor predeterminado es suficiente.
  • rfcStartDateTime : La fecha y hora de inicio que se usará para recuperar datos de Cloud Storage (https://tools.ietf.org/html/rfc3339). El valor predeterminado es: 1970-01-01T00:00:00.00Z.
  • fileReadConcurrency : La cantidad de archivos simultáneos de DataStream para leer. El valor predeterminado es 10.
  • outputProjectId : El ID del proyecto de Google Cloud que contiene los conjuntos de datos de BigQuery en los que se deben generar datos. El valor predeterminado para este parámetro es el proyecto en el que se ejecuta la canalización de Dataflow.
  • outputStagingTableNameTemplate : Es la plantilla que se usará para nombrar las tablas de etapa de pruebas. Por ejemplo, {_metadata_table}). La configuración predeterminada es: {_metadata_table}_log.
  • outputTableNameTemplate : Es la plantilla que se usará para el nombre de las tablas de réplica, por ejemplo, {_metadata_table}. La configuración predeterminada es: {_metadata_table}.
  • ignoreFields : los campos separados por comas que se deben ignorar en BigQuery. La configuración predeterminada es: _metadata_stream,_metadata_schema,_metadata_table,_metadata_source,_metadata_tx_id,_metadata_dlq_reconsumed,_metadata_primary_keys,_metadata_error,_metadata_retry_count. (Ejemplo: _metadata_stream,_metadata_schema).
  • mergeFrequencyMinutes : La cantidad de minutos entre combinaciones para una tabla determinada. La configuración predeterminada es 5.
  • dlqRetryMinutes : La cantidad de minutos entre los reintentos de DLQ. La configuración predeterminada es 10.
  • dataStreamRootUrl : La URL raíz de la API de Datastream. La configuración predeterminada es https://datastream.googleapis.com/.
  • applyMerge : Indica si se deben inhabilitar las consultas MERGE para el trabajo. Configuración predeterminada: verdadero.
  • mergeConcurrency : Es la cantidad de consultas simultáneas de MERGE en BigQuery. Esta acción solo es válida cuando applyMerge se establece como true. La configuración predeterminada es 30.
  • partitionRetentionDays : Es la cantidad de días que se usa para la retención de particiones cuando se ejecutan combinaciones de BigQuery. La configuración predeterminada es 1.
  • useStorageWriteApiAtLeastOnce: Este parámetro solo se aplica si “Usar la API de BigQuery Storage Write” está habilitada. Si es verdadero, se usa la semántica al menos una vez para la API de Storage Write. De lo contrario, se usa la semántica de “exactamente una vez”. La configuración predeterminada es "false".
  • javascriptTextTransformGcsPath: el URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que se usará. (Ejemplo: gs://my-bucket/my-udfs/my_file.js).
  • javascriptTextTransformFunctionName: el nombre de la función definida por el usuario (UDF) de JavaScript que se usará. Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: Especifica la frecuencia en minutos con la que se debe volver a cargar la UDF. Si el valor es mayor que 0, Dataflow comprueba de forma periódica el archivo de UDF en Cloud Storage y vuelve a cargar la UDF si el archivo se modifica. Este parámetro te permite actualizar la UDF mientras se ejecuta la canalización, sin necesidad de reiniciar el trabajo. Si el valor es 0, se inhabilita la carga de UDF. El valor predeterminado es 0.
  • pythonTextTransformGcsPath : El patrón de ruta de acceso de Cloud Storage para el código de Python que contiene tus funciones definidas por el usuario. (Por ejemplo: gs://your-bucket/your-transforms/*.py).
  • pythonRuntimeVersion : la versión del entorno de ejecución que se usa para esta UDF de Python.
  • pythonTextTransformFunctionName : El nombre de la función a la que se llamará desde el archivo JavaScript. Usa solo letras, dígitos y guiones bajos. (Ejemplo: transform_udf1).
  • runtimeRetries : la cantidad de veces que se reintenta un entorno de ejecución antes de que falle. La configuración predeterminada es 5.
  • useStorageWriteApi : si es verdadero, la canalización usa la API de BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). El valor predeterminado es false. Para obtener más información, consulta Usa la API de Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams: cuando usas la API de Storage Write, se especifica la cantidad de transmisiones de escritura. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false, debes configurar este parámetro. La configuración predeterminada es 0.
  • storageWriteApiTriggeringFrequencySec: cuando se usa la API de Storage Write, se especifica la frecuencia de activación en segundos. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false, debes configurar este parámetro.

Función definida por el usuario

Para extender esta plantilla, puedes escribir una función definida por el usuario (UDF). La plantilla llama a la UDF para cada elemento de entrada. Las cargas útiles de elementos se serializan como cadenas JSON. Para obtener más información, consulta Crea funciones definidas por el usuario para plantillas de Dataflow.

Especificación de la función

La UDF tiene la siguiente especificación:

  • Entrada: los datos de CDC, serializados como una cadena JSON.
  • Resultado: Una cadena JSON que coincide con el esquema de la tabla de destino de BigQuery.
  • Ejecuta la plantilla

    Consola

    1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
    2. Ir a Crear un trabajo a partir de una plantilla
    3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
    4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

      Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

    5. En el menú desplegable Plantilla de Dataflow, selecciona the Datastream to BigQuery template.
    6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
    7. Opcional: Para cambiar del procesamiento “exactamente una vez” al modo de transmisión al menos una vez, selecciona At Least Once.
    8. Haz clic en Ejecutar trabajo.

    gcloud

    En tu shell o terminal, ejecuta la plantilla:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --enable-streaming-engine \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \
        --parameters \
    inputFilePattern=GCS_FILE_PATH,\
    gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
    outputStagingDatasetTemplate=BIGQUERY_DATASET,\
    outputDatasetTemplate=BIGQUERY_DATASET,\
    outputStagingTableNameTemplate=BIGQUERY_TABLE,\
    outputTableNameTemplate=BIGQUERY_TABLE_log
      

    Reemplaza lo siguiente:

    • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
    • JOB_NAME: Es el nombre del trabajo que elijas
    • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: es la ruta de acceso de Cloud Storage a los datos de Datastream. Por ejemplo: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: es la suscripción de Pub/Sub desde la que se leen los archivos modificados. Por ejemplo: projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: es el nombre de tu conjunto de datos de BigQuery.
    • BIGQUERY_TABLE: es la plantilla de tabla de BigQuery. Por ejemplo, {_metadata_schema}_{_metadata_table}_log

    API

    Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
    
              "inputFilePattern": "GCS_FILE_PATH",
              "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
              "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
              "outputDatasetTemplate": "BIGQUERY_DATASET",
              "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
              "outputTableNameTemplate": "BIGQUERY_TABLE_log"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery",
       }
    }
      

    Reemplaza lo siguiente:

    • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
    • JOB_NAME: Es el nombre del trabajo que elijas
    • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: es la ruta de acceso de Cloud Storage a los datos de Datastream. Por ejemplo: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: es la suscripción de Pub/Sub desde la que se leen los archivos modificados. Por ejemplo: projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: es el nombre de tu conjunto de datos de BigQuery.
    • BIGQUERY_TABLE: es la plantilla de tabla de BigQuery. Por ejemplo, {_metadata_schema}_{_metadata_table}_log

    ¿Qué sigue?