Modelo do Datastream para MySQL ou PostgreSQL (Stream)

O modelo Datastream para SQL é um pipeline de streaming que lê dados do Datastream e os replica em qualquer banco de dados MySQL ou PostgreSQL. O modelo lê dados do Cloud Storage usando notificações do Pub/Sub e replica esses dados em tabelas de réplica do PostgreSQL.

O modelo não é compatível com a linguagem de definição de dados (DDL) e espera que todas as tabelas já existam no banco de dados. A replicação usa transformações com estado do Dataflow para filtrar dados desatualizados e garantir consistência nos dados fora de ordem. Por exemplo, se uma versão mais recente de uma linha já tiver passado, uma versão que chega atrasada dessa linha será ignorada. A linguagem de manipulação de dados (DML, na sigla em inglês) executada é a melhor tentativa de replicar perfeitamente os dados de origem nos de destino. As instruções DML executadas seguem as seguintes regras:

  • Se houver uma chave primária, as operações de inserção e atualização usarão sintaxe de mesclagem (isto é, INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE).
  • Se houver chaves primárias, as exclusões serão replicadas como uma DML de exclusão.
  • Se não houver uma chave primária, as operações de inserção e atualização serão inseridas na tabela.
  • Se não houver chaves primárias, as exclusões serão ignoradas.

Se você estiver usando os utilitários Oracle para Postgres, adicione ROWID no PostgreSQL como a chave primária quando não houver nenhuma.

Requisitos de pipeline

  • Um stream do Datastream que está pronto ou já está replicando dados.
  • As notificações do Pub/Sub do Cloud Storage estão ativadas para os dados do Datastream.
  • Um banco de dados do PostgreSQL foi propagado com o esquema necessário.
  • O acesso à rede entre workers do Dataflow e o PostgreSQL está configurado.

Parâmetros do modelo

Parâmetros obrigatórios

  • inputFilePattern: o local dos arquivos do Datastream que serão replicados no Cloud Storage. Normalmente, esse local de arquivo é o caminho raiz do stream.
  • databaseHost: o host SQL para se conectar.
  • databaseUser: o usuário do SQL com todas as permissões necessárias para gravar em todas as tabelas na replicação.
  • databasePassword: a senha do usuário SQL.

Parâmetros opcionais

  • gcsPubSubSubscription: a assinatura do Pub/Sub com notificações de arquivos do Datastream. Por exemplo, projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>.
  • inputFileFormat: o formato do arquivo de saída produzido pelo Datastream. Por exemplo, avro ou json. O padrão é avro.
  • streamName: o nome ou modelo do stream para pesquisar informações de esquema. O valor padrão é {_metadata_stream}.
  • rfcStartDateTime: o DateTime inicial usado para buscar do Cloud Storage (https://tools.ietf.org/html/rfc3339). O padrão é: 1970-01-01T00:00:00.00Z.
  • dataStreamRootUrl: URL raiz da API Datastream. O padrão é: https://datastream.googleapis.com/.
  • databaseType: o tipo de banco de dados para gravar (por exemplo, Postgres). O padrão é postgres.
  • databasePort: a porta do banco de dados SQL para se conectar. O valor padrão é 5432.
  • databaseName: o nome do banco de dados do PostgreSQL ao qual se conectar. O valor padrão é postgres.
  • schemaMap: um mapa de chaves/valores usados para ditar mudanças no nome do esquema (por exemplo, old_name:new_name,CaseError:case_error). O padrão é vazio.
  • customConnectionString: string de conexão opcional que será usada no lugar da string do banco de dados padrão.

Executar o modelo

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Cloud Datastream to SQL template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

No shell ou no terminal, execute o modelo:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • REGION_NAME: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: o caminho do Cloud Storage para os dados do Datastream. Exemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: a assinatura do Pub/Sub para ler os arquivos alterados. Por exemplo, projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: o IP do host do SQL.
  • DATABASE_USER: seu usuário do SQL.
  • DATABASE_PASSWORD: sua senha do SQL.

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL",
   }
}
  

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • LOCATION: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: o caminho do Cloud Storage para os dados do Datastream. Exemplo: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: a assinatura do Pub/Sub para ler os arquivos alterados. Por exemplo, projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: o IP do host do SQL.
  • DATABASE_USER: seu usuário do SQL.
  • DATABASE_PASSWORD: sua senha do SQL.
