Ringkasan
Anda dapat menggunakan pipeline data Dataflow untuk tugas berikut:
- Membuat jadwal tugas berulang.
- Pahami tempat resource dibelanjakan selama beberapa eksekusi tugas.
- Menentukan dan mengelola tujuan keaktualan data.
- Periksa setiap tahap pipeline untuk memperbaiki dan mengoptimalkan pipeline Anda.
Untuk dokumentasi API, lihat referensi Data Pipeline.
Fitur
- Buat pipeline batch berulang untuk menjalankan tugas batch sesuai jadwal.
- Buat pipeline batch inkremental berulang untuk menjalankan tugas batch terhadap versi terbaru data input.
- Gunakan kartu skor ringkasan pipeline untuk melihat penggunaan kapasitas agregat dan penggunaan resource pipeline.
- Melihat keaktualan data pipeline streaming. Metrik ini, yang berkembang seiring waktu, dapat dikaitkan dengan pemberitahuan yang memberi tahu Anda saat keaktualan turun lebih rendah dari tujuan yang ditentukan.
- Gunakan grafik metrik pipeline untuk membandingkan tugas pipeline batch dan menemukan anomali.
Batasan
Ketersediaan regional: Anda dapat membuat pipeline data di region Cloud Scheduler yang tersedia.
Kuota:
- Jumlah default pipeline per project: 500
Jumlah default pipeline per organisasi: 2.500
Kuota tingkat organisasi dinonaktifkan secara default. Anda dapat memilih untuk menggunakan kuota tingkat organisasi, dan jika melakukannya, setiap organisasi dapat memiliki maksimal 2.500 pipeline secara default.
Label: Anda tidak dapat menggunakan label yang ditentukan pengguna untuk memberi label pada pipeline data Dataflow. Namun, saat Anda menggunakan kolom
additionalUserLabels
, nilai tersebut akan diteruskan ke tugas Dataflow Anda. Untuk informasi selengkapnya tentang cara label diterapkan ke setiap tugas Dataflow, lihat Opsi pipeline.
Jenis pipeline data
Dataflow memiliki dua jenis pipeline data, streaming dan batch. Kedua jenis pipeline menjalankan tugas yang ditentukan dalam template Dataflow.
- Streaming data pipeline
- Streaming data pipeline menjalankan tugas streaming Dataflow segera setelah dibuat.
- Pipeline data batch
Pipeline data batch menjalankan tugas batch Dataflow sesuai jadwal yang ditentukan pengguna. Nama file input pipeline batch dapat diparameterkan untuk memungkinkan pemrosesan pipeline batch inkremental.
Pipeline batch inkremental
Anda dapat menggunakan placeholder tanggal/waktu untuk menentukan format file input tambahan untuk pipeline batch.
- Placeholder untuk tahun, bulan, tanggal, jam, menit, dan detik dapat digunakan, dan
harus mengikuti
format
strftime()
. Placeholder didahului dengan simbol persen (%). - Pemformatan parameter tidak diverifikasi selama pembuatan pipeline.
- Contoh: Jika Anda menentukan "gs://bucket/Y" sebagai jalur input berparameter,
jalur tersebut akan dievaluasi sebagai "gs://bucket/Y", karena "Y" tanpa "%" sebelumnya
tidak dipetakan ke format
strftime()
.
- Contoh: Jika Anda menentukan "gs://bucket/Y" sebagai jalur input berparameter,
jalur tersebut akan dievaluasi sebagai "gs://bucket/Y", karena "Y" tanpa "%" sebelumnya
tidak dipetakan ke format
Pada setiap waktu eksekusi pipeline batch terjadwal, bagian placeholder jalur input dievaluasi ke tanggal dan waktu saat ini (atau diubah waktunya). Nilai tanggal dievaluasi menggunakan tanggal saat ini di zona waktu tugas terjadwal. Jika jalur yang dievaluasi cocok dengan jalur file input, file tersebut akan diambil untuk diproses oleh pipeline batch pada waktu yang dijadwalkan.
- Contoh: Pipeline batch dijadwalkan untuk diulang pada awal setiap jam
PST. Jika Anda membuat parameter jalur input sebagai
gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv
, pada 15 April 2021, pukul 18.00 PST, jalur input akan dievaluasi menjadigs://bucket-name/2021-04-15/prefix-18_00.csv
.
Menggunakan parameter pergeseran waktu
Anda dapat menggunakan parameter pergeseran waktu + atau - menit atau jam.
Untuk mendukung pencocokan jalur input dengan tanggal waktu yang dievaluasi yang
digeser sebelum atau setelah tanggal waktu jadwal pipeline saat ini,
masukkan parameter ini dalam tanda kurung kurawal.
Gunakan format {[+|-][0-9]+[m|h]}
. Pipeline batch terus diulang pada
waktu yang dijadwalkan, tetapi jalur input dievaluasi dengan offset waktu
yang ditentukan.
- Contoh: Pipeline batch dijadwalkan untuk diulang pada awal setiap jam
PST. Jika Anda membuat parameter jalur input sebagai
gs://bucket-name/%Y-%m-%d/prefix-%H_%M.csv{-2h}
, pada 15 April 2021, pukul 18.00 PST, jalur input akan dievaluasi menjadigs://bucket-name/2021-04-15/prefix-16_00.csv
.
Peran pipeline data
Agar operasi pipeline data Dataflow berhasil, Anda memerlukan peran IAM yang diperlukan, sebagai berikut:
Anda memerlukan peran yang sesuai untuk melakukan operasi:
Datapipelines.admin
: Dapat melakukan semua operasi pipeline dataDatapipelines.viewer
: Dapat melihat pipeline dan tugas dataDatapipelines.invoker
: Dapat memanggil tugas pipeline data yang dijalankan (peran ini dapat diaktifkan menggunakan API)
Akun layanan yang digunakan oleh Cloud Scheduler harus memiliki peran
roles/iam.serviceAccountUser
, baik akun layanan yang ditentukan pengguna maupun akun layanan Compute Engine default. Untuk mengetahui informasi selengkapnya, lihat Peran pipeline data.Anda harus dapat bertindak sebagai akun layanan yang digunakan oleh Cloud Scheduler dan Dataflow dengan diberi peran
roles/iam.serviceAccountUser
di akun tersebut. Jika Anda tidak memilih akun layanan untuk Cloud Scheduler dan Dataflow, akun layanan Compute Engine default akan digunakan.
Membuat pipeline data
Anda dapat membuat pipeline data Dataflow dengan dua cara:
Halaman penyiapan pipeline data: Saat Anda pertama kali mengakses fitur pipeline Dataflow di konsol Google Cloud, halaman penyiapan akan terbuka. Aktifkan API yang tercantum untuk membuat pipeline data.
Mengimpor tugas
Anda dapat mengimpor tugas streaming atau batch Dataflow yang didasarkan pada template klasik atau fleksibel dan menjadikannya pipeline data.
Di konsol Google Cloud, buka halaman Jobs Dataflow.
Pilih tugas yang telah selesai, lalu di halaman Job Details, pilih +Import as a pipeline.
Di halaman Create pipeline from template, parameter diisi dengan opsi tugas yang diimpor.
Untuk tugas batch, di bagian Jadwalkan pipeline Anda, berikan jadwal pengulangan. Memberikan alamat akun email untuk Cloud Scheduler, yang digunakan untuk menjadwalkan operasi batch, bersifat opsional. Jika tidak ditentukan, akun layanan Compute Engine default akan digunakan.
Membuat pipeline data
Di konsol Google Cloud, buka halaman Pipeline data Dataflow.
Pilih +Create data pipeline.
Di halaman Create pipeline from template, berikan nama pipeline, dan isi kolom parameter dan pemilihan template lainnya.
Untuk tugas batch, di bagian Jadwalkan pipeline Anda, berikan jadwal pengulangan. Memberikan alamat akun email untuk Cloud Scheduler, yang digunakan untuk menjadwalkan operasi batch, bersifat opsional. Jika nilai tidak ditentukan, akun layanan Compute Engine default akan digunakan.
Membuat pipeline data batch
Untuk membuat contoh pipeline data batch ini, Anda harus memiliki akses ke resource berikut dalam project:
- Bucket Cloud Storage untuk menyimpan file input dan output
- Set data BigQuery untuk membuat tabel.
Contoh pipeline ini menggunakan template pipeline batch Teks Cloud Storage ke BigQuery. Template ini membaca file dalam format CSV dari Cloud Storage, menjalankan transformasi, lalu menyisipkan nilai ke dalam tabel BigQuery dengan tiga kolom.
Buat file berikut di drive lokal Anda:
File
bq_three_column_table.json
yang berisi skema tabel BigQuery tujuan berikut.{ "BigQuery Schema": [ { "name": "col1", "type": "STRING" }, { "name": "col2", "type": "STRING" }, { "name": "col3", "type": "INT64" } ] }
File JavaScript
split_csv_3cols.js
, yang menerapkan transformasi sederhana pada data input sebelum disisipkan ke BigQuery.function transform(line) { var values = line.split(','); var obj = new Object(); obj.col1 = values[0]; obj.col2 = values[1]; obj.col3 = values[2]; var jsonString = JSON.stringify(obj); return jsonString; }
File CSV
file01.csv
dengan beberapa kumpulan data yang disisipkan ke dalam tabel BigQuery.b8e5087a,74,27531 7a52c051,4a,25846 672de80f,cd,76981 111b92bf,2e,104653 ff658424,f0,149364 e6c17c75,84,38840 833f5a69,8f,76892 d8c833ff,7d,201386 7d3da7fb,d5,81919 3836d29b,70,181524 ca66e6e5,d7,172076 c8475eb6,03,247282 558294df,f3,155392 737b82a8,c7,235523 82c8f5dc,35,468039 57ab17f9,5e,480350 cbcdaf84,bd,354127 52b55391,eb,423078 825b8863,62,88160 26f16d4f,fd,397783
Gunakan perintah
gcloud storage cp
untuk menyalin file ke folder di bucket Cloud Storage dalam project Anda, sebagai berikut:Salin
bq_three_column_table.json
dansplit_csv_3cols.js
kegs://BUCKET_ID/text_to_bigquery/
gcloud storage cp bq_three_column_table.json gs://BUCKET_ID/text_to_bigquery/
gcloud storage cp split_csv_3cols.js gs://BUCKET_ID/text_to_bigquery/
Salin
file01.csv
kegs://BUCKET_ID/inputs/
gcloud storage cp file01.csv gs://BUCKET_ID/inputs/
Di konsol Google Cloud, buka halaman Bucket Cloud Storage.
Untuk membuat folder
tmp
di bucket Cloud Storage, pilih nama folder untuk membuka Halaman detail bucket, lalu klik Buat folder.Di konsol Google Cloud, buka halaman Pipeline data Dataflow.
Pilih Create data pipeline. Masukkan atau pilih item berikut di halaman Create pipeline from template:
- Untuk Pipeline name, masukkan
text_to_bq_batch_data_pipeline
. - Untuk Regional endpoint, pilih region Compute Engine. Region sumber dan tujuan harus cocok. Oleh karena itu, bucket Cloud Storage dan tabel BigQuery Anda harus berada di region yang sama.
Untuk Dataflow template, di Process Data in Bulk (batch), pilih Text Files on Cloud Storage to BigQuery.
Untuk Jadwalkan pipeline Anda, pilih jadwal, seperti Per jam pada menit 25, di zona waktu Anda. Anda dapat mengedit jadwal setelah mengirimkan pipeline. Memberikan alamat akun email untuk Cloud Scheduler, yang digunakan untuk menjadwalkan operasi batch, bersifat opsional. Jika tidak ditentukan, akun layanan Compute Engine default akan digunakan.
Di Required parameters, masukkan hal berikut:
- Untuk JavaScript UDF path in Cloud Storage:
gs://BUCKET_ID/text_to_bigquery/split_csv_3cols.js
- Untuk Jalur JSON:
BUCKET_ID/text_to_bigquery/bq_three_column_table.json
- Untuk JavaScript UDF name:
transform
- Untuk BigQuery output table:
PROJECT_ID:DATASET_ID.three_column_table
- Untuk Jalur input Cloud Storage:
BUCKET_ID/inputs/file01.csv
- Untuk Temporary BigQuery directory:
BUCKET_ID/tmp
- Untuk Temporary location:
BUCKET_ID/tmp
- Untuk JavaScript UDF path in Cloud Storage:
Klik Create pipeline.
- Untuk Pipeline name, masukkan
Konfirmasi informasi template dan pipeline, serta lihat histori saat ini dan sebelumnya dari halaman Detail pipeline.
Anda dapat mengedit jadwal pipeline data dari panel Info pipeline di halaman Detail pipeline.
Anda juga dapat menjalankan pipeline batch sesuai permintaan menggunakan tombol Run di konsol Dataflow Pipelines.
Membuat contoh pipeline data streaming
Anda dapat membuat contoh pipeline data streaming dengan mengikuti contoh petunjuk pipeline batch, dengan perbedaan berikut:
- Untuk Jadwal pipeline, jangan tentukan jadwal untuk pipeline data streaming. Tugas streaming Dataflow akan segera dimulai.
- Untuk Dataflow template, di Process Data Continuously (stream), pilih Text Files on Cloud Storage to BigQuery.
- Untuk Jenis mesin pekerja, pipeline memproses kumpulan awal file yang cocok dengan pola
gs://BUCKET_ID/inputs/file01.csv
dan file tambahan apa pun yang cocok dengan pola ini yang Anda upload ke folderinputs/
. Jika ukuran file CSV melebihi beberapa GB, untuk menghindari kemungkinan error kehabisan memori, pilih jenis mesin dengan memori yang lebih tinggi daripada jenis mesinn1-standard-4
default, sepertin1-highmem-8
.
Pemecahan masalah
Bagian ini menunjukkan cara menyelesaikan masalah pada pipeline data Dataflow.
Tugas pipeline data gagal diluncurkan
Saat Anda menggunakan pipeline data untuk membuat jadwal tugas berulang, tugas Dataflow Anda mungkin tidak diluncurkan, dan error status 503
akan muncul di file log Cloud Scheduler.
Masalah ini terjadi saat Dataflow untuk sementara tidak dapat menjalankan tugas.
Untuk mengatasi masalah ini, konfigurasikan Cloud Scheduler untuk mencoba kembali tugas. Karena masalah tersebut bersifat sementara, saat dicoba ulang, tugas mungkin berhasil. Untuk mengetahui informasi selengkapnya tentang cara menetapkan nilai percobaan ulang di Cloud Scheduler, lihat Membuat tugas.
Menyelidiki pelanggaran tujuan pipeline
Bagian berikut menjelaskan cara menyelidiki pipeline yang tidak memenuhi tujuan performa.
Pipeline batch berulang
Untuk analisis awal kondisi pipeline Anda, di halaman Info pipeline di konsol Google Cloud, gunakan grafik Status tugas individual dan Waktu thread per langkah. Grafik ini terletak di panel status pipeline.
Contoh investigasi:
Anda memiliki pipeline batch berulang yang berjalan setiap jam pada pukul 3 menit setelah jam. Setiap tugas biasanya berjalan selama sekitar 9 menit. Anda memiliki tujuan agar semua tugas dapat diselesaikan dalam waktu kurang dari 10 menit.
Grafik status tugas menunjukkan bahwa tugas berjalan selama lebih dari 10 menit.
Di tabel histori Pembaruan/Eksekusi, temukan tugas yang berjalan selama jam yang diinginkan. Klik halaman detail tugas Dataflow. Di halaman tersebut, temukan tahap yang berjalan lebih lama, lalu cari kemungkinan error dalam log untuk menentukan penyebab penundaan.
Pipeline streaming
Untuk analisis awal kondisi pipeline Anda, di halaman Detail Pipeline, di tab Info pipeline, gunakan grafik keaktualan data. Grafik ini terletak di panel status pipeline.
Contoh investigasi:
Anda memiliki pipeline streaming yang biasanya menghasilkan output dengan keaktualan data 20 detik.
Anda menetapkan tujuan untuk memiliki jaminan keaktualan data 30 detik. Saat meninjau grafik keaktualan data, Anda melihat bahwa antara pukul 09.00 dan 10.00, keaktualan data melonjak menjadi hampir 40 detik.
Beralihlah ke tab Pipeline metrics, lalu lihat grafik CPU Utilization dan Memory Utilization untuk analisis lebih lanjut.
Error: ID pipeline sudah ada dalam project
Jika Anda mencoba membuat pipeline baru dengan nama yang sudah ada dalam project, Anda akan menerima pesan error ini: Pipeline Id already exist within the
project
. Untuk menghindari masalah ini, selalu pilih nama unik untuk pipeline Anda.