Kelola streaming

Ringkasan

Di bagian ini, Anda akan mempelajari cara menggunakan Datastream API untuk:

  • Buat feed
  • Mendapatkan informasi tentang aliran dan objek aliran
  • Memperbarui aliran data dengan memulai, menjeda, melanjutkan, dan mengubahnya, serta dengan memulai dan menghentikan pengisian ulang untuk objek aliran data
  • Memulihkan streaming yang gagal secara permanen
  • Mengaktifkan streaming objek besar untuk streaming Oracle
  • Hapus feed

Ada dua cara untuk menggunakan Datastream API. Anda dapat melakukan panggilan REST API atau menggunakan Google Cloud CLI (CLI).

Untuk mengetahui informasi umum tentang penggunaan gcloud dalam mengelola aliran Datastream, lihat gcloud Datastream stream.

Membuat stream

Di bagian ini, Anda akan mempelajari cara membuat aliran data yang digunakan untuk mentransfer data dari sumber ke tujuan. Contoh berikut ini tidak komprehensif, tetapi menyoroti fitur Datastream yang spesifik. Untuk mengatasi kasus penggunaan tertentu, gunakan contoh ini bersama dengan dokumentasi referensi API Datastream.

Bagian ini membahas kasus penggunaan berikut:

Contoh 1: Melakukan streaming objek tertentu ke BigQuery

Dalam contoh ini, Anda akan mempelajari cara:

  • Melakukan streaming dari MySQL ke BigQuery
  • Menyertakan sekumpulan objek dalam feed
  • Mengisi ulang semua objek yang disertakan dalam aliran

Berikut adalah permintaan untuk menarik semua tabel dari schema1 dan dua tabel spesifik dari schema2: tableA dan tableC. Peristiwa ditulis ke set data di BigQuery.

Permintaan ini tidak menyertakan parameter customerManagedEncryptionKey, sehingga sistem pengelolaan kunci internal Google Cloud digunakan untuk mengenkripsi data, bukan CMEK.

