Beberapa template Dataflow yang disediakan Google mendukung fungsi yang ditentukan pengguna (UDF). UDF memungkinkan Anda memperluas fungsi template tanpa mengubah kode template.
Ringkasan
Untuk membuat UDF, tulis fungsi JavaScript atau fungsi Python, bergantung pada templatenya. Anda menyimpan file kode UDF di Cloud Storage dan menentukan lokasinya sebagai parameter template. Untuk setiap elemen input, template akan memanggil fungsi Anda. Fungsi ini mengubah elemen atau melakukan logika khusus lainnya dan menampilkan hasilnya kembali ke template.
Misalnya, Anda dapat menggunakan UDF untuk:
- Memformat ulang data input agar sesuai dengan skema target.
- Menyamarkan data sensitif.
- Filter beberapa elemen dari output.
Input ke fungsi UDF adalah elemen data tunggal, yang diserialisasi sebagai string JSON. Fungsi ini menampilkan string JSON serial sebagai output. Format data bergantung pada {i>template<i}. Misalnya, di template Pub/Sub Subscription to BigQuery, inputnya adalah data pesan Pub/Sub yang diserialisasi sebagai objek JSON, dan outputnya adalah objek JSON serial yang mewakili baris tabel BigQuery. Untuk informasi selengkapnya, lihat dokumentasi untuk setiap template.
Menjalankan template dengan UDF
Untuk menjalankan template dengan UDF, tentukan lokasi Cloud Storage file JavaScript dan nama fungsi sebagai parameter template.
Dengan beberapa template yang disediakan Google, Anda juga dapat membuat UDF langsung di Konsol Google Cloud, sebagai berikut:
Buka halaman Dataflow di konsol Google Cloud.
Klik add_boxCreate job from template.
Pilih template yang disediakan Google yang ingin Anda jalankan.
Luaskan Parameter opsional. Jika mendukung UDF, template akan memiliki parameter untuk lokasi Cloud Storage UDF dan parameter lain untuk nama fungsi.
Di samping parameter template, klik Create UDF.
Di panel Select or Create a User-Defined Function (UDF):
- Masukkan nama file. Contoh:
my_udf.js
. - Pilih folder Cloud Storage.
Contoh:
gs://your-bucket/your-folder
. - Gunakan editor kode inline untuk menulis fungsi. Editor telah diisi sebelumnya dengan kode boilerplate yang dapat Anda gunakan sebagai titik awal.
Klik Create UDF.
Konsol Google Cloud menyimpan file UDF dan mengisi lokasi Cloud Storage.
Masukkan nama fungsi di kolom yang sesuai.
- Masukkan nama file. Contoh:
Menulis UDF JavaScript
Kode berikut menunjukkan UDF JavaScript tanpa pengoperasian yang dapat Anda mulai:
/*
* @param {string} inJson input JSON message (stringified)
* @return {?string} outJson output JSON message (stringified)
*/
function process(inJson) {
const obj = JSON.parse(inJson);
// Example data transformations:
// Add a field: obj.newField = 1;
// Modify a field: obj.existingField = '';
// Filter a record: return null;
return JSON.stringify(obj);
}
Kode JavaScript berjalan di
mesin JavaScript Nashorn. Sebaiknya
uji UDF Anda di mesin Nashorn sebelum men-deploy-nya. Mesin Nashorn tidak sama persis dengan implementasi JavaScript Node.js. Masalah
umumnya adalah menggunakan console.log()
atau Number.isNaN()
, yang keduanya
tidak ditentukan dalam mesin Nashorn.
Anda dapat menguji UDF pada mesin Nashorn menggunakan Cloud Shell, yang telah menginstal JDK 11. Luncurkan Nashorn dalam mode interaktif sebagai berikut:
jjs
Di shell interaktif Nashorn, lakukan langkah-langkah berikut:
- Panggil
load
untuk memuat file JavaScript UDF Anda. - Tentukan objek JSON input bergantung pada pesan yang diharapkan oleh pipeline Anda.
- Gunakan fungsi
JSON.stringify
untuk melakukan serialisasi input ke string JSON. - Panggil fungsi UDF Anda untuk memproses string JSON.
- Panggil
JSON.parse
untuk melakukan deserialisasi output. - Verifikasi hasilnya.
Contoh:
> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)
Menulis UDF Python
Kode berikut menunjukkan UDF Python tanpa pengoperasian yang dapat Anda mulai:
import json
def process(value):
# Load the JSON string into a dictionary.
data = json.loads(value)
# Transform the data in some way.
data['new_field'] = 'new_value'
# Serialize the data back to JSON.
return json.dumps(data)
UDF Python mendukung paket dependensi yang merupakan standar untuk Python dan Apache Beam. Mereka tidak dapat menggunakan paket pihak ketiga.
Penanganan error
Biasanya, jika terjadi selama eksekusi UDF, error akan ditulis ke
lokasi yang dihentikan pengirimannya. Detailnya bergantung pada template. Misalnya, template
Pub/Sub Subscription to BigQuery
membuat tabel _error_records
dan menulis error di sana. Error UDF
runtime dapat terjadi karena error sintaksis atau pengecualian yang tidak tertangkap. Untuk memeriksa error sintaksis, uji UDF Anda secara lokal.
Anda dapat menampilkan pengecualian secara terprogram untuk elemen yang tidak boleh diproses. Dalam hal ini, elemen ditulis ke lokasi yang dihentikan pengirimannya, jika template mendukung lokasi tersebut. Untuk contoh yang menunjukkan pendekatan ini, lihat Merutekan peristiwa.
Contoh kasus penggunaan
Bagian ini menjelaskan beberapa pola umum untuk UDF, berdasarkan kasus penggunaan di dunia nyata.
Memperkaya acara
Gunakan UDF untuk memperkaya peristiwa dengan kolom baru untuk informasi yang lebih kontekstual.
Contoh:
function process(inJson) {
const data = JSON.parse(inJson);
// Add new field to track data source
data.source = "source1";
return JSON.stringify(data);
}
Mengubah peristiwa
Gunakan UDF untuk mengubah seluruh format peristiwa bergantung pada apa yang diharapkan tujuan Anda.
Contoh berikut mengembalikan entri log Cloud Logging (LogEntry
) ke string log asli jika tersedia. (Bergantung pada sumber log, string log asli terkadang
diisi di kolom textPayload
.) Anda dapat menggunakan pola ini untuk mengirim log mentah dalam format aslinya, bukan mengirim seluruh LogEntry
dari Cloud Logging.
function process(inJson) {
const data = JSON.parse(inJson);
if (data.textPayload) {
return data.textPayload; // Return string value, and skip JSON.stringify
}
return JSON.stringify(obj);
}
Menyamarkan atau menghapus data peristiwa
Gunakan UDF untuk menyamarkan atau menghapus bagian acara.
Contoh berikut menyamarkan nama kolom sensitiveField
dengan mengganti nilainya, dan menghapus kolom bernama redundantField
sepenuhnya.
function process(inJson) {
const data = JSON.parse(inJson);
// Normalize existing field values
data.source = (data.source && data.source.toLowerCase()) || "unknown";
// Redact existing field values
if (data.sensitiveField) {
data.sensitiveField = "REDACTED";
}
// Remove existing fields
if (data.redundantField) {
delete(data.redundantField);
}
return JSON.stringify(data);
}
Peristiwa rute
Gunakan UDF untuk mengarahkan peristiwa ke tujuan terpisah di sink downstream.
Contoh berikut, berdasarkan template Pub/Sub ke Splunk, mengarahkan setiap peristiwa ke indeks Splunk yang benar. Metode ini memanggil fungsi lokal yang ditentukan pengguna untuk memetakan peristiwa ke indeks.
function process(inJson) {
const obj = JSON.parse(inJson);
// Set index programmatically for data segregation in Splunk
obj._metadata = {
index: splunkIndexLookup(obj)
}
return JSON.stringify(obj);
}
Contoh berikutnya mengarahkan peristiwa yang tidak dikenal ke antrean yang dihentikan pengirimannya, dengan asumsi bahwa template mendukung antrean yang dihentikan pengirimannya. (Misalnya, lihat template Pub/Sub ke JDBC.) Anda dapat menggunakan pola ini untuk memfilter entri yang tidak terduga sebelum menulis ke tujuan.
function process(inJson) {
const data = JSON.parse(inJson);
// Route unrecognized events to the deadletter topic
if (!data.hasOwnProperty('severity')) {
throw new Error("Unrecognized event. eventId='" + data.Id + "'");
}
return JSON.stringify(data);
Filter peristiwa
Gunakan UDF untuk memfilter peristiwa yang tidak diinginkan atau tidak dikenal dari output.
Contoh berikut menghapus peristiwa dengan data.severity
sama dengan "DEBUG"
.
function process(inJson) {
const data = JSON.parse(inJson);
// Drop events with certain field values
if (data.severity == "DEBUG") {
return null;
}
return JSON.stringify(data);
}
Langkah selanjutnya
- Template yang disediakan Google
- Membuat dan menjalankan Template Fleksibel
- Menjalankan template klasik
- Memperluas template Dataflow dengan UDF (postingan blog)
- Contoh UDF (GitHub)