Beberapa template Dataflow yang disediakan Google mendukung fungsi yang ditentukan pengguna (UDF). UDF memungkinkan Anda memperluas fungsi template tanpa mengubah kode template.
Ringkasan
Untuk membuat UDF, Anda menulis fungsi JavaScript atau fungsi Python, bergantung pada template. Anda menyimpan file kode UDF di Cloud Storage dan menentukan lokasi sebagai parameter template. Untuk setiap elemen input, template akan memanggil fungsi Anda. Fungsi ini mengubah elemen atau menjalankan logika kustom lainnya dan menampilkan hasilnya kembali ke template.
Misalnya, Anda dapat menggunakan UDF untuk:
- Format ulang data input agar cocok dengan skema target.
- Menyamarkan data sensitif.
- Memfilter beberapa elemen dari output.
Input ke fungsi UDF adalah satu elemen data, yang diserialisasi sebagai string JSON. Fungsi ini menampilkan string JSON serialisasi sebagai output. Format data bergantung pada template. Misalnya, dalam template Pub/Sub Subscription to BigQuery, input adalah data pesan Pub/Sub yang diserialisasi sebagai objek JSON, dan output adalah objek JSON serial yang mewakili baris tabel BigQuery. Untuk informasi selengkapnya, lihat dokumentasi untuk setiap template.
Menjalankan template dengan UDF
Untuk menjalankan template dengan UDF, Anda menentukan lokasi Cloud Storage file JavaScript dan nama fungsi sebagai parameter template.
Dengan beberapa template yang disediakan Google, Anda juga dapat membuat UDF langsung di konsol Google Cloud, sebagai berikut:
Buka halaman Dataflow di konsol Google Cloud.
Klik add_boxCreate job from template.
Pilih template yang disediakan Google yang ingin Anda jalankan.
Luaskan Optional parameters. Jika mendukung UDF, template memiliki parameter untuk lokasi Cloud Storage UDF dan parameter lain untuk nama fungsi.
Di samping parameter template, klik Create UDF.
Di panel Select or Create a User-Defined Function (UDF):
- Masukkan nama file. Contoh:
my_udf.js
. - Pilih folder Cloud Storage.
Contoh:
gs://your-bucket/your-folder
. - Gunakan editor kode inline untuk menulis fungsi. Editor telah diisi sebelumnya dengan kode boilerplate yang dapat Anda gunakan sebagai titik awal.
Klik Buat UDF.
Konsol Google Cloud akan menyimpan file UDF dan mengisi lokasi Cloud Storage.
Masukkan nama fungsi Anda di kolom yang sesuai.
- Masukkan nama file. Contoh:
Menulis UDF JavaScript
Kode berikut menunjukkan UDF JavaScript tanpa operasi yang dapat Anda mulai:
/*
* @param {string} inJson input JSON message (stringified)
* @return {?string} outJson output JSON message (stringified)
*/
function process(inJson) {
const obj = JSON.parse(inJson);
// Example data transformations:
// Add a field: obj.newField = 1;
// Modify a field: obj.existingField = '';
// Filter a record: return null;
return JSON.stringify(obj);
}
Kode JavaScript berjalan di
mesin JavaScript Nashorn. Sebaiknya
uji UDF Anda di mesin Nashorn sebelum men-deploy-nya. Mesin Nashorn
tidak sama persis dengan penerapan JavaScript Node.js. Masalah umum
adalah menggunakan console.log()
atau Number.isNaN()
, yang tidak
ditentukan dalam mesin Nashorn.
Anda dapat menguji UDF di mesin Nashorn menggunakan Cloud Shell, yang telah menginstal JDK 11 secara default. Luncurkan Nashorn dalam mode interaktif sebagai berikut:
jjs --language=es6
Di shell interaktif Nashorn, lakukan langkah-langkah berikut:
- Panggil
load
untuk memuat file JavaScript UDF Anda. - Tentukan objek JSON input bergantung pada pesan yang diharapkan pipeline Anda.
- Gunakan fungsi
JSON.stringify
untuk melakukan serialisasi input ke string JSON. - Panggil fungsi UDF untuk memproses string JSON.
- Panggil
JSON.parse
untuk mendeserialisasi output. - Verifikasi hasilnya.
Contoh:
> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)
Menulis UDF Python
Kode berikut menunjukkan UDF Python tanpa operasi yang dapat Anda mulai:
import json
def process(value):
# Load the JSON string into a dictionary.
data = json.loads(value)
# Transform the data in some way.
data['new_field'] = 'new_value'
# Serialize the data back to JSON.
return json.dumps(data)
UDF Python mendukung paket dependensi yang standar untuk Python dan Apache Beam. Aplikasi tidak dapat menggunakan paket pihak ketiga.
Penanganan error
Biasanya, saat terjadi error selama eksekusi UDF, error akan ditulis ke
lokasi dead letter. Detailnya bergantung pada template. Misalnya, template Langganan Pub/Sub ke BigQuery membuat tabel _error_records
dan menulis error di sana. Error UDF runtime dapat terjadi karena error sintaksis atau pengecualian yang tidak tertangkap. Untuk memeriksa
error sintaksis, uji UDF secara lokal.
Anda dapat menampilkan pengecualian secara terprogram untuk elemen yang tidak boleh diproses. Dalam hal ini, elemen ditulis ke lokasi surat tidak terkirim, jika template mendukungnya. Untuk contoh yang menunjukkan pendekatan ini, lihat Peristiwa rute.
Contoh kasus penggunaan
Bagian ini menjelaskan beberapa pola umum untuk UDF, berdasarkan kasus penggunaan dunia nyata.
Memperkaya peristiwa
Gunakan UDF untuk memperkaya peristiwa dengan kolom baru untuk mendapatkan informasi yang lebih kontekstual.
Contoh:
function process(inJson) {
const data = JSON.parse(inJson);
// Add new field to track data source
data.source = "source1";
return JSON.stringify(data);
}
Mengubah peristiwa
Gunakan UDF untuk mengubah seluruh format peristiwa, bergantung pada yang diharapkan tujuan Anda.
Contoh berikut mengembalikan entri log Cloud Logging
(LogEntry
) ke string log
asli jika tersedia. (Bergantung pada sumber log, string log asli
terkadang diisi di kolom textPayload
.) Anda dapat menggunakan pola ini untuk
mengirim log mentah dalam format aslinya, bukan mengirim seluruh
LogEntry
dari Cloud Logging.
function process(inJson) {
const data = JSON.parse(inJson);
if (data.textPayload) {
return data.textPayload; // Return string value, and skip JSON.stringify
}
return JSON.stringify(obj);
}
Menyamarkan atau menghapus data peristiwa
Gunakan UDF untuk menyamarkan atau menghapus bagian peristiwa.
Contoh berikut menyamarkan nama kolom sensitiveField
dengan mengganti nilainya, dan menghapus kolom bernama redundantField
sepenuhnya.
function process(inJson) {
const data = JSON.parse(inJson);
// Normalize existing field values
data.source = (data.source && data.source.toLowerCase()) || "unknown";
// Redact existing field values
if (data.sensitiveField) {
data.sensitiveField = "REDACTED";
}
// Remove existing fields
if (data.redundantField) {
delete(data.redundantField);
}
return JSON.stringify(data);
}
Peristiwa rute
Gunakan UDF untuk merutekan peristiwa ke tujuan terpisah di sink downstream.
Contoh berikut, berdasarkan template Pub/Sub ke Splunk, merutekan setiap peristiwa ke indeks Splunk yang benar. Fungsi ini memanggil fungsi lokal yang ditentukan pengguna untuk memetakan peristiwa ke indeks.
function process(inJson) {
const obj = JSON.parse(inJson);
// Set index programmatically for data segregation in Splunk
obj._metadata = {
index: splunkIndexLookup(obj)
}
return JSON.stringify(obj);
}
Contoh berikutnya merutekan peristiwa yang tidak dikenal ke antrean surat mati, dengan asumsi bahwa template mendukung antrean surat mati. (Misalnya, lihat template Pub/Sub ke JDBC.) Anda dapat menggunakan pola ini untuk memfilter entri yang tidak terduga sebelum menulis ke tujuan.
function process(inJson) {
const data = JSON.parse(inJson);
// Route unrecognized events to the deadletter topic
if (!data.hasOwnProperty('severity')) {
throw new Error("Unrecognized event. eventId='" + data.Id + "'");
}
return JSON.stringify(data);
Filter peristiwa
Gunakan UDF untuk memfilter peristiwa yang tidak diinginkan atau tidak dikenal dari output.
Contoh berikut menghapus peristiwa dengan data.severity
sama dengan "DEBUG"
.
function process(inJson) {
const data = JSON.parse(inJson);
// Drop events with certain field values
if (data.severity == "DEBUG") {
return null;
}
return JSON.stringify(data);
}
Langkah selanjutnya
- Template yang disediakan Google
- Mem-build dan menjalankan Template Flex
- Menjalankan template klasik
- Memperluas template Dataflow dengan UDF (postingan blog)
- Contoh UDF (GitHub)