Memperbarui pipeline yang ada

Dokumen ini menjelaskan cara memperbarui tugas streaming yang sedang berlangsung. Anda mungkin ingin memperbarui tugas Dataflow yang ada karena alasan berikut:

  • Anda ingin meningkatkan atau memperbaiki kode pipeline.
  • Anda ingin memperbaiki bug dalam kode pipeline.
  • Anda ingin memperbarui pipeline untuk menangani perubahan format data, atau untuk memperhitungkan versi atau perubahan lainnya di sumber data Anda.
  • Anda ingin menerapkan patch pada kerentanan keamanan yang terkait dengan Container-Optimized OS untuk semua pekerja Dataflow.
  • Anda ingin menskalakan pipeline Apache Beam streaming untuk menggunakan jumlah pekerja yang berbeda.

Anda dapat memperbarui tugas dengan dua cara:

  • Pembaruan tugas yang sedang berjalan: Untuk tugas streaming yang menggunakan Streaming Engine, Anda dapat memperbarui opsi tugas min-num-workers dan max-num-workers tanpa menghentikan tugas atau mengubah ID tugas.
  • Tugas penggantian: Untuk menjalankan kode pipeline yang diperbarui atau untuk memperbarui opsi tugas yang tidak didukung oleh update tugas yang sedang berlangsung, luncurkan tugas baru yang menggantikan tugas yang ada. Untuk memverifikasi apakah tugas pengganti valid, sebelum meluncurkan tugas baru, validasi grafik tugasnya.

Saat Anda memperbarui tugas, layanan Dataflow akan melakukan pemeriksaan kompatibilitas antara tugas yang sedang berjalan dan tugas pengganti potensial. Pemeriksaan kompatibilitas memastikan bahwa hal-hal seperti informasi status perantara dan data yang dibuffer dapat ditransfer dari tugas sebelumnya ke tugas pengganti.

Anda juga dapat menggunakan infrastruktur logging bawaan Apache Beam SDK untuk mencatat informasi saat memperbarui tugas. Untuk mengetahui informasi selengkapnya, lihat Menggunakan log pipeline. Untuk mengidentifikasi masalah pada kode pipeline, gunakan tingkat logging DEBUG.

Pembaruan opsi tugas dalam penerbangan

Untuk tugas streaming yang menggunakan Streaming Engine, Anda dapat memperbarui opsi tugas berikut tanpa menghentikan tugas atau mengubah ID tugas:

  • min-num-workers: jumlah minimum instance Compute Engine.
  • max-num-workers: jumlah maksimum instance Compute Engine.
  • worker-utilization-hint: target penggunaan CPU, dalam rentang [0,1, 0,9]

Untuk update tugas lainnya, Anda harus mengganti tugas saat ini dengan tugas yang diperbarui. Untuk informasi selengkapnya, lihat Meluncurkan tugas pengganti.

Melakukan update saat proses pengiriman

Untuk melakukan pembaruan opsi tugas yang sedang berlangsung, lakukan langkah-langkah berikut.

gcloud

Gunakan perintah gcloud dataflow jobs update-options:

gcloud dataflow jobs update-options \
  --region=REGION \
  --min-num-workers=MINIMUM_WORKERS \
  --max-num-workers=MAXIMUM_WORKERS \
  --worker-utilization-hint=TARGET_UTILIZATION \
  JOB_ID

Ganti kode berikut:

  • REGION: ID region tugas
  • MINIMUM_WORKERS: jumlah minimum instance Compute Engine
  • MAXIMUM_WORKERS: jumlah maksimum instance Compute Engine
  • TARGET_UTILIZATION: nilai dalam rentang [0,1, 0,9]
  • JOB_ID: ID tugas yang akan diupdate

Anda juga dapat memperbarui --min-num-workers, --max-num-workers, dan worker-utilization-hint satu per satu.

REST

Gunakan projects.locations.jobs.update metode:

PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=MASK
{
  "runtime_updatable_params": {
    "min_num_workers": MINIMUM_WORKERS,
    "max_num_workers": MAXIMUM_WORKERS,
    "worker_utilization_hint": TARGET_UTILIZATION
  }
}

Ganti kode berikut:

  • MASK: daftar parameter yang dipisahkan koma untuk diperbarui, dari hal berikut:
    • runtime_updatable_params.max_num_workers
    • runtime_updatable_params.min_num_workers
    • runtime_updatable_params.worker_utilization_hint
  • PROJECT_ID: project ID Google Cloud dari tugas Dataflow
  • REGION: ID region tugas
  • JOB_ID: ID tugas yang akan diupdate
  • MINIMUM_WORKERS: jumlah minimum instance Compute Engine
  • MAXIMUM_WORKERS: jumlah maksimum instance Compute Engine
  • TARGET_UTILIZATION: nilai dalam rentang [0,1, 0,9]

