Mengelola streaming

Di halaman ini, Anda akan mempelajari cara menggunakan Datastream API untuk:

  • Membuat aliran data
  • Mendapatkan informasi tentang aliran data dan objek aliran data
  • Memperbarui streaming dengan memulai, menjeda, melanjutkan, dan mengubahnya, serta dengan memulai dan menghentikan pengisian ulang untuk objek streaming
  • Memulihkan streaming yang gagal secara permanen
  • Mengaktifkan streaming objek besar untuk streaming Oracle
  • Menghapus aliran data

Ada dua cara untuk menggunakan Datastream API. Anda dapat melakukan panggilan REST API atau menggunakan Google Cloud CLI (CLI).

Untuk informasi umum tentang penggunaan gcloud untuk mengelola aliran Datastream, lihat aliran Datastream gcloud.

Membuat stream

Di bagian ini, Anda akan mempelajari cara membuat streaming yang digunakan untuk mentransfer data dari sumber ke tujuan. Contoh berikut tidak komprehensif, tetapi menyoroti fitur tertentu dari Datastream. Untuk mengatasi kasus penggunaan spesifik Anda, gunakan contoh ini bersama dengan dokumentasi referensi API Datastream.

Bagian ini membahas kasus penggunaan berikut:

Contoh 1: Menstreaming objek tertentu ke BigQuery

Dalam contoh ini, Anda akan mempelajari cara:

  • Streaming dari MySQL ke BigQuery
  • Menyertakan kumpulan objek dalam streaming
  • Menentukan mode tulis untuk aliran data sebagai append-only
  • Mengisi ulang semua objek yang disertakan dalam streaming

Berikut adalah permintaan untuk mengambil semua tabel dari schema1 dan dua tabel khusus dari schema2: tableA dan tableC. Peristiwa ditulis ke set data di BigQuery.

Permintaan tidak menyertakan parameter customerManagedEncryptionKey, sehingga Google Cloud sistem pengelolaan kunci internal digunakan untuk mengenkripsi data Anda, bukan CMEK.

Parameter backfillAll yang terkait dengan melakukan pengisian ulang historis (atau snapshot) ditetapkan ke kamus kosong ({}), yang berarti bahwa Datastream mengisi ulang data historis dari semua tabel yang disertakan dalam aliran data.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream
{
  "displayName": "MySQL CDC to BigQuery",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "schema1" },
          {
            "database": "schema2",
            "mysqlTables": [
              {
                "table": "tableA",
                "table": "tableC"
              }
            ]
          }
        ]
      },
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "dataFreshness": "900s"
    }
  },
  "backfillAll": {}
}

gcloud

Untuk informasi selengkapnya tentang cara menggunakan gcloud untuk membuat streaming, lihat dokumentasi Google Cloud SDK.

Contoh 2: Mengecualikan objek tertentu dari aliran data dengan sumber PostgreSQL

Dalam contoh ini, Anda akan mempelajari cara:

  • Streaming dari PostgreSQL ke BigQuery
  • Mengecualikan objek dari streaming
  • Mengecualikan objek dari pengisian ulang

Kode berikut menunjukkan permintaan untuk membuat aliran yang digunakan untuk mentransfer data dari database PostgreSQL sumber ke BigQuery. Saat membuat aliran data dari database PostgreSQL sumber, Anda perlu menentukan dua kolom tambahan khusus PostgreSQL dalam permintaan:

  • replicationSlot: slot replikasi adalah prasyarat untuk mengonfigurasi database PostgreSQL untuk replikasi. Anda perlu membuat slot replikasi untuk setiap aliran data.
  • publication: publikasi adalah grup tabel yang perubahannya ingin Anda replikasi. Nama publikasi harus ada di database sebelum memulai streaming. Setidaknya, publikasi harus menyertakan tabel yang ditentukan dalam daftar includeObjects aliran data.

Parameter backfillAll yang terkait dengan melakukan pengisian ulang historis (atau snapshot) ditetapkan untuk mengecualikan satu tabel.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myPostgresStream
{
  "displayName": "PostgreSQL to BigQueryCloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp",
    "postgresqlSourceConfig": {
      "replicationSlot": "replicationSlot1",
      "publication": "publicationA",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "schema1"
        }
      },
      "excludeObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
        "postgresqlTables": [
          {
            "table": "tableA",
            "postgresqlColumns": [
              { "column": "column5" }
              ]
              }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
           "location": "us",
           "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {
    "postgresqlExcludedObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
            "postgresqlTables": [
              { "table": "tableA" }
            ]
          }
        ]
      }
    }
  }

