Template Datastream ke MySQL atau PostgreSQL (Stream)

Template Datastream ke SQL adalah pipeline streaming yang membaca data Datastream dan mereplikasinya ke database MySQL atau PostgreSQL. Template ini membaca data dari Cloud Storage menggunakan notifikasi Pub/Sub dan mereplikasi data ini ke dalam tabel replika SQL.

Template ini tidak mendukung bahasa definisi data (DDL) dan mengharapkan semua tabel sudah ada di database. Replikasi menggunakan transformasi stateful Dataflow untuk memfilter data yang sudah tidak berlaku dan memastikan konsistensi dalam data yang tidak berurutan. Misalnya, jika versi baris yang lebih baru telah diproses, versi baris yang terlambat akan diabaikan. Bahasa manipulasi data (DML) yang dieksekusi adalah upaya terbaik untuk mereplikasi data sumber ke target dengan sempurna. Pernyataan DML yang dieksekusi mengikuti aturan berikut:

  • Jika kunci utama ada, operasi penyisipan dan pembaruan menggunakan sintaksis upsert (yaitu. INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE).
  • Jika kunci utama ada, penghapusan akan direplikasi sebagai DML penghapusan.
  • Jika tidak ada kunci utama, operasi penyisipan dan pembaruan akan disisipkan ke dalam tabel.
  • Jika tidak ada kunci utama, penghapusan akan diabaikan.

Jika Anda menggunakan utilitas Oracle ke Postgres, tambahkan ROWID di SQL sebagai kunci utama jika tidak ada.

Persyaratan pipeline

  • Aliran Datastream yang siap atau sudah mereplikasi data.
  • Notifikasi Pub/Sub Cloud Storage diaktifkan untuk data Datastream.
  • Database PostgreSQL diisi dengan skema yang diperlukan.
  • Akses jaringan antara pekerja Dataflow dan PostgreSQL disiapkan.

Parameter template

Parameter yang diperlukan

  • inputFilePattern: Lokasi file untuk file Datastream di Cloud Storage yang akan direplikasi. Lokasi file ini biasanya adalah jalur root untuk streaming.
  • databaseHost: Host SQL yang akan dihubungkan.
  • databaseUser: Pengguna SQL dengan semua izin yang diperlukan untuk menulis ke semua tabel dalam replikasi.
  • databasePassword: Sandi untuk pengguna SQL.