Anda juga dapat memperbarui min_num_workers, max_num_workers, dan worker_utilization_hint satu per satu. Tentukan parameter yang akan diperbarui dalam parameter kueri updateMask, dan sertakan nilai yang diperbarui di kolom runtimeUpdatableParams dalam isi permintaan. Contoh berikut memperbarui min_num_workers:

PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers
{
  "runtime_updatable_params": {
    "min_num_workers": 5
  }
}

Tugas harus dalam status berjalan agar memenuhi syarat untuk update dalam penerbangan. Error akan terjadi jika tugas belum dimulai atau sudah dibatalkan. Demikian pula, jika Anda meluncurkan tugas pengganti, tunggu hingga tugas tersebut mulai berjalan sebelum mengirim update yang sedang berlangsung ke tugas baru.

Setelah Anda mengirimkan permintaan pembaruan, sebaiknya tunggu hingga permintaan selesai sebelum mengirimkan pembaruan lain. Lihat log tugas untuk melihat kapan permintaan selesai.

Memvalidasi tugas penggantian

Untuk memverifikasi apakah tugas pengganti valid, sebelum meluncurkan tugas baru, validasi grafik tugasnya. Di Dataflow, grafik tugas adalah representasi grafis pipeline. Dengan memvalidasi grafik tugas, Anda mengurangi risiko pipeline mengalami error atau kegagalan pipeline setelah update. Selain itu, Anda dapat memvalidasi update tanpa perlu menghentikan tugas asli, sehingga tugas tersebut tidak mengalami periode nonaktif.

Untuk memvalidasi grafik tugas, ikuti langkah-langkah untuk meluncurkan tugas pengganti. Sertakan graph_validate_only Opsi layanan aliran data dalam perintah update.

Java

  • Teruskan opsi --update.
  • Tetapkan opsi --jobName di PipelineOptions ke nama yang sama dengan tugas yang ingin Anda perbarui.
  • Tetapkan opsi --region ke region yang sama dengan region tugas yang ingin Anda perbarui.
  • Sertakan opsi layanan --dataflowServiceOptions=graph_validate_only.
  • Jika nama transformasi di pipeline telah berubah, Anda harus memberikan pemetaan transformasi dan meneruskannya menggunakan opsi --transformNameMapping.
  • Jika Anda mengirimkan tugas pengganti yang menggunakan versi terbaru Apache Beam SDK, tetapkan --updateCompatibilityVersion ke versi Apache Beam SDK yang digunakan dalam tugas asli.

Python

  • Teruskan opsi --update.
  • Tetapkan opsi --job_name di PipelineOptions ke nama yang sama dengan tugas yang ingin Anda perbarui.
  • Tetapkan opsi --region ke region yang sama dengan region tugas yang ingin Anda perbarui.
  • Sertakan opsi layanan --dataflow_service_options=graph_validate_only.
  • Jika nama transformasi di pipeline telah berubah, Anda harus memberikan pemetaan transformasi dan meneruskannya menggunakan opsi --transform_name_mapping.
  • Jika Anda mengirimkan tugas pengganti yang menggunakan versi terbaru Apache Beam SDK, tetapkan --updateCompatibilityVersion ke versi Apache Beam SDK yang digunakan dalam tugas asli.

Go

  • Teruskan opsi --update.
  • Tetapkan opsi --job_name ke nama yang sama dengan tugas yang ingin Anda perbarui.
  • Tetapkan opsi --region ke region yang sama dengan region tugas yang ingin Anda perbarui.
  • Sertakan opsi layanan --dataflow_service_options=graph_validate_only.
  • Jika nama transformasi di pipeline telah berubah, Anda harus memberikan pemetaan transformasi dan meneruskannya menggunakan opsi --transform_name_mapping.

gcloud

Untuk memvalidasi grafik tugas untuk tugas Template Fleksibel, gunakan perintah gcloud dataflow flex-template run dengan opsi additional-experiments:

  • Teruskan opsi --update.
  • Tetapkan JOB_NAME ke nama yang sama dengan tugas yang ingin Anda perbarui.
  • Tetapkan opsi --region ke region yang sama dengan region tugas yang ingin Anda perbarui.
  • Sertakan opsi --additional-experiments=graph_validate_only.
  • Jika nama transformasi di pipeline telah berubah, Anda harus memberikan pemetaan transformasi dan meneruskannya menggunakan opsi --transform-name-mappings.

Contoh:

gcloud dataflow flex-template run JOB_NAME --additional-experiments=graph_validate_only