Parameter backfillAll yang terkait dengan melakukan pengisian ulang historis (atau snapshot) ditetapkan ke kamus kosong ({}), yang berarti bahwa Dataflow mengisi ulang data historis dari semua tabel yang disertakan dalam aliran data.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams?streamId=mysqlCdcStream
{
  "displayName": "MySQL CDC to BigQuery",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/streams/mysqlCp",
    "mysqlSourceConfig": {
      "includeObjects": {
        "mysqlDatabases": [
          { "database": "schema1" },
          {
            "database": "schema2",
            "mysqlTables": [
              {
                "table": "tableA",
                "table": "tableC"
              }
            ]
          }
        ]
      },
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
          "location": "us",
          "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {}
}

gcloud

Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud untuk membuat aliran data, lihat dokumentasi Google Cloud SDK.

Contoh 2: Mengecualikan objek tertentu dari aliran data dengan sumber PostgreSQL

Dalam contoh ini, Anda akan mempelajari cara:

  • Melakukan streaming dari PostgreSQL ke BigQuery
  • Mengecualikan objek dari aliran
  • Mengecualikan objek dari pengisian ulang

Kode berikut menunjukkan permintaan untuk membuat aliran data yang digunakan untuk mentransfer data dari database PostgreSQL sumber ke BigQuery. Saat membuat aliran data dari database PostgreSQL sumber, Anda harus menentukan dua kolom tambahan khusus PostgreSQL dalam permintaan:

  • replicationSlot: slot replikasi adalah prasyarat untuk mengonfigurasi database PostgreSQL untuk replikasi. Anda perlu membuat slot replikasi untuk setiap aliran data.
  • publication: publikasi adalah grup tabel tempat Anda ingin mereplikasi perubahan. Nama publikasi harus ada dalam database sebelum memulai aliran data. Setidaknya, publikasi harus menyertakan tabel yang ditentukan dalam daftar includeObjects aliran data.

Parameter backfillAll yang terkait dengan tindakan pengisian ulang historis (atau snapshot) ditetapkan untuk mengecualikan satu tabel.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myPostgresStream
{
  "displayName": "PostgreSQL to BigQueryCloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/postgresCp",
    "postgresqlSourceConfig": {
      "replicationSlot": "replicationSlot1",
      "publication": "publicationA",
      "includeObjects": {
        "postgresqlSchemas": {
          "schema": "schema1"
        }
      },
      "excludeObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
        "postgresqlTables": [
          {
            "table": "tableA",
            "postgresqlColumns": [
              { "column": "column5" }
              ]
              }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "BigQueryCp",
    "bigqueryDestinationConfig": {
      "dataFreshness": "900s",
      "sourceHierarchyDatasets": {
        "datasetTemplate": {
           "location": "us",
           "datasetIdPrefix": "prefix_"
        }
      }
    }
  },
  "backfillAll": {
    "postgresqlExcludedObjects": {
        "postgresqlSchemas": [
          { "schema": "schema1",
            "postgresqlTables": [
              { "table": "tableA" }
            ]
          }
        ]
      }
    }
  }

gcloud

Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud untuk membuat aliran data, lihat dokumentasi Google Cloud SDK.

Contoh 3: Melakukan streaming ke tujuan Cloud Storage

Dalam contoh ini, Anda akan mempelajari cara:

  • Melakukan streaming dari Oracle ke Cloud Storage
  • Menentukan sekumpulan objek yang akan disertakan dalam aliran data
  • Menentukan CMEK untuk mengenkripsi data dalam penyimpanan

Permintaan berikut menunjukkan cara membuat aliran data yang menulis peristiwa ke bucket di Cloud Storage.

Dalam contoh permintaan ini, peristiwa ditulis dalam format output JSON, dan file baru dibuat setiap 100 MB atau 30 detik (menggantikan nilai default 50 MB dan 60 detik).

Untuk format JSON, Anda dapat:

  • Sertakan file skema jenis terpadu di jalur. Akibatnya, Datastream menulis dua file ke Cloud Storage: file data JSON dan file skema Avro. File skema memiliki nama yang sama dengan file data, dengan ekstensi .schema.

  • Aktifkan kompresi gzip agar Datastream mengompresi file yang ditulis ke Cloud Storage.

Dengan menggunakan parameter backfillNone, permintaan tersebut menentukan bahwa hanya perubahan yang sedang berlangsung yang di-streaming ke tujuan, tanpa pengisian ulang.

Permintaan tersebut menentukan parameter kunci enkripsi yang dikelola pelanggan sehingga Anda dapat mengontrol kunci yang digunakan untuk mengenkripsi data dalam penyimpanan dalam project Google Cloud. Parameter ini mengacu pada CMEK yang digunakan Datastream untuk mengenkripsi data yang di-streaming dari sumber ke tujuan. Kode ini juga menentukan key ring untuk CMEK Anda.

Untuk mengetahui informasi selengkapnya tentang key ring, lihat resource Cloud KMS. Untuk mengetahui informasi selengkapnya tentang cara melindungi data menggunakan kunci enkripsi, lihat Cloud Key Management Service (KMS).

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleCdcStream
{
  "displayName": "Oracle CDC to Cloud Storage",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/
    connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "GcsBucketCp",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "jsonFileFormat": {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      },
      "fileRotationMb": 100,
      "fileRotationInterval": 30
    }
  },
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillNone": {}
}

gcloud

Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud untuk membuat aliran data, lihat dokumentasi Google Cloud SDK.

Memvalidasi definisi streaming

Sebelum membuat streaming, Anda dapat memvalidasi definisinya. Dengan demikian, Anda dapat memastikan bahwa semua pemeriksaan validasi lulus, dan streaming akan berhasil berjalan saat dibuat.

Memvalidasi pemeriksaan streaming:

  • Apakah sumber dikonfigurasi dengan benar agar Datastream dapat melakukan streaming data darinya.
  • Apakah streaming dapat terhubung ke sumber dan tujuan.
  • Konfigurasi streaming menyeluruh.