gcloud

Untuk informasi selengkapnya tentang cara menggunakan gcloud untuk membuat streaming, lihat dokumentasi Google Cloud SDK.

Contoh 3: Menentukan mode tulis hanya tambahkan untuk aliran data

Saat melakukan streaming ke BigQuery, Anda dapat menentukan mode tulis: merge atau appendOnly. Untuk informasi selengkapnya, lihat Mengonfigurasi mode tulis.

Jika Anda tidak menentukan mode tulis dalam permintaan untuk membuat streaming, mode merge default akan digunakan.

Permintaan berikut menunjukkan cara menentukan mode appendOnly saat Anda membuat streaming MySQL ke BigQuery.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream
{
  "displayName": "My append-only stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "myMySqlDb"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      },
      "appendOnly": {}
    }
  },
  "backfillAll": {}
}

gcloud

Untuk informasi selengkapnya tentang cara menggunakan gcloud untuk membuat streaming, lihat dokumentasi Google Cloud SDK.

Contoh 4: Menstreaming ke tujuan Cloud Storage

Dalam contoh ini, Anda akan mempelajari cara:

  • Streaming dari Oracle ke Cloud Storage
  • Menentukan kumpulan objek yang akan disertakan dalam aliran data
  • Menentukan CMEK untuk mengenkripsi data dalam penyimpanan

Permintaan berikut menunjukkan cara membuat aliran yang menulis peristiwa ke bucket di Cloud Storage.

Dalam contoh permintaan ini, peristiwa ditulis dalam format output JSON, dan file baru dibuat setiap 100 MB atau 30 detik (mengganti nilai default 50 MB dan 60 detik).

Untuk format JSON, Anda dapat:

  • Sertakan file skema jenis terpadu di jalur. Akibatnya, Datastream menulis dua file ke Cloud Storage: file data JSON dan file skema Avro. File skema memiliki nama yang sama dengan file data, dengan ekstensi .schema.

  • Aktifkan kompresi gzip agar Datastream mengompresi file yang ditulis ke Cloud Storage.

Dengan menggunakan parameter backfillNone, permintaan menentukan bahwa hanya perubahan yang sedang berlangsung yang di-streaming ke tujuan, tanpa pengisian ulang.

Permintaan menentukan parameter kunci enkripsi yang dikelola pelanggan yang memungkinkan Anda mengontrol kunci yang digunakan untuk mengenkripsi data dalam penyimpanan dalam project Google Cloud . Parameter ini mengacu pada CMEK yang digunakan Datastream untuk mengenkripsi data yang di-streaming dari sumber ke tujuan. File ini juga menentukan key ring untuk CMEK Anda.

Untuk informasi selengkapnya tentang key ring, lihat resource Cloud KMS. Untuk informasi selengkapnya tentang cara melindungi data Anda menggunakan kunci enkripsi, lihat Cloud Key Management Service (KMS).

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
    connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "GcsBucketCp",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "jsonFileFormat": {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      },
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillNone": {}
}

gcloud

Untuk informasi selengkapnya tentang cara menggunakan gcloud untuk membuat streaming, lihat dokumentasi Google Cloud SDK.

Memvalidasi definisi aliran data

Sebelum membuat aliran data, Anda dapat memvalidasi definisinya. Dengan cara ini, Anda dapat memastikan bahwa semua pemeriksaan validasi lulus, dan streaming akan berhasil berjalan saat dibuat.

Memvalidasi streaming akan memeriksa:

  • Apakah sumber dikonfigurasi dengan benar untuk memungkinkan Datastream melakukan streaming data darinya.
  • Apakah streaming dapat terhubung ke sumber dan tujuan.
  • Konfigurasi menyeluruh streaming.

Untuk memvalidasi streaming, tambahkan &validate_only=true ke URL sebelum isi permintaan Anda:

POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"