Java
/*
 * Copyright (C) 2020 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.datastream.io.CdcJdbcIO;
import com.google.cloud.teleport.v2.datastream.sources.DataStreamIO;
import com.google.cloud.teleport.v2.datastream.values.DmlInfo;
import com.google.cloud.teleport.v2.templates.DataStreamToSQL.Options;
import com.google.cloud.teleport.v2.transforms.CreateDml;
import com.google.cloud.teleport.v2.transforms.ProcessDml;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Splitter;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * This pipeline ingests DataStream data from GCS. The data is then cleaned and converted from JSON
 * objects into DML statements. The DML is applied to the desired target database, which can be one
 * of MySQL or PostgreSQL. Replication maintains a 1:1 match between source and target by default.
 * No DDL is supported in the current version of this pipeline.
 *
 * <p>NOTE: Future versions will support: Pub/Sub, GCS, or Kafka as per DataStream
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/datastream-to-sql/README_Cloud_Datastream_to_SQL.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_Datastream_to_SQL",
    category = TemplateCategory.STREAMING,
    displayName = "Datastream to SQL",
    description = {
      "The Datastream to SQL template is a streaming pipeline that reads <a href=\"https://cloud.google.com/datastream/docs\">Datastream</a> data and replicates it into any MySQL or PostgreSQL database. "
          + "The template reads data from Cloud Storage using Pub/Sub notifications and replicates this data into SQL replica tables.\n",
      "The template does not support data definition language (DDL) and expects that all tables already exist in the database. "
          + "Replication uses Dataflow stateful transforms to filter stale data and ensure consistency in out of order data. "
          + "For example, if a more recent version of a row has already passed through, a late arriving version of that row is ignored. "
          + "The data manipulation language (DML) that executes is a best attempt to perfectly replicate source to target data. The DML statements executed follow the following rules:\n",
      "If a primary key exists, insert and update operations use upsert syntax (ie. <code>INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE</code>).\n"
          + "If primary keys exist, deletes are replicated as a delete DML.\n"
          + "If no primary key exists, both insert and update operations are inserted into the table.\n"
          + "If no primary keys exist, deletes are ignored.\n"
          + "If you are using the Oracle to Postgres utilities, add <code>ROWID</code> in SQL as the primary key when none exists."
    },
    optionsClass = Options.class,
    flexContainerName = "datastream-to-sql",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/datastream-to-sql",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "A Datastream stream that is ready to or already replicating data.",
      "<a href=\"https://cloud.google.com/storage/docs/reporting-changes\">Cloud Storage Pub/Sub notifications</a> are enabled for the Datastream data.",
      "A PostgreSQL database was seeded with the required schema.",
      "Network access between Dataflow workers and PostgreSQL is set up."
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class DataStreamToSQL {

  private static final Logger LOG = LoggerFactory.getLogger(DataStreamToSQL.class);
  private static final String AVRO_SUFFIX = "avro";
  private static final String JSON_SUFFIX = "json";

  /**
   * Options supported by the pipeline.
   *
   * <p>Inherits standard configuration options.
   */
  public interface Options extends PipelineOptions, StreamingOptions {
    @TemplateParameter.GcsReadFile(
        order = 1,
        groupName = "Source",
        description = "File location for Datastream file input in Cloud Storage.",
        helpText =
            "The file location for the Datastream files in Cloud Storage to replicate. This file location is typically the root path for the stream.")
    String getInputFilePattern();

    void setInputFilePattern(String value);

    @TemplateParameter.PubsubSubscription(
        order = 2,
        optional = true,
        description = "The Pub/Sub subscription being used in a Cloud Storage notification policy.",
        helpText =
            "The Pub/Sub subscription with Datastream file notifications."
                + " For example, `projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>`.")
    String getGcsPubSubSubscription();

    void setGcsPubSubSubscription(String value);

    @TemplateParameter.Enum(
        order = 3,
        enumOptions = {@TemplateEnumOption("avro"), @TemplateEnumOption("json")},
        optional = true,
        description = "Datastream output file format (avro/json).",
        helpText =
            "The format of the output file produced by Datastream. For example, `avro` or `json`. Defaults to `avro`.")
    @Default.String("avro")
    String getInputFileFormat();

    void setInputFileFormat(String value);

    @TemplateParameter.Text(
        order = 4,
        groupName = "Source",
        optional = true,
        description = "Name or template for the stream to poll for schema information.",
        helpText =
            "The name or template for the stream to poll for schema information. The default value is `{_metadata_stream}`.")
    String getStreamName();

    void setStreamName(String value);

    @TemplateParameter.DateTime(
        order = 5,
        optional = true,
        description =
            "The starting DateTime used to fetch from Cloud Storage "
                + "(https://tools.ietf.org/html/rfc3339).",
        helpText =
            "The starting DateTime used to fetch from Cloud Storage "
                + "(https://tools.ietf.org/html/rfc3339).")
    @Default.String("1970-01-01T00:00:00.00Z")
    String getRfcStartDateTime();

    void setRfcStartDateTime(String value);

    // DataStream API Root Url (only used for testing)
    @TemplateParameter.Text(
        order = 6,
        optional = true,
        description = "Datastream API Root URL (only required for testing)",
        helpText = "Datastream API Root URL")
    @Default.String("https://datastream.googleapis.com/")
    String getDataStreamRootUrl();

    void setDataStreamRootUrl(String value);

    // SQL Connection Parameters
    @TemplateParameter.Enum(
        order = 7,
        optional = true,
        enumOptions = {@TemplateEnumOption("postgres"), @TemplateEnumOption("mysql")},
        description = "SQL Database Type (postgres or mysql).",
        helpText = "The database type to write to (for example, Postgres).")
    @Default.String("postgres")
    String getDatabaseType();

    void setDatabaseType(String value);

    @TemplateParameter.Text(
        order = 8,
        groupName = "Target",
        description = "Database Host to connect on.",
        helpText = "The SQL host to connect on.")
    String getDatabaseHost();

    void setDatabaseHost(String value);

    @TemplateParameter.Text(
        order = 9,
        groupName = "Target",
        optional = true,
        description = "Database Port to connect on.",
        helpText = "The SQL database port to connect to. The default value is `5432`.")
    @Default.String("5432")
    String getDatabasePort();

    void setDatabasePort(String value);

    @TemplateParameter.Text(
        order = 10,
        description = "Database User to connect with.",
        helpText =
            "The SQL user with all required permissions to write to all tables in replication.")
    String getDatabaseUser();

    void setDatabaseUser(String value);

    @TemplateParameter.Password(
        order = 11,
        description = "Database Password for given user.",
        helpText = "The password for the SQL user.")
    String getDatabasePassword();

    void setDatabasePassword(String value);

    @TemplateParameter.Text(
        order = 12,
        groupName = "Target",
        optional = true,
        description = "SQL Database Name.",
        helpText = "The name of the SQL database to connect to. The default value is `postgres`.")
    @Default.String("postgres")
    String getDatabaseName();

    void setDatabaseName(String value);

    @TemplateParameter.Text(
        order = 13,
        optional = true,
        description = "A map of key/values used to dictate schema name changes",
        helpText =
            "A map of key/values used to dictate schema name changes (ie."
                + " old_name:new_name,CaseError:case_error)")
    @Default.String("")
    String getSchemaMap();

    void setSchemaMap(String value);

    @TemplateParameter.Text(
        order = 14,
        groupName = "Target",
        optional = true,
        description = "Custom connection string.",
        helpText =
            "Optional connection string which will be used instead of the default database string.")
    @Default.String("")
    String getCustomConnectionString();

    void setCustomConnectionString(String value);
  }

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    LOG.info("Starting Datastream to SQL");

    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    options.setStreaming(true);
    run(options);
  }

  /**
   * Build the DataSourceConfiguration for the target SQL database. Using the pipeline options,
   * determine the database type and create the correct jdbc connection for the requested DB.
   *
   * @param options The execution parameters to the pipeline.
   */
  public static CdcJdbcIO.DataSourceConfiguration getDataSourceConfiguration(Options options) {
    String jdbcDriverName;
    String jdbcDriverConnectionString;

    switch (options.getDatabaseType()) {
      case "postgres":
        jdbcDriverName = "org.postgresql.Driver";
        jdbcDriverConnectionString =
            String.format(
                "jdbc:postgresql://%s:%s/%s",
                options.getDatabaseHost(), options.getDatabasePort(), options.getDatabaseName());
        break;
      case "mysql":
        jdbcDriverName = "com.mysql.cj.jdbc.Driver";
        jdbcDriverConnectionString =
            String.format(
                "jdbc:mysql://%s:%s/%s",
                options.getDatabaseHost(), options.getDatabasePort(), options.getDatabaseName());
        break;
      default:
        throw new IllegalArgumentException(
            String.format("Database Type %s is not supported.", options.getDatabaseType()));
    }
    if (!options.getCustomConnectionString().isEmpty()) {
      jdbcDriverConnectionString = options.getCustomConnectionString();
    }

    CdcJdbcIO.DataSourceConfiguration dataSourceConfiguration =
        CdcJdbcIO.DataSourceConfiguration.create(jdbcDriverName, jdbcDriverConnectionString)
            .withUsername(options.getDatabaseUser())
            .withPassword(options.getDatabasePassword())
            .withMaxIdleConnections(new Integer(0));

    return dataSourceConfiguration;
  }

  /**
   * Validate the options supplied match expected values. We will also validate that connectivity is
   * working correctly for the target SQL database.
   *
   * @param options The execution parameters to the pipeline.
   * @param dataSourceConfiguration The JDBC datasource configuration.
   */
  public static void validateOptions(
      Options options, CdcJdbcIO.DataSourceConfiguration dataSourceConfiguration) {
    try {
      if (options.getDatabaseHost() != null) {
        dataSourceConfiguration.buildDatasource().getConnection().close();
      }
    } catch (SQLException e) {
      throw new IllegalArgumentException(e);
    }
  }

  /** Parse the SchemaMap config which allows key:value pairs of column naming configs. */
  public static Map<String, String> parseSchemaMap(String schemaMapString) {
    if (schemaMapString == null || schemaMapString.equals("")) {
      return new HashMap<>();
    }

    return Splitter.on(",").withKeyValueSeparator(":").split(schemaMapString);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(Options options) {
    /*
     * Stages:
     *   1) Ingest and Normalize Data to FailsafeElement with JSON Strings
     *   2) Write JSON Strings to SQL DML Objects
     *   3) Filter stale rows using stateful PK transform
     *   4) Write DML statements to SQL Database via jdbc
     */

    Pipeline pipeline = Pipeline.create(options);

    CdcJdbcIO.DataSourceConfiguration dataSourceConfiguration = getDataSourceConfiguration(options);
    validateOptions(options, dataSourceConfiguration);
    Map<String, String> schemaMap = parseSchemaMap(options.getSchemaMap());
    LOG.info("Parsed schema map: {}", schemaMap);

    /*
     * Stage 1: Ingest and Normalize Data to FailsafeElement with JSON Strings
     *   a) Read DataStream data from GCS into JSON String FailsafeElements (datastreamJsonRecords)
     */
    PCollection<FailsafeElement<String, String>> datastreamJsonRecords =
        pipeline.apply(
            new DataStreamIO(
                    options.getStreamName(),
                    options.getInputFilePattern(),
                    options.getInputFileFormat(),
                    options.getGcsPubSubSubscription(),
                    options.getRfcStartDateTime())
                .withLowercaseSourceColumns()
                .withRenameColumnValue("_metadata_row_id", "rowid")
                .withHashRowId());

    /*
     * Stage 2: Write JSON Strings to SQL Insert Strings
     *   a) Convert JSON String FailsafeElements to TableRow's (tableRowRecords)
     * Stage 3) Filter stale rows using stateful PK transform
     */
    PCollection<KV<String, DmlInfo>> dmlStatements =
        datastreamJsonRecords
            .apply("Format to DML", CreateDml.of(dataSourceConfiguration).withSchemaMap(schemaMap))
            .apply("DML Stateful Processing", ProcessDml.statefulOrderByPK());

    /*
     * Stage 4: Write Inserts to CloudSQL
     */
    dmlStatements.apply(
        "Write to SQL",
        CdcJdbcIO.<KV<String, DmlInfo>>write()
            .withDataSourceConfiguration(dataSourceConfiguration)
            .withStatementFormatter(
                new CdcJdbcIO.StatementFormatter<KV<String, DmlInfo>>() {
                  public String formatStatement(KV<String, DmlInfo> element) {
                    LOG.debug("Executing SQL: {}", element.getValue().getDmlSql());
                    return element.getValue().getDmlSql();
                  }
                }));

    // Execute the pipeline and return the result.
    return pipeline.run();
  }
}

A seguir