Membuat fungsi yang ditentukan pengguna untuk template Dataflow

Beberapa template Dataflow yang disediakan Google mendukung fungsi yang ditentukan pengguna (UDF). UDF memungkinkan Anda memperluas fungsi template tanpa mengubah kode template.

Ringkasan

Untuk membuat UDF, tulis fungsi JavaScript atau fungsi Python, bergantung pada templatenya. Anda menyimpan file kode UDF di Cloud Storage dan menentukan lokasinya sebagai parameter template. Untuk setiap elemen input, template akan memanggil fungsi Anda. Fungsi ini mengubah elemen atau melakukan logika khusus lainnya dan menampilkan hasilnya kembali ke template.

Misalnya, Anda dapat menggunakan UDF untuk:

  • Memformat ulang data input agar sesuai dengan skema target.
  • Menyamarkan data sensitif.
  • Filter beberapa elemen dari output.

Input ke fungsi UDF adalah elemen data tunggal, yang diserialisasi sebagai string JSON. Fungsi ini menampilkan string JSON serial sebagai output. Format data bergantung pada {i>template<i}. Misalnya, di template Pub/Sub Subscription to BigQuery, inputnya adalah data pesan Pub/Sub yang diserialisasi sebagai objek JSON, dan outputnya adalah objek JSON serial yang mewakili baris tabel BigQuery. Untuk informasi selengkapnya, lihat dokumentasi untuk setiap template.

Menjalankan template dengan UDF

Untuk menjalankan template dengan UDF, tentukan lokasi Cloud Storage file JavaScript dan nama fungsi sebagai parameter template.

Dengan beberapa template yang disediakan Google, Anda juga dapat membuat UDF langsung di Konsol Google Cloud, sebagai berikut:

  1. Buka halaman Dataflow di konsol Google Cloud.

    Buka halaman Dataflow

  2. Klik Create job from template.

  3. Pilih template yang disediakan Google yang ingin Anda jalankan.

  4. Luaskan Parameter opsional. Jika mendukung UDF, template akan memiliki parameter untuk lokasi Cloud Storage UDF dan parameter lain untuk nama fungsi.

  5. Di samping parameter template, klik Create UDF.

  6. Di panel Select or Create a User-Defined Function (UDF):

    1. Masukkan nama file. Contoh: my_udf.js.
    2. Pilih folder Cloud Storage. Contoh: gs://your-bucket/your-folder.
    3. Gunakan editor kode inline untuk menulis fungsi. Editor telah diisi sebelumnya dengan kode boilerplate yang dapat Anda gunakan sebagai titik awal.
    4. Klik Create UDF.

      Konsol Google Cloud menyimpan file UDF dan mengisi lokasi Cloud Storage.

    5. Masukkan nama fungsi di kolom yang sesuai.

Menulis UDF JavaScript

Kode berikut menunjukkan UDF JavaScript tanpa pengoperasian yang dapat Anda mulai:

/*
 * @param {string} inJson input JSON message (stringified)
 * @return {?string} outJson output JSON message (stringified)
 */
function process(inJson) {
  const obj = JSON.parse(inJson);

  // Example data transformations:
  // Add a field: obj.newField = 1;
  // Modify a field: obj.existingField = '';
  // Filter a record: return null;

  return JSON.stringify(obj);
}

Kode JavaScript berjalan di mesin JavaScript Nashorn. Sebaiknya uji UDF Anda di mesin Nashorn sebelum men-deploy-nya. Mesin Nashorn tidak sama persis dengan implementasi JavaScript Node.js. Masalah umumnya adalah menggunakan console.log() atau Number.isNaN(), yang keduanya tidak ditentukan dalam mesin Nashorn.

Anda dapat menguji UDF pada mesin Nashorn menggunakan Cloud Shell, yang telah menginstal JDK 11. Luncurkan Nashorn dalam mode interaktif sebagai berikut:

jjs

Di shell interaktif Nashorn, lakukan langkah-langkah berikut:

  1. Panggil load untuk memuat file JavaScript UDF Anda.
  2. Tentukan objek JSON input bergantung pada pesan yang diharapkan oleh pipeline Anda.
  3. Gunakan fungsi JSON.stringify untuk melakukan serialisasi input ke string JSON.
  4. Panggil fungsi UDF Anda untuk memproses string JSON.
  5. Panggil JSON.parse untuk melakukan deserialisasi output.
  6. Verifikasi hasilnya.

Contoh:

> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)

Menulis UDF Python

Kode berikut menunjukkan UDF Python tanpa pengoperasian yang dapat Anda mulai:

import json
def process(value):
  # Load the JSON string into a dictionary.
  data = json.loads(value)

  # Transform the data in some way.
  data['new_field'] = 'new_value'

  # Serialize the data back to JSON.
  return json.dumps(data)

UDF Python mendukung paket dependensi yang merupakan standar untuk Python dan Apache Beam. Mereka tidak dapat menggunakan paket pihak ketiga.

Penanganan error

Biasanya, jika terjadi selama eksekusi UDF, error akan ditulis ke lokasi yang dihentikan pengirimannya. Detailnya bergantung pada template. Misalnya, template Pub/Sub Subscription to BigQuery membuat tabel _error_records dan menulis error di sana. Error UDF runtime dapat terjadi karena error sintaksis atau pengecualian yang tidak tertangkap. Untuk memeriksa error sintaksis, uji UDF Anda secara lokal.

Anda dapat menampilkan pengecualian secara terprogram untuk elemen yang tidak boleh diproses. Dalam hal ini, elemen ditulis ke lokasi yang dihentikan pengirimannya, jika template mendukung lokasi tersebut. Untuk contoh yang menunjukkan pendekatan ini, lihat Merutekan peristiwa.

Contoh kasus penggunaan

Bagian ini menjelaskan beberapa pola umum untuk UDF, berdasarkan kasus penggunaan di dunia nyata.

Memperkaya acara

Gunakan UDF untuk memperkaya peristiwa dengan kolom baru untuk informasi yang lebih kontekstual.

Contoh:

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Add new field to track data source
  data.source = "source1";
  return JSON.stringify(data);
}

Mengubah peristiwa

Gunakan UDF untuk mengubah seluruh format peristiwa bergantung pada apa yang diharapkan tujuan Anda.

Contoh berikut mengembalikan entri log Cloud Logging (LogEntry) ke string log asli jika tersedia. (Bergantung pada sumber log, string log asli terkadang diisi di kolom textPayload.) Anda dapat menggunakan pola ini untuk mengirim log mentah dalam format aslinya, bukan mengirim seluruh LogEntry dari Cloud Logging.

 function process(inJson) {
  const data = JSON.parse(inJson);

  if (data.textPayload) {
    return data.textPayload; // Return string value, and skip JSON.stringify
  }
 return JSON.stringify(obj);
}

Menyamarkan atau menghapus data peristiwa

Gunakan UDF untuk menyamarkan atau menghapus bagian acara.

Contoh berikut menyamarkan nama kolom sensitiveField dengan mengganti nilainya, dan menghapus kolom bernama redundantField sepenuhnya.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Normalize existing field values
  data.source = (data.source && data.source.toLowerCase()) || "unknown";

  // Redact existing field values
  if (data.sensitiveField) {
    data.sensitiveField = "REDACTED";
  }

  // Remove existing fields
  if (data.redundantField) {
    delete(data.redundantField);
  }

  return JSON.stringify(data);
}

Peristiwa rute

Gunakan UDF untuk mengarahkan peristiwa ke tujuan terpisah di sink downstream.

Contoh berikut, berdasarkan template Pub/Sub ke Splunk, mengarahkan setiap peristiwa ke indeks Splunk yang benar. Metode ini memanggil fungsi lokal yang ditentukan pengguna untuk memetakan peristiwa ke indeks.

function process(inJson) {
  const obj = JSON.parse(inJson);

  // Set index programmatically for data segregation in Splunk
  obj._metadata = {
    index: splunkIndexLookup(obj)
  }
  return JSON.stringify(obj);
}

Contoh berikutnya mengarahkan peristiwa yang tidak dikenal ke antrean yang dihentikan pengirimannya, dengan asumsi bahwa template mendukung antrean yang dihentikan pengirimannya. (Misalnya, lihat template Pub/Sub ke JDBC.) Anda dapat menggunakan pola ini untuk memfilter entri yang tidak terduga sebelum menulis ke tujuan.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Route unrecognized events to the deadletter topic
  if (!data.hasOwnProperty('severity')) {
    throw new Error("Unrecognized event. eventId='" + data.Id + "'");
  }

  return JSON.stringify(data);

Filter peristiwa

Gunakan UDF untuk memfilter peristiwa yang tidak diinginkan atau tidak dikenal dari output.

Contoh berikut menghapus peristiwa dengan data.severity sama dengan "DEBUG".

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Drop events with certain field values
  if (data.severity == "DEBUG") {
    return null;
  }

  return JSON.stringify(data);
}

Langkah selanjutnya