Plantilla de Datastream a BigQuery (transmisión)

La plantilla de Datastream a BigQuery es una canalización de transmisión que lee datos de Datastream y los replica en BigQuery. La plantilla lee los datos de Cloud Storage mediante notificaciones de Pub/Sub y los replica en una tabla de etapa de pruebas de BigQuery particionada por tiempo. Después de la replicación, la plantilla ejecuta una MERGE en BigQuery para actualizar todos los cambios de captura de datos modificados (CDC) en una réplica de la tabla de origen.

La plantilla controla la creación y la actualización de las tablas de BigQuery que administra la replicación. Cuando se requiere un lenguaje de definición de datos (DDL), una devolución de llamada a Datastream extrae el esquema de la tabla de origen y lo traduce a los tipos de datos de BigQuery. Las operaciones admitidas incluyen las siguientes:

  • Las tablas nuevas se crean a medida que se insertan los datos.
  • Se agregan columnas nuevas a las tablas de BigQuery con valores iniciales nulos.
  • Las columnas descartadas se ignoran en BigQuery y los valores futuros son nulos.
  • Las columnas cuyos nombres se han cambiado se agregan a BigQuery como columnas nuevas.
  • Los cambios de tipo no se propagan a BigQuery.

Se recomienda ejecutar esta canalización con el modo de transmisión al menos una vez, ya que la plantilla realiza la anulación de duplicación cuando combina datos de una tabla de BigQuery temporal con la principal Tabla de BigQuery. Este paso en la canalización significa que no hay beneficios adicionales en usar el modo de transmisión “exactamente una vez”.

Requisitos de la canalización

  • Una transmisión de Datastream que está lista para replicar los datos o ya los está replicando.
  • Las notificaciones de Pub/Sub de Cloud Storage están habilitadas para los datos de Datastream.
  • Se crean los conjuntos de datos de destino de BigQuery y se les otorgó acceso de administrador a la cuenta de servicio de Compute Engine.
  • En la tabla de origen, se necesita una clave primaria para crear la réplica de destino.
  • Una base de datos de origen de MySQL o de Oracle. Las bases de datos de PostgreSQL no son compatibles.

Parámetros de la plantilla

Parámetro Descripción
inputFilePattern La ubicación de los archivos de Datastream en Cloud Storage para replicar. Esta ubicación suele ser la ruta de acceso raíz de la transmisión.
gcsPubSubSubscription La suscripción de Pub/Sub con las notificaciones de archivos de Datastream. Por ejemplo, projects/my-project-id/subscriptions/my-subscription-id.
inputFileFormat El formato del archivo de salida que produce Datastream. Por ejemplo: avro,json. Valor predeterminado, avro.
outputStagingDatasetTemplate El nombre de un conjunto de datos existente para contener tablas de etapa de pruebas. Puedes incluir la plantilla {_metadata_dataset} como marcador de posición que se reemplazará por el nombre del conjunto de datos o esquema de origen (p. ej., {_metadata_dataset}_log).
outputDatasetTemplate El nombre de un conjunto de datos existente que contiene tablas de réplica. Puedes incluir la plantilla {_metadata_dataset} como marcador de posición que se reemplazará por el nombre del conjunto de datos o esquema de origen (p. ej., {_metadata_dataset}).
deadLetterQueueDirectory La ruta de acceso del archivo para almacenar los mensajes no procesados con el motivo por el que no se pudieron procesar. El valor predeterminado es un directorio en la ubicación temporal del trabajo de Dataflow. El valor predeterminado es suficiente en la mayoría de las condiciones.
outputStagingTableNameTemplate Opcional: La plantilla para el nombre de las tablas de etapa de pruebas. El valor predeterminado es {_metadata_table}_log. Si replicas varios esquemas, la sugerencia es {_metadata_schema}_{_metadata_table}_log.
outputTableNameTemplate Opcional: La plantilla para el nombre de las tablas de réplica. Valor predeterminado, {_metadata_table}. Si replicas varios esquemas, la sugerencia es {_metadata_schema}_{_metadata_table}.
outputProjectId Opcional: Proyecto para los conjuntos de datos de BigQuery en los que se deben generar datos. El valor predeterminado para este parámetro es el proyecto en el que se ejecuta la canalización de Dataflow.
streamName Opcional: El nombre o la plantilla del flujo que se consultará para obtener la información del esquema. Valor predeterminado, {_metadata_stream}.
mergeFrequencyMinutes Opcional: La cantidad de minutos entre combinaciones para una tabla determinada. Valor predeterminado, 5.
dlqRetryMinutes Opcional: La cantidad de minutos entre reintentos de la cola de mensajes no entregados (DLQ). Valor predeterminado, 10.
javascriptTextTransformGcsPath El URI de Cloud Storage del archivo .js que define la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName El nombre de la función definida por el usuario (UDF) de JavaScript que deseas usar (opcional). Por ejemplo, si el código de tu función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDF de JavaScript, consulta Ejemplos de UDF.
useStorageWriteApi Opcional: Si es true, la canalización usa la API de BigQuery Storage Write. El valor predeterminado es false. Para obtener más información, consulta Usa la API de BigQuery Storage Write.
useStorageWriteApiAtLeastOnce Opcional: Cuando usas la API de Storage Write, se especifica la semántica de escritura (opcional). Para usar una semántica de al menos una vez, establece este parámetro en true. Para usar una semántica de una y solo una vez, configura el parámetro en false. Este parámetro se aplica solo cuando useStorageWriteApi es true. El valor predeterminado es false.
numStorageWriteApiStreams Opcional: Cuando usas la API de Storage Write, se especifica la cantidad de transmisiones de escritura. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false, debes configurar este parámetro.
storageWriteApiTriggeringFrequencySec Opcional: Cuando se usa la API de Storage Write, se especifica la frecuencia de activación en segundos. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false, debes configurar este parámetro.
applyMerge Opcional: Especifica si la plantilla ejecuta una sentencia MERGE en BigQuery después de replicar los datos en la tabla de etapa de pruebas. Predeterminada: true.
fileReadConcurrency Opcional: La cantidad de archivos de Datastream que se leerán de forma simultánea. Cantidad predeterminada: 10.
mergeConcurrency Opcional: La cantidad de sentencias MERGE simultáneas de BigQuery. Cantidad predeterminada: 30.
partitionRetentionDays Opcional: La cantidad de días que se usarán para la retención de particiones cuando se ejecuten sentencias MERGE de BigQuery. Cantidad predeterminada: 1
rfcStartDateTime Opcional: La hora de inicio para leer archivos desde Cloud Storage como un valor de fecha y hora RFC 3339. Valor predeterminado: 1970-01-01T00:00:00.00Z.