Ganti JOB_NAME dengan nama tugas yang ingin Anda perbarui.

REST

Gunakan kolom additionalExperiments di objek FlexTemplateRuntimeEnvironment (Template fleksibel) atau RuntimeEnvironment.

{
  additionalExperiments : ["graph_validate_only"]
  ...
}

Opsi layanan graph_validate_only hanya memvalidasi update pipeline. Jangan gunakan opsi ini saat membuat atau meluncurkan pipeline. Untuk mengupdate pipeline, luncurkan tugas pengganti tanpa opsi layanan graph_validate_only.

Jika validasi grafik tugas berhasil, status tugas dan log tugas akan menampilkan status berikut:

  • Status tugas adalah JOB_STATE_DONE.
  • Di konsol Google Cloud , Status tugas adalah Succeeded.
  • Pesan berikut akan muncul di log tugas:

    Workflow job: JOB_ID succeeded validation. Marking graph_validate_only job as Done.
    

Jika validasi grafik tugas gagal, status tugas dan log tugas akan menampilkan status berikut:

  • Status tugas adalah JOB_STATE_FAILED.
  • Di konsol Google Cloud , Status tugas adalah Failed.
  • Pesan akan muncul di log tugas yang menjelaskan error inkompatibilitas. Konten pesan bergantung pada error.

Meluncurkan tugas pengganti

Anda dapat mengganti tugas yang ada karena alasan berikut:

Untuk memverifikasi apakah tugas pengganti valid, sebelum meluncurkan tugas baru, validasi grafik tugasnya.

Saat Anda meluncurkan tugas penggantian, tetapkan opsi pipeline berikut untuk melakukan proses update selain opsi tugas reguler:

Java

  • Teruskan opsi --update.
  • Tetapkan opsi --jobName di PipelineOptions ke nama yang sama dengan tugas yang ingin Anda perbarui.
  • Tetapkan opsi --region ke region yang sama dengan region tugas yang ingin Anda perbarui.
  • Jika nama transformasi dalam pipeline telah berubah, Anda harus memberikan pemetaan transformasi dan meneruskannya menggunakan opsi --transformNameMapping.
  • Jika Anda mengirimkan tugas pengganti yang menggunakan versi terbaru Apache Beam SDK, tetapkan --updateCompatibilityVersion ke versi Apache Beam SDK yang digunakan dalam tugas asli.

Python

  • Teruskan opsi --update.
  • Tetapkan opsi --job_name di PipelineOptions ke nama yang sama dengan tugas yang ingin Anda perbarui.
  • Tetapkan opsi --region ke region yang sama dengan region tugas yang ingin Anda perbarui.
  • Jika nama transformasi di pipeline telah berubah, Anda harus memberikan pemetaan transformasi dan meneruskannya menggunakan opsi --transform_name_mapping.
  • Jika Anda mengirimkan tugas pengganti yang menggunakan versi terbaru Apache Beam SDK, tetapkan --updateCompatibilityVersion ke versi Apache Beam SDK yang digunakan dalam tugas asli.

Go

  • Teruskan opsi --update.
  • Tetapkan opsi --job_name ke nama yang sama dengan tugas yang ingin Anda perbarui.
  • Tetapkan opsi --region ke region yang sama dengan region tugas yang ingin Anda perbarui.
  • Jika nama transformasi di pipeline telah berubah, Anda harus memberikan pemetaan transformasi dan meneruskannya menggunakan opsi --transform_name_mapping.

gcloud

Untuk mengupdate tugas Template Fleksibel menggunakan gcloud CLI, gunakan perintah gcloud dataflow flex-template run. Memperbarui tugas lain menggunakan gcloud CLI tidak didukung.

  • Teruskan opsi --update.
  • Tetapkan JOB_NAME ke nama yang sama dengan tugas yang ingin Anda perbarui.
  • Tetapkan opsi --region ke region yang sama dengan region tugas yang ingin Anda perbarui.
  • Jika nama transformasi di pipeline telah berubah, Anda harus memberikan pemetaan transformasi dan meneruskannya menggunakan opsi --transform-name-mappings.

REST

