Di halaman ini, Anda akan mempelajari cara menggunakan Datastream API untuk:
- Membuat aliran data
- Mendapatkan informasi tentang aliran data dan objek aliran data
- Memperbarui streaming dengan memulai, menjeda, melanjutkan, dan mengubahnya, serta dengan memulai dan menghentikan pengisian ulang untuk objek streaming
- Memulihkan streaming yang gagal secara permanen
- Mengaktifkan streaming objek besar untuk streaming Oracle
- Menghapus aliran data
Ada dua cara untuk menggunakan Datastream API. Anda dapat melakukan panggilan REST API atau menggunakan Google Cloud CLI (CLI).
Untuk informasi umum tentang penggunaan gcloud
untuk mengelola aliran Datastream, lihat aliran Datastream gcloud.
Membuat stream
Di bagian ini, Anda akan mempelajari cara membuat streaming yang digunakan untuk mentransfer data dari sumber ke tujuan. Contoh berikut tidak komprehensif, tetapi menyoroti fitur tertentu dari Datastream. Untuk mengatasi kasus penggunaan spesifik Anda, gunakan contoh ini bersama dengan dokumentasi referensi API Datastream.
Bagian ini membahas kasus penggunaan berikut:
- Menstreaming dari Oracle ke Cloud Storage
- Mengalirkan data dari MySQL ke BigQuery
- Mengalirkan data dari PostgreSQL ke BigQuery
- Menentukan kumpulan objek yang akan disertakan dalam aliran data
- Mengisi ulang semua objek yang disertakan dalam streaming
- Kecualikan objek dari aliran data
- Kecualikan objek dari pengisian ulang
- Menentukan CMEK untuk mengenkripsi data dalam penyimpanan
- Menentukan mode tulis untuk streaming
Contoh 1: Menstreaming objek tertentu ke BigQuery
Dalam contoh ini, Anda akan mempelajari cara:
- Streaming dari MySQL ke BigQuery
- Menyertakan kumpulan objek dalam streaming
- Menentukan mode tulis untuk aliran data sebagai append-only
- Mengisi ulang semua objek yang disertakan dalam streaming
Berikut adalah permintaan untuk mengambil semua tabel dari schema1
dan dua tabel khusus
dari schema2
: tableA
dan tableC
. Peristiwa ditulis ke set data di BigQuery.
Permintaan tidak menyertakan parameter customerManagedEncryptionKey
, sehingga
Google Cloud sistem pengelolaan kunci internal digunakan untuk mengenkripsi data Anda,
bukan CMEK.
Parameter backfillAll
yang terkait dengan melakukan pengisian ulang historis (atau snapshot) ditetapkan ke kamus kosong ({}
), yang berarti bahwa Datastream mengisi ulang data historis dari semua tabel yang disertakan dalam aliran data.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream { "displayName": "MySQL CDC to BigQuery", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "schema1" }, { "database": "schema2", "mysqlTables": [ { "table": "tableA", "table": "tableC" } ] } ] }, } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } }, "dataFreshness": "900s" } }, "backfillAll": {} }
gcloud
Untuk informasi selengkapnya tentang cara menggunakan gcloud
untuk membuat streaming, lihat dokumentasi Google Cloud SDK.
Contoh 2: Mengecualikan objek tertentu dari aliran data dengan sumber PostgreSQL
Dalam contoh ini, Anda akan mempelajari cara:
- Streaming dari PostgreSQL ke BigQuery
- Mengecualikan objek dari streaming
- Mengecualikan objek dari pengisian ulang
Kode berikut menunjukkan permintaan untuk membuat aliran yang digunakan untuk mentransfer data dari database PostgreSQL sumber ke BigQuery. Saat membuat aliran data dari database PostgreSQL sumber, Anda perlu menentukan dua kolom tambahan khusus PostgreSQL dalam permintaan:
replicationSlot
: slot replikasi adalah prasyarat untuk mengonfigurasi database PostgreSQL untuk replikasi. Anda perlu membuat slot replikasi untuk setiap aliran data.publication
: publikasi adalah grup tabel yang perubahannya ingin Anda replikasi. Nama publikasi harus ada di database sebelum memulai streaming. Setidaknya, publikasi harus menyertakan tabel yang ditentukan dalam daftarincludeObjects
aliran data.
Parameter backfillAll
yang terkait dengan melakukan pengisian ulang historis (atau snapshot) ditetapkan untuk mengecualikan satu tabel.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myPostgresStream { "displayName": "PostgreSQL to BigQueryCloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp", "postgresqlSourceConfig": { "replicationSlot": "replicationSlot1", "publication": "publicationA", "includeObjects": { "postgresqlSchemas": { "schema": "schema1" } }, "excludeObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA", "postgresqlColumns": [ { "column": "column5" } ] } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "dataFreshness": "900s", "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } } } }, "backfillAll": { "postgresqlExcludedObjects": { "postgresqlSchemas": [ { "schema": "schema1", "postgresqlTables": [ { "table": "tableA" } ] } ] } } }
gcloud
Untuk informasi selengkapnya tentang cara menggunakan gcloud
untuk membuat streaming, lihat dokumentasi Google Cloud SDK.
Contoh 3: Menentukan mode tulis hanya tambahkan untuk aliran data
Saat melakukan streaming ke BigQuery, Anda dapat menentukan mode tulis: merge
atau
appendOnly
. Untuk informasi selengkapnya, lihat Mengonfigurasi mode tulis.
Jika Anda tidak menentukan mode tulis dalam permintaan untuk membuat streaming, mode
merge
default akan digunakan.
Permintaan berikut menunjukkan cara menentukan mode appendOnly
saat Anda membuat streaming MySQL ke BigQuery.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=appendOnlyStream { "displayName": "My append-only stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp" , "mysqlSourceConfig": { "includeObjects": { "mysqlDatabases": [ { "database": "myMySqlDb" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "BigQueryCp", "bigqueryDestinationConfig": { "sourceHierarchyDatasets": { "datasetTemplate": { "location": "us", "datasetIdPrefix": "prefix_" } }, "appendOnly": {} } }, "backfillAll": {} }
gcloud
Untuk informasi selengkapnya tentang cara menggunakan gcloud
untuk membuat streaming, lihat dokumentasi Google Cloud SDK.
Contoh 4: Menstreaming ke tujuan Cloud Storage
Dalam contoh ini, Anda akan mempelajari cara:
- Streaming dari Oracle ke Cloud Storage
- Menentukan kumpulan objek yang akan disertakan dalam aliran data
- Menentukan CMEK untuk mengenkripsi data dalam penyimpanan
Permintaan berikut menunjukkan cara membuat aliran yang menulis peristiwa ke bucket di Cloud Storage.
Dalam contoh permintaan ini, peristiwa ditulis dalam format output JSON, dan file baru dibuat setiap 100 MB atau 30 detik (mengganti nilai default 50 MB dan 60 detik).
Untuk format JSON, Anda dapat:
Sertakan file skema jenis terpadu di jalur. Akibatnya, Datastream menulis dua file ke Cloud Storage: file data JSON dan file skema Avro. File skema memiliki nama yang sama dengan file data, dengan ekstensi
.schema
.Aktifkan kompresi gzip agar Datastream mengompresi file yang ditulis ke Cloud Storage.
Dengan menggunakan parameter backfillNone
, permintaan menentukan bahwa hanya perubahan yang sedang berlangsung yang di-streaming ke tujuan, tanpa pengisian ulang.
Permintaan menentukan parameter kunci enkripsi yang dikelola pelanggan yang memungkinkan Anda mengontrol kunci yang digunakan untuk mengenkripsi data dalam penyimpanan dalam project Google Cloud . Parameter ini mengacu pada CMEK yang digunakan Datastream untuk mengenkripsi data yang di-streaming dari sumber ke tujuan. File ini juga menentukan key ring untuk CMEK Anda.
Untuk informasi selengkapnya tentang key ring, lihat resource Cloud KMS. Untuk informasi selengkapnya tentang cara melindungi data Anda menggunakan kunci enkripsi, lihat Cloud Key Management Service (KMS).
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleCdcStream { "displayName": "Oracle CDC to Cloud Storage", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/ connectionProfiles/OracleCp", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "GcsBucketCp", "gcsDestinationConfig": { "path": "/folder1", "jsonFileFormat": { "schemaFileFormat": "AVRO_SCHEMA_FILE" }, "fileRotationMb": 100, "fileRotationInterval": 30 } }, "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillNone": {} }
gcloud
Untuk informasi selengkapnya tentang cara menggunakan gcloud
untuk membuat streaming, lihat dokumentasi Google Cloud SDK.
Memvalidasi definisi aliran data
Sebelum membuat aliran data, Anda dapat memvalidasi definisinya. Dengan cara ini, Anda dapat memastikan bahwa semua pemeriksaan validasi lulus, dan streaming akan berhasil berjalan saat dibuat.
Memvalidasi streaming akan memeriksa:
- Apakah sumber dikonfigurasi dengan benar untuk memungkinkan Datastream melakukan streaming data darinya.
- Apakah streaming dapat terhubung ke sumber dan tujuan.
- Konfigurasi menyeluruh streaming.
Untuk memvalidasi streaming, tambahkan &validate_only=true
ke URL sebelum isi permintaan Anda:
POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"
Setelah membuat permintaan ini, Anda akan melihat pemeriksaan validasi yang dijalankan Datastream untuk sumber dan tujuan, beserta apakah pemeriksaan tersebut lulus atau gagal. Untuk pemeriksaan validasi yang tidak lulus, informasi akan muncul tentang alasan kegagalan dan tindakan yang harus dilakukan untuk memperbaiki masalah.
Misalnya, Anda memiliki kunci enkripsi yang dikelola pelanggan (CMEK) yang ingin digunakan Datastream untuk mengenkripsi data yang di-streaming dari sumber ke tujuan. Sebagai bagian dari memvalidasi aliran data, Datastream akan memverifikasi bahwa kunci ada, dan bahwa Datastream memiliki izin untuk menggunakan kunci tersebut. Jika salah satu kondisi ini tidak terpenuhi, saat Anda memvalidasi streaming, pesan error berikut akan ditampilkan:
CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS
Untuk mengatasi masalah ini, pastikan kunci yang Anda berikan ada, dan akun layanan Datastream memiliki izin cloudkms.cryptoKeys.get
untuk kunci tersebut.
Setelah melakukan koreksi yang sesuai, buat permintaan lagi untuk memastikan semua pemeriksaan validasi lulus. Untuk contoh di atas, pemeriksaan CMEK_VALIDATE_PERMISSIONS
tidak akan lagi menampilkan pesan error, tetapi akan memiliki status PASSED
.
Mendapatkan informasi tentang streaming
Kode berikut menunjukkan permintaan untuk mengambil informasi tentang streaming. Informasi ini mencakup:
- Nama aliran data (ID unik)
- Nama yang mudah digunakan untuk streaming (nama tampilan)
- Stempel waktu saat streaming dibuat dan terakhir diperbarui
- Informasi tentang profil koneksi sumber dan tujuan yang terkait dengan aliran data
- Status aliran data
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID
Respons akan muncul, seperti berikut:
{ "name": "myOracleCdcStream", "displayName": "Oracle CDC to Cloud Storage", "createTime": "2019-12-15T15:01:23.045123456Z", "updateTime": "2019-12-15T15:01:23.045123456Z", "sourceConfig": { "sourceConnectionProfileName": "myOracleDb", "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1" }, { "schema": "schema3", "oracleTables": [ { "table": "tableA" }, { "table": "tableC" } ] } ] } } }, "destinationConfig": { "destinationConnectionProfileName": "myGcsBucket", "gcsDestinationConfig": { "path": "/folder1", "avroFileFormat": {}, "fileRotationMb": 100, "fileRotationInterval": 60 } }, "state": "RUNNING" "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/ keyRings/myRing/cryptoKeys/myEncryptionKey", "backfillAll": {} }
gcloud
Untuk informasi selengkapnya tentang cara menggunakan gcloud
untuk mengambil informasi tentang streaming Anda, klik di sini.
Mencantumkan aliran data
Kode berikut menunjukkan permintaan untuk mengambil daftar semua aliran di project dan lokasi yang ditentukan.
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams
gcloud
Untuk informasi selengkapnya tentang cara menggunakan gcloud
untuk mengambil informasi tentang semua streaming Anda, klik di sini.
Mencantumkan objek streaming
Kode berikut menunjukkan permintaan untuk mengambil informasi tentang semua objek streaming.
REST
GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects
gcloud
Untuk informasi selengkapnya tentang cara menggunakan gcloud
untuk mengambil informasi tentang semua objek streaming Anda, klik di sini.
Daftar objek yang ditampilkan mungkin terlihat seperti berikut:
REST
{ "streamObjects": [ { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object1", "displayName": "employees.salaries", "backfillJob": { "state": "ACTIVE", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T12:12:26.344878Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "salaries" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object2", "displayName": "contractors.hours", "sourceObject": { "mysqlIdentifier": { "database": "contractors", "table": "hours" } } }, { "name": "projects/myProjectId1/locations/us-central1/streams/myStream/ objects/object3", "displayName": "employees.departments", "backfillJob": { "state": "COMPLETED", "trigger": "AUTOMATIC", "lastStartTime": "2021-10-18T11:26:12.869880Z", "lastEndTime": "2021-10-18T11:26:28.405653Z" }, "sourceObject": { "mysqlIdentifier": { "database": "employees", "table": "departments" } } } ] }
gcloud
Untuk informasi selengkapnya tentang cara menggunakan gcloud
untuk mencantumkan objek aliran data, klik di sini.
Memulai streaming
Kode berikut menunjukkan permintaan untuk memulai streaming.
Dengan menggunakan parameter updateMask
dalam permintaan, hanya kolom yang Anda tentukan yang harus disertakan dalam isi permintaan. Untuk memulai streaming, ubah nilai di kolom state
dari CREATED
menjadi RUNNING
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
Untuk informasi selengkapnya tentang cara menggunakan gcloud
untuk memulai streaming, klik di sini.
Menjeda streaming
Kode berikut menunjukkan permintaan untuk menjeda streaming yang sedang berjalan.
Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask
adalah kolom state
. Dengan menjeda streaming, Anda mengubah statusnya dari RUNNING
menjadi PAUSED
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "PAUSED" }
gcloud
Untuk informasi selengkapnya tentang cara menggunakan gcloud
untuk menjeda streaming, klik di sini.
Melanjutkan streaming
Kode berikut menunjukkan permintaan untuk melanjutkan streaming yang dijeda.
Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask
adalah kolom state
. Dengan melanjutkan streaming, Anda mengubah statusnya dari PAUSED
menjadi RUNNING
.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=state { "state": "RUNNING" }
gcloud
Untuk informasi selengkapnya tentang cara menggunakan gcloud
untuk melanjutkan streaming, klik di sini.
Memulihkan streaming
Anda dapat memulihkan streaming yang gagal secara permanen menggunakan metode RunStream
. Setiap
jenis database sumber memiliki definisinya sendiri tentang operasi pemulihan streaming
yang memungkinkan. Untuk informasi selengkapnya, lihat Memulihkan streaming.
Memulihkan streaming untuk sumber MySQL atau Oracle
Contoh kode berikut menunjukkan permintaan untuk memulihkan streaming untuk sumber MySQL atau Oracle dari berbagai posisi file log:
REST
Memulihkan streaming dari posisi saat ini. Ini adalah opsi default:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Memulihkan streaming dari posisi berikutnya yang tersedia:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "nextAvailableStartPosition": {} } }
Memulihkan streaming dari posisi terbaru:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "mostRecentStartPosition": {} } }
Memulihkan streaming dari posisi tertentu (MySQL):
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
Ganti kode berikut:
- NAME_OF_THE_LOG_FILE: Nama file log tempat Anda ingin memulihkan streaming
- POSITION: Posisi dalam file log tempat Anda ingin memulihkan streaming. Jika Anda tidak memberikan nilai, Datastream akan memulihkan streaming dari header file.
Contoh:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 4 } } } }
Memulihkan streaming dari posisi tertentu (Oracle):
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }
Contoh:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": 234234 } } } }
Untuk informasi selengkapnya tentang opsi pemulihan yang tersedia, lihat Memulihkan streaming.
gcloud
Memulihkan streaming menggunakan gcloud
tidak didukung.
Memulihkan streaming untuk sumber PostgreSQL
Contoh kode berikut menunjukkan permintaan untuk memulihkan streaming untuk sumber PostgreSQL. Selama pemulihan, aliran data mulai membaca dari nomor urutan log (LSN) pertama di slot replikasi yang dikonfigurasi untuk aliran data.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Contoh:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run
Jika Anda ingin mengubah slot replikasi, perbarui streaming dengan nama slot replikasi baru terlebih dahulu:
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot { "sourceConfig": { "postgresqlSourceConfig": { "replicationSlot": "NEW_REPLICATION_SLOT_NAME" } } }
gcloud
Memulihkan streaming menggunakan gcloud
tidak didukung.
Memulihkan streaming untuk sumber SQL Server
Contoh kode berikut menunjukkan contoh permintaan untuk memulihkan streaming untuk sumber SQL Server.
REST
Memulihkan streaming dari posisi pertama yang tersedia:
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run
Contoh:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/mySqlServerStreamId:run
Memulihkan streaming dari nomor urutan log yang diinginkan:
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "sqlServerLsnPosition": { "lsn": lsn } } } }
Contoh:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/mySqlServerStreamId:run { "cdcStrategy": { "specificStartPosition": { "sqlServerLsnPosition": { "lsn": 0000123C:0000BA78:0004 } } } }
gcloud
Memulihkan streaming menggunakan gcloud
tidak didukung.
Memulai atau melanjutkan streaming dari posisi tertentu
Anda dapat memulai streaming atau melanjutkan streaming yang dijeda dari posisi tertentu untuk sumber MySQL dan Oracle. Hal ini mungkin berguna saat Anda ingin melakukan pengisian ulang menggunakan alat eksternal, atau memulai CDC dari posisi yang Anda tunjukkan. Untuk sumber MySQL, Anda perlu menunjukkan posisi binlog, untuk sumber Oracle, system change number (SCN) dalam file log redo.
Kode berikut menunjukkan permintaan untuk memulai atau melanjutkan streaming yang telah dibuat dari posisi tertentu.
Memulai atau melanjutkan streaming dari posisi binlog tertentu (MySQL):
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "NAME_OF_THE_LOG_FILE" "logPosition": POSITION } } } }
Ganti kode berikut:
- NAME_OF_THE_LOG_FILE: Nama file log tempat Anda ingin memulai streaming.
- POSITION: Posisi dalam file log tempat Anda ingin memulai streaming. Jika Anda tidak memberikan nilai, Datastream akan mulai membaca dari awal file.
Contoh:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "mysqlLogPosition": { "logFile": "binlog.001" "logPosition": 2 } } } }
gcloud
Memulai atau melanjutkan streaming dari posisi tertentu menggunakan gcloud
tidak didukung. Untuk mengetahui informasi tentang cara menggunakan gcloud
untuk memulai atau melanjutkan streaming, lihat dokumentasi Cloud SDK.
Mulai atau lanjutkan streaming dari nomor perubahan sistem tertentu dalam file log redo (Oracle):
REST
POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/ [location]/streams/[stream-id]:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": scn } } } }
Contoh:
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams/myStreamId1:run { "cdcStrategy": { "specificStartPosition": { "oracleScnPosition": { "scn": 123123 } } } }
gcloud
Memulai atau melanjutkan streaming dari posisi tertentu menggunakan gcloud
tidak didukung. Untuk mengetahui informasi tentang cara menggunakan gcloud
untuk memulai streaming, lihat dokumentasi Cloud SDK.
Mengubah streaming
Kode berikut menunjukkan permintaan untuk memperbarui konfigurasi rotasi file streaming untuk memutar file setiap 75 MB atau 45 detik.
Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask
mencakup kolom fileRotationMb
dan fileRotationInterval
, yang masing-masing diwakili oleh tanda destinationConfig.gcsDestinationConfig.fileRotationMb
dan destinationConfig.gcsDestinationConfig.fileRotationInterval
.
REST
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval { "destinationConfig": { "gcsDestinationConfig": { "fileRotationMb": 75, "fileRotationInterval": 45 } } }
Kode berikut menunjukkan permintaan untuk menyertakan file skema Jenis Terpadu di jalur file yang ditulis Datastream ke Cloud Storage. Akibatnya, Datastream akan menulis dua file: file data JSON dan file skema Avro.
Untuk contoh ini, kolom yang ditentukan adalah kolom jsonFileFormat
, yang diwakili oleh flag destinationConfig.gcsDestinationConfig.jsonFileFormat
.
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig. jsonFileFormat { "destinationConfig": { "gcsDestinationConfig": { "jsonFileFormat" { "schemaFileFormat": "AVRO_SCHEMA_FILE" } } } }
Kode berikut menunjukkan permintaan agar Datastream mereplikasi data yang ada, selain perubahan yang sedang berlangsung pada data, dari database sumber ke tujuan.
Bagian oracleExcludedObjects
kode menunjukkan tabel dan skema yang dibatasi agar tidak diisi ulang ke tujuan.
Untuk contoh ini, semua tabel dan skema akan diisi ulang, kecuali tableA di schema3.
PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll { "backfillAll": { "oracleExcludedObjects": { "oracleSchemas": [ { "schema": "schema3", "oracleTables": [ { "table": "tableA" } ] } ] } } }
gcloud
Untuk informasi selengkapnya tentang cara menggunakan gcloud
untuk mengubah streaming, klik di sini.
Memulai pengisian ulang untuk objek streaming
Aliran di Datastream dapat mengisi ulang data historis, serta melakukan streaming perubahan yang sedang berlangsung ke tujuan. Perubahan yang sedang berlangsung akan selalu di-streaming dari sumber ke tujuan. Namun, Anda dapat menentukan apakah data historis akan di-streaming.
Jika Anda ingin data historis di-streaming dari sumber ke tujuan, gunakan parameter backfillAll
.
Datastream juga memungkinkan Anda melakukan streaming data historis hanya untuk tabel database tertentu. Untuk melakukannya, gunakan parameter backfillAll
, dan kecualikan tabel yang data historisnya tidak Anda inginkan.
Jika Anda hanya ingin perubahan yang sedang berlangsung di-streaming ke tujuan, gunakan parameter backfillNone
. Jika kemudian Anda ingin Datastream melakukan streaming snapshot semua data yang ada dari sumber ke tujuan, Anda harus memulai pengisian ulang secara manual untuk objek yang berisi data ini.
Alasan lain untuk memulai pengisian ulang untuk objek adalah jika data tidak sinkron antara sumber dan tujuan. Misalnya, pengguna dapat menghapus data di tujuan secara tidak sengaja, dan data tersebut kini hilang. Dalam hal ini, memulai pengisian ulang untuk objek berfungsi sebagai "mekanisme reset" karena semua data di-streaming ke tujuan dalam satu kali proses. Akibatnya, data disinkronkan antara sumber dan tujuan.
Sebelum dapat memulai pengisian ulang untuk objek aliran data, Anda harus mengambil informasi tentang objek tersebut.
Setiap objek memiliki OBJECT_ID, yang mengidentifikasi objek secara unik. Anda menggunakan OBJECT_ID untuk memulai pengisian ulang untuk streaming.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob
gcloud
Untuk informasi selengkapnya tentang penggunaan gcloud
untuk memulai pengisian ulang objek streaming, lihat dokumentasi Google Cloud SDK.
Menghentikan pengisian ulang untuk objek streaming
Setelah memulai pengisian ulang untuk objek streaming, Anda dapat menghentikan pengisian ulang untuk objek tersebut. Misalnya, jika pengguna mengubah skema database, skema atau data dapat rusak. Anda tidak ingin skema atau data ini di-streaming ke tujuan, sehingga Anda menghentikan pengisian ulang untuk objek.
Anda juga dapat menghentikan pengisian ulang untuk objek untuk tujuan load balancing. Datastream dapat menjalankan beberapa pengisian ulang secara paralel. Hal ini dapat menambah beban pada sumbernya. Jika bebannya signifikan, hentikan pengisian ulang untuk setiap objek, lalu mulai pengisian ulang untuk objek, satu per satu.
Sebelum dapat menghentikan pengisian ulang untuk objek aliran data, Anda harus membuat permintaan untuk mengambil informasi tentang semua objek aliran data. Setiap objek yang ditampilkan memiliki OBJECT_ID, yang mengidentifikasi objek secara unik. Anda menggunakan OBJECT_ID untuk menghentikan pengisian ulang untuk streaming.
REST
POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob
gcloud
Untuk informasi selengkapnya tentang cara menggunakan gcloud
untuk menghentikan pengisian ulang objek streaming, klik di sini.
Mengubah jumlah tugas CDC serentak maksimum
Kode berikut menunjukkan cara menetapkan jumlah tugas pengambilan data perubahan (CDC) serentak maksimum untuk streaming MySQL menjadi 7.
Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask
adalah kolom maxConcurrentCdcTasks
. Dengan menetapkan nilainya ke 7, Anda mengubah jumlah tugas CDC serentak maksimum dari nilai sebelumnya menjadi 7. Anda dapat menggunakan nilai dari 0 hingga 50 (inklusif). Jika Anda tidak menentukan nilai, atau jika Anda menentukannya sebagai 0, default sistem 5 tugas akan ditetapkan untuk streaming.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentCdcTasks": "7" } } }
gcloud
Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud
, klik di sini.
Mengubah jumlah tugas pengisian ulang serentak maksimum
Kode berikut menunjukkan cara menetapkan jumlah tugas pengisian ulang serentak maksimum untuk streaming MySQL menjadi 25.
Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask
adalah kolom maxConcurrentBackfillTasks
. Dengan menetapkan nilainya ke 25, Anda mengubah jumlah tugas pengisian ulang serentak maksimum dari nilai sebelumnya menjadi 25. Anda dapat menggunakan nilai dari 0 hingga 50 (inklusif). Jika Anda tidak menentukan nilai, atau jika Anda menentukannya sebagai 0, default sistem sebesar 16 tugas akan ditetapkan untuk streaming.
REST
PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/ streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks { "sourceConfig": { "mysqlSourceConfig": { "maxConcurrentBackfillTasks": "25" } } }
gcloud
Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud
, klik di sini.
Mengaktifkan streaming objek besar untuk sumber Oracle
Anda dapat mengaktifkan streaming objek besar, seperti objek besar biner (BLOB
),
objek besar karakter (CLOB
), dan objek besar karakter nasional (NCLOB
)
untuk streaming dengan sumber Oracle. Flag streamLargeObjects
memungkinkan Anda menyertakan objek besar baik di streaming baru maupun yang sudah ada. Flag ditetapkan di tingkat aliran,
Anda tidak perlu menentukan kolom jenis data objek besar.
Contoh berikut menunjukkan cara membuat streaming yang memungkinkan Anda melakukan streaming objek besar.
REST
POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/ us-central1/streams?streamId=myOracleLobStream { "displayName": "Oracle LOB stream", "sourceConfig": { "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp" , "oracleSourceConfig": { "includeObjects": { "oracleSchemas": [ { "schema": "schema1", "oracleTables": [ { "table": "tableA", "oracleColumns": [ { "column": "column1,column2" } ] } ] } ] }, "excludeObjects": {}, "streamLargeObjects": {} } } }
gcloud
Untuk informasi selengkapnya tentang cara menggunakan gcloud
untuk memperbarui streaming, lihat dokumentasi Google Cloud SDK.
Menghapus feed
Kode berikut menunjukkan permintaan untuk menghapus streaming.
REST
DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/ LOCATION/streams/STREAM_ID
gcloud
Untuk informasi selengkapnya tentang cara menggunakan gcloud
untuk menghapus streaming, klik di sini.
Langkah selanjutnya
- Pelajari cara menggunakan Datastream API untuk mengelola profil koneksi.
- Pelajari cara menggunakan Datastream API untuk mengelola konfigurasi konektivitas pribadi.
- Untuk informasi selengkapnya tentang penggunaan Datastream API, lihat dokumentasi referensi.