Setelah membuat permintaan ini, Anda akan melihat pemeriksaan validasi yang dijalankan Datastream untuk sumber dan tujuan, beserta apakah pemeriksaan tersebut lulus atau gagal. Untuk pemeriksaan validasi yang tidak lulus, informasi akan muncul tentang alasan kegagalan dan tindakan yang harus dilakukan untuk memperbaiki masalah.

Misalnya, Anda memiliki kunci enkripsi yang dikelola pelanggan (CMEK) yang ingin digunakan Datastream untuk mengenkripsi data yang di-streaming dari sumber ke tujuan. Sebagai bagian dari memvalidasi aliran data, Datastream akan memverifikasi bahwa kunci ada, dan bahwa Datastream memiliki izin untuk menggunakan kunci tersebut. Jika salah satu kondisi ini tidak terpenuhi, saat Anda memvalidasi streaming, pesan error berikut akan ditampilkan:

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

Untuk mengatasi masalah ini, pastikan kunci yang Anda berikan ada, dan akun layanan Datastream memiliki izin cloudkms.cryptoKeys.get untuk kunci tersebut.

Setelah melakukan koreksi yang sesuai, buat permintaan lagi untuk memastikan semua pemeriksaan validasi lulus. Untuk contoh di atas, pemeriksaan CMEK_VALIDATE_PERMISSIONS tidak akan lagi menampilkan pesan error, tetapi akan memiliki status PASSED.

Mendapatkan informasi tentang streaming

Kode berikut menunjukkan permintaan untuk mengambil informasi tentang streaming. Informasi ini mencakup:

  • Nama aliran data (ID unik)
  • Nama yang mudah digunakan untuk streaming (nama tampilan)
  • Stempel waktu saat streaming dibuat dan terakhir diperbarui
  • Informasi tentang profil koneksi sumber dan tujuan yang terkait dengan aliran data
  • Status aliran data

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID

Respons akan muncul, seperti berikut:

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          },
          {
            "schema": "schema3",
            "oracleTables": [
              { "table": "tableA" },
              { "table": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": {},
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillAll": {}
}

gcloud

Untuk informasi selengkapnya tentang cara menggunakan gcloud untuk mengambil informasi tentang streaming Anda, klik di sini.

Mencantumkan aliran data

Kode berikut menunjukkan permintaan untuk mengambil daftar semua aliran di project dan lokasi yang ditentukan.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams

gcloud

Untuk informasi selengkapnya tentang cara menggunakan gcloud untuk mengambil informasi tentang semua streaming Anda, klik di sini.

Mencantumkan objek streaming

Kode berikut menunjukkan permintaan untuk mengambil informasi tentang semua objek streaming.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects

gcloud

Untuk informasi selengkapnya tentang cara menggunakan gcloud untuk mengambil informasi tentang semua objek streaming Anda, klik di sini.

Daftar objek yang ditampilkan mungkin terlihat seperti berikut:

REST

{
  "streamObjects": [
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object1",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object2",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object3",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

gcloud

Untuk informasi selengkapnya tentang cara menggunakan gcloud untuk mencantumkan objek aliran data, klik di sini.

Memulai streaming

Kode berikut menunjukkan permintaan untuk memulai streaming.

Dengan menggunakan parameter updateMask dalam permintaan, hanya kolom yang Anda tentukan yang harus disertakan dalam isi permintaan. Untuk memulai streaming, ubah nilai di kolom state dari CREATED menjadi RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Untuk informasi selengkapnya tentang cara menggunakan gcloud untuk memulai streaming, klik di sini.

Menjeda streaming

Kode berikut menunjukkan permintaan untuk menjeda streaming yang sedang berjalan.

Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask adalah kolom state. Dengan menjeda streaming, Anda mengubah statusnya dari RUNNING menjadi PAUSED.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "PAUSED"
}

gcloud

Untuk informasi selengkapnya tentang cara menggunakan gcloud untuk menjeda streaming, klik di sini.

Melanjutkan streaming

Kode berikut menunjukkan permintaan untuk melanjutkan streaming yang dijeda.

Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask adalah kolom state. Dengan melanjutkan streaming, Anda mengubah statusnya dari PAUSED menjadi RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Untuk informasi selengkapnya tentang cara menggunakan gcloud untuk melanjutkan streaming, klik di sini.

Memulihkan streaming

Anda dapat memulihkan streaming yang gagal secara permanen menggunakan metode RunStream. Setiap jenis database sumber memiliki definisinya sendiri tentang operasi pemulihan streaming yang memungkinkan. Untuk informasi selengkapnya, lihat Memulihkan streaming.

Memulihkan streaming untuk sumber MySQL atau Oracle

Contoh kode berikut menunjukkan permintaan untuk memulihkan streaming untuk sumber MySQL atau Oracle dari berbagai posisi file log:

REST

Memulihkan streaming dari posisi saat ini. Ini adalah opsi default:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Memulihkan streaming dari posisi berikutnya yang tersedia:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "nextAvailableStartPosition": {}
  }
}

Memulihkan streaming dari posisi terbaru:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "mostRecentStartPosition": {}
  }
}