Untuk memvalidasi aliran data, tambahkan &validate_only=true ke URL sebelum isi permintaan Anda:

POST "https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams?streamId=STREAM_ID&validate_only=true"

Setelah membuat permintaan ini, Anda akan melihat pemeriksaan validasi bahwa Datastream berjalan untuk sumber dan tujuan Anda, beserta apakah pemeriksaan lulus atau gagal. Untuk setiap pemeriksaan validasi yang tidak lulus, informasi muncul mengapa gagal dan apa yang harus dilakukan untuk memperbaiki masalah.

Misalnya, Anda memiliki kunci enkripsi yang dikelola pelanggan (CMEK) yang Anda inginkan untuk digunakan oleh Datastream untuk mengenkripsi data yang di-streaming dari sumber ke tujuan. Sebagai bagian dari memvalidasi aliran, Datastream akan memverifikasi bahwa kuncinya ada, dan Datastream memiliki izin untuk menggunakan kunci tersebut. Jika salah satu kondisi ini tidak terpenuhi, saat Anda memvalidasi streaming, pesan error berikut akan ditampilkan:

CMEK_DOES_NOT_EXIST_OR_MISSING_PERMISSIONS

Untuk mengatasi masalah ini, pastikan kunci yang Anda berikan ada, dan akun layanan Datastream memiliki izin cloudkms.cryptoKeys.get untuk kunci tersebut.

Setelah melakukan koreksi yang sesuai, buat permintaan lagi untuk memastikan bahwa semua pemeriksaan validasi lulus. Untuk contoh di atas, pemeriksaan CMEK_VALIDATE_PERMISSIONS tidak akan lagi menampilkan pesan error, tetapi akan memiliki status PASSED.

Mendapatkan informasi tentang streaming

Kode berikut menunjukkan permintaan untuk mengambil informasi tentang aliran data. Informasi ini mencakup:

  • Nama streaming (ID unik)
  • Nama yang mudah digunakan untuk streaming (nama tampilan)
  • Stempel waktu saat streaming dibuat dan terakhir diperbarui
  • Informasi tentang profil koneksi sumber dan tujuan yang terkait dengan aliran data
  • Status streaming

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/streams/STREAM_ID

Responsnya akan muncul, seperti berikut:

{
  "name": "myOracleCdcStream",
  "displayName": "Oracle CDC to Cloud Storage",
  "createTime": "2019-12-15T15:01:23.045123456Z",
  "updateTime": "2019-12-15T15:01:23.045123456Z",
  "sourceConfig": {
    "sourceConnectionProfileName": "myOracleDb",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1"
          },
          {
            "schema": "schema3",
            "oracleTables": [
              { "table": "tableA" },
              { "table": "tableC" }
            ]
          }
        ]
      }
    }
  },
  "destinationConfig": {
    "destinationConnectionProfileName": "myGcsBucket",
    "gcsDestinationConfig": {
      "path": "/folder1",
      "avroFileFormat": {},
      "fileRotationMb": 100,
      "fileRotationInterval": 60
    }
  },
  "state": "RUNNING"
  "customerManagedEncryptionKey": "projects/myProjectId1/locations/us-central1/
  keyRings/myRing/cryptoKeys/myEncryptionKey",
  "backfillAll": {}
}

gcloud

Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud untuk mengambil informasi tentang streaming Anda, klik di sini.

Mencantumkan feed

Kode berikut menampilkan permintaan untuk mengambil daftar semua aliran data dalam project dan lokasi yang ditentukan.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams

gcloud

Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud untuk mengambil informasi tentang semua streaming Anda, klik di sini.

Membuat daftar objek aliran data

Kode berikut menunjukkan permintaan untuk mengambil informasi tentang semua objek aliran data.

REST

GET https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects

gcloud

Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud guna mengambil informasi tentang semua objek aliran data Anda, klik di sini.

Daftar objek yang ditampilkan mungkin terlihat seperti berikut:

REST