Parameter opsional

  • gcsPubSubSubscription: Langganan Pub/Sub dengan notifikasi file Datastream. Contoh, projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>.
  • inputFileFormat: Format file output yang dihasilkan oleh Datastream. Misalnya avro atau json. Setelan defaultnya adalah avro.
  • streamName: Nama atau template untuk aliran data yang akan melakukan polling untuk informasi skema. Nilai defaultnya adalah {_metadata_stream}.
  • rfcStartDateTime: DateTime awal yang digunakan untuk mengambil dari Cloud Storage (https://tools.ietf.org/html/rfc3339). Setelan defaultnya adalah: 1970-01-01T00:00:00.00Z.
  • dataStreamRootUrl: URL Root Datastream API. Secara default: https://datastream.googleapis.com/.
  • databaseType: Jenis database yang akan ditulis (misalnya, Postgres). Defaultnya adalah: postgres.
  • databasePort: Port database SQL yang akan dihubungkan. Nilai defaultnya adalah 5432.
  • databaseName: Nama database SQL yang akan dihubungkan. Nilai defaultnya adalah postgres.
  • schemaMap: Peta kunci/nilai yang digunakan untuk menentukan perubahan nama skema (yaitu old_name:new_name,CaseError:case_error). Default-nya adalah kosong.
  • customConnectionString: String koneksi opsional yang akan digunakan, bukan string database default.

Menjalankan template

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Cloud Datastream to SQL template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

Di shell atau terminal, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: jalur Cloud Storage ke data Datastream. Contoh: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: langganan Pub/Sub untuk membaca file yang diubah. Contoh: projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: IP host SQL Anda.
  • DATABASE_USER: pengguna SQL Anda.

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL",

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: jalur Cloud Storage ke data Datastream. Contoh: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: langganan Pub/Sub untuk membaca file yang diubah. Contoh: projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: IP host SQL Anda.
  • DATABASE_USER: pengguna SQL Anda.
 * Copyright (C) 2020 Google LLC
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *   http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.datastream.io.CdcJdbcIO;
import com.google.cloud.teleport.v2.datastream.sources.DataStreamIO;
import com.google.cloud.teleport.v2.datastream.values.DmlInfo;
import com.google.cloud.teleport.v2.templates.DataStreamToSQL.Options;
import com.google.cloud.teleport.v2.transforms.CreateDml;
import com.google.cloud.teleport.v2.transforms.ProcessDml;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Splitter;
import java.sql.SQLException;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

 * This pipeline ingests DataStream data from GCS. The data is then cleaned and converted from JSON
 * objects into DML statements. The DML is applied to the desired target database, which can be one
 * of MySQL or PostgreSQL. Replication maintains a 1:1 match between source and target by default.
 * No DDL is supported in the current version of this pipeline.
 * <p>NOTE: Future versions will support: Pub/Sub, GCS, or Kafka as per DataStream
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/datastream-to-sql/README_Cloud_Datastream_to_SQL.md">README</a>
 * for instructions on how to use or modify this template.
    name = "Cloud_Datastream_to_SQL",
    category = TemplateCategory.STREAMING,
    displayName = "Datastream to SQL",
    description = {
      "The Datastream to SQL template is a streaming pipeline that reads <a href=\"https://cloud.google.com/datastream/docs\">Datastream</a> data and replicates it into any MySQL or PostgreSQL database. "
          + "The template reads data from Cloud Storage using Pub/Sub notifications and replicates this data into SQL replica tables.\n",
      "The template does not support data definition language (DDL) and expects that all tables already exist in the database. "
          + "Replication uses Dataflow stateful transforms to filter stale data and ensure consistency in out of order data. "
          + "For example, if a more recent version of a row has already passed through, a late arriving version of that row is ignored. "
          + "The data manipulation language (DML) that executes is a best attempt to perfectly replicate source to target data. The DML statements executed follow the following rules:\n",
      "If a primary key exists, insert and update operations use upsert syntax (ie. <code>INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE</code>).\n"
          + "If primary keys exist, deletes are replicated as a delete DML.\n"
          + "If no primary key exists, both insert and update operations are inserted into the table.\n"
          + "If no primary keys exist, deletes are ignored.\n"
          + "If you are using the Oracle to Postgres utilities, add <code>ROWID</code> in SQL as the primary key when none exists."
    optionsClass = Options.class,
    flexContainerName = "datastream-to-sql",
    documentation =
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "A Datastream stream that is ready to or already replicating data.",
      "<a href=\"https://cloud.google.com/storage/docs/reporting-changes\">Cloud Storage Pub/Sub notifications</a> are enabled for the Datastream data.",
      "A PostgreSQL database was seeded with the required schema.",
      "Network access between Dataflow workers and PostgreSQL is set up."
    streaming = true,
    supportsAtLeastOnce = true)
public class DataStreamToSQL {

  private static final Logger LOG = LoggerFactory.getLogger(DataStreamToSQL.class);
  private static final String AVRO_SUFFIX = "avro";
  private static final String JSON_SUFFIX = "json";

   * Options supported by the pipeline.
   * <p>Inherits standard configuration options.
  public interface Options extends PipelineOptions, StreamingOptions {
        order = 1,
        groupName = "Source",
        description = "File location for Datastream file input in Cloud Storage.",
        helpText =
            "The file location for the Datastream files in Cloud Storage to replicate. This file location is typically the root path for the stream.")
    String getInputFilePattern();

    void setInputFilePattern(String value);

        order = 2,
        optional = true,
        description = "The Pub/Sub subscription being used in a Cloud Storage notification policy.",
        helpText =
            "The Pub/Sub subscription with Datastream file notifications."
                + " For example, `projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>`.")
    String getGcsPubSubSubscription();

    void setGcsPubSubSubscription(String value);

        order = 3,
        enumOptions = {@TemplateEnumOption("avro"), @TemplateEnumOption("json")},
        optional = true,
        description = "Datastream output file format (avro/json).",
        helpText =
            "The format of the output file produced by Datastream. For example, `avro` or `json`. Defaults to `avro`.")
    String getInputFileFormat();

    void setInputFileFormat(String value);

        order = 4,
        groupName = "Source",
        optional = true,
        description = "Name or template for the stream to poll for schema information.",
        helpText =
            "The name or template for the stream to poll for schema information. The default value is `{_metadata_stream}`.")
    String getStreamName();

    void setStreamName(String value);

        order = 5,
        optional = true,
        description =
            "The starting DateTime used to fetch from Cloud Storage "
                + "(https://tools.ietf.org/html/rfc3339).",
        helpText =
            "The starting DateTime used to fetch from Cloud Storage "
                + "(https://tools.ietf.org/html/rfc3339).")
    String getRfcStartDateTime();

    void setRfcStartDateTime(String value);

    // DataStream API Root Url (only used for testing)
        order = 6,
        optional = true,
        description = "Datastream API Root URL (only required for testing)",
        helpText = "Datastream API Root URL")
    String getDataStreamRootUrl();

    void setDataStreamRootUrl(String value);

    // SQL Connection Parameters
        order = 7,
        optional = true,
        enumOptions = {@TemplateEnumOption("postgres"), @TemplateEnumOption("mysql")},
        description = "SQL Database Type (postgres or mysql).",
        helpText = "The database type to write to (for example, Postgres).")
    String getDatabaseType();

    void setDatabaseType(String value);

        order = 8,
        groupName = "Target",
        description = "Database Host to connect on.",
        helpText = "The SQL host to connect on.")
    String getDatabaseHost();

    void setDatabaseHost(String value);

        order = 9,
        groupName = "Target",
        optional = true,
        description = "Database Port to connect on.",
        helpText = "The SQL database port to connect to. The default value is `5432`.")
    String getDatabasePort();

    void setDatabasePort(String value);

        order = 10,
        description = "Database User to connect with.",
        helpText =
            "The SQL user with all required permissions to write to all tables in replication.")
    String getDatabaseUser();

    void setDatabaseUser(String value);

        order = 11,
        description = "Database Password for given user.",
        helpText = "The password for the SQL user.")
    String getDatabasePassword();

    void setDatabasePassword(String value);

        order = 12,
        groupName = "Target",
        optional = true,
        description = "SQL Database Name.",
        helpText = "The name of the SQL database to connect to. The default value is `postgres`.")
    String getDatabaseName();

    void setDatabaseName(String value);

        order = 13,
        optional = true,
        description = "A map of key/values used to dictate schema name changes",
        helpText =
            "A map of key/values used to dictate schema name changes (ie."
                + " old_name:new_name,CaseError:case_error)")
    String getSchemaMap();

    void setSchemaMap(String value);

        order = 14,
        groupName = "Target",
        optional = true,
        description = "Custom connection string.",
        helpText =
            "Optional connection string which will be used instead of the default database string.")
    String getCustomConnectionString();

    void setCustomConnectionString(String value);

   * Main entry point for executing the pipeline.
   * @param args The command-line arguments to the pipeline.
  public static void main(String[] args) {

    LOG.info("Starting Datastream to SQL");

    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);


   * Build the DataSourceConfiguration for the target SQL database. Using the pipeline options,
   * determine the database type and create the correct jdbc connection for the requested DB.
   * @param options The execution parameters to the pipeline.
  public static CdcJdbcIO.DataSourceConfiguration getDataSourceConfiguration(Options options) {
    String jdbcDriverName;
    String jdbcDriverConnectionString;

    switch (options.getDatabaseType()) {
      case "postgres":
        jdbcDriverName = "org.postgresql.Driver";
        jdbcDriverConnectionString =
                options.getDatabaseHost(), options.getDatabasePort(), options.getDatabaseName());
      case "mysql":
        jdbcDriverName = "com.mysql.cj.jdbc.Driver";
        jdbcDriverConnectionString =
                options.getDatabaseHost(), options.getDatabasePort(), options.getDatabaseName());
        throw new IllegalArgumentException(
            String.format("Database Type %s is not supported.", options.getDatabaseType()));
    if (!options.getCustomConnectionString().isEmpty()) {
      jdbcDriverConnectionString = options.getCustomConnectionString();

    CdcJdbcIO.DataSourceConfiguration dataSourceConfiguration =
        CdcJdbcIO.DataSourceConfiguration.create(jdbcDriverName, jdbcDriverConnectionString)
            .withMaxIdleConnections(new Integer(0));

    return dataSourceConfiguration;

   * Validate the options supplied match expected values. We will also validate that connectivity is
   * working correctly for the target SQL database.
   * @param options The execution parameters to the pipeline.
   * @param dataSourceConfiguration The JDBC datasource configuration.
  public static void validateOptions(
      Options options, CdcJdbcIO.DataSourceConfiguration dataSourceConfiguration) {
    try {
      if (options.getDatabaseHost() != null) {
    } catch (SQLException e) {
      throw new IllegalArgumentException(e);

  /** Parse the SchemaMap config which allows key:value pairs of column naming configs. */
  public static Map<String, String> parseSchemaMap(String schemaMapString) {
    if (schemaMapString == null || schemaMapString.equals("")) {
      return new HashMap<>();

    return Splitter.on(",").withKeyValueSeparator(":").split(schemaMapString);

   * Runs the pipeline with the supplied options.
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
  public static PipelineResult run(Options options) {
     * Stages:
     *   1) Ingest and Normalize Data to FailsafeElement with JSON Strings
     *   2) Write JSON Strings to SQL DML Objects
     *   3) Filter stale rows using stateful PK transform
     *   4) Write DML statements to SQL Database via jdbc

    Pipeline pipeline = Pipeline.create(options);

    CdcJdbcIO.DataSourceConfiguration dataSourceConfiguration = getDataSourceConfiguration(options);
    validateOptions(options, dataSourceConfiguration);
    Map<String, String> schemaMap = parseSchemaMap(options.getSchemaMap());
    LOG.info("Parsed schema map: {}", schemaMap);

     * Stage 1: Ingest and Normalize Data to FailsafeElement with JSON Strings
     *   a) Read DataStream data from GCS into JSON String FailsafeElements (datastreamJsonRecords)
    PCollection<FailsafeElement<String, String>> datastreamJsonRecords =
            new DataStreamIO(
                .withRenameColumnValue("_metadata_row_id", "rowid")

     * Stage 2: Write JSON Strings to SQL Insert Strings
     *   a) Convert JSON String FailsafeElements to TableRow's (tableRowRecords)
     * Stage 3) Filter stale rows using stateful PK transform
    PCollection<KV<String, DmlInfo>> dmlStatements =
            .apply("Format to DML", CreateDml.of(dataSourceConfiguration).withSchemaMap(schemaMap))
            .apply("DML Stateful Processing", ProcessDml.statefulOrderByPK());

     * Stage 4: Write Inserts to CloudSQL
        "Write to SQL",
        CdcJdbcIO.<KV<String, DmlInfo>>write()
                new CdcJdbcIO.StatementFormatter<KV<String, DmlInfo>>() {
                  public String formatStatement(KV<String, DmlInfo> element) {
                    LOG.debug("Executing SQL: {}", element.getValue().getDmlSql());
                    return element.getValue().getDmlSql();

    // Execute the pipeline and return the result.
    return pipeline.run();