Memulihkan streaming dari posisi tertentu (MySQL):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Ganti kode berikut:

  • NAME_OF_THE_LOG_FILE: Nama file log tempat Anda ingin memulihkan streaming
  • POSITION: Posisi dalam file log tempat Anda ingin memulihkan streaming. Jika Anda tidak memberikan nilai, Datastream akan memulihkan streaming dari header file.

Contoh:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 4
      }
    }
  }
}

Memulihkan streaming dari posisi tertentu (Oracle):

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Ganti scn dengan nomor perubahan sistem (SCN) dalam file log redo tempat Anda ingin memulihkan streaming. Kolom ini wajib diisi.

Contoh:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 234234
      }
    }
  }
}

Untuk informasi selengkapnya tentang opsi pemulihan yang tersedia, lihat Memulihkan streaming.

gcloud

Memulihkan streaming menggunakan gcloud tidak didukung.

Memulihkan streaming untuk sumber PostgreSQL

Contoh kode berikut menunjukkan permintaan untuk memulihkan streaming untuk sumber PostgreSQL. Selama pemulihan, aliran data mulai membaca dari nomor urutan log (LSN) pertama di slot replikasi yang dikonfigurasi untuk aliran data.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Contoh:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run

Jika Anda ingin mengubah slot replikasi, perbarui streaming dengan nama slot replikasi baru terlebih dahulu:

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot
{
  "sourceConfig": {
    "postgresqlSourceConfig": {
      "replicationSlot": "NEW_REPLICATION_SLOT_NAME"
    }
  }
}

gcloud

Memulihkan streaming menggunakan gcloud tidak didukung.

Memulihkan streaming untuk sumber SQL Server

Contoh kode berikut menunjukkan contoh permintaan untuk memulihkan streaming untuk sumber SQL Server.

REST

Memulihkan streaming dari posisi pertama yang tersedia:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Contoh:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run

Memulihkan streaming dari nomor urutan log yang diinginkan:

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": lsn
      }
    }
  }
}

Contoh:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/mySqlServerStreamId:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "sqlServerLsnPosition": {
        "lsn": 0000123C:0000BA78:0004
      }
    }
  }
}

gcloud

Memulihkan streaming menggunakan gcloud tidak didukung.

Memulai atau melanjutkan streaming dari posisi tertentu

Anda dapat memulai streaming atau melanjutkan streaming yang dijeda dari posisi tertentu untuk sumber MySQL dan Oracle. Hal ini mungkin berguna saat Anda ingin melakukan pengisian ulang menggunakan alat eksternal, atau memulai CDC dari posisi yang Anda tunjukkan. Untuk sumber MySQL, Anda perlu menunjukkan posisi binlog, untuk sumber Oracle, system change number (SCN) dalam file log redo.

Kode berikut menunjukkan permintaan untuk memulai atau melanjutkan streaming yang telah dibuat dari posisi tertentu.

Memulai atau melanjutkan streaming dari posisi binlog tertentu (MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Ganti kode berikut:

  • NAME_OF_THE_LOG_FILE: Nama file log tempat Anda ingin memulai streaming.
  • POSITION: Posisi dalam file log tempat Anda ingin memulai streaming. Jika Anda tidak memberikan nilai, Datastream akan mulai membaca dari awal file.

Contoh:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 2
      }
    }
  }
}

gcloud

