Plantilla de Datastream a BigQuery (Stream)

La plantilla de Datastream a BigQuery es una canalización de streaming que lee datos de Datastream y los replica en BigQuery. La plantilla lee datos de Cloud Storage mediante notificaciones de Pub/Sub y los replica en una tabla de almacenamiento provisional de BigQuery con particiones por tiempo. Después de la replicación, la plantilla ejecuta un MERGE en BigQuery para insertar o actualizar todos los cambios de la captura de datos de cambios (CDC) en una réplica de la tabla de origen. Especifica el parámetro gcsPubSubSubscription para leer datos de las notificaciones de Pub/Sub O proporciona el parámetro inputFilePattern para leer datos directamente de los archivos de Cloud Storage.

La plantilla se encarga de crear y actualizar las tablas de BigQuery gestionadas por la replicación. Cuando se requiere el lenguaje de definición de datos (DDL), una retrollamada a Datastream extrae el esquema de la tabla de origen y lo traduce a tipos de datos de BigQuery. Entre las operaciones admitidas se incluyen las siguientes:

  • Las tablas se crean a medida que se insertan los datos.
  • Se añaden nuevas columnas a las tablas de BigQuery con valores iniciales nulos.
  • Las columnas eliminadas se ignoran en BigQuery y los valores futuros son nulos.
  • Las columnas renombradas se añaden a BigQuery como columnas nuevas.
  • Los cambios de tipo no se propagan a BigQuery.

Se recomienda ejecutar este flujo de procesamiento con el modo de streaming al menos una vez, ya que la plantilla elimina los duplicados cuando combina los datos de una tabla temporal de BigQuery con la tabla principal de BigQuery. Este paso de la canalización significa que no hay ninguna ventaja adicional en usar el modo de streaming exactamente una vez.

Requisitos del flujo de procesamiento

  • Un flujo de Datastream que esté listo para replicar datos o que ya lo esté haciendo.
  • Las notificaciones de Pub/Sub de Cloud Storage están habilitadas para los datos de Datastream.
  • Se crean conjuntos de datos de destino de BigQuery y se concede acceso de administrador a la cuenta de servicio de Compute Engine.
  • Es necesario que la tabla de origen tenga una clave principal para que se cree la tabla de réplica de destino.
  • Una base de datos de origen MySQL u Oracle. Las bases de datos PostgreSQL y SQL Server no son compatibles.

Parámetros de plantilla

Parámetros obligatorios

  • inputFilePattern la ubicación del archivo de salida de Datastream en Cloud Storage, con el formato gs://<BUCKET_NAME>/<ROOT_PATH>/.
  • inputFileFormat formato de los archivos de salida que genera Datastream. Los valores permitidos son avro y json. El valor predeterminado es avro.
  • gcsPubSubSubscription la suscripción de Pub/Sub que usa Cloud Storage para notificar a Dataflow que hay archivos nuevos disponibles para procesar, con el formato projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
  • outputStagingDatasetTemplate nombre del conjunto de datos que contiene las tablas de almacenamiento provisional. Este parámetro admite plantillas, como {_metadata_dataset}_log o my_dataset_log. Normalmente, este parámetro es el nombre de un conjunto de datos. El valor predeterminado es {_metadata_dataset}. Nota: En el caso de las fuentes de MySQL, el nombre de la base de datos se asigna a {_metadata_schema} en lugar de a {_metadata_dataset}.
  • outputDatasetTemplate nombre del conjunto de datos que contiene las tablas de réplica. Este parámetro admite plantillas, como {_metadata_dataset} o my_dataset. Normalmente, este parámetro es el nombre de un conjunto de datos. El valor predeterminado es {_metadata_dataset}. Nota: En el caso de las fuentes de MySQL, el nombre de la base de datos se asigna a {_metadata_schema} en lugar de a {_metadata_dataset}.
  • deadLetterQueueDirectory la ruta que usa Dataflow para escribir la salida de la cola de mensajes fallidos. Esta ruta no debe ser la misma que la de la salida del archivo de flujo de datos. El valor predeterminado es empty.

