Modello Datastream per MySQL o PostgreSQL (stream)

Il modello Datastream to SQL è una pipeline di streaming che legge i dati di Datastream e li replica in qualsiasi database MySQL o PostgreSQL. Il modello legge i dati da Cloud Storage utilizzando le notifiche Pub/Sub e li replica nelle tabelle di replica SQL.

Il modello non supporta il linguaggio di definizione dei dati (DDL) e presuppone che tutte le tabelle esistano già nel database. La replica utilizza le trasformazioni con stato di Dataflow per filtrare i dati non aggiornati e garantire la coerenza dei dati non in ordine. Ad esempio, se è già passata una versione più recente di una riga, una versione in ritardo della riga viene ignorata. Il linguaggio di manipolazione dei dati (DML) che viene eseguito è il miglior tentativo di replicare perfettamente i dati di origine nei dati di destinazione. Le istruzioni DML eseguite seguono le seguenti regole:

  • Se esiste una chiave primaria, le operazioni di inserimento e aggiornamento utilizzano la sintassi upsert (ad es. INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE).
  • Se esistono chiavi principali, le eliminazioni vengono replicate come DML di eliminazione.
  • Se non esiste una chiave primaria, nella tabella vengono inserite sia le operazioni di inserimento sia quelle di aggiornamento.
  • Se non esistono chiavi primarie, le eliminazioni vengono ignorate.

Se utilizzi le utilità Oracle to Postgres, aggiungi ROWID in SQL come chiave primaria se non esiste.

Requisiti della pipeline

  • Uno stream DataStream pronto per la replica dei dati o che la sta già eseguendo.
  • Le notifiche Pub/Sub di Cloud Storage sono attivate per i dati di Datastream.
  • Un database PostgreSQL è stato inizializzato con lo schema richiesto.
  • L'accesso alla rete tra i worker di Dataflow e PostgreSQL è configurato.

Parametri del modello

Parametri obbligatori

  • patternFileInput: la posizione del file per i file Datastream in Cloud Storage da replicare. Questa posizione del file è in genere il percorso principale dello stream.
  • databaseHost: l'host SQL a cui connettersi.
  • databaseUser: l'utente SQL con tutte le autorizzazioni necessarie per scrivere in tutte le tabelle in replica.
  • databasePassword: la password per l'utente SQL.

Parametri facoltativi

  • gcsPubSubSubscription: l'abbonamento Pub/Sub con notifiche dei file Datastream. Ad esempio, projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>.
  • inputFileFormat: il formato del file di output prodotto da Datastream. Ad esempio, avro o json. Il valore predefinito è avro.
  • streamName: il nome o il modello dello stream da sottoporre a polling per le informazioni sullo schema. Il valore predefinito è {_metadata_stream}.
  • rfcStartDateTime: la data e l'ora di inizio utilizzate per il recupero da Cloud Storage (https://tools.ietf.org/html/rfc3339). Il valore predefinito è 1970-01-01T00:00:00.00Z.
  • dataStreamRootUrl: URL principale dell'API Datastream. Valore predefinito: https://datastream.googleapis.com/.
  • databaseType: il tipo di database in cui scrivere (ad esempio Postgres). Valore predefinito: postgres.
  • databasePort: la porta del database SQL a cui connettersi. Il valore predefinito è 5432.
  • databaseName: il nome del database SQL a cui connettersi. Il valore predefinito è postgres.
  • schemaMap: una mappa di chiavi/valori utilizzata per specificare le modifiche ai nomi degli schemi (ad es. old_name:new_name,CaseError:case_error). Il valore predefinito è vuoto.
  • customConnectionString: stringa di connessione facoltativa che verrà utilizzata al posto della stringa del database predefinita.

Esegui il modello

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the Cloud Datastream to SQL template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: il percorso Cloud Storage per i dati di Datastream. Ad esempio: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: l'abbonamento Pub/Sub da cui leggere i file modificati. Ad esempio: projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: l'IP host SQL.
  • DATABASE_USER: il tuo utente SQL.
  • DATABASE_PASSWORD: la tua password SQL.

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL",
   }
}
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: il percorso Cloud Storage per i dati di Datastream. Ad esempio: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: l'abbonamento Pub/Sub da cui leggere i file modificati. Ad esempio: projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: l'IP host SQL.
  • DATABASE_USER: il tuo utente SQL.
  • DATABASE_PASSWORD: la tua password SQL.
