Modello da Datastream a MySQL o PostgreSQL (stream)

Il modello Datastream to SQL è una pipeline di inserimento flussi che legge i dati di Datastream e li replica in qualsiasi database MySQL o PostgreSQL. Il modello legge i dati da Cloud Storage utilizzando le notifiche Pub/Sub e li replica nelle tabelle di replica SQL.

Il modello non supporta il Data Definition Language (DDL) e prevede che tutte le tabelle esistano già nel database. La replica utilizza le trasformazioni stateful di Dataflow per filtrare i dati inattivi e garantire la coerenza nei dati non in ordine. Ad esempio, se è già passata una versione più recente di una riga, viene ignorata una versione arrivata in ritardo di quella riga. Il Data Manipulation Language (DML) che viene eseguito è il tentativo migliore di replicare perfettamente l'origine in base ai dati di destinazione. Le istruzioni DML eseguite seguono le seguenti regole:

  • Se esiste una chiave primaria, le operazioni di inserimento e aggiornamento utilizzano la sintassi upsert (ad es. INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE).
  • Se esistono chiavi primarie, le eliminazioni vengono replicate come DML di eliminazione.
  • Se non esiste una chiave primaria, nella tabella vengono inserite sia le operazioni di inserimento che quelle di aggiornamento.
  • Se non esistono chiavi primarie, le eliminazioni vengono ignorate.

Se utilizzi le utilità Oracle to Postgres, aggiungi ROWID in SQL come chiave primaria quando non ne esiste nessuna.

Requisiti della pipeline

  • Un flusso Datastream pronto per la replica dei dati o lo sta già.
  • Le notifiche Pub/Sub di Cloud Storage sono abilitate per i dati Datastream.
  • È stato eseguito il seeding di un database PostgreSQL con lo schema richiesto.
  • L'accesso alla rete tra i worker di Dataflow e PostgreSQL è configurato.

Parametri del modello

Parametri obbligatori

  • inputFilePattern : la posizione dei file Datastream in Cloud Storage da replicare. Questa posizione del file è in genere il percorso principale dello stream.
  • databaseHost : l'host SQL su cui connettersi.
  • databaseUser : l'utente SQL con tutte le autorizzazioni necessarie per scrivere in tutte le tabelle nella replica.
  • databasePassword : la password dell'utente SQL.

Parametri facoltativi

  • gcsPubSubSubscription : l'abbonamento Pub/Sub con notifiche relative ai file Datastream. Ad esempio, projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>.
  • inputFileFormat : il formato del file di output generato da Datastream. Ad esempio, avro o json. Il valore predefinito è avro.
  • streamName : il nome o il modello dello stream da sottoporre a polling per ottenere le informazioni sullo schema. Il valore predefinito è {_metadata_stream}.
  • rfcStartDateTime : il valore DateTime di inizio utilizzato per il recupero da Cloud Storage (https://tools.ietf.org/html/rfc3339). Il valore predefinito è: 1970-01-01T00:00:00.00Z.
  • dataStreamRootUrl : URL principale dell'API Datastream. Il valore predefinito è: https://datastream.googleapis.com/.
  • databaseType : il tipo di database in cui scrivere, ad esempio Postgres. Il valore predefinito è: postgres.
  • databasePort : la porta del database SQL a cui connetterti. Il valore predefinito è 5432.
  • databaseName : il nome del database SQL a cui connetterti. Il valore predefinito è postgres.
  • schemaMap : una mappa di valori-chiave utilizzati per dettare le modifiche ai nomi dello schema (ad es. old_name:new_name,CaseError:case_error). Il campo predefinito è vuoto.
  • customConnectionString : stringa di connessione facoltativa che verrà utilizzata al posto della stringa del database predefinita.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco di regioni in cui è possibile eseguire un job Dataflow, consulta Località di Dataflow.

  5. Dal menu a discesa Modello Dataflow, seleziona the Cloud Datastream to SQL template.
  6. Inserisci i valori parametro negli appositi campi.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: il percorso Cloud Storage dei dati Datastream. Ad esempio: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: la sottoscrizione Pub/Sub da cui leggere i file modificati. Ad esempio: projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: l'IP dell'host SQL.
  • DATABASE_USER: il tuo utente SQL.
  • DATABASE_PASSWORD: la tua password SQL.

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL",
   }
}
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: il percorso Cloud Storage dei dati Datastream. Ad esempio: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: la sottoscrizione Pub/Sub da cui leggere i file modificati. Ad esempio: projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: l'IP dell'host SQL.
  • DATABASE_USER: il tuo utente SQL.
  • DATABASE_PASSWORD: la tua password SQL.

Passaggi successivi