Memulai atau melanjutkan streaming dari posisi tertentu menggunakan gcloud tidak didukung. Untuk mengetahui informasi tentang cara menggunakan gcloud untuk memulai atau melanjutkan streaming, lihat dokumentasi Cloud SDK.

Mulai atau lanjutkan streaming dari nomor perubahan sistem tertentu dalam file log redo (Oracle):

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Ganti scn dengan nomor perubahan sistem (SCN) dalam file log redo tempat Anda ingin memulai streaming. Kolom ini wajib diisi.

Contoh:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 123123
      }
    }
  }
}

gcloud

Memulai atau melanjutkan streaming dari posisi tertentu menggunakan gcloud tidak didukung. Untuk mengetahui informasi tentang cara menggunakan gcloud untuk memulai streaming, lihat dokumentasi Cloud SDK.

Mengubah streaming

Kode berikut menunjukkan permintaan untuk memperbarui konfigurasi rotasi file streaming untuk memutar file setiap 75 MB atau 45 detik.

Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask mencakup kolom fileRotationMb dan fileRotationInterval, yang masing-masing diwakili oleh tanda destinationConfig.gcsDestinationConfig.fileRotationMb dan destinationConfig.gcsDestinationConfig.fileRotationInterval.

REST

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

Kode berikut menunjukkan permintaan untuk menyertakan file skema Jenis Terpadu di jalur file yang ditulis Datastream ke Cloud Storage. Akibatnya, Datastream akan menulis dua file: file data JSON dan file skema Avro.

Untuk contoh ini, kolom yang ditentukan adalah kolom jsonFileFormat, yang diwakili oleh flag destinationConfig.gcsDestinationConfig.jsonFileFormat.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }  
    }
  }
}

Kode berikut menunjukkan permintaan agar Datastream mereplikasi data yang ada, selain perubahan yang sedang berlangsung pada data, dari database sumber ke tujuan.

Bagian oracleExcludedObjects kode menunjukkan tabel dan skema yang dibatasi agar tidak diisi ulang ke tujuan.

Untuk contoh ini, semua tabel dan skema akan diisi ulang, kecuali tableA di schema3.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schema": "schema3",
          "oracleTables": [
            { "table": "tableA" }
          ]
        }
      ]
    }
  }
}  

gcloud

Untuk informasi selengkapnya tentang cara menggunakan gcloud untuk mengubah streaming, klik di sini.

Memulai pengisian ulang untuk objek streaming

Aliran di Datastream dapat mengisi ulang data historis, serta melakukan streaming perubahan yang sedang berlangsung ke tujuan. Perubahan yang sedang berlangsung akan selalu di-streaming dari sumber ke tujuan. Namun, Anda dapat menentukan apakah data historis akan di-streaming.

Jika Anda ingin data historis di-streaming dari sumber ke tujuan, gunakan parameter backfillAll.

Datastream juga memungkinkan Anda melakukan streaming data historis hanya untuk tabel database tertentu. Untuk melakukannya, gunakan parameter backfillAll, dan kecualikan tabel yang data historisnya tidak Anda inginkan.

Jika Anda hanya ingin perubahan yang sedang berlangsung di-streaming ke tujuan, gunakan parameter backfillNone. Jika kemudian Anda ingin Datastream melakukan streaming snapshot semua data yang ada dari sumber ke tujuan, Anda harus memulai pengisian ulang secara manual untuk objek yang berisi data ini.

Alasan lain untuk memulai pengisian ulang untuk objek adalah jika data tidak sinkron antara sumber dan tujuan. Misalnya, pengguna dapat menghapus data di tujuan secara tidak sengaja, dan data tersebut kini hilang. Dalam hal ini, memulai pengisian ulang untuk objek berfungsi sebagai "mekanisme reset" karena semua data di-streaming ke tujuan dalam satu kali proses. Akibatnya, data disinkronkan antara sumber dan tujuan.

Sebelum dapat memulai pengisian ulang untuk objek aliran data, Anda harus mengambil informasi tentang objek tersebut.

Setiap objek memiliki OBJECT_ID, yang mengidentifikasi objek secara unik. Anda menggunakan OBJECT_ID untuk memulai pengisian ulang untuk streaming.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob

gcloud

Untuk informasi selengkapnya tentang penggunaan gcloud untuk memulai pengisian ulang objek streaming, lihat dokumentasi Google Cloud SDK.

