Dokumen ini menjelaskan cara memperbarui tugas streaming yang sedang berlangsung. Anda mungkin ingin memperbarui tugas Dataflow yang ada karena alasan berikut:
- Anda ingin meningkatkan atau memperbaiki kode pipeline Anda.
- Anda ingin memperbaiki bug dalam kode pipeline.
- Anda ingin memperbarui pipeline untuk menangani perubahan dalam format data, atau untuk memperhitungkan versi atau perubahan lain dalam sumber data Anda.
- Anda ingin mem-patch kerentanan keamanan terkait Container-Optimized OS untuk semua pekerja Dataflow.
- Anda ingin menskalakan pipeline Apache Beam streaming agar menggunakan jumlah pekerja yang berbeda.
Anda dapat memperbarui tugas dengan dua cara:
- Pembaruan tugas yang sedang berlangsung: Untuk tugas streaming yang menggunakan
Streaming Engine, Anda dapat memperbarui
opsi tugas
min-num-workers
danmax-num-workers
tanpa menghentikan tugas atau mengubah ID tugas. - Tugas pengganti: Untuk menjalankan kode pipeline yang diperbarui atau memperbarui opsi tugas yang tidak didukung oleh update tugas yang sedang berlangsung, luncurkan tugas baru yang menggantikan tugas yang ada. Untuk memverifikasi apakah tugas pengganti valid, sebelum meluncurkan tugas baru, validasi grafik tugasnya.
Saat Anda memperbarui tugas, layanan Dataflow akan melakukan pemeriksaan kompatibilitas antara tugas yang saat ini berjalan dan tugas pengganti yang mungkin dilakukan. Pemeriksaan kompatibilitas memastikan bahwa hal-hal seperti informasi status perantara dan data yang di-buffer dapat ditransfer dari tugas sebelumnya ke tugas pengganti Anda.
Anda juga dapat menggunakan infrastruktur logging bawaan Apache Beam SDK untuk mencatat informasi ke dalam log saat memperbarui tugas. Untuk mengetahui informasi selengkapnya, lihat
Bekerja dengan log pipeline.
Untuk mengidentifikasi masalah terkait kode pipeline, gunakan
level logging DEBUG
.
- Untuk mengetahui petunjuk cara memperbarui tugas streaming yang menggunakan template klasik, lihat artikel Memperbarui tugas streaming template kustom.
- Untuk mengetahui petunjuk cara mengupdate tugas streaming yang menggunakan Template Flex, ikuti petunjuk gcloud CLI di halaman ini, atau lihat Memperbarui tugas Template Flex.
Pembaruan opsi tugas dalam penerbangan
Untuk tugas streaming yang menggunakan Streaming Engine, Anda dapat memperbarui opsi tugas berikut tanpa menghentikan tugas atau mengubah ID tugas:
min-num-workers
: jumlah minimum instance Compute Engine.max-num-workers
: jumlah maksimum instance Compute Engine.
Untuk info lowongan terbaru lainnya, Anda harus mengganti pekerjaan saat ini dengan lowongan yang sudah diperbarui. Untuk informasi selengkapnya, lihat Meluncurkan tugas pengganti.
Lakukan update yang sedang berlangsung
Untuk melakukan pembaruan opsi tugas yang sedang berlangsung, lakukan langkah-langkah berikut.
gcloud
Gunakan perintah gcloud dataflow jobs update-options
:
gcloud dataflow jobs update-options \ --region=REGION \ --min-num-workers=MINIMUM_WORKERS \ --max-num-workers=MAXIMUM_WORKERS \ JOB_ID
Ganti kode berikut:
- REGION: ID wilayah tugas
- JOB_ID: ID tugas yang akan diperbarui
Anda juga dapat mengupdate --min-num-workers
dan --max-num-workers
satu per satu.
REST
Gunakan
projects.locations.jobs.update
metode:
PUT https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?updateMask=runtime_updatable_params.max_num_workers,runtime_updatable_params.min_num_workers { "runtime_updatable_params": { "min_num_workers": MINIMUM_WORKERS, "max_num_workers": MAXIMUM_WORKERS } }
Ganti kode berikut:
- PROJECT_ID: ID project Google Cloud untuk tugas Dataflow
- REGION: ID wilayah tugas
- JOB_ID: ID tugas yang akan diperbarui
- MINIMUM_WORKERS: jumlah minimum instance Compute Engine
- MAXIMUM_WORKERS: jumlah maksimum instance Compute Engine
Anda juga dapat memperbarui min_num_workers
dan max_num_workers
satu per satu.
Tentukan parameter yang akan diperbarui dalam parameter kueri updateMask
, dan
sertakan nilai yang telah diperbarui dalam kolom runtimeUpdatableParams
dari
isi permintaan. Contoh berikut memperbarui min_num_workers
:
PUT https://dataflow.googleapis.com/v1b3/projects/my_project/locations/us-central1/jobs/job1?updateMask=runtime_updatable_params.min_num_workers { "runtime_updatable_params": { "min_num_workers": 5 } }
Tugas harus dalam status berjalan agar memenuhi syarat untuk update yang sedang berlangsung. Error terjadi jika tugas belum dimulai atau sudah dibatalkan. Demikian pula, jika Anda meluncurkan tugas pengganti, tunggu hingga tugas tersebut mulai berjalan sebelum mengirimkan update yang sedang berlangsung ke tugas baru.
Setelah Anda mengirimkan permintaan update, sebaiknya tunggu permintaan selesai sebelum mengirim update lain. Lihat log tugas untuk melihat kapan permintaan selesai.
Memvalidasi tugas pengganti
Untuk memverifikasi apakah tugas pengganti valid, sebelum Anda meluncurkan tugas baru, validasi grafik tugasnya. Di Dataflow, grafik tugas adalah representasi grafis dari pipeline. Dengan memvalidasi grafik tugas, Anda mengurangi risiko pipeline mengalami error atau kegagalan pipeline setelah pembaruan. Selain itu, Anda dapat memvalidasi update tanpa perlu menghentikan tugas asal, sehingga tugas tidak mengalami periode nonaktif.
Untuk memvalidasi grafik tugas, ikuti langkah-langkah untuk
meluncurkan tugas pengganti. Sertakan graph_validate_only
opsi layanan Dataflow dalam perintah update.
Java
- Teruskan opsi
--update
. - Tetapkan opsi
--jobName
diPipelineOptions
dengan nama yang sama dengan tugas yang ingin Anda perbarui. - Tetapkan opsi
--region
ke region yang sama dengan region tugas yang ingin Anda perbarui. - Sertakan opsi layanan
--dataflowServiceOptions=graph_validate_only
. - Jika nama transformasi dalam pipeline telah berubah, Anda harus menyediakan
pemetaan transformasi dan meneruskannya menggunakan
opsi
--transformNameMapping
. - Jika Anda mengirimkan tugas pengganti yang menggunakan Apache Beam SDK
versi terbaru, tetapkan
--updateCompatibilityVersion
ke versi Apache Beam SDK yang digunakan dalam tugas asal.
Python
- Teruskan opsi
--update
. - Tetapkan opsi
--job_name
diPipelineOptions
dengan nama yang sama dengan tugas yang ingin Anda perbarui. - Tetapkan opsi
--region
ke region yang sama dengan region tugas yang ingin Anda perbarui. - Sertakan opsi layanan
--dataflow_service_options=graph_validate_only
. - Jika nama transformasi dalam pipeline telah berubah, Anda harus menyediakan
pemetaan transformasi dan meneruskannya menggunakan
opsi
--transform_name_mapping
. - Jika Anda mengirimkan tugas pengganti yang menggunakan Apache Beam SDK
versi terbaru, tetapkan
--updateCompatibilityVersion
ke versi Apache Beam SDK yang digunakan dalam tugas asal.
Go
- Teruskan opsi
--update
. - Tetapkan opsi
--job_name
ke nama yang sama dengan tugas yang ingin Anda perbarui. - Tetapkan opsi
--region
ke region yang sama dengan region tugas yang ingin Anda perbarui. - Sertakan opsi layanan
--dataflow_service_options=graph_validate_only
. - Jika nama transformasi dalam pipeline telah berubah, Anda harus menyediakan
pemetaan transformasi dan meneruskannya menggunakan
opsi
--transform_name_mapping
.
gcloud
Guna memvalidasi grafik tugas untuk tugas Template Fleksibel, gunakan
perintah gcloud dataflow flex-template run
dengan opsi additional-experiments
:
- Teruskan opsi
--update
. - Setel JOB_NAME ke nama yang sama dengan tugas yang ingin Anda perbarui.
- Tetapkan opsi
--region
ke region yang sama dengan region tugas yang ingin Anda perbarui. - Sertakan opsi
--additional-experiments=graph_validate_only
. - Jika nama transformasi dalam pipeline telah berubah, Anda harus menyediakan
pemetaan transformasi dan meneruskannya menggunakan
opsi
--transform-name-mappings
.
Contoh:
gcloud dataflow flex-template run JOB_NAME --additional-experiments=graph_validate_only
Ganti JOB_NAME dengan nama lowongan yang ingin Anda perbarui.
REST
Gunakan kolom additionalExperiments
di objek
FlexTemplateRuntimeEnvironment
(Template fleksibel) atau
RuntimeEnvironment
.
{
additionalExperiments : ["graph_validate_only"]
...
}
Opsi layanan graph_validate_only
hanya memvalidasi update pipeline. Jangan gunakan opsi ini saat
membuat atau meluncurkan {i>pipe<i}. Untuk memperbarui pipeline Anda,
luncurkan tugas pengganti tanpa
opsi layanan graph_validate_only
.
Saat validasi grafik tugas berhasil, status tugas dan log tugas akan menampilkan status berikut:
- Status tugas adalah
JOB_STATE_DONE
. - Di konsol Google Cloud, Status tugas adalah
Succeeded
. Pesan berikut muncul di log tugas:
Workflow job: JOB_ID succeeded validation. Marking graph_validate_only job as Done.
Jika validasi grafik tugas gagal, status tugas dan log tugas akan menampilkan status berikut:
- Status tugas adalah
JOB_STATE_FAILED
. - Di konsol Google Cloud, Status tugas adalah
Failed
. - Pesan muncul di log tugas yang menjelaskan error inkompatibilitas. Konten pesan bergantung pada error.
Meluncurkan tugas pengganti
Anda mungkin mengganti pekerjaan yang ada karena alasan berikut:
- Untuk menjalankan kode pipeline yang diperbarui.
- Untuk memperbarui opsi tugas yang tidak mendukung update yang sedang berlangsung.
Untuk memverifikasi apakah tugas pengganti valid, sebelum Anda meluncurkan tugas baru, validasi grafik tugasnya.
Saat Anda meluncurkan tugas pengganti, tetapkan opsi pipeline berikut untuk melakukan proses pembaruan, selain opsi reguler tugas:
Java
- Teruskan opsi
--update
. - Tetapkan opsi
--jobName
diPipelineOptions
dengan nama yang sama dengan tugas yang ingin Anda perbarui. - Tetapkan opsi
--region
ke region yang sama dengan region tugas yang ingin Anda perbarui. - Jika nama transformasi dalam pipeline telah berubah, Anda harus menyediakan
pemetaan transformasi dan meneruskannya menggunakan
opsi
--transformNameMapping
. - Jika Anda mengirimkan tugas pengganti yang menggunakan Apache Beam SDK
versi terbaru, tetapkan
--updateCompatibilityVersion
ke versi Apache Beam SDK yang digunakan dalam tugas asal.
Python
- Teruskan opsi
--update
. - Tetapkan opsi
--job_name
diPipelineOptions
dengan nama yang sama dengan tugas yang ingin Anda perbarui. - Tetapkan opsi
--region
ke region yang sama dengan region tugas yang ingin Anda perbarui. - Jika nama transformasi dalam pipeline telah berubah, Anda harus menyediakan
pemetaan transformasi dan meneruskannya menggunakan
opsi
--transform_name_mapping
. - Jika Anda mengirimkan tugas pengganti yang menggunakan Apache Beam SDK
versi terbaru, tetapkan
--updateCompatibilityVersion
ke versi Apache Beam SDK yang digunakan dalam tugas asal.
Go
- Teruskan opsi
--update
. - Tetapkan opsi
--job_name
ke nama yang sama dengan tugas yang ingin Anda perbarui. - Tetapkan opsi
--region
ke region yang sama dengan region tugas yang ingin Anda perbarui. - Jika nama transformasi dalam pipeline telah berubah, Anda harus menyediakan
pemetaan transformasi dan meneruskannya menggunakan
opsi
--transform_name_mapping
.
gcloud
Untuk memperbarui tugas Template Flex menggunakan gcloud CLI, gunakan perintah gcloud dataflow flex-template run
. Mengupdate tugas lain menggunakan gcloud CLI tidak didukung.
- Teruskan opsi
--update
. - Setel JOB_NAME ke nama yang sama dengan tugas yang ingin Anda perbarui.
- Tetapkan opsi
--region
ke region yang sama dengan region tugas yang ingin Anda perbarui. - Jika nama transformasi dalam pipeline telah berubah, Anda harus menyediakan
pemetaan transformasi dan meneruskannya menggunakan
opsi
--transform-name-mappings
.
REST
Petunjuk ini menunjukkan cara memperbarui tugas non-template dengan menggunakan REST API. Untuk menggunakan REST API guna memperbarui tugas template klasik, lihat Memperbarui tugas streaming template kustom. Untuk menggunakan REST API guna memperbarui tugas Template Flex, lihat Memperbarui tugas Template Flex.
Ambil resource
job
untuk tugas yang ingin Anda ganti dengan menggunakan metodeprojects.locations.jobs.get
. Sertakan parameter kueriview
dengan nilaiJOB_VIEW_DESCRIPTION
. MenyertakanJOB_VIEW_DESCRIPTION
akan membatasi jumlah data dalam respons sehingga permintaan Anda berikutnya tidak melebihi batas ukuran. Jika Anda memerlukan informasi pekerjaan yang lebih mendetail, gunakan nilaiJOB_VIEW_ALL
.GET https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs/JOB_ID?view=JOB_VIEW_DESCRIPTION
Ganti nilai berikut:
- PROJECT_ID: ID project Google Cloud untuk tugas Dataflow
- REGION: region tugas yang ingin Anda perbarui
- JOB_ID: ID pekerjaan dari pekerjaan yang ingin Anda perbarui
Untuk memperbarui tugas, gunakan metode
projects.locations.jobs.create
. Dalam isi permintaan, gunakan resourcejob
yang Anda ambil.POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs { "id": JOB_ID, "replaceJobId": JOB_ID, "name": JOB_NAME, "type": "JOB_TYPE_STREAMING", "transformNameMapping": { string: string, ... }, }
Ganti kode berikut:
- JOB_ID: ID tugas yang sama dengan ID tugas yang ingin Anda perbarui.
- JOB_NAME: nama pekerjaan yang sama dengan nama tugas yang ingin Anda perbarui.
Jika nama transformasi dalam pipeline telah berubah, Anda harus menyediakan pemetaan transformasi dan meneruskannya menggunakan kolom
transformNameMapping
.Opsional: Untuk mengirim permintaan Anda menggunakan curl (Linux, macOS, atau Cloud Shell), simpan permintaan ke file JSON, lalu jalankan perintah berikut:
curl -X POST -d "@FILE_PATH" -H "Content-Type: application/json" -H "Authorization: Bearer $(gcloud auth print-access-token)" https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/REGION/jobs
Ganti FILE_PATH dengan jalur ke file JSON yang berisi isi permintaan.
Tentukan nama tugas pengganti Anda
Java
Saat Anda meluncurkan tugas pengganti, nilai yang Anda teruskan untuk opsi --jobName
harus sama persis dengan nama tugas yang ingin Anda ganti.
Python
Saat Anda meluncurkan tugas pengganti, nilai yang Anda teruskan untuk opsi --job_name
harus sama persis dengan nama tugas yang ingin Anda ganti.
Go
Saat Anda meluncurkan tugas pengganti, nilai yang Anda teruskan untuk opsi --job_name
harus sama persis dengan nama tugas yang ingin Anda ganti.
gcloud
Saat Anda meluncurkan tugas pengganti, JOB_NAME harus sama persis dengan nama tugas yang ingin Anda ganti.
REST
Tetapkan nilai kolom replaceJobId
ke ID pekerjaan yang sama dengan tugas yang ingin Anda
perbarui. Untuk menemukan nilai nama tugas yang benar, pilih pekerjaan Anda sebelumnya di Dataflow Monitoring Interface.
Kemudian, di panel samping Info pekerjaan, cari kolom ID Pekerjaan.
Untuk menemukan nilai nama tugas yang benar, pilih pekerjaan Anda sebelumnya di Dataflow Monitoring Interface. Kemudian, di panel samping Info tugas, cari kolom Nama pekerjaan:
Atau, buat kueri daftar tugas yang ada dengan menggunakan Antarmuka Command Line Dataflow.
Masukkan perintah gcloud dataflow jobs list
ke dalam jendela shell atau terminal untuk mendapatkan daftar tugas Dataflow di project Google Cloud Anda, dan cari kolom NAME
untuk tugas yang ingin Anda ganti:
JOB_ID NAME TYPE CREATION_TIME STATE REGION 2020-12-28_12_01_09-yourdataflowjobid ps-topic Streaming 2020-12-28 20:01:10 Running us-central1
Membuat pemetaan transformasi
Jika pipeline pengganti Anda mengubah nama transformasi dari nama di pipeline sebelumnya, layanan Dataflow memerlukan pemetaan transformasi. Pemetaan transformasi memetakan transformasi bernama dalam kode pipeline Anda sebelumnya ke nama dalam kode pipeline pengganti Anda.
Java
Teruskan pemetaan dengan menggunakan opsi command line --transformNameMapping
,
menggunakan format umum berikut:
--transformNameMapping= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
Anda hanya perlu memberikan entri pemetaan di --transformNameMapping
untuk
nama transformasi yang telah berubah antara pipeline sebelumnya dan
pipeline penggantian Anda.
Saat menjalankan dengan --transformNameMapping
,
Anda mungkin perlu meng-escape
kutipan sebagaimana mestinya untuk shell Anda. Misalnya, di Bash:
--transformNameMapping='{"oldTransform1":"newTransform1",...}'
Python
Teruskan pemetaan dengan menggunakan opsi command line --transform_name_mapping
,
menggunakan format umum berikut:
--transform_name_mapping= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
Anda hanya perlu memberikan entri pemetaan di --transform_name_mapping
untuk
nama transformasi yang telah berubah antara pipeline sebelumnya dan
pipeline penggantian Anda.
Saat menjalankan dengan --transform_name_mapping
,
Anda mungkin perlu meng-escape
kutipan sesuai untuk shell Anda. Misalnya, di Bash:
--transform_name_mapping='{"oldTransform1":"newTransform1",...}'
Go
Teruskan pemetaan dengan menggunakan opsi command line --transform_name_mapping
,
menggunakan format umum berikut:
--transform_name_mapping= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
Anda hanya perlu memberikan entri pemetaan di --transform_name_mapping
untuk
nama transformasi yang telah berubah antara pipeline sebelumnya dan
pipeline penggantian Anda.
Saat menjalankan dengan --transform_name_mapping
,
Anda mungkin perlu meng-escape
kutipan sebagaimana mestinya untuk shell Anda. Misalnya, di Bash:
--transform_name_mapping='{"oldTransform1":"newTransform1",...}'
gcloud
Teruskan pemetaan menggunakan opsi --transform-name-mappings
, dengan menggunakan format umum berikut:
--transform-name-mappings= . {"oldTransform1":"newTransform1","oldTransform2":"newTransform2",...}
Anda hanya perlu memberikan entri pemetaan di --transform-name-mappings
untuk
nama transformasi yang telah berubah antara pipeline sebelumnya dan
pipeline penggantian Anda.
Saat menjalankan dengan --transform-name-mappings
,
Anda mungkin harus meng-escape tanda kutip sesuai untuk shell Anda. Misalnya, di Bash:
--transform-name-mappings='{"oldTransform1":"newTransform1",...}'
REST
Teruskan pemetaan menggunakan kolom transformNameMapping
, menggunakan format umum berikut:
"transformNameMapping": {
oldTransform1: newTransform1,
oldTransform2: newTransform2,
...
}
Anda hanya perlu memberikan entri pemetaan di transformNameMapping
untuk
nama transformasi yang telah berubah antara pipeline sebelumnya dan
pipeline penggantian Anda.
Menentukan nama transformasi
Nama transformasi di setiap instance dalam peta adalah nama yang Anda berikan saat menerapkan transformasi di pipeline. Contoh:
Java
.apply("FormatResults", ParDo
.of(new DoFn<KV<String, Long>>, String>() {
...
}
}))
Python
| 'FormatResults' >> beam.ParDo(MyDoFn())
Go
// In Go, this is always the package-qualified name of the DoFn itself.
// For example, if the FormatResults DoFn is in the main package, its name
// is "main.FormatResults".
beam.ParDo(s, FormatResults, results)
Anda juga bisa mendapatkan nama transformasi untuk tugas sebelumnya dengan memeriksa grafik eksekusi tugas di Dataflow Monitoring Interface:
Penamaan transformasi gabungan
Nama transformasi bersifat hierarkis, berdasarkan hierarki transformasi di pipeline Anda. Jika pipeline Anda memiliki transformasi gabungan, transformasi bertingkat akan diberi nama sesuai dengan transformasi yang memuatnya. Misalnya, pipeline Anda berisi transformasi komposit bernama
CountWidgets
, yang berisi transformasi dalam bernama Parse
. Nama lengkap transformasi Anda adalah CountWidgets/Parse
, dan Anda harus menentukan nama lengkap tersebut dalam pemetaan transformasi.
Jika pipeline baru Anda memetakan transformasi gabungan ke nama yang berbeda, semua transformasi bertingkat juga akan diganti namanya secara otomatis. Anda harus menentukan nama yang diubah untuk transformasi internal dalam pemetaan transformasi.
Memfaktorkan ulang hierarki transformasi
Jika pipeline pengganti Anda menggunakan hierarki transformasi yang berbeda dengan pipeline sebelumnya, Anda harus mendeklarasikan pemetaan secara eksplisit. Anda mungkin memiliki hierarki transformasi yang berbeda karena Anda telah memfaktorkan ulang transformasi komposit, atau pipeline Anda bergantung pada transformasi gabungan dari library yang telah berubah.
Misalnya, pipeline Anda sebelumnya menerapkan transformasi komposit, CountWidgets
,
yang berisi transformasi dalam bernama Parse
. Pipeline pengganti memfaktorkan ulang CountWidgets
, dan menyarangkan Parse
di dalam transformasi lain bernama Scan
. Agar update berhasil, Anda harus secara eksplisit memetakan nama transformasi lengkap dalam pipeline sebelumnya (CountWidgets/Parse
) ke nama transformasi di pipeline baru (CountWidgets/Scan/Parse
):
Java
--transformNameMapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}
Jika menghapus transformasi sepenuhnya di pipeline pengganti, Anda harus menyediakan pemetaan null. Misalkan pipeline pengganti Anda menghapus
transformasi CountWidgets/Parse
sepenuhnya:
--transformNameMapping={"CountWidgets/Parse":""}
Python
--transform_name_mapping={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}
Jika menghapus transformasi sepenuhnya di pipeline pengganti, Anda harus menyediakan pemetaan null. Misalkan pipeline pengganti Anda menghapus
transformasi CountWidgets/Parse
sepenuhnya:
--transform_name_mapping={"CountWidgets/Parse":""}
Go
--transform_name_mapping={"CountWidgets/main.Parse":"CountWidgets/Scan/main.Parse"}
Jika menghapus transformasi sepenuhnya di pipeline pengganti, Anda harus menyediakan pemetaan null. Misalkan pipeline pengganti Anda menghapus
transformasi CountWidgets/Parse
sepenuhnya:
--transform_name_mapping={"CountWidgets/main.Parse":""}
gcloud
--transform-name-mappings={"CountWidgets/Parse":"CountWidgets/Scan/Parse"}
Jika menghapus transformasi sepenuhnya di pipeline pengganti, Anda harus menyediakan pemetaan null. Misalkan pipeline pengganti Anda menghapus
transformasi CountWidgets/Parse
sepenuhnya:
--transform-name-mappings={"CountWidgets/main.Parse":""}
REST
"transformNameMapping": {
CountWidgets/Parse: CountWidgets/Scan/Parse
}
Jika menghapus transformasi sepenuhnya di pipeline pengganti, Anda harus menyediakan pemetaan null. Misalkan pipeline pengganti Anda menghapus
transformasi CountWidgets/Parse
sepenuhnya:
"transformNameMapping": {
CountWidgets/main.Parse: null
}
Efek dari mengganti pekerjaan
Saat Anda mengganti tugas yang ada, tugas baru akan menjalankan kode pipeline yang telah diperbarui. Layanan Dataflow mempertahankan nama tugas, tetapi menjalankan tugas pengganti dengan ID Tugas yang telah diperbarui. Proses ini dapat menyebabkan periode nonaktif saat tugas yang ada berhenti, pemeriksaan kompatibilitas berjalan, dan tugas baru dimulai.
Tugas pengganti mempertahankan item berikut:
- Data status menengah dari pekerjaan sebelumnya. Cache dalam memori tidak disimpan.
- Kumpulan data atau metadata yang mengalami buffering dari tugas sebelumnya. Misalnya, beberapa catatan dalam pipeline Anda mungkin di-buffer saat menunggu jendela untuk diselesaikan.
- Pembaruan opsi lowongan dalam penerbangan yang Anda lamar ke pekerjaan sebelumnya.
Data status menengah
Data status menengah dari tugas sebelumnya dipertahankan. Data status tidak termasuk cache dalam memori. Jika Anda ingin mempertahankan data cache dalam memori saat memperbarui pipeline, sebagai solusinya, faktorkan ulang pipeline untuk mengonversi cache menjadi data status atau ke input samping. Untuk mengetahui informasi selengkapnya tentang penggunaan input samping, lihat Pola input samping dalam dokumentasi Apache Beam.
Pipeline streaming memiliki batas ukuran untuk ValueState
dan untuk input samping.
Akibatnya, jika memiliki cache besar yang ingin dipertahankan, Anda mungkin perlu
menggunakan penyimpanan eksternal, seperti Memorystore atau Bigtable.
Data dalam penerbangan
Data yang "sedang berlangsung" masih diproses oleh transformasi di pipeline baru Anda. Namun, transformasi tambahan yang Anda tambahkan dalam kode pipeline pengganti mungkin berlaku atau mungkin tidak berlaku, bergantung pada tempat kumpulan data di-buffer. Dalam contoh ini, pipeline yang ada memiliki transformasi berikut:
Java
p.apply("Read", ReadStrings()) .apply("Format", FormatStrings());
Python
p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription) | 'Format' >> FormatStrings()
Go
beam.ParDo(s, ReadStrings) beam.ParDo(s, FormatStrings)
Anda dapat mengganti tugas dengan kode pipeline baru, seperti berikut:
Java
p.apply("Read", ReadStrings()) .apply("Remove", RemoveStringsStartingWithA()) .apply("Format", FormatStrings());
Python
p | 'Read' >> beam.io.ReadFromPubSub(subscription=known_args.input_subscription) | 'Remove' >> RemoveStringsStartingWithA() | 'Format' >> FormatStrings()
Go
beam.ParDo(s, ReadStrings) beam.ParDo(s, RemoveStringsStartingWithA) beam.ParDo(s, FormatStrings)
Meskipun Anda menambahkan transformasi untuk memfilter string yang dimulai dengan
huruf "A", transformasi berikutnya (FormatStrings
) mungkin masih melihat string yang di-buffer atau
sedang diproses yang dimulai dengan "A" yang ditransfer dari tugas
sebelumnya.
Mengubah windowing
Anda dapat mengubah strategi windowing
dan pemicu
untuk elemen PCollection
di pipeline pengganti Anda, tetapi berhati-hatilah.
Mengubah strategi windowing atau pemicu tidak akan memengaruhi data yang
sudah di-buffer atau sedang beroperasi.
Sebaiknya Anda hanya mencoba perubahan yang lebih kecil pada windowing pipeline, seperti mengubah durasi periode waktu tetap atau geser. Melakukan perubahan besar pada windowing atau pemicu, seperti mengubah algoritma windowing, mungkin memiliki hasil yang tidak dapat diprediksi pada output pipeline Anda.
Pemeriksaan kompatibilitas tugas
Saat Anda meluncurkan tugas pengganti, layanan Dataflow akan melakukan pemeriksaan kompatibilitas antara tugas pengganti dan tugas sebelumnya. Jika pemeriksaan kompatibilitas lulus, tugas sebelumnya akan dihentikan. Tugas pengganti Anda kemudian diluncurkan di layanan Dataflow sambil mempertahankan nama tugas yang sama. Jika pemeriksaan kompatibilitas gagal, tugas sebelumnya Anda terus berjalan di layanan Dataflow dan tugas penggantian Anda akan menampilkan error.
Java
Karena keterbatasan, Anda harus menggunakan eksekusi pemblokiran untuk melihat error upaya update yang gagal di konsol atau terminal Anda. Solusi saat ini terdiri dari langkah-langkah berikut:
- Gunakan pipeline.run().waitUntilFinish() dalam kode pipeline Anda.
- Jalankan program pipeline pengganti Anda dengan opsi
--update
. - Tunggu tugas pengganti berhasil lulus pemeriksaan kompatibilitas.
- Keluar dari proses runner pemblokiran dengan mengetik
Ctrl+C
.
Atau, Anda dapat memantau status tugas pengganti di Dataflow Monitoring Interface. Jika berhasil dimulai, tugas Anda juga lulus pemeriksaan kompatibilitas.
Python
Karena keterbatasan, Anda harus menggunakan eksekusi pemblokiran untuk melihat error upaya update yang gagal di konsol atau terminal Anda. Solusi saat ini terdiri dari langkah-langkah berikut:
- Gunakan pipeline.run().wait_until_finish() dalam kode pipeline Anda.
- Jalankan program pipeline pengganti Anda dengan opsi
--update
. - Tunggu tugas pengganti berhasil lulus pemeriksaan kompatibilitas.
- Keluar dari proses runner pemblokiran dengan mengetik
Ctrl+C
.
Atau, Anda dapat memantau status tugas pengganti di Dataflow Monitoring Interface. Jika berhasil dimulai, tugas Anda juga lulus pemeriksaan kompatibilitas.
Go
Karena keterbatasan, Anda harus menggunakan eksekusi pemblokiran
untuk melihat error upaya update yang gagal di konsol atau terminal Anda.
Secara khusus, Anda harus menentukan eksekusi non-pemblokiran dengan menggunakan
tanda --execute_async
atau --async
. Solusi saat ini
terdiri dari langkah-langkah berikut:
- Jalankan program pipeline pengganti dengan opsi
--update
dan tanpa tanda--execute_async
atau--async
. - Tunggu tugas pengganti berhasil lulus pemeriksaan kompatibilitas.
- Keluar dari proses runner pemblokiran dengan mengetik
Ctrl+C
.
gcloud
Karena keterbatasan, Anda harus menggunakan eksekusi pemblokiran untuk melihat error upaya update yang gagal di konsol atau terminal Anda. Solusi saat ini terdiri dari langkah-langkah berikut:
- Untuk pipeline Java, gunakan pipeline.run().waitUntilFinish() dalam kode pipeline Anda. Untuk pipeline Python, gunakan pipeline.run().wait_until_finish() dalam kode pipeline Anda. Untuk pipeline Go, ikuti langkah-langkah di tab Go.
- Jalankan program pipeline pengganti Anda dengan opsi
--update
. - Tunggu tugas pengganti berhasil lulus pemeriksaan kompatibilitas.
- Keluar dari proses runner pemblokiran dengan mengetik
Ctrl+C
.
REST
Karena keterbatasan, Anda harus menggunakan eksekusi pemblokiran untuk melihat error upaya update yang gagal di konsol atau terminal Anda. Solusi saat ini terdiri dari langkah-langkah berikut:
- Untuk pipeline Java, gunakan pipeline.run().waitUntilFinish() dalam kode pipeline Anda. Untuk pipeline Python, gunakan pipeline.run().wait_until_finish() dalam kode pipeline Anda. Untuk pipeline Go, ikuti langkah-langkah di tab Go.
- Jalankan program pipeline pengganti Anda dengan kolom
replaceJobId
. - Tunggu tugas pengganti berhasil lulus pemeriksaan kompatibilitas.
- Keluar dari proses runner pemblokiran dengan mengetik
Ctrl+C
.
Pemeriksaan kompatibilitas menggunakan pemetaan transformasi yang disediakan untuk memastikan bahwa
Dataflow dapat mentransfer data status perantara dari langkah-langkah dalam
tugas sebelumnya ke tugas pengganti Anda. Pemeriksaan kompatibilitas juga memastikan
bahwa PCollection
di pipeline Anda menggunakan
Coder yang sama.
Mengubah Coder
dapat menyebabkan kegagalan pemeriksaan kompatibilitas karena
data yang sedang berlangsung atau data yang di-buffer mungkin tidak diserialisasi dengan benar di
pipeline penggantian.
Mencegah gangguan kompatibilitas
Perbedaan tertentu antara pipeline sebelumnya dan pipeline pengganti Anda dapat menyebabkan kegagalan pemeriksaan kompatibilitas. Perbedaan ini mencakup:
- Mengubah grafik pipeline tanpa memberikan pemetaan. Saat Anda memperbarui tugas, Dataflow akan mencoba mencocokkan transformasi di tugas sebelumnya dengan transformasi dalam tugas pengganti. Proses pencocokan ini membantu Dataflow mentransfer data status perantara untuk setiap langkah. Jika Anda mengganti nama atau menghapus langkah, Anda harus memberikan pemetaan transformasi agar Dataflow dapat mencocokkan data status sebagaimana mestinya.
- Mengubah input samping untuk sebuah langkah. Menambahkan input samping ke atau menghapusnya dari transformasi di pipeline pengganti Anda akan menyebabkan pemeriksaan kompatibilitas gagal.
- Mengubah Coder untuk satu langkah. Saat Anda memperbarui tugas, Dataflow menyimpan catatan data apa pun yang saat ini di-buffer dan menanganinya dalam tugas pengganti. Misalnya, data yang di-buffer dapat terjadi saat windowing di-resolve. Jika tugas pengganti menggunakan encoding data yang berbeda atau tidak kompatibel, Dataflow tidak dapat melakukan serialisasi atau melakukan deserialisasi data ini.
Menghapus operasi "stateful" dari pipeline Anda. Jika Anda menghapus operasi stateful dari pipeline, tugas pengganti mungkin akan gagal dalam pemeriksaan kompatibilitas. Dataflow dapat menggabungkan beberapa langkah secara bersamaan untuk meningkatkan efisiensi. Jika Anda menghapus operasi yang bergantung pada status dari dalam langkah gabungan, pemeriksaan akan gagal. Operasi stateful meliputi:
- Transformasi yang menghasilkan atau menggunakan input samping.
- pembacaan I/O.
- Transformasi yang menggunakan status dengan kunci.
- Transformasi yang memiliki penggabungan jendela.
Mengubah variabel
DoFn
stateful. Untuk tugas streaming yang sedang berlangsung, jika pipeline Anda menyertakanDoFn
stateful, mengubah variabelDoFn
stateful dapat menyebabkan pipeline gagal.Mencoba menjalankan tugas pengganti di zona geografis yang berbeda. Jalankan tugas pengganti di zona yang sama tempat Anda menjalankan tugas sebelumnya.
Memperbarui skema
Apache Beam memungkinkan PCollection
memiliki skema dengan kolom bernama, dalam hal ini
Coder eksplisit tidak diperlukan. Jika nama dan jenis kolom untuk skema tertentu tidak berubah (termasuk kolom bertingkat), skema tersebut tidak menyebabkan kegagalan pemeriksaan update. Namun, update mungkin masih diblokir jika segmen
lain dari pipeline baru tidak kompatibel.
Mengembangkan skema
Sering kali skema PCollection
perlu dikembangkan karena persyaratan bisnis
yang terus berkembang. Dengan layanan Dataflow, perubahan berikut dapat dilakukan pada skema saat mengupdate pipeline:
- Menambahkan satu atau beberapa kolom baru ke skema, termasuk kolom bertingkat.
- Membuat jenis kolom wajib (non-nullable) menjadi opsional (nullable).
Menghapus kolom, mengubah nama kolom, atau mengubah jenis kolom tidak diizinkan selama update.
Meneruskan data tambahan ke operasi ParDo yang ada
Anda dapat meneruskan data tambahan (out-of-band) ke operasi ParDo yang ada menggunakan salah satu metode berikut, bergantung pada kasus penggunaan Anda:
- Membuat serialisasi informasi sebagai kolom di subclass
DoFn
. - Setiap variabel yang dirujuk oleh metode dalam
DoFn
anonim akan diserialisasi secara otomatis. - Komputasikan data di dalam
DoFn.startBundle()
. - Meneruskan data menggunakan
ParDo.withSideInputs
.
Untuk informasi selengkapnya, lihat halaman berikut:
- Panduan pemrograman Apache Beam: ParDo, khususnya bagian tentang pembuatan DoFn dan input samping.
- Referensi Apache Beam SDK untuk Java: ParDo