Ringkasan
Di bagian ini, Anda akan mempelajari cara menggunakan Datastream API untuk:
- Buat feed
- Mendapatkan informasi tentang aliran dan objek aliran
- Memperbarui aliran data dengan memulai, menjeda, melanjutkan, dan mengubahnya, serta dengan memulai dan menghentikan pengisian ulang untuk objek aliran data
- Memulihkan streaming yang gagal secara permanen
- Mengaktifkan streaming objek besar untuk streaming Oracle
- Hapus feed
Ada dua cara untuk menggunakan Datastream API. Anda dapat melakukan panggilan REST API atau menggunakan Google Cloud CLI (CLI).
Untuk mengetahui informasi umum tentang penggunaan gcloud
dalam mengelola aliran Datastream, lihat gcloud Datastream stream.
Membuat stream
Di bagian ini, Anda akan mempelajari cara membuat aliran data yang digunakan untuk mentransfer data dari sumber ke tujuan. Contoh berikut ini tidak komprehensif, tetapi menyoroti fitur Datastream yang spesifik. Untuk mengatasi kasus penggunaan tertentu, gunakan contoh ini bersama dengan dokumentasi referensi API Datastream.
Bagian ini membahas kasus penggunaan berikut:
- Streaming dari Oracle ke Cloud Storage
- Streaming dari MySQL ke BigQuery
- Streaming dari PostgreSQL ke BigQuery
- Menentukan sekumpulan objek yang akan disertakan dalam aliran data
- Mengisi ulang semua objek yang disertakan dalam aliran data
- Mengecualikan objek dari aliran data
- Mengecualikan objek dari pengisian ulang
- Menentukan CMEK untuk mengenkripsi data dalam penyimpanan
Contoh 1: Melakukan streaming objek tertentu ke BigQuery
Dalam contoh ini, Anda akan mempelajari cara:
- Melakukan streaming dari MySQL ke BigQuery
- Menyertakan sekumpulan objek dalam feed
- Mengisi ulang semua objek yang disertakan dalam aliran
Berikut adalah permintaan untuk menarik semua tabel dari schema1
dan dua tabel spesifik
dari schema2
: tableA
dan tableC
. Peristiwa ditulis ke set data di BigQuery.
Permintaan ini tidak menyertakan parameter customerManagedEncryptionKey
, sehingga
sistem pengelolaan kunci internal Google Cloud digunakan untuk mengenkripsi data,
bukan CMEK.
Parameter backfillAll
yang terkait dengan melakukan pengisian ulang historis (atau snapshot) ditetapkan ke kamus kosong ({}
), yang berarti bahwa Dataflow mengisi ulang data historis dari semua tabel yang disertakan dalam aliran data.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream { "displayName": "MySQL CDC to BigQuery", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/ streams/mysqlCp", "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "schema1" }, { "database": "schema2", "mysqlTables": [ { "table": "tableA", "table": "tableC" } ] } ] }, } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "dataFreshness": "900s", "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } } } }, "backfillAll": {} }
gcloud
Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud
untuk membuat aliran data, lihat dokumentasi Google Cloud SDK.
Contoh 2: Mengecualikan objek tertentu dari aliran data dengan sumber PostgreSQL
Dalam contoh ini, Anda akan mempelajari cara:
- Melakukan streaming dari PostgreSQL ke BigQuery
- Mengecualikan objek dari aliran
- Mengecualikan objek dari pengisian ulang
Kode berikut menunjukkan permintaan untuk membuat aliran data yang digunakan untuk mentransfer data dari database PostgreSQL sumber ke BigQuery. Saat membuat aliran data dari database PostgreSQL sumber, Anda harus menentukan dua kolom tambahan khusus PostgreSQL dalam permintaan:
replicationSlot
: slot replikasi adalah prasyarat untuk mengonfigurasi database PostgreSQL untuk replikasi. Anda perlu membuat slot replikasi untuk setiap aliran data.publication
: publikasi adalah grup tabel tempat Anda ingin mereplikasi perubahan. Nama publikasi harus ada dalam database sebelum memulai aliran data. Setidaknya, publikasi harus menyertakan tabel yang ditentukan dalam daftarincludeObjects
aliran data.
Parameter backfillAll
yang terkait dengan tindakan pengisian ulang historis (atau snapshot) ditetapkan untuk mengecualikan satu tabel.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myPostgresStream { "displayName": "PostgreSQL to BigQueryCloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp", "postgresqlSourceConfig": { "replicationSlot": "replicationSlot1", "publication": "publicationA", "includeObjects": { "postgresqlSchemas": { "schema": "schema1" } }, "excludeObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA", "postgresqlColumns": [ { "column": "column5" } ] } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "dataFreshness": "900s", "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } } } }, "backfillAll": { "postgresqlExcludedObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA" } ] } ] } } }
gcloud
Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud
untuk membuat aliran data, lihat dokumentasi Google Cloud SDK.
Contoh 3: Melakukan streaming ke tujuan Cloud Storage
Dalam contoh ini, Anda akan mempelajari cara:
- Melakukan streaming dari Oracle ke Cloud Storage
- Menentukan sekumpulan objek yang akan disertakan dalam aliran data
- Menentukan CMEK untuk mengenkripsi data dalam penyimpanan
Permintaan berikut menunjukkan cara membuat aliran data yang menulis peristiwa ke bucket di Cloud Storage.
Dalam contoh permintaan ini, peristiwa ditulis dalam format output JSON, dan file baru dibuat setiap 100 MB atau 30 detik (menggantikan nilai default 50 MB dan 60 detik).
Untuk format JSON, Anda dapat:
Sertakan file skema jenis terpadu di jalur. Akibatnya, Datastream menulis dua file ke Cloud Storage: file data JSON dan file skema Avro. File skema memiliki nama yang sama dengan file data, dengan ekstensi
.schema
.Aktifkan kompresi gzip agar Datastream mengompresi file yang ditulis ke Cloud Storage.
Dengan menggunakan parameter backfillNone
, permintaan tersebut menentukan bahwa hanya perubahan yang sedang berlangsung yang di-streaming ke tujuan, tanpa pengisian ulang.
Permintaan tersebut menentukan parameter kunci enkripsi yang dikelola pelanggan sehingga Anda dapat mengontrol kunci yang digunakan untuk mengenkripsi data dalam penyimpanan dalam project Google Cloud. Parameter ini mengacu pada CMEK yang digunakan Datastream untuk mengenkripsi data yang di-streaming dari sumber ke tujuan. Kode ini juga menentukan key ring untuk CMEK Anda.
Untuk mengetahui informasi selengkapnya tentang key ring, lihat resource Cloud KMS. Untuk mengetahui informasi selengkapnya tentang cara melindungi data menggunakan kunci enkripsi, lihat Cloud Key Management Service (KMS).
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleCdcStream { "displayName": "Oracle CDC to Cloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/ connectionProfiles/OracleCp", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "GcsBucketCp", "gcsDestinationConfig": { "path": "/folder1", "jsonFileFormat": { "schemaFileFormat": "AVRO_SCHEMA_FILE" }, "fileRotationMb": 100, "fileRotationInterval": 30 } }, "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillNone": {} }
gcloud
Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud
untuk membuat aliran data, lihat dokumentasi Google Cloud SDK.
Memvalidasi definisi streaming
Sebelum membuat streaming, Anda dapat memvalidasi definisinya. Dengan demikian, Anda dapat memastikan bahwa semua pemeriksaan validasi lulus, dan streaming akan berhasil berjalan saat dibuat.
Memvalidasi pemeriksaan streaming:
- Apakah sumber dikonfigurasi dengan benar agar Datastream dapat melakukan streaming data darinya.
- Apakah streaming dapat terhubung ke sumber dan tujuan.
- Konfigurasi streaming menyeluruh.
Untuk memvalidasi aliran data, tambahkan &validate_only=true
ke URL sebelum isi permintaan Anda:
POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"
Setelah membuat permintaan ini, Anda akan melihat pemeriksaan validasi bahwa Datastream berjalan untuk sumber dan tujuan Anda, beserta apakah pemeriksaan lulus atau gagal. Untuk setiap pemeriksaan validasi yang tidak lulus, informasi muncul mengapa gagal dan apa yang harus dilakukan untuk memperbaiki masalah.
Misalnya, Anda memiliki kunci enkripsi yang dikelola pelanggan (CMEK) yang Anda inginkan untuk digunakan oleh Datastream untuk mengenkripsi data yang di-streaming dari sumber ke tujuan. Sebagai bagian dari memvalidasi aliran, Datastream akan memverifikasi bahwa kuncinya ada, dan Datastream memiliki izin untuk menggunakan kunci tersebut. Jika salah satu kondisi ini tidak terpenuhi, saat Anda memvalidasi streaming, pesan error berikut akan ditampilkan:
CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS
Untuk mengatasi masalah ini, pastikan kunci yang Anda berikan ada, dan akun layanan Datastream memiliki izin cloudkms.cryptoKeys.get
untuk kunci tersebut.
Setelah melakukan koreksi yang sesuai, buat permintaan lagi untuk memastikan bahwa semua pemeriksaan validasi lulus. Untuk contoh di atas, pemeriksaan CMEK_VALIDATE_PERMISSIONS
tidak akan lagi menampilkan pesan error, tetapi akan memiliki status PASSED
.
Mendapatkan informasi tentang streaming
Kode berikut menunjukkan permintaan untuk mengambil informasi tentang aliran data. Informasi ini mencakup:
- Nama streaming (ID unik)
- Nama yang mudah digunakan untuk streaming (nama tampilan)
- Stempel waktu saat streaming dibuat dan terakhir diperbarui
- Informasi tentang profil koneksi sumber dan tujuan yang terkait dengan aliran data
- Status streaming
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID
Responsnya akan muncul, seperti berikut:
{ "name": "myOracleCdcStream", "displayName": "Oracle CDC to Cloud Storage", "createTime": "2019-12-15T15:01:23.045123456Z", "updateTime": "2019-12-15T15:01:23.045123456Z", "sourceConfig": { "sourceConnectionProfileName": "myOracleDb", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" }, { "schema": "schema3", "oracleTables": [ { "table": "tableA" }, { "table": "tableC" } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "myGcsBucket", "gcsDestinationConfig": { "path": "/folder1", "avroFileFormat": {}, "fileRotationMb": 100, "fileRotationInterval": 60 } }, "state": "RUNNING" "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillAll": {} }
gcloud
Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud
untuk mengambil informasi tentang streaming Anda, klik di sini.
Mencantumkan feed
Kode berikut menampilkan permintaan untuk mengambil daftar semua aliran data dalam project dan lokasi yang ditentukan.
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams
gcloud
Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud
untuk mengambil informasi tentang semua streaming Anda, klik di sini.
Membuat daftar objek aliran data
Kode berikut menunjukkan permintaan untuk mengambil informasi tentang semua objek aliran data.
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects
gcloud
Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud
guna mengambil informasi tentang semua objek aliran data Anda, klik di sini.
Daftar objek yang ditampilkan mungkin terlihat seperti berikut:
REST
{ "streamObjects": [ { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object1", "displayName": "employees.salaries", "backfillJob": { "state": "ACTIVE", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T12:12:26.344878Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "salaries" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object2", "displayName": "contractors.hours", "sourceObject": { "mysqlIdentifier": { "database": "contractors", "table": "hours" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object3", "displayName": "employees.departments", "backfillJob": { "state": "COMPLETED", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T11:26:12.869880Z", "lastEndTime": "2021-10-18T11:26:28.405653Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "departments" } } } ] }
gcloud
Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud
guna mencantumkan objek aliran data, klik di sini.
Mulai streaming
Kode berikut menunjukkan permintaan untuk memulai streaming.
Dengan menggunakan parameter updateMask
dalam permintaan, hanya kolom yang Anda tentukan yang harus disertakan dalam isi permintaan. Untuk memulai aliran data, ubah nilai di kolom state
dari CREATED
menjadi RUNNING
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud
untuk memulai streaming Anda, klik di sini.
Memulai streaming dari posisi tertentu
Anda dapat memulai aliran data dari posisi tertentu untuk sumber MySQL dan Oracle, misalnya, jika Anda ingin melakukan pengisian ulang menggunakan alat eksternal, atau memulai CDC dari file log yang Anda tentukan. Untuk sumber MySQL, Anda harus menentukan posisi binlog, untuk sumber Oracle, nomor perubahan sistem (SCN) dalam file log redo.
Kode berikut menunjukkan permintaan untuk memulai aliran data yang telah dibuat dari posisi tertentu.
Mulai streaming dari posisi binlog tertentu (MySQL):
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
Ganti kode berikut:
- NAME_OF_THE_LOG_FILE: Nama file log tempat Anda ingin memulai aliran data.
- POSITION: Posisi di file log tempat Anda ingin memulai aliran data. Jika Anda tidak memberikan nilai, Datastream akan mulai membaca dari head file.
Contoh:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 2 } } } }
gcloud
Memulai streaming dari posisi tertentu menggunakan gcloud
tidak didukung. Untuk mengetahui informasi tentang cara menggunakan gcloud
guna memulai aliran data, lihat dokumentasi Cloud SDK.
Mulai streaming dari nomor perubahan sistem tertentu dalam file log ulangi (Oracle):
REST
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }Ganti scn dengan nomor perubahan sistem (SCN) di file log pengulangan tempat Anda ingin memulai streaming. Kolom ini wajib diisi.
Contoh:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": 123123 } } } }
gcloud
Memulai streaming dari posisi tertentu menggunakan gcloud
tidak didukung. Untuk mengetahui informasi tentang cara menggunakan gcloud
guna memulai aliran data, lihat dokumentasi Cloud SDK.
Menjeda streaming
Kode berikut menunjukkan permintaan untuk menjeda streaming yang sedang berjalan.
Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask
adalah kolom state
. Dengan menjeda streaming, Anda mengubah statusnya dari RUNNING
menjadi PAUSED
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "PAUSED" }
gcloud
Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud
untuk menjeda streaming, klik di sini.
Melanjutkan streaming
Kode berikut menunjukkan permintaan untuk melanjutkan streaming yang dijeda.
Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask
adalah kolom state
. Dengan melanjutkan streaming, Anda mengubah statusnya dari PAUSED
menjadi RUNNING
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud
untuk melanjutkan streaming, klik di sini.
Memulihkan streaming
Anda dapat memulihkan streaming yang gagal secara permanen untuk sumber MySQL, Oracle, atau PostgreSQL menggunakan metode RunStream
. Setiap jenis database sumber memiliki definisinya sendiri
tentang operasi pemulihan streaming yang dimungkinkan. Untuk mengetahui informasi selengkapnya, lihat
Memulihkan streaming.
Memulihkan streaming untuk sumber MySQL atau Oracle
Contoh kode berikut menunjukkan permintaan untuk memulihkan streaming untuk MySQL atau sumber Oracle dari berbagai posisi file log:
REST
Pulihkan aliran data dari posisi saat ini. Ini adalah opsi default-nya:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Memulihkan streaming dari posisi berikutnya yang tersedia:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "nextAvailableStartPosition": {} } }
Memulihkan streaming dari posisi terbaru:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "mostRecentStartPosition": {} } }
Memulihkan aliran data dari posisi tertentu (MySQL):
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
Ganti kode berikut:
- NAME_OF_THE_LOG_FILE: Nama file log tempat Anda ingin memulihkan aliran data
- POSITION: Posisi dalam file log tempat Anda ingin memulihkan aliran data. Jika Anda tidak memberikan nilai, Datastream akan memulihkan aliran data dari head file.
Contoh:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 4 } } } }
Memulihkan streaming dari posisi tertentu (Oracle):
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }Ganti scn dengan nomor perubahan sistem (SCN) di file log pengulangan yang ingin Anda pulihkan. Kolom ini wajib diisi.
Contoh:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": 234234 } } } }
Untuk informasi selengkapnya tentang opsi pemulihan yang tersedia, lihat Memulihkan streaming.
gcloud
Pemulihan streaming menggunakan gcloud
tidak didukung.
Memulihkan aliran data untuk sumber PostgreSQL
Contoh kode berikut menunjukkan permintaan untuk memulihkan aliran data untuk sumber PostgreSQL. Selama pemulihan, stream mulai membaca dari nomor urut log (LSN) pertama dalam slot replikasi yang dikonfigurasi untuk streaming tersebut.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Contoh:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run
Jika Anda ingin mengubah slot replikasi, perbarui aliran data dengan nama slot replikasi yang baru terlebih dahulu:
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot { "sourceConfig": { "postgresqlSourceConfig": { "replicationSlot": "NEW_REPLICATION_SLOT_NAME" } } }
gcloud
Pemulihan streaming menggunakan gcloud
tidak didukung.
Mengubah streaming
Kode berikut menunjukkan permintaan untuk mengupdate konfigurasi rotasi file streaming guna memutar file setiap 75 MB atau 45 detik.
Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask
menyertakan kolom fileRotationMb
dan fileRotationInterval
, masing-masing yang diwakili oleh flag destinationConfig.gcsDestinationConfig.fileRotationMb
dan destinationConfig.gcsDestinationConfig.fileRotationInterval
.
REST
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval { "destinationConfig": { "gcsDestinationConfig": { "fileRotationMb": 75, "fileRotationInterval": 45 } } }
Kode berikut menunjukkan permintaan untuk menyertakan file skema Jenis Terpadu di jalur file yang ditulis Datastream ke Cloud Storage. Hasilnya, Datastream menulis dua file: file data JSON dan file skema Avro.
Untuk contoh ini, kolom yang ditentukan adalah kolom jsonFileFormat
, yang diwakili oleh tanda destinationConfig.gcsDestinationConfig.jsonFileFormat
.
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. jsonFileFormat { "destinationConfig": { "gcsDestinationConfig": { "jsonFileFormat" { "schemaFileFormat": "AVRO_SCHEMA_FILE" } } } }
Kode berikut menunjukkan permintaan agar Datastream untuk mereplikasi data yang ada, selain perubahan yang sedang berlangsung pada data, dari database sumber ke tujuan.
Bagian oracleExcludedObjects
kode menampilkan tabel dan skema yang dibatasi agar tidak diisi ulang ke tujuan.
Untuk contoh ini, semua tabel dan skema akan diisi ulang, kecuali untuk tabelA di skema3.
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll { "backfillAll": { "oracleExcludedObjects": { "oracleSchemas": [ { "schema": "schema3", "oracleTables": [ { "table": "tableA" } ] } ] } } }
gcloud
Untuk informasi selengkapnya tentang penggunaan gcloud
untuk mengubah streaming Anda, klik di sini.
Memulai pengisian ulang untuk objek aliran data
Aliran data di Datastream dapat mengisi ulang data historis, serta mengalirkan perubahan yang sedang berlangsung ke tujuan. Perubahan yang sedang berlangsung akan selalu di-streaming dari sumber ke tujuan. Namun, Anda dapat menentukan apakah Anda ingin data historis di-streaming.
Jika Anda ingin data historis di-streaming dari sumber ke tujuan, gunakan parameter backfillAll
.
Datastream juga memungkinkan Anda melakukan streaming data historis hanya untuk tabel database tertentu. Untuk melakukannya, gunakan parameter backfillAll
, dan kecualikan tabel yang tidak Anda inginkan data historisnya.
Jika Anda hanya ingin perubahan yang sedang berlangsung yang di-streaming ke tujuan, gunakan parameter backfillNone
. Jika Anda ingin Datastream mengalirkan ringkasan semua data yang ada dari sumber ke tujuan, Anda harus memulai pengisian ulang secara manual untuk objek yang berisi data ini.
Alasan lain untuk memulai pengisian ulang sebuah objek adalah jika data tidak sinkron antara sumber dan tujuan. Misalnya, pengguna dapat menghapus data di tujuan secara tidak sengaja, dan data tersebut kini akan hilang. Dalam hal ini, memulai pengisian ulang untuk objek berfungsi sebagai "mekanisme reset" karena semua data di-streaming ke tujuan dalam satu sesi. Akibatnya, data disinkronkan antara sumber dan tujuan.
Sebelum dapat memulai pengisian ulang untuk objek aliran data, Anda harus mengambil informasi tentang objek.
Setiap objek memiliki OBJECT_ID, yang mengidentifikasi objek secara unik. Anda menggunakan OBJECT_ID untuk memulai pengisian ulang aliran data.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob
gcloud
Untuk informasi selengkapnya tentang penggunaan gcloud
guna pengisian ulang awal untuk objek aliran data Anda, klik di sini.
Menghentikan pengisian ulang untuk objek aliran data
Setelah memulai pengisian ulang untuk objek aliran data, Anda dapat menghentikan pengisian ulang untuk objek tersebut. Misalnya, jika pengguna mengubah skema database, skema atau data tersebut mungkin rusak. Anda tidak ingin skema atau data ini di-streaming ke tujuan, sehingga menghentikan pengisian ulang objek.
Anda juga dapat menghentikan pengisian ulang sebuah objek untuk tujuan load balancing. Datastream dapat menjalankan beberapa pengisian ulang secara paralel. Hal ini dapat menambah beban pada sumber. Jika bebannya signifikan, hentikan pengisian ulang untuk setiap objek, lalu mulai isi ulang objek, satu per satu.
Sebelum dapat menghentikan pengisian ulang objek aliran data, Anda harus membuat permintaan untuk mengambil informasi tentang semua objek aliran data. Setiap objek yang ditampilkan memiliki OBJECT_ID, yang mengidentifikasi objek secara unik. Anda menggunakan OBJECT_ID untuk menghentikan pengisian ulang aliran data.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob
gcloud
Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud
guna menghentikan pengisian ulang untuk objek aliran data Anda, klik di sini.
Mengubah jumlah tugas CDC serentak maksimum
Kode berikut menunjukkan cara menetapkan jumlah maksimum tugas pengambilan data perubahan serentak (CDC) untuk streaming MySQL ke 7.
Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask
adalah kolom maxConcurrentCdcTasks
. Dengan menetapkan nilainya ke 7, Anda mengubah jumlah tugas CDC serentak maksimum dari nilai sebelumnya menjadi 7. Anda dapat menggunakan nilai dari 0 hingga 50 (inklusif). Jika Anda tidak menentukan nilai, atau menetapkannya sebagai 0, default sistem 5 tugas akan ditetapkan untuk aliran data.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentCdcTasks": "7" } } }
gcloud
Untuk informasi selengkapnya tentang penggunaan gcloud
, klik di sini.
Mengubah jumlah tugas pengisian ulang serentak maksimum
Kode berikut menunjukkan cara menetapkan jumlah maksimum tugas isi ulang serentak untuk streaming MySQL ke 25.
Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask
adalah kolom maxConcurrentBackfillTasks
. Dengan menetapkan nilainya ke 25, Anda mengubah jumlah tugas pengisian ulang serentak maksimum dari nilai sebelumnya menjadi 25. Anda dapat menggunakan nilai dari 0 hingga 50 (inklusif). Jika Anda tidak menentukan nilai, atau menetapkannya sebagai 0, default sistem untuk 16 tugas akan ditetapkan untuk aliran data.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/ streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentBackfillTasks": "25" } } }
gcloud
Untuk informasi selengkapnya tentang penggunaan gcloud
, klik di sini.
Mengaktifkan streaming objek besar untuk sumber Oracle
Anda dapat mengaktifkan streaming objek besar, seperti objek besar biner (BLOB
), objek besar karakter (CLOB
), dan objek besar karakter nasional (NCLOB
) untuk streaming dengan sumber Oracle. Flag streamLargeObjects
memungkinkan Anda menyertakan
objek besar dalam streaming baru dan yang sudah ada. Flag ini ditetapkan pada tingkat aliran data, sehingga Anda tidak perlu menentukan kolom jenis data objek besar.
Contoh berikut menunjukkan cara membuat aliran data yang memungkinkan Anda melakukan streaming objek besar.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleLobStream { "displayName": "Oracle LOB stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1", "oracleTables": [ { "table": "tableA", "oracleColumns": [ { "column": "column1,column2" } ] } ] } ] }, "excludeObjects": {}, "streamLargeObjects": {} } } }
gcloud
Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud
guna memperbarui aliran data, lihat dokumentasi Google Cloud SDK.
Menghapus feed
Kode berikut menunjukkan permintaan untuk menghapus streaming.
REST
DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID
gcloud
Untuk informasi selengkapnya tentang penggunaan gcloud
untuk menghapus aliran data Anda, klik di sini.