Il modello Datastream to SQL è una pipeline di streaming che legge i dati di Datastream e li replica in qualsiasi database MySQL o PostgreSQL. Il modello legge i dati da Cloud Storage utilizzando le notifiche Pub/Sub e li replica nelle tabelle di replica SQL.
Il modello non supporta il linguaggio di definizione dei dati (DDL) e presuppone che tutte le tabelle esistano già nel database. La replica utilizza le trasformazioni con stato di Dataflow per filtrare i dati obsoleti e garantire la coerenza dei dati non in ordine. Ad esempio, se è già passata una versione più recente di una riga, una versione in ritardo della riga viene ignorata. Il data manipulation language (DML) che viene eseguito è il miglior tentativo di replicare perfettamente i dati di origine nei dati di destinazione. Le istruzioni DML eseguite seguono le seguenti regole:
- Se esiste una chiave primaria, le operazioni di inserimento e aggiornamento utilizzano la sintassi upsert
(ad es.
INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE
). - Se esistono chiavi principali, le eliminazioni vengono replicate come DML di eliminazione.
- Se non esiste una chiave primaria, nella tabella vengono inserite sia le operazioni di inserimento sia quelle di aggiornamento.
- Se non esistono chiavi principali, le eliminazioni vengono ignorate.
Se utilizzi le utilità Oracle to Postgres, aggiungi ROWID
in SQL come chiave primaria se non esiste.
Requisiti della pipeline
- Uno stream DataStream pronto per la replica dei dati o che la sta già eseguendo.
- Le notifiche Pub/Sub di Cloud Storage sono attivate per i dati di Datastream.
- Un database PostgreSQL è stato inizializzato con lo schema richiesto.
- L'accesso alla rete tra i worker di Dataflow e PostgreSQL è configurato.
Parametri del modello
Parametri obbligatori
- inputFilePattern: la posizione del file per i file Datastream in Cloud Storage da replicare. Questa posizione del file è in genere il percorso principale dello stream.
- databaseHost: l'host SQL a cui connettersi.
- databaseUser: l'utente SQL con tutte le autorizzazioni necessarie per scrivere in tutte le tabelle in replica.
- databasePassword: la password per l'utente SQL.
Parametri facoltativi
- gcsPubSubSubscription: l'abbonamento Pub/Sub con notifiche dei file Datastream. Ad esempio,
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>
. - inputFileFormat: il formato del file di output prodotto da Datastream. Ad esempio,
avro
ojson
. Il valore predefinito èavro
. - streamName: il nome o il modello dello stream da sottoporre a polling per le informazioni sullo schema. Il valore predefinito è
{_metadata_stream}
. - rfcStartDateTime: la data e l'ora di inizio utilizzate per il recupero da Cloud Storage (https://tools.ietf.org/html/rfc3339). Il valore predefinito è 1970-01-01T00:00:00.00Z.
- dataStreamRootUrl: URL principale dell'API Datastream. Valore predefinito: https://datastream.googleapis.com/.
- databaseType: il tipo di database in cui scrivere (ad esempio Postgres). Valore predefinito: postgres.
- databasePort: la porta del database SQL a cui connettersi. Il valore predefinito è
5432
. - databaseName: il nome del database SQL a cui connettersi. Il valore predefinito è
postgres
. - schemaMap: una mappa di chiavi/valori utilizzata per specificare le modifiche ai nomi degli schemi (ad es. old_name:new_name,CaseError:case_error). Il valore predefinito è vuoto.
- customConnectionString: stringa di connessione facoltativa che verrà utilizzata al posto della stringa del database predefinita.
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.
- Nel menu a discesa Modello di flusso di dati, seleziona the Cloud Datastream to SQL template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --enable-streaming-engine \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \ --parameters \ inputFilePattern=GCS_FILE_PATH,\ gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\ databaseHost=DATABASE_HOST,\ databaseUser=DATABASE_USER,\ databasePassword=DATABASE_PASSWORD
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome di job univoco a tua sceltaREGION_NAME
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH
: il percorso Cloud Storage per i dati di Datastream. Ad esempio:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: l'abbonamento Pub/Sub da cui leggere i file modificati. Ad esempio:projects/my-project-id/subscriptions/my-subscription-id
.DATABASE_HOST
: l'IP host SQL.DATABASE_USER
: il tuo utente SQL.DATABASE_PASSWORD
: la tua password SQL.
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFilePattern": "GCS_FILE_PATH", "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME", "databaseHost": "DATABASE_HOST", "databaseUser": "DATABASE_USER", "databasePassword": "DATABASE_PASSWORD" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL", } }
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome di job univoco a tua sceltaLOCATION
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
VERSION: the version of the template that you want to use
You can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
GCS_FILE_PATH
: il percorso Cloud Storage per i dati di Datastream. Ad esempio:gs://bucket/path/to/data/
GCS_SUBSCRIPTION_NAME
: l'abbonamento Pub/Sub da cui leggere i file modificati. Ad esempio:projects/my-project-id/subscriptions/my-subscription-id
.DATABASE_HOST
: l'IP host SQL.DATABASE_USER
: il tuo utente SQL.DATABASE_PASSWORD
: la tua password SQL.
Passaggi successivi
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.