{
  "streamObjects": [
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object1",
      "displayName": "employees.salaries",
      "backfillJob": {
        "state": "ACTIVE",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T12:12:26.344878Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "salaries"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object2",
      "displayName": "contractors.hours",
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "contractors",
          "table": "hours"
        }
      }
    },
    {
      "name": "projects/myProjectId1/locations/us-central1/streams/myStream/
      objects/object3",
      "displayName": "employees.departments",
      "backfillJob": {
        "state": "COMPLETED",
        "trigger": "AUTOMATIC",
        "lastStartTime": "2021-10-18T11:26:12.869880Z",
        "lastEndTime": "2021-10-18T11:26:28.405653Z"
      },
      "sourceObject": {
        "mysqlIdentifier": {
          "database": "employees",
          "table": "departments"
        }
      }
    }
  ]
}

gcloud

Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud guna mencantumkan objek aliran data, klik di sini.

Mulai streaming

Kode berikut menunjukkan permintaan untuk memulai streaming.

Dengan menggunakan parameter updateMask dalam permintaan, hanya kolom yang Anda tentukan yang harus disertakan dalam isi permintaan. Untuk memulai aliran data, ubah nilai di kolom state dari CREATED menjadi RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud untuk memulai streaming Anda, klik di sini.

Memulai streaming dari posisi tertentu

Anda dapat memulai aliran data dari posisi tertentu untuk sumber MySQL dan Oracle, misalnya, jika Anda ingin melakukan pengisian ulang menggunakan alat eksternal, atau memulai CDC dari file log yang Anda tentukan. Untuk sumber MySQL, Anda harus menentukan posisi binlog, untuk sumber Oracle, nomor perubahan sistem (SCN) dalam file log redo.

Kode berikut menunjukkan permintaan untuk memulai aliran data yang telah dibuat dari posisi tertentu.

Mulai streaming dari posisi binlog tertentu (MySQL):

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Ganti kode berikut:

  • NAME_OF_THE_LOG_FILE: Nama file log tempat Anda ingin memulai aliran data.
  • POSITION: Posisi di file log tempat Anda ingin memulai aliran data. Jika Anda tidak memberikan nilai, Datastream akan mulai membaca dari head file.

Contoh:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 2
      }
    }
  }
}

gcloud

Memulai streaming dari posisi tertentu menggunakan gcloud tidak didukung. Untuk mengetahui informasi tentang cara menggunakan gcloud guna memulai aliran data, lihat dokumentasi Cloud SDK.

Mulai streaming dari nomor perubahan sistem tertentu dalam file log ulangi (Oracle):

REST

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Ganti scn dengan nomor perubahan sistem (SCN) di file log pengulangan tempat Anda ingin memulai streaming. Kolom ini wajib diisi.

Contoh:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 123123
      }
    }
  }
}

gcloud

Memulai streaming dari posisi tertentu menggunakan gcloud tidak didukung. Untuk mengetahui informasi tentang cara menggunakan gcloud guna memulai aliran data, lihat dokumentasi Cloud SDK.

Menjeda streaming

Kode berikut menunjukkan permintaan untuk menjeda streaming yang sedang berjalan.

Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask adalah kolom state. Dengan menjeda streaming, Anda mengubah statusnya dari RUNNING menjadi PAUSED.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "PAUSED"
}

gcloud

Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud untuk menjeda streaming, klik di sini.

Melanjutkan streaming

Kode berikut menunjukkan permintaan untuk melanjutkan streaming yang dijeda.

Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask adalah kolom state. Dengan melanjutkan streaming, Anda mengubah statusnya dari PAUSED menjadi RUNNING.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=state
{
  "state": "RUNNING"
}

gcloud

Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud untuk melanjutkan streaming, klik di sini.

Memulihkan streaming

Anda dapat memulihkan streaming yang gagal secara permanen untuk sumber MySQL, Oracle, atau PostgreSQL menggunakan metode RunStream. Setiap jenis database sumber memiliki definisinya sendiri tentang operasi pemulihan streaming yang dimungkinkan. Untuk mengetahui informasi selengkapnya, lihat Memulihkan streaming.

Memulihkan streaming untuk sumber MySQL atau Oracle