Petunjuk ini menunjukkan cara memperbarui tugas non-template menggunakan REST API. Untuk menggunakan REST API guna memperbarui tugas template klasik, lihat Memperbarui tugas streaming template kustom. Untuk menggunakan REST API guna memperbarui tugas Template Flex, lihat Memperbarui tugas Template Flex.

  1. Ambil resource job untuk tugas yang ingin Anda ganti menggunakan metode projects.locations.jobs.get. Sertakan parameter kueri view dengan nilai JOB_VIEW_DESCRIPTION. Menyertakan JOB_VIEW_DESCRIPTION akan membatasi jumlah data dalam respons sehingga permintaan berikutnya tidak melebihi batas ukuran. Jika Anda memerlukan informasi tugas yang lebih mendetail, gunakan nilai JOB_VIEW_ALL.

    GET https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?view=JOB_VIEW_DESCRIPTION
    

    Ganti nilai berikut:

    • PROJECT_ID: project ID Google Cloud dari tugas Dataflow
    • REGION: region tugas yang ingin Anda perbarui
    • JOB_ID: ID tugas yang ingin Anda perbarui
  2. Untuk memperbarui tugas, gunakan metode projects.locations.jobs.create. Dalam isi permintaan, gunakan resource job yang Anda ambil.

    POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs
    {
      "id": JOB_ID,
      "replaceJobId": JOB_ID,
      "name": JOB_NAME,
      "type": "JOB_TYPE_STREAMING",
      "transformNameMapping": {
        string: string,
        ...
      },
    }
    

    Ganti kode berikut:

    • JOB_ID: ID tugas yang sama dengan ID tugas yang ingin Anda update.
    • JOB_NAME: nama tugas yang sama dengan nama tugas yang ingin Anda perbarui.

    Jika nama transformasi di pipeline telah berubah, Anda harus memberikan pemetaan transformasi dan meneruskannya menggunakan kolom transformNameMapping.

  3. Opsional: Untuk mengirim permintaan menggunakan curl (Linux, macOS, atau Cloud Shell), simpan permintaan ke file JSON, lalu jalankan perintah berikut:

    curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)"  https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs
    

    Ganti FILE_PATH dengan jalur ke file JSON yang berisi isi permintaan.

Tentukan nama tugas pengganti

Java

Saat meluncurkan tugas pengganti, nilai yang Anda teruskan untuk opsi --jobName harus sama persis dengan nama tugas yang ingin Anda ganti.

Python

Saat meluncurkan tugas pengganti, nilai yang Anda teruskan untuk opsi --job_name harus sama persis dengan nama tugas yang ingin Anda ganti.

Go

Saat meluncurkan tugas pengganti, nilai yang Anda teruskan untuk opsi --job_name harus sama persis dengan nama tugas yang ingin Anda ganti.

gcloud

Saat Anda meluncurkan tugas pengganti, JOB_NAME harus sama persis dengan nama tugas yang ingin Anda ganti.

REST

Tetapkan nilai kolom replaceJobId ke ID tugas yang sama dengan tugas yang ingin Anda perbarui. Untuk menemukan nilai nama tugas yang benar, pilih tugas sebelumnya di Dataflow Monitoring Interface. Kemudian, di panel samping Job info, temukan kolom Job ID.

Untuk menemukan nilai nama tugas yang benar, pilih tugas sebelumnya di Dataflow Monitoring Interface. Kemudian, di panel samping Job info, temukan kolom Job name:

Panel samping Info tugas untuk tugas Dataflow yang sedang berjalan.
Gambar 1: Panel samping Info tugas untuk tugas Dataflow yang sedang berjalan dengan kolom Nama tugas.

Atau, buat kueri daftar tugas yang ada menggunakan Antarmuka Command Line Dataflow. Masukkan perintah gcloud dataflow jobs list ke jendela shell atau terminal untuk mendapatkan daftar tugas Dataflow di project Google Cloud, dan temukan kolom NAME untuk tugas yang ingin Anda ganti:

JOB_ID                                    NAME                        TYPE       CREATION_TIME        STATE    REGION
2020-12-28_12_01_09-yourdataflowjobid     ps-topic                    Streaming  2020-12-28 20:01:10  Running  us-central1

Membuat pemetaan transformasi

Jika pipeline penggantian Anda mengubah nama transformasi dari nama dalam pipeline sebelumnya, layanan Dataflow memerlukan pemetaan transformasi. Pemetaan transformasi memetakan transformasi bernama dalam kode pipeline sebelumnya ke nama dalam kode pipeline pengganti.

Java

Teruskan pemetaan menggunakan opsi command line --transformNameMapping, menggunakan format umum berikut:

--transformNameMapping= . 
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

Anda hanya perlu memberikan entri pemetaan di --transformNameMapping untuk nama transformasi yang telah berubah antara pipeline sebelumnya dan pipeline pengganti.

Saat menjalankan dengan --transformNameMapping, Anda mungkin perlu meng-escape tanda kutip yang sesuai untuk shell Anda. Misalnya, di Bash:

--transformNameMapping='{"oldTransform1":"newTransform1",...}'

Python

Teruskan pemetaan menggunakan opsi command line --transform_name_mapping, menggunakan format umum berikut:

--transform_name_mapping= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

