Plantilla de Datastream a MySQL o PostgreSQL (Stream)

La plantilla Datastream a SQL es una canalización de streaming que lee datos de Datastream y los replica en cualquier base de datos MySQL o PostgreSQL. La plantilla lee datos de Cloud Storage mediante notificaciones de Pub/Sub y replica estos datos en tablas de réplica de SQL. Especifica el parámetro gcsPubSubSubscription para leer datos de las notificaciones de Pub/Sub O proporciona el parámetro inputFilePattern para leer datos directamente de los archivos de Cloud Storage.

La plantilla no admite el lenguaje de definición de datos (DDL) y espera que todas las tablas ya estén en la base de datos. La replicación usa transformaciones con estado de Dataflow para filtrar los datos obsoletos y asegurar la coherencia de los datos desordenados. Por ejemplo, si ya ha pasado una versión más reciente de una fila, se ignora una versión de esa fila que llegue más tarde. El lenguaje de manipulación de datos (DML) que se ejecuta intenta replicar perfectamente los datos de origen en los de destino. Las instrucciones de DML ejecutadas siguen estas reglas:

  • Si existe una clave principal, las operaciones de inserción y actualización usan la sintaxis upsert. (es decir, INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE).
  • Si existen claves principales, las eliminaciones se replican como DML de eliminación.
  • Si no existe ninguna clave principal, las operaciones de inserción y actualización se insertan en la tabla.
  • Si no hay claves principales, las eliminaciones se ignoran.

Si usas las utilidades de Oracle a PostgreSQL, añade ROWID en SQL como clave principal cuando no haya ninguna.

Requisitos del flujo de procesamiento

  • Un flujo de Datastream que esté listo para replicar datos o que ya lo esté haciendo.
  • Las notificaciones de Pub/Sub de Cloud Storage están habilitadas para los datos de Datastream.
  • Se ha inicializado una base de datos PostgreSQL con el esquema necesario.
  • Se ha configurado el acceso de red entre los trabajadores de Dataflow y PostgreSQL.

Parámetros de plantilla

Parámetros obligatorios

  • inputFilePattern la ubicación de los archivos de Datastream en Cloud Storage que se van a replicar. Normalmente, esta ubicación de archivo es la ruta raíz de la emisión.
  • databaseHost el host SQL al que conectarse.
  • databaseUser el usuario de SQL con todos los permisos necesarios para escribir en todas las tablas de la replicación.
  • databasePassword la contraseña del usuario de SQL.

Parámetros opcionales

  • gcsPubSubSubscription la suscripción de Pub/Sub con notificaciones de archivos de Datastream. Por ejemplo, projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>.
  • inputFileFormat formato del archivo de salida generado por Datastream. Por ejemplo, avro o json. El valor predeterminado es avro.
  • streamName el nombre o la plantilla de la emisión de la que se va a obtener información del esquema. El valor predeterminado es {_metadata_stream}.
  • rfcStartDateTime la fecha y hora de inicio que se usa para obtener datos de Cloud Storage (https://tools.ietf.org/html/rfc3339). El valor predeterminado es 1970-01-01T00:00:00.00Z.
  • dataStreamRootUrl URL raíz de la API Datastream. El valor predeterminado es https://datastream.googleapis.com/.
  • databaseType: el tipo de base de datos en el que se va a escribir (por ejemplo, Postgres). El valor predeterminado es postgres.
  • databasePort el puerto de la base de datos SQL al que conectarse. El valor predeterminado es 5432.
  • databaseName el nombre de la base de datos SQL a la que conectarse. El valor predeterminado es postgres.
  • schemaMap un mapa de claves y valores que se usa para indicar los cambios de nombre del esquema (por ejemplo, old_name:new_name, CaseError:case_error). El valor predeterminado es una cadena vacía.
  • customConnectionString cadena de conexión opcional que se usará en lugar de la cadena de base de datos predeterminada.
  • numThreads determina el paralelismo de claves del paso de formato a DML. En concreto, el valor se transfiere a Reshuffle.withNumBuckets. El valor predeterminado es 100.
  • databaseLoginTimeout tiempo de espera en segundos para los intentos de inicio de sesión en la base de datos. De esta forma, se evitan los bloqueos de conexión cuando varios trabajadores intentan conectarse simultáneamente.
  • orderByIncludesIsDeleted el orden de las configuraciones de los datos debe incluir la priorización de los datos que no se han eliminado. Valor predeterminado: false.

Ejecutar la plantilla

Consola

  1. Ve a la página Crear tarea a partir de plantilla de Dataflow.
  2. Ir a Crear tarea a partir de plantilla
  3. En el campo Nombre de la tarea, introduce un nombre único.
  4. Opcional: En Endpoint regional, seleccione un valor en el menú desplegable. La región predeterminada es us-central1.

    Para ver una lista de las regiones en las que puedes ejecutar una tarea de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de flujo de datos, seleccione the Cloud Datastream to SQL template.
  6. En los campos de parámetros proporcionados, introduzca los valores de los parámetros.
  7. Haz clic en Ejecutar trabajo.

gcloud

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

Haz los cambios siguientes:

  • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
  • JOB_NAME: un nombre de trabajo único que elijas
  • REGION_NAME: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: la ruta de Cloud Storage a los datos de Datastream. Por ejemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: la suscripción de Pub/Sub para leer los archivos modificados. Por ejemplo: projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: la IP de tu host SQL.
  • DATABASE_USER: tu usuario de SQL.
  • DATABASE_PASSWORD: tu contraseña de SQL.

API

Para ejecutar la plantilla mediante la API REST, envía una solicitud HTTP POST. Para obtener más información sobre la API y sus ámbitos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL",
   }
}
  

Haz los cambios siguientes:

  • PROJECT_ID: el ID del proyecto Google Cloud en el que quieres ejecutar la tarea de Dataflow
  • JOB_NAME: un nombre de trabajo único que elijas
  • LOCATION: la región en la que quieras desplegar tu trabajo de Dataflow. Por ejemplo, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: la ruta de Cloud Storage a los datos de Datastream. Por ejemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: la suscripción de Pub/Sub para leer los archivos modificados. Por ejemplo: projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: la IP de tu host SQL.
  • DATABASE_USER: tu usuario de SQL.
  • DATABASE_PASSWORD: tu contraseña de SQL.

Siguientes pasos