Contoh kode berikut menunjukkan permintaan untuk memulihkan streaming untuk MySQL atau sumber Oracle dari berbagai posisi file log:

REST

Pulihkan aliran data dari posisi saat ini. Ini adalah opsi default-nya:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Memulihkan streaming dari posisi berikutnya yang tersedia:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "nextAvailableStartPosition": {}
  }
}

Memulihkan streaming dari posisi terbaru:

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "mostRecentStartPosition": {}
  }
}

Memulihkan aliran data dari posisi tertentu (MySQL):

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "NAME_OF_THE_LOG_FILE"
        "logPosition": POSITION
      }
    }
  }
}

Ganti kode berikut:

  • NAME_OF_THE_LOG_FILE: Nama file log tempat Anda ingin memulihkan aliran data
  • POSITION: Posisi dalam file log tempat Anda ingin memulihkan aliran data. Jika Anda tidak memberikan nilai, Datastream akan memulihkan aliran data dari head file.

Contoh:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "mysqlLogPosition": {
        "logFile": "binlog.001"
        "logPosition": 4
      }
    }
  }
}

Memulihkan streaming dari posisi tertentu (Oracle):

POST https://datastream.googleapis.com/v1/projects/[project-id]/locations/
[location]/streams/[stream-id]:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": scn
      }
    }
  }
}
Ganti scn dengan nomor perubahan sistem (SCN) di file log pengulangan yang ingin Anda pulihkan. Kolom ini wajib diisi.

Contoh:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run
{
  "cdcStrategy": {
    "specificStartPosition": {
      "oracleScnPosition": {
        "scn": 234234
      }
    }
  }
}

Untuk informasi selengkapnya tentang opsi pemulihan yang tersedia, lihat Memulihkan streaming.

gcloud

Pemulihan streaming menggunakan gcloud tidak didukung.

Memulihkan aliran data untuk sumber PostgreSQL

Contoh kode berikut menunjukkan permintaan untuk memulihkan aliran data untuk sumber PostgreSQL. Selama pemulihan, stream mulai membaca dari nomor urut log (LSN) pertama dalam slot replikasi yang dikonfigurasi untuk streaming tersebut.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID:run

Contoh:

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams/myStreamId1:run

Jika Anda ingin mengubah slot replikasi, perbarui aliran data dengan nama slot replikasi yang baru terlebih dahulu:

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.postgresqlSourceConfig.replicationSlot
{
  "sourceConfig": {
    "postgresqlSourceConfig": {
      "replicationSlot": "NEW_REPLICATION_SLOT_NAME"
    }
  }
}

gcloud

Pemulihan streaming menggunakan gcloud tidak didukung.

Mengubah streaming

Kode berikut menunjukkan permintaan untuk mengupdate konfigurasi rotasi file streaming guna memutar file setiap 75 MB atau 45 detik.

Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask menyertakan kolom fileRotationMb dan fileRotationInterval, masing-masing yang diwakili oleh flag destinationConfig.gcsDestinationConfig.fileRotationMb dan destinationConfig.gcsDestinationConfig.fileRotationInterval.

REST

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
fileRotationMb,destinationConfig.gcsDestinationConfig.fileRotationInterval
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "fileRotationMb": 75,
      "fileRotationInterval": 45
    }
  }
}

Kode berikut menunjukkan permintaan untuk menyertakan file skema Jenis Terpadu di jalur file yang ditulis Datastream ke Cloud Storage. Hasilnya, Datastream menulis dua file: file data JSON dan file skema Avro.

Untuk contoh ini, kolom yang ditentukan adalah kolom jsonFileFormat, yang diwakili oleh tanda destinationConfig.gcsDestinationConfig.jsonFileFormat.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=destinationConfig.gcsDestinationConfig.
jsonFileFormat
{
  "destinationConfig": {
    "gcsDestinationConfig": {
      "jsonFileFormat" {
        "schemaFileFormat": "AVRO_SCHEMA_FILE"
      }
    }
  }
}

