Modello Datastream per MySQL o PostgreSQL (stream)

Il modello Datastream to SQL è una pipeline di streaming che legge i dati di Datastream e li replica in qualsiasi database MySQL o PostgreSQL. Il modello legge i dati da Cloud Storage utilizzando le notifiche Pub/Sub e li replica nelle tabelle di replica SQL.

Il modello non supporta il linguaggio di definizione dei dati (DDL) e presuppone che tutte le tabelle esistano già nel database. La replica utilizza le trasformazioni con stato di Dataflow per filtrare i dati obsoleti e garantire la coerenza dei dati non in ordine. Ad esempio, se è già passata una versione più recente di una riga, una versione in ritardo della riga viene ignorata. Il data manipulation language (DML) che viene eseguito è il miglior tentativo di replicare perfettamente i dati di origine nei dati di destinazione. Le istruzioni DML eseguite seguono le seguenti regole:

  • Se esiste una chiave primaria, le operazioni di inserimento e aggiornamento utilizzano la sintassi upsert (ad es. INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE).
  • Se esistono chiavi principali, le eliminazioni vengono replicate come DML di eliminazione.
  • Se non esiste una chiave primaria, nella tabella vengono inserite sia le operazioni di inserimento sia quelle di aggiornamento.
  • Se non esistono chiavi principali, le eliminazioni vengono ignorate.

Se utilizzi le utilità Oracle to Postgres, aggiungi ROWID in SQL come chiave primaria se non esiste.

Requisiti della pipeline

  • Uno stream DataStream pronto per la replica dei dati o che la sta già eseguendo.
  • Le notifiche Pub/Sub di Cloud Storage sono abilitate per i dati di Datastream.
  • Un database PostgreSQL è stato inizializzato con lo schema richiesto.
  • L'accesso alla rete tra i worker di Dataflow e PostgreSQL è configurato.

Parametri del modello

Parametri obbligatori

  • inputFilePattern : la posizione del file per i file Datastream in Cloud Storage da replicare. Questa posizione del file è in genere il percorso principale dello stream.
  • databaseHost : l'host SQL a cui connettersi.
  • databaseUser : l'utente SQL con tutte le autorizzazioni necessarie per scrivere in tutte le tabelle in replica.
  • databasePassword : la password per l'utente SQL.

Parametri facoltativi

  • gcsPubSubSubscription : l'abbonamento Pub/Sub con notifiche dei file Datastream. Ad esempio, projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>.
  • inputFileFormat : il formato del file di output prodotto da Datastream. Ad esempio, avro o json. Il valore predefinito è avro.
  • streamName : il nome o il modello dello stream da sottoporre a polling per le informazioni sullo schema. Il valore predefinito è {_metadata_stream}.
  • rfcStartDateTime : la data e l'ora di inizio utilizzate per il recupero da Cloud Storage (https://tools.ietf.org/html/rfc3339). Il valore predefinito è 1970-01-01T00:00:00.00Z.
  • dataStreamRootUrl : URL principale dell'API Datastream. Valore predefinito: https://datastream.googleapis.com/.
  • databaseType : il tipo di database in cui scrivere (ad esempio Postgres). Valore predefinito: postgres.
  • databasePort : la porta del database SQL a cui connettersi. Il valore predefinito è 5432.
  • databaseName : il nome del database SQL a cui connettersi. Il valore predefinito è postgres.
  • schemaMap : una mappa di chiavi/valori utilizzata per specificare le modifiche ai nomi degli schemi (ad es. old_name:new_name,CaseError:case_error). Il valore predefinito è vuoto.
  • customConnectionString : stringa di connessione facoltativa che verrà utilizzata al posto della stringa del database predefinita.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the Cloud Datastream to SQL template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: il percorso Cloud Storage per i dati di Datastream. Ad esempio: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: l'abbonamento Pub/Sub da cui leggere i file modificati. Ad esempio: projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: l'IP host SQL.
  • DATABASE_USER: il tuo utente SQL.
  • DATABASE_PASSWORD: la tua password SQL.

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL",
   }
}
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: il percorso Cloud Storage per i dati di Datastream. Ad esempio: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: l'abbonamento Pub/Sub da cui leggere i file modificati. Ad esempio: projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: l'IP host SQL.
  • DATABASE_USER: il tuo utente SQL.
  • DATABASE_PASSWORD: la tua password SQL.

Passaggi successivi