Anda hanya perlu memberikan entri pemetaan di --transform_name_mapping untuk nama transformasi yang telah berubah antara pipeline sebelumnya dan pipeline pengganti.

Saat menjalankan dengan --transform_name_mapping, Anda mungkin perlu meng-escape tanda kutip yang sesuai untuk shell Anda. Misalnya, di Bash:

--transform_name_mapping='{"oldTransform1":"newTransform1",...}'

Go

Teruskan pemetaan menggunakan opsi command line --transform_name_mapping, menggunakan format umum berikut:

--transform_name_mapping= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

Anda hanya perlu memberikan entri pemetaan di --transform_name_mapping untuk nama transformasi yang telah berubah antara pipeline sebelumnya dan pipeline pengganti.

Saat menjalankan dengan --transform_name_mapping, Anda mungkin perlu meng-escape tanda kutip yang sesuai untuk shell Anda. Misalnya, di Bash:

--transform_name_mapping='{"oldTransform1":"newTransform1",...}'

gcloud

Teruskan pemetaan menggunakan opsi --transform-name-mappings, menggunakan format umum berikut:

--transform-name-mappings= .
{"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}

Anda hanya perlu memberikan entri pemetaan di --transform-name-mappings untuk nama transformasi yang telah berubah antara pipeline sebelumnya dan pipeline pengganti.

Saat menjalankan dengan --transform-name-mappings, Anda mungkin perlu meng-escape tanda kutip sesuai dengan shell Anda. Misalnya, di Bash:

--transform-name-mappings='{"oldTransform1":"newTransform1",...}'

REST

Teruskan pemetaan menggunakan kolom transformNameMapping, menggunakan format umum berikut:

"transformNameMapping": {
  oldTransform1: newTransform1,
  oldTransform2: newTransform2,
  ...
}

Anda hanya perlu memberikan entri pemetaan di transformNameMapping untuk nama transformasi yang telah berubah antara pipeline sebelumnya dan pipeline pengganti.

Menentukan nama transformasi

Nama transformasi di setiap instance dalam peta adalah nama yang Anda berikan saat menerapkan transformasi di pipeline. Contoh:

Java

  .apply("FormatResults", ParDo
    .of(new DoFn<KV<String, Long>>, String>() {
      ...
     }
  }))

Python

  | 'FormatResults' >> beam.ParDo(MyDoFn())

Go

  // In Go, this is always the package-qualified name of the DoFn itself.
  // For example, if the FormatResults DoFn is in the main package, its name
  // is "main.FormatResults".
  beam.ParDo(s, FormatResults, results)

Anda juga bisa mendapatkan nama transformasi untuk tugas sebelumnya dengan memeriksa grafik eksekusi tugas di Antarmuka Pemantauan Dataflow:

Grafik eksekusi untuk pipeline WordCount.
Gambar 2: Grafik eksekusi untuk pipeline WordCount seperti yang ditampilkan di Antarmuka Pemantauan Dataflow.

Penamaan transformasi gabungan

Nama transformasi bersifat hierarkis, berdasarkan hierarki transformasi dalam pipeline Anda. Jika pipeline Anda memiliki transformasi gabungan, transformasi bertingkat diberi nama berdasarkan transformasi yang berisinya. Misalnya, misalnya pipeline Anda berisi transformasi gabungan bernama CountWidgets, yang berisi transformasi dalam bernama Parse. Nama lengkap transformasi Anda adalah CountWidgets/Parse, dan Anda harus menentukan nama lengkap tersebut dalam pemetaan transformasi.

Jika pipeline baru Anda memetakan transformasi komposit ke nama yang berbeda, semua transformasi bertingkat juga akan otomatis diganti namanya. Anda harus menentukan nama yang diubah untuk transformasi dalam di pemetaan transformasi.

Memfaktorkan ulang hierarki transformasi

Jika pipeline pengganti menggunakan hierarki transformasi yang berbeda dengan pipeline sebelumnya, Anda harus mendeklarasikan pemetaan secara eksplisit. Anda mungkin memiliki hierarki transformasi yang berbeda karena memfaktorkan ulang transformasi komposit, atau pipeline Anda bergantung pada transformasi komposit dari library yang berubah.

Misalnya, pipeline sebelumnya menerapkan transformasi gabungan, CountWidgets, yang berisi transformasi dalam bernama Parse. Pipeline pengganti memfaktorkan ulang CountWidgets, dan menyusun bertingkat Parse di dalam transformasi lain bernama Scan. Agar update berhasil, Anda harus memetakan nama transformasi lengkap di pipeline sebelumnya (CountWidgets/Parse) secara eksplisit ke nama transformasi di pipeline baru (CountWidgets/Scan/Parse):