Parámetros opcionales

  • streamName: el nombre o la plantilla de la emisión de la que se va a obtener la información del esquema. El valor predeterminado es {_metadata_stream}. El valor predeterminado suele ser suficiente.
  • rfcStartDateTime la fecha y hora de inicio que se usará para obtener datos de Cloud Storage (https://tools.ietf.org/html/rfc3339). El valor predeterminado es 1970-01-01T00:00:00.00Z.
  • fileReadConcurrency número de archivos de DataStream simultáneos que se van a leer. El valor predeterminado es 10.
  • outputProjectId el ID del proyecto de Google Cloud que contiene los conjuntos de datos de BigQuery en los que se van a generar los datos. El valor predeterminado de este parámetro es el proyecto en el que se ejecuta la canalización de Dataflow.
  • outputStagingTableNameTemplate la plantilla que se va a usar para asignar nombres a las tablas de almacenamiento provisional. Por ejemplo, {_metadata_table}. El valor predeterminado es {_metadata_table}_log.
  • outputTableNameTemplate la plantilla que se va a usar para el nombre de las tablas de réplica. Por ejemplo, {_metadata_table}. El valor predeterminado es {_metadata_table}.
  • ignoreFields campos separados por comas que se deben ignorar en BigQuery. El valor predeterminado es _metadata_stream,_metadata_schema,_metadata_table,_metadata_source,_metadata_tx_id,_metadata_dlq_reconsumed,_metadata_primary_keys,_metadata_error,_metadata_retry_count. Por ejemplo, _metadata_stream,_metadata_schema.
  • mergeFrequencyMinutes número de minutos entre las combinaciones de una tabla determinada. El valor predeterminado es 5.
  • dlqRetryMinutes número de minutos entre reintentos de DLQ. El valor predeterminado es 10.
  • dataStreamRootUrl la URL raíz de la API DataStream. El valor predeterminado es https://datastream.googleapis.com/.
  • applyMerge indica si se deben inhabilitar las consultas MERGE en el trabajo. El valor predeterminado es true.
  • mergeConcurrency el número de consultas MERGE simultáneas de BigQuery. Solo tiene efecto cuando applyMerge se define como true. El valor predeterminado es 30.
  • partitionRetentionDays el número de días que se usará para la conservación de particiones al ejecutar combinaciones de BigQuery. El valor predeterminado es 1.
  • useStorageWriteApiAtLeastOnce este parámetro solo tiene efecto si Use BigQuery Storage Write API está habilitado. Si es true, se usa la semántica de al menos una vez para la API Storage Write. De lo contrario, se usa la semántica de entrega única. El valor predeterminado es false.
  • javascriptTextTransformGcsPath el URI de Cloud Storage del archivo .js que define la función de JavaScript definida por el usuario (UDF) que se va a usar. Por ejemplo, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName nombre de la función definida por el usuario (UDF) de JavaScript que se va a usar. Por ejemplo, si el código de la función de JavaScript es myTransform(inJson) { /*...do stuff...*/ }, el nombre de la función es myTransform. Para ver ejemplos de UDFs de JavaScript, consulta Ejemplos de UDFs (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes especifica la frecuencia con la que se vuelve a cargar la FDU, en minutos. Si el valor es superior a 0, Dataflow comprueba periódicamente el archivo de la función definida por el usuario en Cloud Storage y vuelve a cargar la función si se modifica el archivo. Este parámetro le permite actualizar la función definida por el usuario mientras se ejecuta la canalización, sin necesidad de reiniciar el trabajo. Si el valor es 0, se inhabilita la recarga de las funciones definidas por el usuario. El valor predeterminado es 0.
  • pythonTextTransformGcsPath patrón de ruta de Cloud Storage del código Python que contiene las funciones definidas por el usuario. Por ejemplo, gs://your-bucket/your-transforms/*.py.
  • pythonRuntimeVersion la versión del tiempo de ejecución que se va a usar en esta función definida por el usuario de Python.
  • pythonTextTransformFunctionName el nombre de la función a la que se llamará desde tu archivo JavaScript. Usa solo letras, dígitos y guiones bajos. Por ejemplo, transform_udf1.
  • runtimeRetries el número de veces que se volverá a intentar ejecutar un tiempo de ejecución antes de que falle. El valor predeterminado es 5.
  • useStorageWriteApi si es true, la canalización usa la API Storage Write de BigQuery (https://cloud.google.com/bigquery/docs/write-api). El valor predeterminado es false. Para obtener más información, consulta el artículo sobre cómo usar la API Storage Write (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams cuando se usa la API Storage Write, especifica el número de flujos de escritura. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false, debe definir este parámetro. El valor predeterminado es 0.
  • storageWriteApiTriggeringFrequencySec cuando se usa la API Storage Write, especifica la frecuencia de activación en segundos. Si useStorageWriteApi es true y useStorageWriteApiAtLeastOnce es false, debe definir este parámetro.

Función definida por el usuario

También puedes ampliar esta plantilla escribiendo una función definida por el usuario (UDF). La plantilla llama a la función definida por el usuario para cada elemento de entrada. Las cargas útiles de los elementos se serializan como cadenas JSON. Para obtener más información, consulta el artículo sobre cómo crear funciones definidas por el usuario para plantillas de Dataflow.

Especificación de la función

La función definida por el usuario tiene las siguientes especificaciones:

  • Entrada: los datos de CDC, serializados como una cadena JSON.
  • Salida: una cadena JSON que coincide con el esquema de la tabla de destino de BigQuery.
  • Ejecutar la plantilla

    Consola

    1. Ve a la página Crear tarea a partir de plantilla de Dataflow.
    2. Ir a Crear tarea a partir de plantilla
    3. En el campo Nombre de la tarea, introduce un nombre único.
    4. Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es us-central1.

      Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.

    5. En el menú desplegable Plantilla de flujo de datos, seleccione the Datastream to BigQuery template.
    6. En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
    7. Opcional: Para cambiar del procesamiento una sola vez al modo de streaming al menos una vez, selecciona Al menos una vez.
    8. Haz clic en Ejecutar trabajo.

    gcloud

    En tu shell o terminal, ejecuta la plantilla:

    gcloud dataflow flex-template run JOB_NAME \
        --project=PROJECT_ID \
        --region=REGION_NAME \
        --enable-streaming-engine \
        --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_BigQuery \
        --parameters \
    inputFilePattern=GCS_FILE_PATH,\
    gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
    outputStagingDatasetTemplate=BIGQUERY_DATASET,\
    outputDatasetTemplate=BIGQUERY_DATASET,\
    outputStagingTableNameTemplate=BIGQUERY_TABLE,\
    outputTableNameTemplate=BIGQUERY_TABLE_log
      

    Haz los cambios siguientes:

    • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
    • JOB_NAME: un nombre de trabajo único que elijas
    • REGION_NAME: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: la ruta de Cloud Storage a los datos de Datastream. Por ejemplo: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: la suscripción de Pub/Sub para leer los archivos modificados. Por ejemplo: projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: el nombre del conjunto de datos de BigQuery.
    • BIGQUERY_TABLE: tu plantilla de tabla de BigQuery. Por ejemplo, {_metadata_schema}_{_metadata_table}_log

    API

    Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
    {
       "launch_parameter": {
          "jobName": "JOB_NAME",
          "parameters": {
    
              "inputFilePattern": "GCS_FILE_PATH",
              "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
              "outputStagingDatasetTemplate": "BIGQUERY_DATASET",
              "outputDatasetTemplate": "BIGQUERY_DATASET",
              "outputStagingTableNameTemplate": "BIGQUERY_TABLE",
              "outputTableNameTemplate": "BIGQUERY_TABLE_log"
          },
          "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_BigQuery",
       }
    }
      

    Haz los cambios siguientes:

    • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
    • JOB_NAME: un nombre de trabajo único que elijas
    • LOCATION: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
    • VERSION: the version of the template that you want to use

      You can use the following values:

    • GCS_FILE_PATH: la ruta de Cloud Storage a los datos de Datastream. Por ejemplo: gs://bucket/path/to/data/
    • GCS_SUBSCRIPTION_NAME: la suscripción de Pub/Sub para leer los archivos modificados. Por ejemplo: projects/my-project-id/subscriptions/my-subscription-id.
    • BIGQUERY_DATASET: el nombre del conjunto de datos de BigQuery.
    • BIGQUERY_TABLE: tu plantilla de tabla de BigQuery. Por ejemplo, {_metadata_schema}_{_metadata_table}_log

    Siguientes pasos