Template Datastream ke Spanner

Template Datastream ke Spanner adalah pipeline streaming yang membaca peristiwa Datastream dari bucket Cloud Storage dan menulisnya ke database Spanner. Alat ini ditujukan untuk migrasi data dari sumber Datastream ke Spanner.

Semua tabel yang diperlukan untuk migrasi harus ada di database Spanner tujuan sebelum eksekusi template. Oleh karena itu, migrasi skema dari database sumber ke Spanner tujuan harus diselesaikan sebelum migrasi data. Data dapat ada dalam tabel sebelum migrasi. Template ini tidak menyebarkan perubahan skema Datastream ke database Spanner.

Konsistensi data hanya dijamin di akhir migrasi saat semua data telah ditulis ke Spanner. Untuk menyimpan informasi pengurutan setiap data yang ditulis ke Spanner, template ini membuat tabel tambahan (disebut tabel bayangan) untuk setiap tabel dalam database Spanner. Hal ini digunakan untuk memastikan konsistensi di akhir migrasi. Tabel bayangan tidak dihapus setelah migrasi dan dapat digunakan untuk tujuan validasi di akhir migrasi.

Setiap error yang terjadi selama operasi, seperti ketidakcocokan skema, file JSON yang salah format, atau error yang dihasilkan dari eksekusi transformasi, dicatat dalam antrean error. Antrean error adalah folder Cloud Storage yang menyimpan semua peristiwa Datastream yang mengalami error beserta alasan error dalam format teks. Error dapat bersifat sementara atau permanen dan disimpan di folder Cloud Storage yang sesuai dalam antrean error. Error sementara akan dicoba ulang secara otomatis, sedangkan error permanen tidak. Jika terjadi error permanen, Anda memiliki opsi untuk melakukan koreksi pada peristiwa perubahan dan memindahkannya ke bucket yang dapat dicoba ulang saat template berjalan.

Persyaratan pipeline

  • Streaming Datastream dalam status Berjalan atau Belum dimulai.
  • Bucket Cloud Storage tempat peristiwa Datastream direplikasi.
  • Database Spanner dengan tabel yang ada. Tabel ini dapat kosong atau berisi data.

Parameter template

Parameter yang diperlukan

  • instanceId: Instance Spanner tempat perubahan direplikasi.
  • databaseId: Database Spanner tempat perubahan direplikasi.

