Membuat fungsi yang ditentukan pengguna untuk template Dataflow

Beberapa template Dataflow yang disediakan Google mendukung fungsi yang ditentukan pengguna (UDF). UDF memungkinkan Anda memperluas fungsi template tanpa mengubah kode template.

Ringkasan

Untuk membuat UDF, Anda menulis fungsi JavaScript atau fungsi Python, bergantung pada template. Anda menyimpan file kode UDF di Cloud Storage dan menentukan lokasi sebagai parameter template. Untuk setiap elemen input, template akan memanggil fungsi Anda. Fungsi ini mengubah elemen atau menjalankan logika kustom lainnya dan menampilkan hasilnya kembali ke template.

Misalnya, Anda dapat menggunakan UDF untuk:

  • Format ulang data input agar cocok dengan skema target.
  • Menyamarkan data sensitif.
  • Memfilter beberapa elemen dari output.

Input ke fungsi UDF adalah satu elemen data, yang diserialisasi sebagai string JSON. Fungsi ini menampilkan string JSON serialisasi sebagai output. Format data bergantung pada template. Misalnya, dalam template Pub/Sub Subscription to BigQuery, input adalah data pesan Pub/Sub yang diserialisasi sebagai objek JSON, dan output adalah objek JSON serial yang mewakili baris tabel BigQuery. Untuk informasi selengkapnya, lihat dokumentasi untuk setiap template.

Menjalankan template dengan UDF

Untuk menjalankan template dengan UDF, Anda menentukan lokasi Cloud Storage file JavaScript dan nama fungsi sebagai parameter template.

Dengan beberapa template yang disediakan Google, Anda juga dapat membuat UDF langsung di konsol Google Cloud, sebagai berikut:

  1. Buka halaman Dataflow di konsol Google Cloud.

    Buka halaman Dataflow

  2. Klik Create job from template.

  3. Pilih template yang disediakan Google yang ingin Anda jalankan.

  4. Luaskan Optional parameters. Jika mendukung UDF, template memiliki parameter untuk lokasi Cloud Storage UDF dan parameter lain untuk nama fungsi.

  5. Di samping parameter template, klik Create UDF.

  6. Di panel Select or Create a User-Defined Function (UDF):

    1. Masukkan nama file. Contoh: my_udf.js.
    2. Pilih folder Cloud Storage. Contoh: gs://your-bucket/your-folder.
    3. Gunakan editor kode inline untuk menulis fungsi. Editor telah diisi sebelumnya dengan kode boilerplate yang dapat Anda gunakan sebagai titik awal.
    4. Klik Buat UDF.

      Konsol Google Cloud akan menyimpan file UDF dan mengisi lokasi Cloud Storage.

    5. Masukkan nama fungsi Anda di kolom yang sesuai.

Menulis UDF JavaScript

Kode berikut menunjukkan UDF JavaScript tanpa operasi yang dapat Anda mulai:

/*
 * @param {string} inJson input JSON message (stringified)
 * @return {?string} outJson output JSON message (stringified)
 */
function process(inJson) {
  const obj = JSON.parse(inJson);

  // Example data transformations:
  // Add a field: obj.newField = 1;
  // Modify a field: obj.existingField = '';
  // Filter a record: return null;

  return JSON.stringify(obj);
}

Kode JavaScript berjalan di mesin JavaScript Nashorn. Sebaiknya uji UDF Anda di mesin Nashorn sebelum men-deploy-nya. Mesin Nashorn tidak sama persis dengan penerapan JavaScript Node.js. Masalah umum adalah menggunakan console.log() atau Number.isNaN(), yang tidak ditentukan dalam mesin Nashorn.

Anda dapat menguji UDF di mesin Nashorn menggunakan Cloud Shell, yang telah menginstal JDK 11 secara default. Luncurkan Nashorn dalam mode interaktif sebagai berikut:

jjs --language=es6

Di shell interaktif Nashorn, lakukan langkah-langkah berikut:

  1. Panggil load untuk memuat file JavaScript UDF Anda.
  2. Tentukan objek JSON input bergantung pada pesan yang diharapkan pipeline Anda.
  3. Gunakan fungsi JSON.stringify untuk melakukan serialisasi input ke string JSON.
  4. Panggil fungsi UDF untuk memproses string JSON.
  5. Panggil JSON.parse untuk mendeserialisasi output.
  6. Verifikasi hasilnya.

Contoh:

> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)

Menulis UDF Python

