Modello da Datastream a MySQL o PostgreSQL (stream)

Il modello da Datastream a SQL è una pipeline di inserimento flussi che legge i dati di Datastream e li replica in qualsiasi database MySQL o PostgreSQL. Il modello legge i dati da Cloud Storage utilizzando le notifiche Pub/Sub e li replica in tabelle di replica SQL.

Il modello non supporta il linguaggio di definizione dei dati (DDL) e prevede che tutte le tabelle esistano già nel database. La replica utilizza le trasformazioni stateful di Dataflow per filtrare i dati obsoleti e garantire la coerenza nei dati non ordinati. Ad esempio, se è già stata eseguita una versione più recente di una riga, la versione in arrivo in ritardo della riga viene ignorata. Il Data Manipulation Language (DML) che esegue è il miglior tentativo di replicare perfettamente l'origine nei dati di destinazione. Le istruzioni DML eseguite seguono le regole seguenti:

  • Se esiste una chiave primaria, le operazioni di inserimento e aggiornamento utilizzano la sintassi upsert, ad esempio INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE).
  • Se esistono chiavi primarie, le eliminazioni vengono replicate come un DML di eliminazione.
  • Se non esiste una chiave primaria, nella tabella vengono inserite sia le operazioni di inserimento che quelle di aggiornamento.
  • Se non esistono chiavi primarie, le eliminazioni vengono ignorate.

Se utilizzi le utilità da Oracle a Postgres, aggiungi ROWID in SQL come chiave primaria se non ne esiste nessuna.

Requisiti della pipeline

  • Un flusso Datastream pronto o già in fase di replica dei dati.
  • Le notifiche Pub/Sub di Cloud Storage sono abilitate per i dati Datastream.
  • È stato eseguito il seeding di un database PostgreSQL con lo schema richiesto.
  • L'accesso alla rete tra i worker Dataflow e PostgreSQL è configurato.

Parametri del modello

Parametri obbligatori

  • inputFilePattern : il percorso dei file Datastream in Cloud Storage da replicare. Questa posizione del file è in genere il percorso principale del flusso.
  • databaseHost : l'host SQL a cui connettersi.
  • databaseUser : l'utente SQL con tutte le autorizzazioni necessarie per scrivere in tutte le tabelle della replica.
  • databasePassword : la password dell'utente SQL.

Parametri facoltativi

  • gcsPubSubSubscription : la sottoscrizione Pub/Sub con notifiche sui file Datastream. Ad esempio, projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>.
  • inputFileFormat : il formato del file di output prodotto da Datastream. Ad esempio, avro o json. Il valore predefinito è avro.
  • streamName : il nome o il modello del flusso per il polling delle informazioni dello schema. Il valore predefinito è {_metadata_stream}.
  • rfcStartDateTime : la data/ora iniziale utilizzata per il recupero da Cloud Storage (https://tools.ietf.org/html/rfc3339). Il valore predefinito è 1970-01-01T00:00:00.00Z.
  • dataStreamRootUrl : URL principale dell'API Datastream. Il valore predefinito è https://datastream.googleapis.com/.
  • databaseType : il tipo di database in cui scrivere (ad esempio Postgres). Il valore predefinito è postgres.
  • databasePort : la porta del database SQL a cui connetterti. Il valore predefinito è 5432.
  • databaseName : il nome del database SQL a cui connettersi. Il valore predefinito è postgres.
  • schemaMap : una mappa di chiavi/valori utilizzati per dettare le modifiche dei nomi dello schema (ad es. vecchio_name:nuovo_nome,CaseError:case_error). Il valore predefinito è vuoto.
  • customConnectionString : stringa di connessione facoltativa che verrà utilizzata al posto della stringa di database predefinita.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, vedi Località Dataflow.

  5. Nel menu a discesa Modello Dataflow, seleziona the Cloud Datastream to SQL template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: percorso Cloud Storage ai dati di Datastream. Ad esempio: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: la sottoscrizione Pub/Sub da cui leggere i file modificati. Ad esempio: projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: l'IP del tuo host SQL.
  • DATABASE_USER: il tuo utente SQL.
  • DATABASE_PASSWORD: la tua password SQL.

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL",
   }
}
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: percorso Cloud Storage ai dati di Datastream. Ad esempio: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: la sottoscrizione Pub/Sub da cui leggere i file modificati. Ad esempio: projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: l'IP del tuo host SQL.
  • DATABASE_USER: il tuo utente SQL.
  • DATABASE_PASSWORD: la tua password SQL.

Passaggi successivi