Crea funciones definidas por el usuario para plantillas de Dataflow

Algunas plantillas de Dataflow proporcionadas por Google admiten funciones definidas por el usuario (UDF). Las UDF te permiten extender la funcionalidad de una plantilla sin modificar el código de la plantilla.

Descripción general

Para crear una UDF, escribe una función de JavaScript o de Python, según la plantilla. Debes almacenar el archivo de código de la UDF en Cloud Storage y especificar la ubicación como un parámetro de plantilla. Para cada elemento de entrada, la plantilla llama a tu función. La función transforma el elemento o realiza otra lógica personalizada y muestra el resultado en la plantilla.

Por ejemplo, puedes usar una UDF para lo siguiente:

  • Vuelve a formatear los datos de entrada para que coincidan con un esquema de destino.
  • Oculta datos sensibles.
  • Filtra algunos elementos del resultado.

La entrada a la función UDF es un elemento de datos único, serializado como una cadena JSON. La función muestra una cadena JSON serializada como un resultado. El formato de los datos depende de la plantilla. Por ejemplo, en la plantilla de suscripción de Pub/Sub a BigQuery, la entrada son los datos del mensaje de Pub/Sub serializados como un objeto JSON y el resultado es un objeto JSON serializado que representa una fila de la tabla de BigQuery. Para obtener más información, consulta la documentación de cada plantilla.

Ejecuta una plantilla con una UDF

Para ejecutar una plantilla con una UDF, especifica la ubicación de Cloud Storage del archivo JavaScript y el nombre de la función como parámetros de plantilla.

Con algunas plantillas proporcionadas por Google, también puedes crear la UDF directamente en la consola de Google Cloud, de la siguiente manera:

  1. Ve a la página de Dataflow en la consola de Google Cloud.

    Ir a la página de Dataflow

  2. Haz clic en Crear trabajo a partir de una plantilla

  3. Selecciona la plantilla que proporciona Google que deseas ejecutar.

  4. Expande Parámetros opcionales. Si la plantilla admite UDF, tiene un parámetro para la ubicación de Cloud Storage de la UDF y otro parámetro para el nombre de la función.

  5. Junto al parámetro de plantilla, haz clic en Crear UDF.

  6. En el panel Selecciona o crea una función definida por el usuario (UDF), haz lo siguiente:

    1. Ingresa un nombre de archivo. Ejemplo: my_udf.js.
    2. Selecciona una carpeta de Cloud Storage. Ejemplo: gs://your-bucket/your-folder.
    3. Usa el editor de código intercalado para escribir la función. El editor ya cuenta con un código estándar que puedes usar como punto de partida.
    4. Haz clic en Crear UDF.

      La consola de Google Cloud guarda el archivo de UDF y propaga la ubicación de Cloud Storage.

    5. Ingresa el nombre de la función en el campo correspondiente.

Escribe una UDF de JavaScript

En el siguiente código, se muestra una UDF no-ops de JavaScript desde la que puedes comenzar:

/*
 * @param {string} inJson input JSON message (stringified)
 * @return {?string} outJson output JSON message (stringified)
 */
function process(inJson) {
  const obj = JSON.parse(inJson);

  // Example data transformations:
  // Add a field: obj.newField = 1;
  // Modify a field: obj.existingField = '';
  // Filter a record: return null;

  return JSON.stringify(obj);
}

El código JavaScript se ejecuta en el motor de JavaScript Nashorn. Te recomendamos probar la UDF en el motor de Nashorn antes de implementarla. El motor de Nashorn no coincide con exactitud con la implementación de Node.js de JavaScript. Un error común es usar console.log() o Number.isNaN(), ninguno de los cuales se define en el motor de Nashorn.

Puedes probar tu UDF en el motor de Nashorn mediante Cloud Shell, que tiene preinstalado JDK 11. Inicia Nashorn en modo interactivo de la siguiente manera:

jjs

En la shell interactiva de Nashorn, realiza los siguientes pasos:

  1. Llama a load para cargar tu archivo UDF JavaScript.
  2. Define un objeto JSON de entrada según los mensajes esperados de tu canalización.
  3. Usa la función JSON.stringify para serializar la entrada en una string JSON.
  4. Llama a tu función de UDF para procesar la string JSON.
  5. Llama a JSON.parse para deserializar el resultado.
  6. Verifica el resultado.