Menghentikan pengisian ulang untuk objek streaming

Setelah memulai pengisian ulang untuk objek streaming, Anda dapat menghentikan pengisian ulang untuk objek tersebut. Misalnya, jika pengguna mengubah skema database, skema atau data dapat rusak. Anda tidak ingin skema atau data ini di-streaming ke tujuan, sehingga Anda menghentikan pengisian ulang untuk objek.

Anda juga dapat menghentikan pengisian ulang untuk objek untuk tujuan load balancing. Datastream dapat menjalankan beberapa pengisian ulang secara paralel. Hal ini dapat menambah beban pada sumbernya. Jika bebannya signifikan, hentikan pengisian ulang untuk setiap objek, lalu mulai pengisian ulang untuk objek, satu per satu.

Sebelum dapat menghentikan pengisian ulang untuk objek aliran data, Anda harus membuat permintaan untuk mengambil informasi tentang semua objek aliran data. Setiap objek yang ditampilkan memiliki OBJECT_ID, yang mengidentifikasi objek secara unik. Anda menggunakan OBJECT_ID untuk menghentikan pengisian ulang untuk streaming.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob

gcloud

Untuk informasi selengkapnya tentang cara menggunakan gcloud untuk menghentikan pengisian ulang objek streaming, klik di sini.

Mengubah jumlah tugas CDC serentak maksimum

Kode berikut menunjukkan cara menetapkan jumlah tugas pengambilan data perubahan (CDC) serentak maksimum untuk streaming MySQL menjadi 7.

Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask adalah kolom maxConcurrentCdcTasks. Dengan menetapkan nilainya ke 7, Anda mengubah jumlah tugas CDC serentak maksimum dari nilai sebelumnya menjadi 7. Anda dapat menggunakan nilai dari 0 hingga 50 (inklusif). Jika Anda tidak menentukan nilai, atau jika Anda menentukannya sebagai 0, default sistem 5 tugas akan ditetapkan untuk streaming.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }  
}

gcloud

Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud, klik di sini.

Mengubah jumlah tugas pengisian ulang serentak maksimum

Kode berikut menunjukkan cara menetapkan jumlah tugas pengisian ulang serentak maksimum untuk streaming MySQL menjadi 25.

Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask adalah kolom maxConcurrentBackfillTasks. Dengan menetapkan nilainya ke 25, Anda mengubah jumlah tugas pengisian ulang serentak maksimum dari nilai sebelumnya menjadi 25. Anda dapat menggunakan nilai dari 0 hingga 50 (inklusif). Jika Anda tidak menentukan nilai, atau jika Anda menentukannya sebagai 0, default sistem sebesar 16 tugas akan ditetapkan untuk streaming.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/
streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentBackfillTasks": "25"
      }
    }  
}

gcloud

Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud, klik di sini.

Mengaktifkan streaming objek besar untuk sumber Oracle

Anda dapat mengaktifkan streaming objek besar, seperti objek besar biner (BLOB), objek besar karakter (CLOB), dan objek besar karakter nasional (NCLOB) untuk streaming dengan sumber Oracle. Flag streamLargeObjects memungkinkan Anda menyertakan objek besar baik di streaming baru maupun yang sudah ada. Flag ditetapkan di tingkat aliran, Anda tidak perlu menentukan kolom jenis data objek besar.

Contoh berikut menunjukkan cara membuat streaming yang memungkinkan Anda melakukan streaming objek besar.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleLobStream
{
  "displayName": "Oracle LOB stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1",
            "oracleTables": [
              {
                "table": "tableA",
                "oracleColumns": [
                  {
                    "column": "column1,column2"
                  }
                ]
              }
            ]
          }
        ]
      },
      "excludeObjects": {},
      "streamLargeObjects": {}
    }
  }
}

gcloud

Untuk informasi selengkapnya tentang cara menggunakan gcloud untuk memperbarui streaming, lihat dokumentasi Google Cloud SDK.

Menghapus feed

Kode berikut menunjukkan permintaan untuk menghapus streaming.

REST

DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID

gcloud

Untuk informasi selengkapnya tentang cara menggunakan gcloud untuk menghapus streaming, klik di sini.

Langkah selanjutnya