Create user-defined functions for Dataflow templates

Some Google-provided Dataflow templates support user-defined functions (UDFs). UDFs let you extend the functionality of a template without modifying the template code.

Overview

To create a UDF, you write a JavaScript function or Python function, depending on the template. You store the UDF code file in Cloud Storage and specify the location as a template parameter. For each input element, the template calls your function. The function transforms the element or performs other custom logic and returns the result back to the template.

For example, you might use a UDF to:

  • Reformat the input data to match a target schema.
  • Redact sensitive data.
  • Filter some elements from the output.

The input to the UDF function is a single data element, serialized as a JSON string. The function returns a serialized JSON string as output. The data format depends on the template. For example, in the Pub/Sub Subscription to BigQuery template, the input is the Pub/Sub message data serialized as a JSON object, and the output is a serialized JSON object representing a BigQuery table row. For more information, see the documentation for each template.

Run a template with a UDF

To run a template with a UDF, you specify the Cloud Storage location of the JavaScript file and the name of the function as template parameters.

With some Google-provided templates, you can also create the UDF directly in the Google Cloud console, as follows:

  1. Go to the Dataflow page in the Google Cloud console.

    Go to the Dataflow page

  2. Click Create job from template.

  3. Select the Google-provided template that you want to run.

  4. Expand Optional parameters. If the template supports UDFs, it has a parameter for the Cloud Storage location of the UDF and another parameter for the function name.

  5. Next to the template parameter, click Create UDF.

  6. In the Select or Create a User-Defined Function (UDF) panel:

    1. Enter a filename. Example: my_udf.js.
    2. Select a Cloud Storage folder. Example: gs://your-bucket/your-folder.
    3. Use the inline code editor to write the function. The editor is pre-populated with boilerplate code that you can use as a starting point.
    4. Click Create UDF.

      The Google Cloud console saves the UDF file and populates the Cloud Storage location.

    5. Enter your function's name in the corresponding field.

Write a JavaScript UDF

The following code shows a no-op JavaScript UDF that you can start from:

/*
 * @param {string} inJson input JSON message (stringified)
 * @return {?string} outJson output JSON message (stringified)
 */
function process(inJson) {
  const obj = JSON.parse(inJson);

  // Example data transformations:
  // Add a field: obj.newField = 1;
  // Modify a field: obj.existingField = '';
  // Filter a record: return null;

  return JSON.stringify(obj);
}

The JavaScript code runs on the Nashorn JavaScript engine. We recommend testing your UDF on the Nashorn engine before deploying it. The Nashorn engine does not exactly match the Node.js implementation of JavaScript. A common problem is using console.log() or Number.isNaN(), neither of which is defined in the Nashorn engine.

You can test your UDF on Nashorn engine by using Cloud Shell, which has JDK 11 pre-installed. Launch Nashorn in interactive mode as follows:

jjs

In the Nashorn interactive shell, perform the following steps:

  1. Call load to load your UDF JavaScript file.
  2. Define an input JSON object depending on your pipeline's expected messages.
  3. Use the JSON.stringify function to serialize the input to a JSON string.
  4. Call your UDF function to process the JSON string.
  5. Call JSON.parse to deserialize the output.
  6. Verify the result.

Example:

> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)

Write a Python UDF

The following code shows a no-op Python UDF that you can start from:

import json
def process(value):
  # Load the JSON string into a dictionary.
  data = json.loads(value)

  # Transform the data in some way.
  data['new_field'] = 'new_value'

  # Serialize the data back to JSON.
  return json.dumps(data)

Python UDFs support dependency packages that are standard to Python and Apache Beam. They can't use third-party packages.

Error handling

Typically, when an error occurs during UDF execution, the error is written to a dead-letter location. The details depend on the template. For example, the Pub/Sub Subscription to BigQuery template creates an _error_records table and writes errors there. Runtime UDF errors can occur because of syntax errors or uncaught exceptions. To check for syntax errors, test your UDF locally.

You can programmatically throw an exception for an element that shouldn't be processed. In the case, the element is written to the dead-letter location, if the template supports one. For an example that shows this approach, see Route events.

Example use cases

This section describes some common patterns for UDFs, based on real-world use cases.

Enrich events

Use a UDF to enrich events with new fields for more contextual information.

Example:

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Add new field to track data source
  data.source = "source1";
  return JSON.stringify(data);
}

Transform events

Use a UDF to transform the entire event format depending on what your destination expects.

The following example reverts a Cloud Logging log entry (LogEntry) to the original log string when available. (Depending on the log source, the original log string is sometimes populated in the textPayload field.) You might use this pattern to send the raw logs in their original format, instead of sending the entire LogEntry from Cloud Logging.

 function process(inJson) {
  const data = JSON.parse(inJson);

  if (data.textPayload) {
    return data.textPayload; // Return string value, and skip JSON.stringify
  }
 return JSON.stringify(obj);
}

Redact or remove event data

Use a UDF to redact or remove a part of the event.

The following example redacts the field name sensitiveField by replacing its value, and removes the field named redundantField entirely.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Normalize existing field values
  data.source = (data.source && data.source.toLowerCase()) || "unknown";

  // Redact existing field values
  if (data.sensitiveField) {
    data.sensitiveField = "REDACTED";
  }

  // Remove existing fields
  if (data.redundantField) {
    delete(data.redundantField);
  }

  return JSON.stringify(data);
}

Route events

Use a UDF to route events to separate destinations in the downstream sink.

The following example, based on the Pub/Sub to Splunk template, routes each event to the correct Splunk index. It calls a user-defined local function to map events to indexes.

function process(inJson) {
  const obj = JSON.parse(inJson);
  
  // Set index programmatically for data segregation in Splunk
  obj._metadata = {
    index: splunkIndexLookup(obj)
  }
  return JSON.stringify(obj);
}  

The next example routes unrecognized events to the dead-letter queue, assuming the template supports a dead-letter queue. (For example, see the Pub/Sub to JDBC template.) You might use this pattern to filter out unexpected entries before writing to the destination.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Route unrecognized events to the deadletter topic
  if (!data.hasOwnProperty('severity')) {
    throw new Error("Unrecognized event. eventId='" + data.Id + "'");
  }

  return JSON.stringify(data);

Filter events

Use a UDF to filter undesired or unrecognized events from the output.

The following example drops events where data.severity equals "DEBUG".

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Drop events with certain field values
  if (data.severity == "DEBUG") {
    return null;
  }

  return JSON.stringify(data);
}

What's next