Kode berikut menunjukkan UDF Python tanpa operasi yang dapat Anda mulai:

import json
def process(value):
  # Load the JSON string into a dictionary.
  data = json.loads(value)

  # Transform the data in some way.
  data['new_field'] = 'new_value'

  # Serialize the data back to JSON.
  return json.dumps(data)

UDF Python mendukung paket dependensi yang standar untuk Python dan Apache Beam. Aplikasi tidak dapat menggunakan paket pihak ketiga.

Penanganan error

Biasanya, saat terjadi error selama eksekusi UDF, error akan ditulis ke lokasi dead letter. Detailnya bergantung pada template. Misalnya, template Langganan Pub/Sub ke BigQuery membuat tabel _error_records dan menulis error di sana. Error UDF runtime dapat terjadi karena error sintaksis atau pengecualian yang tidak tertangkap. Untuk memeriksa error sintaksis, uji UDF secara lokal.

Anda dapat menampilkan pengecualian secara terprogram untuk elemen yang tidak boleh diproses. Dalam hal ini, elemen ditulis ke lokasi surat tidak terkirim, jika template mendukungnya. Untuk contoh yang menunjukkan pendekatan ini, lihat Peristiwa rute.

Contoh kasus penggunaan

Bagian ini menjelaskan beberapa pola umum untuk UDF, berdasarkan kasus penggunaan dunia nyata.

Memperkaya peristiwa

Gunakan UDF untuk memperkaya peristiwa dengan kolom baru untuk mendapatkan informasi yang lebih kontekstual.

Contoh:

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Add new field to track data source
  data.source = "source1";
  return JSON.stringify(data);
}

Mengubah peristiwa

Gunakan UDF untuk mengubah seluruh format peristiwa, bergantung pada yang diharapkan tujuan Anda.

Contoh berikut mengembalikan entri log Cloud Logging (LogEntry) ke string log asli jika tersedia. (Bergantung pada sumber log, string log asli terkadang diisi di kolom textPayload.) Anda dapat menggunakan pola ini untuk mengirim log mentah dalam format aslinya, bukan mengirim seluruh LogEntry dari Cloud Logging.

 function process(inJson) {
  const data = JSON.parse(inJson);

  if (data.textPayload) {
    return data.textPayload; // Return string value, and skip JSON.stringify
  }
 return JSON.stringify(obj);
}

Menyamarkan atau menghapus data peristiwa

Gunakan UDF untuk menyamarkan atau menghapus bagian peristiwa.

Contoh berikut menyamarkan nama kolom sensitiveField dengan mengganti nilainya, dan menghapus kolom bernama redundantField sepenuhnya.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Normalize existing field values
  data.source = (data.source && data.source.toLowerCase()) || "unknown";

  // Redact existing field values
  if (data.sensitiveField) {
    data.sensitiveField = "REDACTED";
  }

  // Remove existing fields
  if (data.redundantField) {
    delete(data.redundantField);
  }

  return JSON.stringify(data);
}

Peristiwa rute

Gunakan UDF untuk merutekan peristiwa ke tujuan terpisah di sink downstream.

Contoh berikut, berdasarkan template Pub/Sub ke Splunk, merutekan setiap peristiwa ke indeks Splunk yang benar. Fungsi ini memanggil fungsi lokal yang ditentukan pengguna untuk memetakan peristiwa ke indeks.

function process(inJson) {
  const obj = JSON.parse(inJson);
  
  // Set index programmatically for data segregation in Splunk
  obj._metadata = {
    index: splunkIndexLookup(obj)
  }
  return JSON.stringify(obj);
}  

Contoh berikutnya merutekan peristiwa yang tidak dikenal ke antrean surat mati, dengan asumsi bahwa template mendukung antrean surat mati. (Misalnya, lihat template Pub/Sub ke JDBC.) Anda dapat menggunakan pola ini untuk memfilter entri yang tidak terduga sebelum menulis ke tujuan.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Route unrecognized events to the deadletter topic
  if (!data.hasOwnProperty('severity')) {
    throw new Error("Unrecognized event. eventId='" + data.Id + "'");
  }

  return JSON.stringify(data);

Filter peristiwa

Gunakan UDF untuk memfilter peristiwa yang tidak diinginkan atau tidak dikenal dari output.

Contoh berikut menghapus peristiwa dengan data.severity sama dengan "DEBUG".

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Drop events with certain field values
  if (data.severity == "DEBUG") {
    return null;
  }

  return JSON.stringify(data);
}

Langkah selanjutnya