Kode berikut menunjukkan permintaan agar Datastream untuk mereplikasi data yang ada, selain perubahan yang sedang berlangsung pada data, dari database sumber ke tujuan.

Bagian oracleExcludedObjects kode menampilkan tabel dan skema yang dibatasi agar tidak diisi ulang ke tujuan.

Untuk contoh ini, semua tabel dan skema akan diisi ulang, kecuali untuk tabelA di skema3.

PATCH https://datastream.googleapis.com/v1/projects/myProjectId1/locations/us-central1/streams/myStream?updateMask=backfillAll
{
  "backfillAll": {
    "oracleExcludedObjects": {
      "oracleSchemas": [
        {
          "schema": "schema3",
          "oracleTables": [
            { "table": "tableA" }
          ]
        }
      ]
    }
  }
}

gcloud

Untuk informasi selengkapnya tentang penggunaan gcloud untuk mengubah streaming Anda, klik di sini.

Memulai pengisian ulang untuk objek aliran data

Aliran data di Datastream dapat mengisi ulang data historis, serta mengalirkan perubahan yang sedang berlangsung ke tujuan. Perubahan yang sedang berlangsung akan selalu di-streaming dari sumber ke tujuan. Namun, Anda dapat menentukan apakah Anda ingin data historis di-streaming.

Jika Anda ingin data historis di-streaming dari sumber ke tujuan, gunakan parameter backfillAll.

Datastream juga memungkinkan Anda melakukan streaming data historis hanya untuk tabel database tertentu. Untuk melakukannya, gunakan parameter backfillAll, dan kecualikan tabel yang tidak Anda inginkan data historisnya.

Jika Anda hanya ingin perubahan yang sedang berlangsung yang di-streaming ke tujuan, gunakan parameter backfillNone. Jika Anda ingin Datastream mengalirkan ringkasan semua data yang ada dari sumber ke tujuan, Anda harus memulai pengisian ulang secara manual untuk objek yang berisi data ini.

Alasan lain untuk memulai pengisian ulang sebuah objek adalah jika data tidak sinkron antara sumber dan tujuan. Misalnya, pengguna dapat menghapus data di tujuan secara tidak sengaja, dan data tersebut kini akan hilang. Dalam hal ini, memulai pengisian ulang untuk objek berfungsi sebagai "mekanisme reset" karena semua data di-streaming ke tujuan dalam satu sesi. Akibatnya, data disinkronkan antara sumber dan tujuan.

Sebelum dapat memulai pengisian ulang untuk objek aliran data, Anda harus mengambil informasi tentang objek.

Setiap objek memiliki OBJECT_ID, yang mengidentifikasi objek secara unik. Anda menggunakan OBJECT_ID untuk memulai pengisian ulang aliran data.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:startBackfillJob

gcloud

Untuk informasi selengkapnya tentang penggunaan gcloud guna pengisian ulang awal untuk objek aliran data Anda, klik di sini.

Menghentikan pengisian ulang untuk objek aliran data

Setelah memulai pengisian ulang untuk objek aliran data, Anda dapat menghentikan pengisian ulang untuk objek tersebut. Misalnya, jika pengguna mengubah skema database, skema atau data tersebut mungkin rusak. Anda tidak ingin skema atau data ini di-streaming ke tujuan, sehingga menghentikan pengisian ulang objek.

Anda juga dapat menghentikan pengisian ulang sebuah objek untuk tujuan load balancing. Datastream dapat menjalankan beberapa pengisian ulang secara paralel. Hal ini dapat menambah beban pada sumber. Jika bebannya signifikan, hentikan pengisian ulang untuk setiap objek, lalu mulai isi ulang objek, satu per satu.

Sebelum dapat menghentikan pengisian ulang objek aliran data, Anda harus membuat permintaan untuk mengambil informasi tentang semua objek aliran data. Setiap objek yang ditampilkan memiliki OBJECT_ID, yang mengidentifikasi objek secara unik. Anda menggunakan OBJECT_ID untuk menghentikan pengisian ulang aliran data.

REST

POST https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID/objects/OBJECT_ID:stopBackfillJob