Java

--transformNameMapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

Jika Anda menghapus transformasi sepenuhnya di pipeline penggantian, Anda harus menyediakan pemetaan null. Misalkan pipeline pengganti Anda menghapus transformasi CountWidgets/Parse sepenuhnya:

--transformNameMapping={"CountWidgets/Parse":""}

Python

--transform_name_mapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

Jika Anda menghapus transformasi sepenuhnya di pipeline penggantian, Anda harus menyediakan pemetaan null. Misalkan pipeline pengganti Anda menghapus transformasi CountWidgets/Parse sepenuhnya:

--transform_name_mapping={"CountWidgets/Parse":""}

Go

--transform_name_mapping={"CountWidgets/main.Parse":"CountWidgets/Scan/main.Parse"}

Jika Anda menghapus transformasi sepenuhnya di pipeline penggantian, Anda harus menyediakan pemetaan null. Misalkan pipeline pengganti Anda menghapus transformasi CountWidgets/Parse sepenuhnya:

--transform_name_mapping={"CountWidgets/main.Parse":""}

gcloud

--transform-name-mappings={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}

Jika Anda menghapus transformasi sepenuhnya di pipeline penggantian, Anda harus menyediakan pemetaan null. Misalkan pipeline pengganti Anda menghapus transformasi CountWidgets/Parse sepenuhnya:

--transform-name-mappings={"CountWidgets/main.Parse":""}

REST

"transformNameMapping": {
  CountWidgets/Parse: CountWidgets/Scan/Parse
}

Jika Anda menghapus transformasi sepenuhnya di pipeline penggantian, Anda harus menyediakan pemetaan null. Misalkan pipeline pengganti Anda menghapus transformasi CountWidgets/Parse sepenuhnya:

"transformNameMapping": {
  CountWidgets/main.Parse: null
}

Efek penggantian tugas

Saat Anda mengganti tugas yang ada, tugas baru akan menjalankan kode pipeline yang diperbarui. Layanan Dataflow mempertahankan nama tugas, tetapi menjalankan tugas penggantian dengan ID Tugas yang diperbarui. Proses ini dapat menyebabkan periode nonaktif saat tugas yang ada berhenti, pemeriksaan kompatibilitas berjalan, dan tugas baru dimulai.

Tugas penggantian mempertahankan item berikut:

Data status perantara

Data status perantara dari tugas sebelumnya akan dipertahankan. Data status tidak mencakup cache dalam memori. Jika Anda ingin mempertahankan data cache dalam memori saat mengupdate pipeline, sebagai solusi, faktorkan ulang pipeline untuk mengonversi cache menjadi data status atau menjadi input samping. Untuk informasi selengkapnya tentang penggunaan input samping, lihat Pola input samping dalam dokumentasi Apache Beam.

Pipeline streaming memiliki batas ukuran untuk ValueState dan untuk input samping. Akibatnya, jika memiliki cache besar yang ingin dipertahankan, Anda mungkin perlu menggunakan penyimpanan eksternal, seperti Memorystore atau Bigtable.

Data dalam penerbangan

Data "sedang diproses" masih diproses oleh transformasi di pipeline baru Anda. Namun, transformasi tambahan yang Anda tambahkan dalam kode pipeline pengganti mungkin akan diterapkan atau tidak, bergantung pada tempat data di-buffer. Dalam contoh ini, pipeline yang ada memiliki transformasi berikut:

Java

  p.apply("Read", ReadStrings())
   .apply("Format", FormatStrings());

Python

  p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
    | 'Format' >> FormatStrings()

Go

   beam.ParDo(s, ReadStrings)
   beam.ParDo(s, FormatStrings)

Anda dapat mengganti tugas dengan kode pipeline baru, sebagai berikut:

Java

  p.apply("Read", ReadStrings())
   .apply("Remove", RemoveStringsStartingWithA())
   .apply("Format", FormatStrings());

Python

  p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription)
    | 'Remove' >> RemoveStringsStartingWithA()
    | 'Format' >> FormatStrings()

Go

  beam.ParDo(s, ReadStrings)
  beam.ParDo(s, RemoveStringsStartingWithA)
  beam.ParDo(s, FormatStrings)

Meskipun Anda menambahkan transformasi untuk memfilter string yang diawali dengan huruf "A", transformasi berikutnya (FormatStrings) mungkin masih melihat string yang dibuffer atau sedang dalam proses yang diawali dengan "A" yang ditransfer dari tugas sebelumnya.

Mengubah periode

Anda dapat mengubah strategi periode dan pemicu untuk elemen PCollection di pipeline penggantian, tetapi berhati-hatilah. Mengubah strategi periode atau pemicu tidak memengaruhi data yang sudah di-buffer atau sedang dalam pengiriman.