Java
/*
 * Copyright (C) 2020 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.datastream.io.CdcJdbcIO;
import com.google.cloud.teleport.v2.datastream.sources.DataStreamIO;
import com.google.cloud.teleport.v2.datastream.values.DmlInfo;
import com.google.cloud.teleport.v2.templates.DataStreamToSQL.Options;
import com.google.cloud.teleport.v2.transforms.CreateDml;
import com.google.cloud.teleport.v2.transforms.ProcessDml;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Splitter;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This pipeline ingests DataStream data from GCS. The data is then cleaned and converted from JSON
 * objects into DML statements. The DML is applied to the desired target database, which can be one
 * of MySQL or PostgreSQL. Replication maintains a 1:1 match between source and target by default.
 * No DDL is supported in the current version of this pipeline.
 *
 * <p>NOTE: Future versions will support: Pub/Sub, GCS, or Kafka as per DataStream
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/datastream-to-sql/README_Cloud_Datastream_to_SQL.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_Datastream_to_SQL",
    category = TemplateCategory.STREAMING,
    displayName = "Datastream to SQL",
    description = {
      "The Datastream to SQL template is a streaming pipeline that reads <a href=\"https://cloud.google.com/datastream/docs\">Datastream</a> data and replicates it into any MySQL or PostgreSQL database. "
          + "The template reads data from Cloud Storage using Pub/Sub notifications and replicates this data into SQL replica tables.\n",
      "The template does not support data definition language (DDL) and expects that all tables already exist in the database. "
          + "Replication uses Dataflow stateful transforms to filter stale data and ensure consistency in out of order data. "
          + "For example, if a more recent version of a row has already passed through, a late arriving version of that row is ignored. "
          + "The data manipulation language (DML) that executes is a best attempt to perfectly replicate source to target data. The DML statements executed follow the following rules:\n",
      "If a primary key exists, insert and update operations use upsert syntax (ie. <code>INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE</code>).\n"
          + "If primary keys exist, deletes are replicated as a delete DML.\n"
          + "If no primary key exists, both insert and update operations are inserted into the table.\n"
          + "If no primary keys exist, deletes are ignored.\n"
          + "If you are using the Oracle to Postgres utilities, add <code>ROWID</code> in SQL as the primary key when none exists."
    },
    optionsClass = Options.class,
    flexContainerName = "datastream-to-sql",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/datastream-to-sql",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "A Datastream stream that is ready to or already replicating data.",
      "<a href=\"https://cloud.google.com/storage/docs/reporting-changes\">Cloud Storage Pub/Sub notifications</a> are enabled for the Datastream data.",
      "A PostgreSQL database was seeded with the required schema.",
      "Network access between Dataflow workers and PostgreSQL is set up."
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class DataStreamToSQL {

  private static final Logger LOG = LoggerFactory.getLogger(DataStreamToSQL.class);
  private static final String AVRO_SUFFIX = "avro";
  private static final String JSON_SUFFIX = "json";

  /**
   * Options supported by the pipeline.
   *
   * <p>Inherits standard configuration options.
   */
  public interface Options extends PipelineOptions, StreamingOptions {
    @TemplateParameter.GcsReadFile(
        order = 1,
        groupName = "Source",
        description = "File location for Datastream file input in Cloud Storage.",
        helpText =
            "The file location for the Datastream files in Cloud Storage to replicate. This file location is typically the root path for the stream.")
    String getInputFilePattern();

    void setInputFilePattern(String value);

    @TemplateParameter.PubsubSubscription(
        order = 2,
        optional = true,
        description = "The Pub/Sub subscription being used in a Cloud Storage notification policy.",
        helpText =
            "The Pub/Sub subscription with Datastream file notifications."
                + " For example, `projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>`.")
    String getGcsPubSubSubscription();

    void setGcsPubSubSubscription(String value);

    @TemplateParameter.Enum(
        order = 3,
        enumOptions = {@TemplateEnumOption("avro"), @TemplateEnumOption("json")},
        optional = true,
        description = "Datastream output file format (avro/json).",
        helpText =
            "The format of the output file produced by Datastream. For example, `avro` or `json`. Defaults to `avro`.")
    @Default.String("avro")
    String getInputFileFormat();

    void setInputFileFormat(String value);

    @TemplateParameter.Text(
        order = 4,
        groupName = "Source",
        optional = true,
        description = "Name or template for the stream to poll for schema information.",
        helpText =
            "The name or template for the stream to poll for schema information. The default value is `{_metadata_stream}`.")
    String getStreamName();

    void setStreamName(String value);

    @TemplateParameter.DateTime(
        order = 5,
        optional = true,
        description =
            "The starting DateTime used to fetch from Cloud Storage "
                + "(https://tools.ietf.org/html/rfc3339).",
        helpText =
            "The starting DateTime used to fetch from Cloud Storage "
                + "(https://tools.ietf.org/html/rfc3339).")
    @Default.String("1970-01-01T00:00:00.00Z")
    String getRfcStartDateTime();

    void setRfcStartDateTime(String value);

    // DataStream API Root Url (only used for testing)
    @TemplateParameter.Text(
        order = 6,
        optional = true,
        description = "Datastream API Root URL (only required for testing)",
        helpText = "Datastream API Root URL")
    @Default.String("https://datastream.googleapis.com/")
    String getDataStreamRootUrl();

    void setDataStreamRootUrl(String value);

    // SQL Connection Parameters
    @TemplateParameter.Enum(
        order = 7,
        optional = true,
        enumOptions = {@TemplateEnumOption("postgres"), @TemplateEnumOption("mysql")},
        description = "SQL Database Type (postgres or mysql).",
        helpText = "The database type to write to (for example, Postgres).")
    @Default.String("postgres")
    String getDatabaseType();

    void setDatabaseType(String value);

    @TemplateParameter.Text(
        order = 8,
        groupName = "Target",
        description = "Database Host to connect on.",
        helpText = "The SQL host to connect on.")
    String getDatabaseHost();

    void setDatabaseHost(String value);

    @TemplateParameter.Text(
        order = 9,
        groupName = "Target",
        optional = true,
        description = "Database Port to connect on.",
        helpText = "The SQL database port to connect to. The default value is `5432`.")
    @Default.String("5432")
    String getDatabasePort();

    void setDatabasePort(String value);

    @TemplateParameter.Text(
        order = 10,
        description = "Database User to connect with.",
        helpText =
            "The SQL user with all required permissions to write to all tables in replication.")
    String getDatabaseUser();

    void setDatabaseUser(String value);

    @TemplateParameter.Password(
        order = 11,
        description = "Database Password for given user.",
        helpText = "The password for the SQL user.")
    String getDatabasePassword();

    void setDatabasePassword(String value);

    @TemplateParameter.Text(
        order = 12,
        groupName = "Target",
        optional = true,
        description = "SQL Database Name.",
        helpText = "The name of the SQL database to connect to. The default value is `postgres`.")
    @Default.String("postgres")
    String getDatabaseName();

    void setDatabaseName(String value);

    @TemplateParameter.Text(
        order = 13,
        optional = true,
        description = "A map of key/values used to dictate schema name changes",
        helpText =
            "A map of key/values used to dictate schema name changes (ie."
                + " old_name:new_name,CaseError:case_error)")
    @Default.String("")
    String getSchemaMap();

    void setSchemaMap(String value);

    @TemplateParameter.Text(
        order = 14,
        groupName = "Target",
        optional = true,
        description = "Custom connection string.",
        helpText =
            "Optional connection string which will be used instead of the default database string.")
    @Default.String("")
    String getCustomConnectionString();

    void setCustomConnectionString(String value);
  }

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    LOG.info("Starting Datastream to SQL");

    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    options.setStreaming(true);
    run(options);
  }

  /**
   * Build the DataSourceConfiguration for the target SQL database. Using the pipeline options,
   * determine the database type and create the correct jdbc connection for the requested DB.
   *
   * @param options The execution parameters to the pipeline.
   */
  public static CdcJdbcIO.DataSourceConfiguration getDataSourceConfiguration(Options options) {
    String jdbcDriverName;
    String jdbcDriverConnectionString;

    switch (options.getDatabaseType()) {
      case "postgres":
        jdbcDriverName = "org.postgresql.Driver";
        jdbcDriverConnectionString =
            String.format(
                "jdbc:postgresql://%s:%s/%s",
                options.getDatabaseHost(), options.getDatabasePort(), options.getDatabaseName());
        break;
      case "mysql":
        jdbcDriverName = "com.mysql.cj.jdbc.Driver";
        jdbcDriverConnectionString =
            String.format(
                "jdbc:mysql://%s:%s/%s",
                options.getDatabaseHost(), options.getDatabasePort(), options.getDatabaseName());
        break;
      default:
        throw new IllegalArgumentException(
            String.format("Database Type %s is not supported.", options.getDatabaseType()));
    }
    if (!options.getCustomConnectionString().isEmpty()) {
      jdbcDriverConnectionString = options.getCustomConnectionString();
    }

    CdcJdbcIO.DataSourceConfiguration dataSourceConfiguration =
        CdcJdbcIO.DataSourceConfiguration.create(jdbcDriverName, jdbcDriverConnectionString)
            .withUsername(options.getDatabaseUser())
            .withPassword(options.getDatabasePassword())
            .withMaxIdleConnections(new Integer(0));

    return dataSourceConfiguration;
  }

  /**
   * Validate the options supplied match expected values. We will also validate that connectivity is
   * working correctly for the target SQL database.
   *
   * @param options The execution parameters to the pipeline.
   * @param dataSourceConfiguration The JDBC datasource configuration.
   */
  public static void validateOptions(
      Options options, CdcJdbcIO.DataSourceConfiguration dataSourceConfiguration) {
    try {
      if (options.getDatabaseHost() != null) {
        dataSourceConfiguration.buildDatasource().getConnection().close();
      }
    } catch (SQLException e) {
      throw new IllegalArgumentException(e);
    }
  }

  /** Parse the SchemaMap config which allows key:value pairs of column naming configs. */
  public static Map<String, String> parseSchemaMap(String schemaMapString) {
    if (schemaMapString == null || schemaMapString.equals("")) {
      return new HashMap<>();
    }

    return Splitter.on(",").withKeyValueSeparator(":").split(schemaMapString);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(Options options) {
    /*
     * Stages:
     *   1) Ingest and Normalize Data to FailsafeElement with JSON Strings
     *   2) Write JSON Strings to SQL DML Objects
     *   3) Filter stale rows using stateful PK transform
     *   4) Write DML statements to SQL Database via jdbc
     */

    Pipeline pipeline = Pipeline.create(options);

    CdcJdbcIO.DataSourceConfiguration dataSourceConfiguration = getDataSourceConfiguration(options);
    validateOptions(options, dataSourceConfiguration);
    Map<String, String> schemaMap = parseSchemaMap(options.getSchemaMap());
    LOG.info("Parsed schema map: {}", schemaMap);

    /*
     * Stage 1: Ingest and Normalize Data to FailsafeElement with JSON Strings
     *   a) Read DataStream data from GCS into JSON String FailsafeElements (datastreamJsonRecords)
     */
    PCollection<FailsafeElement<String, String>> datastreamJsonRecords =
        pipeline.apply(
            new DataStreamIO(
                    options.getStreamName(),
                    options.getInputFilePattern(),
                    options.getInputFileFormat(),
                    options.getGcsPubSubSubscription(),
                    options.getRfcStartDateTime())
                .withLowercaseSourceColumns()
                .withRenameColumnValue("_metadata_row_id", "rowid")
                .withHashRowId());

    /*
     * Stage 2: Write JSON Strings to SQL Insert Strings
     *   a) Convert JSON String FailsafeElements to TableRow's (tableRowRecords)
     * Stage 3) Filter stale rows using stateful PK transform
     */
    PCollection<KV<String, DmlInfo>> dmlStatements =
        datastreamJsonRecords
            .apply("Format to DML", CreateDml.of(dataSourceConfiguration).withSchemaMap(schemaMap))
            .apply("DML Stateful Processing", ProcessDml.statefulOrderByPK());

    /*
     * Stage 4: Write Inserts to CloudSQL
     */
    dmlStatements.apply(
        "Write to SQL",
        CdcJdbcIO.<KV<String, DmlInfo>>write()
            .withDataSourceConfiguration(dataSourceConfiguration)
            .withStatementFormatter(
                new CdcJdbcIO.StatementFormatter<KV<String, DmlInfo>>() {
                  public String formatStatement(KV<String, DmlInfo> element) {
                    LOG.debug("Executing SQL: {}", element.getValue().getDmlSql());
                    return element.getValue().getDmlSql();
                  }
                }));

    // Execute the pipeline and return the result.
    return pipeline.run();
  }
}

Passaggi successivi