Función definida por el usuario

Para extender esta plantilla, puedes escribir una función definida por el usuario (UDF). La plantilla llama a la UDF para cada elemento de entrada. Las cargas útiles de elementos se serializan como cadenas JSON. Para obtener más información, consulta Crea funciones definidas por el usuario para plantillas de Dataflow.

Especificación de la función

La UDF tiene la siguiente especificación:

  • Entrada: los datos de CDC, serializados como una cadena JSON.
  • Resultado: Una cadena JSON que coincide con el esquema de la tabla de destino de BigQuery.
  • Ejecuta la plantilla

    Consola

    1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
    2. Ir a Crear un trabajo a partir de una plantilla
    3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
    4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

      Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

    5. En el menú desplegable Plantilla de Dataflow, selecciona the Datastream to BigQuery template.
    6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
    7. Opcional: Para cambiar del procesamiento “exactamente una vez” al modo de transmisión al menos una vez, selecciona Al menos una vez.
    8. Haz clic en Ejecutar trabajo.

    gcloud

    En tu shell o terminal, ejecuta la plantilla:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --enable-streaming-engine \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \
        --parameters \
    inputFilePattern=GCS_FILE_PATH,\
    gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
    outputStagingDatasetTemplate=BIGQUERY_DATASET,\
    outputDatasetTemplate=BIGQUERY_DATASET,\
    outputStagingTableNameTemplate=BIGQUERY_TABLE,\
    outputTableNameTemplate=BIGQUERY_TABLE_log
      

    Reemplaza lo siguiente:

    • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
    • JOB_NAME: Es el nombre del trabajo que elijas
    • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: es la ruta de acceso de Cloud Storage a los datos de Datastream. Por ejemplo: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: es la suscripción de Pub/Sub desde la que se leen los archivos modificados. Por ejemplo: projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: es el nombre de tu conjunto de datos de BigQuery.
    • BIGQUERY_TABLE: es la plantilla de tabla de BigQuery. Por ejemplo, {_metadata_schema}_{_metadata_table}_log

    API

    Para ejecutar la plantilla con la API de REST, envía una solicitud HTTP POST. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
    
              "inputFilePattern": "GCS_FILE_PATH",
              "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
              "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
              "outputDatasetTemplate": "BIGQUERY_DATASET",
              "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
              "outputTableNameTemplate": "BIGQUERY_TABLE_log"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery",
       }
    }
      

    Reemplaza lo siguiente:

    • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
    • JOB_NAME: Es el nombre del trabajo que elijas
    • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: es la ruta de acceso de Cloud Storage a los datos de Datastream. Por ejemplo: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: es la suscripción de Pub/Sub desde la que se leen los archivos modificados. Por ejemplo: projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: es el nombre de tu conjunto de datos de BigQuery.
    • BIGQUERY_TABLE: es la plantilla de tabla de BigQuery. Por ejemplo, {_metadata_schema}_{_metadata_table}_log

    ¿Qué sigue?