Sebaiknya Anda hanya mencoba perubahan yang lebih kecil pada periode pipeline, seperti mengubah durasi periode waktu tetap atau geser. Membuat perubahan besar pada periode atau pemicu, seperti mengubah algoritma periode, mungkin akan memberikan hasil yang tidak dapat diprediksi pada output pipeline Anda.

Pemeriksaan kompatibilitas tugas

Saat Anda meluncurkan tugas pengganti, layanan Dataflow akan melakukan pemeriksaan kompatibilitas antara tugas pengganti dan tugas sebelumnya. Jika pemeriksaan kompatibilitas lulus, tugas sebelumnya akan dihentikan. Tugas pengganti Anda kemudian akan diluncurkan di layanan Dataflow sekaligus mempertahankan nama tugas yang sama. Jika pemeriksaan kompatibilitas gagal, tugas sebelumnya akan terus berjalan di layanan Dataflow dan tugas pengganti akan menampilkan error.

Java

Karena batasan, Anda harus menggunakan eksekusi pemblokiran untuk melihat error upaya update yang gagal di konsol atau terminal. Solusi saat ini terdiri dari langkah-langkah berikut:

  1. Gunakan pipeline.run().waitUntilFinish() dalam kode pipeline Anda.
  2. Jalankan program pipeline penggantian Anda dengan opsi --update.
  3. Tunggu hingga tugas penggantian berhasil lulus pemeriksaan kompatibilitas.
  4. Keluar dari proses runner pemblokir dengan mengetik Ctrl+C.

Atau, Anda dapat memantau status tugas penggantian di Dataflow Monitoring Interface. Jika berhasil dimulai, tugas Anda juga akan lulus pemeriksaan kompatibilitas.

Python

Karena batasan, Anda harus menggunakan eksekusi pemblokiran untuk melihat error upaya update yang gagal di konsol atau terminal. Solusi saat ini terdiri dari langkah-langkah berikut:

  1. Gunakan pipeline.run().wait_until_finish() dalam kode pipeline Anda.
  2. Jalankan program pipeline penggantian Anda dengan opsi --update.
  3. Tunggu hingga tugas penggantian berhasil lulus pemeriksaan kompatibilitas.
  4. Keluar dari proses runner pemblokir dengan mengetik Ctrl+C.

Atau, Anda dapat memantau status tugas penggantian di Dataflow Monitoring Interface. Jika berhasil dimulai, tugas Anda juga akan lulus pemeriksaan kompatibilitas.

Go

Karena batasan, Anda harus menggunakan eksekusi pemblokiran untuk melihat error upaya update yang gagal di konsol atau terminal. Secara khusus, Anda harus menentukan eksekusi non-pemblokiran menggunakan flag --execute_async atau --async. Solusi saat ini terdiri dari langkah-langkah berikut:

  1. Jalankan program pipeline penggantian dengan opsi --update dan tanpa tanda --execute_async atau --async.
  2. Tunggu hingga tugas penggantian berhasil lulus pemeriksaan kompatibilitas.
  3. Keluar dari proses runner pemblokir dengan mengetik Ctrl+C.

gcloud

Karena batasan, Anda harus menggunakan eksekusi pemblokiran untuk melihat error upaya update yang gagal di konsol atau terminal. Solusi saat ini terdiri dari langkah-langkah berikut:

  1. Untuk pipeline Java, gunakan pipeline.run().waitUntilFinish() dalam kode pipeline Anda. Untuk pipeline Python, gunakan pipeline.run().wait_until_finish() dalam kode pipeline Anda. Untuk pipeline Go, ikuti langkah-langkah di tab Go.
  2. Jalankan program pipeline penggantian Anda dengan opsi --update.
  3. Tunggu hingga tugas penggantian berhasil lulus pemeriksaan kompatibilitas.
  4. Keluar dari proses runner pemblokir dengan mengetik Ctrl+C.

REST

Karena batasan, Anda harus menggunakan eksekusi pemblokiran untuk melihat error upaya update yang gagal di konsol atau terminal. Solusi saat ini terdiri dari langkah-langkah berikut:

  • Untuk pipeline Java, gunakan pipeline.run().waitUntilFinish() dalam kode pipeline Anda. Untuk pipeline Python, gunakan pipeline.run().wait_until_finish() dalam kode pipeline Anda. Untuk pipeline Go, ikuti langkah-langkah di tab Go.
  • Jalankan program pipeline penggantian Anda dengan kolom replaceJobId.
  • Tunggu hingga tugas penggantian berhasil lulus pemeriksaan kompatibilitas.
  • Keluar dari proses runner pemblokir dengan mengetik Ctrl+C.