Parameter opsional

  • inputFilePattern: Lokasi file Cloud Storage yang berisi file Datastream yang akan direplikasi. Biasanya, ini adalah jalur root untuk streaming. Dukungan untuk fitur ini telah dinonaktifkan.
  • inputFileFormat: Format file output yang dihasilkan oleh Datastream. Contohnya, avro,json. Setelan defaultnya adalah avro.
  • sessionFilePath: Jalur file sesi di Cloud Storage yang berisi informasi pemetaan dari HarbourBridge.
  • projectId: Project ID Spanner.
  • spannerHost: Endpoint Cloud Spanner yang akan dipanggil dalam template. Contoh, https://batch-spanner.googleapis.com. Secara default: https://batch-spanner.googleapis.com.
  • gcsPubSubSubscription: Langganan Pub/Sub yang digunakan dalam kebijakan notifikasi Cloud Storage. Untuk nama, gunakan format projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
  • streamName: Nama atau template untuk aliran data yang akan melakukan polling untuk informasi skema dan jenis sumber.
  • shadowTablePrefix: Awalan yang digunakan untuk memberi nama tabel bayangan. Default: shadow_.
  • shouldCreateShadowTables: Flag ini menunjukkan apakah tabel bayangan harus dibuat di database Cloud Spanner. Defaultnya adalah: true.
  • rfcStartDateTime: DateTime awal yang digunakan untuk mengambil dari Cloud Storage (https://tools.ietf.org/html/rfc3339). Setelan defaultnya adalah: 1970-01-01T00:00:00.00Z.
  • fileReadConcurrency: Jumlah file DataStream serentak yang akan dibaca. Setelan defaultnya adalah: 30.
  • deadLetterQueueDirectory: Jalur file yang digunakan saat menyimpan output antrean error. Jalur file default adalah direktori di lokasi sementara tugas Dataflow.
  • dlqRetryMinutes: Jumlah menit antara percobaan ulang antrean surat mati. Default-nya adalah 10.
  • dlqMaxRetryCount: Jumlah maksimum percobaan ulang error sementara melalui DLQ. Default-nya adalah 500.
  • dataStreamRootUrl: URL Root Datastream API. Secara default ditetapkan ke: https://datastream.googleapis.com/.
  • datastreamSourceType: Ini adalah jenis database sumber yang terhubung ke Datastream. Contoh - mysql/oracle. Perlu ditetapkan saat menguji tanpa Datastream yang sebenarnya berjalan.
  • roundJsonDecimals: Jika ditetapkan, tanda ini akan membulatkan nilai desimal di kolom json ke angka yang dapat disimpan tanpa kehilangan presisi. Defaultnya adalah: false.
  • runMode: Ini adalah jenis mode operasi, baik reguler maupun dengan retryDLQ. Setelan defaultnya adalah: reguler.
  • transformationContextFilePath: Jalur file konteks transformasi di cloud storage yang digunakan untuk mengisi data yang digunakan dalam transformasi yang dilakukan selama migrasi Misalnya: ID shard ke nama db untuk mengidentifikasi db tempat baris dimigrasikan.
  • directoryWatchDurationInMinutes: Durasi yang diperlukan pipeline untuk terus melakukan polling direktori di GCS. File datastreamoutput disusun dalam struktur direktori yang menggambarkan stempel waktu peristiwa yang dikelompokkan berdasarkan menit. Parameter ini harus kira-kira sama dengan penundaan maksimum yang dapat terjadi antara peristiwa yang terjadi di database sumber dan peristiwa yang sama yang ditulis ke GCS oleh Datastream. Persentil 99,9 = 10 menit. Setelan defaultnya adalah: 10.
  • spannerPriority: Prioritas permintaan untuk panggilan Cloud Spanner. Nilai harus berupa salah satu dari: [HIGH,MEDIUM,LOW]. Nilai default-nya adalah HIGH.
  • dlqGcsPubSubSubscription: Langganan Pub/Sub yang digunakan dalam kebijakan notifikasi Cloud Storage untuk direktori percobaan ulang DLQ saat berjalan dalam mode reguler. Untuk nama, gunakan format projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>. Jika ditetapkan, deadLetterQueueDirectory dan dlqRetryMinutes akan diabaikan.
  • transformationJarPath: Lokasi file JAR kustom di Cloud Storage untuk file yang berisi logika transformasi kustom guna memproses data dalam migrasi maju. Default-nya adalah kosong.
  • transformationClassName: Nama class yang sepenuhnya memenuhi syarat dan memiliki logika transformasi kustom. Ini adalah kolom wajib diisi jika transformationJarPath ditentukan. Default-nya adalah kosong.
  • transformationCustomParameters: String yang berisi parameter kustom yang akan diteruskan ke class transformasi kustom. Default-nya adalah kosong.
  • filteredEventsDirectory: Ini adalah jalur file untuk menyimpan peristiwa yang difilter melalui transformasi kustom. Defaultnya adalah direktori di lokasi sementara tugas Dataflow. Nilai default sudah cukup dalam sebagian besar kondisi.
  • shardingContextFilePath: Jalur file konteks sharding di cloud storage digunakan untuk mengisi ID shard di database spanner untuk setiap shard sumber.Formatnya adalah Map<stream_name, Map<db_name, shard_id>>.
  • tableOverrides: Ini adalah penggantian nama tabel dari sumber ke spanner. Kolom ini ditulis dalam format berikut: [{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]Contoh ini menunjukkan pemetaan tabel Penyanyi ke Vokalis dan tabel Album ke Rekaman. Contoh, [{Singers, Vocalists}, {Albums, Records}]. Default-nya adalah kosong.
  • columnOverrides: Ini adalah penggantian nama kolom dari sumber ke spanner. Kolom ini ditulis dalam format berikut: [{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]Perhatikan bahwa SourceTableName harus tetap sama di pasangan sumber dan spanner. Untuk mengganti nama tabel, gunakan tableOverrides.Contoh ini menunjukkan pemetaan SingerName ke TalentName dan AlbumName ke RecordName di tabel Singers dan Albums. Contoh, [{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]. Default-nya adalah kosong.
  • schemaOverridesFilePath: File yang menentukan penggantian nama tabel dan kolom dari sumber ke spanner. Default-nya adalah kosong.

Menjalankan template

Konsol

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Cloud Datastream to Spanner template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

gcloud

Di shell atau terminal, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner \
    --parameters \
inputFilePattern=GCS_FILE_PATH,\
streamName=STREAM_NAME,\
instanceId=CLOUDSPANNER_INSTANCE,\
databaseId=CLOUDSPANNER_DATABASE,\
deadLetterQueueDirectory=DLQ
  

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • GCS_FILE_PATH: jalur Cloud Storage yang digunakan untuk menyimpan peristiwa aliran data. Contoh: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: instance Spanner Anda.
  • CLOUDSPANNER_DATABASE: database Spanner Anda.
  • DLQ: jalur Cloud Storage untuk direktori antrean error.

API

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "containerSpecGcsPath": "gs://dataflow-templates-REGION_NAME/VERSION/flex/Cloud_Datastream_to_Spanner",
      "parameters": {
          "inputFilePattern": "GCS_FILE_PATH",
          "streamName": "STREAM_NAME"
          "instanceId": "CLOUDSPANNER_INSTANCE"
          "databaseId": "CLOUDSPANNER_DATABASE"
          "deadLetterQueueDirectory": "DLQ"
      }
   }
}
  

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • GCS_FILE_PATH: jalur Cloud Storage yang digunakan untuk menyimpan peristiwa aliran data. Contoh: gs://bucket/path/to/data/
  • CLOUDSPANNER_INSTANCE: instance Spanner Anda.
  • CLOUDSPANNER_DATABASE: database Spanner Anda.
  • DLQ: jalur Cloud Storage untuk direktori antrean error.

Langkah selanjutnya