Ejemplo:

> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)

Escribe una UDF de Python

En el siguiente código, se muestra una UDF no-ops de Python desde la que puedes comenzar:

import json
def process(value):
  # Load the JSON string into a dictionary.
  data = json.loads(value)

  # Transform the data in some way.
  data['new_field'] = 'new_value'

  # Serialize the data back to JSON.
  return json.dumps(data)

Las UDF de Python admiten paquetes de dependencia que son estándar de Python y Apache Beam. No pueden usar paquetes de terceros.

Manejo de errores

Por lo general, cuando se produce un error durante la ejecución de la UDF, el error se escribe en una ubicación de mensajes no entregados. Los detalles dependen de la plantilla. Por ejemplo, la plantilla de suscripción de Pub/Sub a BigQuery crea una tabla _error_records y escribe errores allí. Los errores de UDF del entorno de ejecución pueden ocurrir debido a errores de sintaxis o excepciones no detectadas. Para comprobar si hay errores de sintaxis, prueba tu UDF de forma local.

Puedes generar una excepción de manera programática para un elemento que no debe procesarse. En el caso del elemento, se escribe en la ubicación de mensajes no entregados, si la plantilla admite una. Para ver un ejemplo que muestra este enfoque, consulta Eventos de ruta.

Casos de uso de ejemplo

En esta sección, se describen algunos patrones comunes para las UDF, basados en casos de uso reales.

Enriquece eventos

Usa una UDF para enriquecer eventos con campos nuevos para obtener información más contextual.

Ejemplo:

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Add new field to track data source
  data.source = "source1";
  return JSON.stringify(data);
}

Transforma eventos

Usa una UDF para transformar todo el formato del evento según lo que espere tu destino.

En el siguiente ejemplo, se revierte una entrada de registro de Cloud Logging (LogEntry) a la cadena de registro original cuando está disponible. (Según la fuente del archivo de registro, la string de registro original se propaga a veces en el campo textPayload). Puedes usar este patrón para enviar los registros sin procesar en su formato original, en lugar de enviar todo el LogEntry desde Cloud Logging.

 function process(inJson) {
  const data = JSON.parse(inJson);

  if (data.textPayload) {
    return data.textPayload; // Return string value, and skip JSON.stringify
  }
 return JSON.stringify(obj);
}

Oculta o quita datos de eventos

Usa una UDF para ocultar o quitar una parte del evento.

En el siguiente ejemplo, se oculta el nombre del campo sensitiveField mediante el reemplazo de su valor y se quita el campo llamado redundantField por completo.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Normalize existing field values
  data.source = (data.source && data.source.toLowerCase()) || "unknown";

  // Redact existing field values
  if (data.sensitiveField) {
    data.sensitiveField = "REDACTED";
  }

  // Remove existing fields
  if (data.redundantField) {
    delete(data.redundantField);
  }

  return JSON.stringify(data);
}

Eventos de ruta

Usa una UDF para enrutar eventos a fin de separar destinos en el receptor descendente.

En el siguiente ejemplo, basado en la plantilla Pub/Sub a Splunk, se enruta cada evento al índice de Splunk correcto. Llama a una función local definida por el usuario para asignar eventos a los índices.

function process(inJson) {
  const obj = JSON.parse(inJson);

  // Set index programmatically for data segregation in Splunk
  obj._metadata = {
    index: splunkIndexLookup(obj)
  }
  return JSON.stringify(obj);
}

En el siguiente ejemplo, se enrutan los eventos no reconocidos a la cola de mensajes no entregados, si la plantilla admite una cola de mensajes no entregados. (Por ejemplo, consulta la plantilla Pub/Sub a JDBC). Puedes usar este patrón para filtrar las entradas inesperadas antes de escribir en el destino.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Route unrecognized events to the deadletter topic
  if (!data.hasOwnProperty('severity')) {
    throw new Error("Unrecognized event. eventId='" + data.Id + "'");
  }

  return JSON.stringify(data);

Filtrar eventos

Usa una UDF para filtrar desde el resultado los eventos no deseados o no reconocidos.

En el siguiente ejemplo, se descartan los eventos en los que data.severity es igual a "DEBUG".

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Drop events with certain field values
  if (data.severity == "DEBUG") {
    return null;
  }

  return JSON.stringify(data);
}

¿Qué sigue?