Pemeriksaan kompatibilitas menggunakan pemetaan transformasi yang disediakan untuk memastikan bahwa Dataflow dapat mentransfer data status perantara dari langkah-langkah dalam tugas sebelumnya ke tugas pengganti. Pemeriksaan kompatibilitas juga memastikan bahwa PCollection di pipeline Anda menggunakan Coder yang sama. Mengubah Coder dapat menyebabkan pemeriksaan kompatibilitas gagal karena data yang sedang diproses atau data yang dibuffer mungkin tidak diserialisasi dengan benar di pipeline penggantian.

Mencegah gangguan kompatibilitas

Perbedaan tertentu antara pipeline sebelumnya dan pipeline pengganti dapat menyebabkan pemeriksaan kompatibilitas gagal. Perbedaan ini mencakup:

  • Mengubah grafik pipeline tanpa memberikan pemetaan. Saat Anda memperbarui tugas, Dataflow akan mencoba mencocokkan transformasi dalam tugas sebelumnya dengan transformasi dalam tugas pengganti. Proses pencocokan ini membantu Aliran data mentransfer data status perantara untuk setiap langkah. Jika mengganti nama atau menghapus langkah apa pun, Anda harus memberikan pemetaan transformasi agar Dataflow dapat mencocokkan data status dengan benar.
  • Mengubah input samping untuk langkah. Menambahkan input samping ke atau menghapusnya dari transformasi di pipeline penggantian akan menyebabkan pemeriksaan kompatibilitas gagal.
  • Mengubah Coder untuk suatu langkah. Saat Anda memperbarui tugas, Dataflow akan mempertahankan data yang saat ini di-buffer dan menanganinya dalam tugas penggantian. Misalnya, data yang di-buffer mungkin terjadi saat windowing di-resolve. Jika tugas penggantian menggunakan encoding data yang berbeda atau tidak kompatibel, Dataflow tidak dapat melakukan serialisasi atau deserialisasi data ini.
  • Menghapus operasi "stateful" dari pipeline Anda. Jika Anda menghapus operasi stateful dari pipeline, tugas penggantian Anda mungkin gagal dalam pemeriksaan kompatibilitas. Dataflow dapat menggabungkan beberapa langkah secara bersamaan untuk efisiensi. Jika Anda menghapus operasi yang bergantung pada status dari dalam langkah gabungan, pemeriksaan akan gagal. Operasi stateful meliputi:

    • Transformasi yang menghasilkan atau menggunakan input samping.
    • Operasi baca I/O.
    • Transformasi yang menggunakan status dengan kunci.
    • Transformasi yang memiliki penggabungan jendela.
  • Mengubah variabel DoFn stateful. Untuk tugas streaming yang sedang berlangsung, jika pipeline Anda menyertakan DoFn stateful, mengubah variabel DoFn stateful dapat menyebabkan pipeline gagal.

  • Mencoba menjalankan tugas penggantian di zona geografis yang berbeda. Jalankan tugas pengganti di zona yang sama dengan tempat Anda menjalankan tugas sebelumnya.

Memperbarui skema

Apache Beam memungkinkan PCollection memiliki skema dengan kolom bernama, sehingga Coder eksplisit tidak diperlukan. Jika nama dan jenis kolom untuk skema tertentu tidak berubah (termasuk kolom bertingkat), skema tersebut tidak menyebabkan pemeriksaan update gagal. Namun, update mungkin masih diblokir jika segmen lain dari pipeline baru tidak kompatibel.

Mengembangkan skema

Sering kali perlu mengembangkan skema PCollection karena persyaratan bisnis yang terus berubah. Layanan Dataflow memungkinkan perubahan berikut pada skema saat memperbarui pipeline:

  • Menambahkan satu atau beberapa kolom baru ke skema, termasuk kolom bertingkat.
  • Membuat jenis kolom wajib diisi (non-nullable) menjadi opsional (nullable).

Menghapus kolom, mengubah nama kolom, atau mengubah jenis kolom tidak diizinkan selama pembaruan.

Meneruskan data tambahan ke operasi ParDo yang ada

Anda dapat meneruskan data tambahan (out-of-band) ke operasi ParDo yang ada dengan menggunakan salah satu metode berikut, bergantung pada kasus penggunaan Anda:

  • Serasikan informasi sebagai kolom di subclass DoFn Anda.
  • Setiap variabel yang direferensikan oleh metode dalam DoFn anonim akan diserialisasi secara otomatis.
  • Menghitung data di dalam DoFn.startBundle().
  • Teruskan data menggunakan ParDo.withSideInputs.

Untuk informasi lebih lanjut, lihat halaman berikut: