La plantilla de transmisión de datos a SQL es una canalización de transmisión que lee datos de Datastream y los replica en cualquier base de datos de MySQL o PostgreSQL. La plantilla lee los datos de Cloud Storage mediante notificaciones de Pub/Sub y los replica en tablas de réplica de PostgreSQL.
La plantilla no es compatible con el lenguaje de definición de datos (DDL) y espera que todas las tablas ya existan en la base de datos. La replicación usa transformaciones con estado de Dataflow para filtrar los datos inactivos y garantizar la coherencia dentro de los datos desordenados. Por ejemplo, si ya se pasó una versión más reciente de una fila, se ignorará una versión tardía de esa fila. El lenguaje de manipulación de datos (DML) que se ejecuta es el mejor intento de replicar perfectamente los datos de origen o destino. Las declaraciones DML ejecutadas siguen las siguientes reglas:
- Si existe una clave primaria, las operaciones de inserción y actualización usan una sintaxis de upsert (es decir,
INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE
). - Si las claves primarias existen, las eliminaciones se replican como un DML borrado.
- Si no existe una clave primaria, se insertan las operaciones de inserción y actualización en la tabla.
- Si no existen claves primarias, se ignoran las eliminaciones.
Si usas las utilidades de Oracle para Postgres, agrega ROWID
en SQL como la clave primaria cuando no exista ninguna.
Requisitos de la canalización
- Una transmisión de Datastream que está lista para replicar los datos o ya los está replicando.
- Las notificaciones de Pub/Sub de Cloud Storage están habilitadas para los datos de Datastream.
- Se propagó una base de datos de PostgreSQL con el esquema requerido.
- Se configura el acceso a la red entre los trabajadores de Dataflow y PostgreSQL.
Parámetros de la plantilla
Parámetros obligatorios
- inputFilePattern: La ubicación del archivo para que los archivos Datastream se repliquen en Cloud Storage. Esta ubicación suele ser la ruta de acceso raíz de la transmisión.
- databaseHost: El host de SQL para conectarte.
- databaseUser: El usuario de SQL con todos los permisos necesarios para escribir en todas las tablas en la replicación.
- databasePassword: La contraseña para el usuario de SQL.
Parámetros opcionales
- gcsPubSubSubscription: La suscripción de Pub/Sub con las notificaciones de archivos de Datastream. Por ejemplo,
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>
- inputFileFormat: El formato del archivo de salida que produce Datastream (opcional). Por ejemplo,
avro
ojson
. La configuración predeterminada esavro
. - streamName: El nombre o la plantilla del flujo que se consultará para obtener la información del esquema. El valor predeterminado es
{_metadata_stream}
. - rfcStartDateTime: La fecha y hora de inicio que se usa para recuperar desde Cloud Storage (https://tools.ietf.org/html/rfc3339). El valor predeterminado es: 1970-01-01T00:00:00.00Z.
- dataStreamRootUrl: URL raíz de la API de Datastream. La configuración predeterminada es https://datastream.googleapis.com/.
- databaseType: El tipo de base de datos en la que se escribirá (por ejemplo, Postgres). La configuración predeterminada es: postgres.
- databasePort: El puerto de la base de datos de SQL al que se realizará la conexión. El valor predeterminado es
5432
. - databaseName: El nombre de la base de datos de SQL a la que se realizará la conexión. El valor predeterminado es
postgres
. - schemaMap: Un mapa de claves o valores usados para dictar los cambios de nombre del esquema (es decir, old_name:new_name,CaseError:case_error). La configuración predeterminada es vacía.
- customConnectionString: La string de conexión opcional que se usará en lugar de la cadena de la base de datos predeterminada.
Ejecuta la plantilla
Console
- Ve a la página Crear un trabajo a partir de una plantilla de Dataflow. Ir a Crear un trabajo a partir de una plantilla
- En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
- Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es
us-central1
.Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.
- En el menú desplegable Plantilla de Dataflow, selecciona the Cloud Datastream to SQL template.
- En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
- Haga clic en Ejecutar trabajo.
gcloud
En tu shell o terminal, ejecuta la plantilla:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --enable-streaming-engine \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \ --parameters \ inputFilePattern=GCS_FILE_PATH,\ gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\ databaseHost=DATABASE_HOST,\ databaseUser=DATABASE_USER,\ databasePassword=DATABASE_PASSWORD
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasREGION_NAME
: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH
: es la ruta de acceso de Cloud Storage a los datos de Datastream. Por ejemplo:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: es la suscripción de Pub/Sub desde la que se leen los archivos modificados. Por ejemplo:projects/my-project-id/subscriptions/my-subscription-id
.DATABASE_HOST
: Es la IP del host de SQL.DATABASE_USER
: Es tu usuario de SQL.DATABASE_PASSWORD
: Es tu contraseña de SQL.
API
Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME", "databaseHost": "DATABASE_HOST", "databaseUser": "DATABASE_USER", "databasePassword": "DATABASE_PASSWORD" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL", } }
Reemplaza lo siguiente:
PROJECT_ID
: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.JOB_NAME
: Es el nombre del trabajo que elijasLOCATION
: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo,us-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH
: es la ruta de acceso de Cloud Storage a los datos de Datastream. Por ejemplo:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: es la suscripción de Pub/Sub desde la que se leen los archivos modificados. Por ejemplo:projects/my-project-id/subscriptions/my-subscription-id
.DATABASE_HOST
: Es la IP del host de SQL.DATABASE_USER
: Es tu usuario de SQL.DATABASE_PASSWORD
: Es tu contraseña de SQL.
¿Qué sigue?
- Obtén información sobre las plantillas de Dataflow.
- Consulta la lista de plantillas que proporciona Google.