Template Datastream ke MySQL atau PostgreSQL (Stream)

Template Datastream ke SQL adalah pipeline streaming yang membaca data Datastream dan mereplikasinya ke database MySQL atau PostgreSQL. Template ini membaca data dari Cloud Storage menggunakan notifikasi Pub/Sub dan mereplikasi data ini ke dalam tabel replika SQL.

Template ini tidak mendukung bahasa definisi data (DDL) dan mengharapkan semua tabel sudah ada di database. Replikasi menggunakan transformasi stateful Dataflow untuk memfilter data yang sudah tidak berlaku dan memastikan konsistensi dalam data yang tidak berurutan. Misalnya, jika versi baris yang lebih baru telah diproses, versi baris yang terlambat akan diabaikan. Bahasa manipulasi data (DML) yang dieksekusi adalah upaya terbaik untuk mereplikasi data sumber ke target dengan sempurna. Pernyataan DML yang dieksekusi mengikuti aturan berikut:

  • Jika kunci utama ada, operasi penyisipan dan pembaruan menggunakan sintaksis upsert (yaitu. INSERT INTO table VALUES (...) ON CONFLICT (...) DO UPDATE).
  • Jika kunci utama ada, penghapusan akan direplikasi sebagai DML penghapusan.
  • Jika tidak ada kunci utama, operasi penyisipan dan pembaruan akan disisipkan ke dalam tabel.
  • Jika tidak ada kunci utama, penghapusan akan diabaikan.

Jika Anda menggunakan utilitas Oracle ke Postgres, tambahkan ROWID di SQL sebagai kunci utama jika tidak ada.

Persyaratan pipeline

  • Aliran Datastream yang siap atau sudah mereplikasi data.
  • Notifikasi Pub/Sub Cloud Storage diaktifkan untuk data Datastream.
  • Database PostgreSQL diisi dengan skema yang diperlukan.
  • Akses jaringan antara pekerja Dataflow dan PostgreSQL disiapkan.

Parameter template

Parameter yang diperlukan

  • inputFilePattern : Lokasi file untuk file Datastream di Cloud Storage yang akan direplikasi. Lokasi file ini biasanya adalah jalur root untuk streaming.
  • databaseHost : Host SQL yang akan dihubungkan.
  • databaseUser : Pengguna SQL dengan semua izin yang diperlukan untuk menulis ke semua tabel dalam replikasi.
  • databasePassword : Sandi untuk pengguna SQL.

Parameter opsional

  • gcsPubSubSubscription : Langganan Pub/Sub dengan notifikasi file Datastream. Contoh, projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_ID>.
  • inputFileFormat : Format file output yang dihasilkan oleh Datastream. Misalnya avro atau json. Setelan defaultnya adalah avro.
  • streamName : Nama atau template untuk aliran data yang akan melakukan polling untuk informasi skema. Nilai defaultnya adalah {_metadata_stream}.
  • rfcStartDateTime : DateTime awal yang digunakan untuk mengambil dari Cloud Storage (https://tools.ietf.org/html/rfc3339). Setelan defaultnya adalah: 1970-01-01T00:00:00.00Z.
  • dataStreamRootUrl : URL Root Datastream API. Secara default ditetapkan ke: https://datastream.googleapis.com/.
  • databaseType : Jenis database yang akan ditulis (misalnya, Postgres). Defaultnya adalah: postgres.
  • databasePort : Port database SQL yang akan dihubungkan. Nilai defaultnya adalah 5432.
  • databaseName : Nama database SQL yang akan dihubungkan. Nilai defaultnya adalah postgres.
  • schemaMap : Peta kunci/nilai yang digunakan untuk menentukan perubahan nama skema (yaitu old_name:new_name,CaseError:case_error). Default-nya adalah kosong.
  • customConnectionString : String koneksi opsional yang akan digunakan, bukan string database default.

Menjalankan template

Konsol

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Cloud Datastream to SQL template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

gcloud

Di shell atau terminal, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --enable-streaming-engine \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_SQL \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
gcsPubSubSubscription=GCS_SUBSCRIPTION_NAME,\
databaseHost=DATABASE_HOST,\
databaseUser=DATABASE_USER,\
databasePassword=DATABASE_PASSWORD
  

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: jalur Cloud Storage ke data Datastream. Contoh: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: langganan Pub/Sub untuk membaca file yang diubah. Contoh: projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: IP host SQL Anda.
  • DATABASE_USER: pengguna SQL Anda.
  • DATABASE_PASSWORD: sandi SQL Anda.

API

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {

          "inputFilePattern": "GCS_FILE_PATH",
          "gcsPubSubSubscription": "GCS_SUBSCRIPTION_NAME",
          "databaseHost": "DATABASE_HOST",
          "databaseUser": "DATABASE_USER",
          "databasePassword": "DATABASE_PASSWORD"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Cloud_Datastream_to_SQL",
   }
}
  

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • GCS_FILE_PATH: jalur Cloud Storage ke data Datastream. Contoh: gs://bucket/path/to/data/
  • GCS_SUBSCRIPTION_NAME: langganan Pub/Sub untuk membaca file yang diubah. Contoh: projects/my-project-id/subscriptions/my-subscription-id.
  • DATABASE_HOST: IP host SQL Anda.
  • DATABASE_USER: pengguna SQL Anda.
  • DATABASE_PASSWORD: sandi SQL Anda.

Langkah selanjutnya