gcloud

Untuk mengetahui informasi selengkapnya tentang penggunaan gcloud guna menghentikan pengisian ulang untuk objek aliran data Anda, klik di sini.

Mengubah jumlah tugas CDC serentak maksimum

Kode berikut menunjukkan cara menetapkan jumlah maksimum tugas pengambilan data perubahan serentak (CDC) untuk streaming MySQL ke 7.

Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask adalah kolom maxConcurrentCdcTasks. Dengan menetapkan nilainya ke 7, Anda mengubah jumlah tugas CDC serentak maksimum dari nilai sebelumnya menjadi 7. Anda dapat menggunakan nilai dari 0 hingga 50 (inklusif). Jika Anda tidak menentukan nilai, atau menetapkannya sebagai 0, default sistem 5 tugas akan ditetapkan untuk aliran data.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentCdcTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentCdcTasks": "7"
      }
    }
}

gcloud

Untuk informasi selengkapnya tentang penggunaan gcloud, klik di sini.

Mengubah jumlah tugas pengisian ulang serentak maksimum

Kode berikut menunjukkan cara menetapkan jumlah maksimum tugas isi ulang serentak untuk streaming MySQL ke 25.

Untuk contoh ini, kolom yang ditentukan untuk parameter updateMask adalah kolom maxConcurrentBackfillTasks. Dengan menetapkan nilainya ke 25, Anda mengubah jumlah tugas pengisian ulang serentak maksimum dari nilai sebelumnya menjadi 25. Anda dapat menggunakan nilai dari 0 hingga 50 (inklusif). Jika Anda tidak menentukan nilai, atau menetapkannya sebagai 0, default sistem untuk 16 tugas akan ditetapkan untuk aliran data.

REST

PATCH https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/LOCATION/
streams/STREAM_ID?updateMask=sourceConfig.mysqlSourceConfig.maxConcurrentBackfillTasks
{
  "sourceConfig": {
    "mysqlSourceConfig": {
      "maxConcurrentBackfillTasks": "25"
      }
    }
}

gcloud

Untuk informasi selengkapnya tentang penggunaan gcloud, klik di sini.

Mengaktifkan streaming objek besar untuk sumber Oracle

Anda dapat mengaktifkan streaming objek besar, seperti objek besar biner (BLOB), objek besar karakter (CLOB), dan objek besar karakter nasional (NCLOB) untuk streaming dengan sumber Oracle. Flag streamLargeObjects memungkinkan Anda menyertakan objek besar dalam streaming baru dan yang sudah ada. Flag ini ditetapkan pada tingkat aliran data, sehingga Anda tidak perlu menentukan kolom jenis data objek besar.

Contoh berikut menunjukkan cara membuat aliran data yang memungkinkan Anda melakukan streaming objek besar.

REST

POST https://datastream.googleapis.com/v1/projects/myProjectId1/locations/
us-central1/streams?streamId=myOracleLobStream
{
  "displayName": "Oracle LOB stream",
  "sourceConfig": {
    "sourceConnectionProfileName": "/projects/myProjectId1/locations/us-central1/connectionProfiles/OracleCp",
    "oracleSourceConfig": {
      "includeObjects": {
        "oracleSchemas": [
          {
            "schema": "schema1",
            "oracleTables": [
              {
                "table": "tableA",
                "oracleColumns": [
                  {
                    "column": "column1,column2"
                  }
                ]
              }
            ]
          }
        ]
      },
      "excludeObjects": {},
      "streamLargeObjects": {}
    }
  }
}

gcloud

Untuk mengetahui informasi selengkapnya tentang cara menggunakan gcloud guna memperbarui aliran data, lihat dokumentasi Google Cloud SDK.

Menghapus feed

Kode berikut menunjukkan permintaan untuk menghapus streaming.

REST

DELETE https://datastream.googleapis.com/v1/projects/PROJECT_ID/locations/
LOCATION/streams/STREAM_ID

gcloud

Untuk informasi selengkapnya tentang penggunaan gcloud untuk menghapus aliran data Anda, klik di sini.