Algunas plantillas de Dataflow proporcionadas por Google admiten funciones definidas por el usuario (UDF). Las funciones definidas por el usuario te permiten ampliar la funcionalidad de una plantilla sin modificar su código.
Información general
Para crear una FDU, escribe una función de JavaScript o de Python, según la plantilla. Almacena el archivo de código de la UDF en Cloud Storage y especifica la ubicación como parámetro de plantilla. En cada elemento de entrada, la plantilla llama a tu función. La función transforma el elemento o realiza otra lógica personalizada y devuelve el resultado a la plantilla.
Por ejemplo, puedes usar una función definida por el usuario para lo siguiente:
- Cambia el formato de los datos de entrada para que coincidan con un esquema de destino.
- Oculta los datos sensibles.
- Filtrar algunos elementos de la salida.
La entrada de la función de FDU es un único elemento de datos, serializado como una cadena JSON. La función devuelve una cadena JSON serializada como salida. El formato de los datos depende de la plantilla. Por ejemplo, en la plantilla Suscripción de Pub/Sub a BigQuery, la entrada son los datos de mensajes de Pub/Sub serializados como un objeto JSON y la salida es un objeto JSON serializado que representa una fila de una tabla de BigQuery. Para obtener más información, consulta la documentación de cada plantilla.
Ejecutar una plantilla con una función definida por el usuario
Para ejecutar una plantilla con una UDF, especifica la ubicación en Cloud Storage del archivo JavaScript y el nombre de la función como parámetros de la plantilla.
Con algunas plantillas proporcionadas por Google, también puedes crear la función definida por el usuario directamente en laGoogle Cloud consola, de la siguiente manera:
Ve a la página Dataflow de la Google Cloud consola.
Haz clic en add_boxCrear tarea a partir de plantilla.
Selecciona la plantilla proporcionada por Google que quieras ejecutar.
Despliega Parámetros opcionales. Si la plantilla admite UDFs, tiene un parámetro para la ubicación de Cloud Storage de la UDF y otro para el nombre de la función.
Junto al parámetro de plantilla, haz clic en Crear función definida por el usuario.
En el panel Seleccionar o crear una función definida por el usuario (UDF), haga lo siguiente:
- Introduce un nombre de archivo. Ejemplo:
my_udf.js
- Selecciona una carpeta de Cloud Storage.
Ejemplo:
gs://your-bucket/your-folder
- Usa el editor de código insertado para escribir la función. El editor ya incluye una plantilla de código que puedes usar como punto de partida.
Haz clic en Crear UDF.
La consola Google Cloud guarda el archivo de función definida por el usuario y rellena la ubicación de Cloud Storage.
Introduce el nombre de la función en el campo correspondiente.
- Introduce un nombre de archivo. Ejemplo:
Escribir una función definida por el usuario de JavaScript
El siguiente código muestra una UDF de JavaScript que no hace nada y que puedes usar como punto de partida:
/*
* @param {string} inJson input JSON message (stringified)
* @return {?string} outJson output JSON message (stringified)
*/
function process(inJson) {
const obj = JSON.parse(inJson);
// Example data transformations:
// Add a field: obj.newField = 1;
// Modify a field: obj.existingField = '';
// Filter a record: return null;
return JSON.stringify(obj);
}
El código JavaScript se ejecuta en el motor de JavaScript Nashorn. Te recomendamos que pruebes tu función definida por el usuario en el motor Nashorn antes de implementarla. El motor Nashorn
no coincide exactamente con la implementación de JavaScript de Node.js. Un problema habitual es usar console.log()
o Number.isNaN()
, ninguno de los cuales está definido en el motor Nashorn.
Puedes probar tu UDF en el motor Nashorn con Cloud Shell, que tiene JDK 11 preinstalado. Inicia Nashorn en modo interactivo de la siguiente manera:
jjs --language=es6
En el shell interactivo de Nashorn, sigue estos pasos:
- Llama a
load
para cargar tu archivo JavaScript de la función definida por el usuario. - Define un objeto JSON de entrada en función de los mensajes esperados de tu canalización.
- Usa la función
JSON.stringify
para serializar la entrada en una cadena JSON. - Llama a tu función UDF para procesar la cadena JSON.
- Llama a
JSON.parse
para deserializar el resultado. - Verifica el resultado.
Ejemplo:
> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)
Escribir una UDF de Python
En el siguiente código se muestra una UDF de Python que no hace nada y que puedes usar como punto de partida:
import json
def process(value):
# Load the JSON string into a dictionary.
data = json.loads(value)
# Transform the data in some way.
data['new_field'] = 'new_value'
# Serialize the data back to JSON.
return json.dumps(data)
Las UDFs de Python admiten paquetes de dependencias que son estándar en Python y Apache Beam. No pueden usar paquetes de terceros.
Gestión de errores
Normalmente, cuando se produce un error durante la ejecución de una UDF, el error se escribe en una ubicación de mensajes fallidos. Los detalles dependen de la plantilla. Por ejemplo, la plantilla Suscripción de Pub/Sub a BigQuery crea una tabla _error_records
y escribe los errores en ella. Los errores de UDF en tiempo de ejecución se pueden producir por errores de sintaxis o excepciones no detectadas. Para comprobar si hay errores de sintaxis, prueba tu función definida por el usuario de forma local.
Puedes generar una excepción de forma programática para un elemento que no se debería procesar. En ese caso, el elemento se escribe en la ubicación de mensajes fallidos, si la plantilla admite una. Para ver un ejemplo de este enfoque, consulta Eventos de ruta.
Ejemplos de casos prácticos
En esta sección se describen algunos patrones habituales de las UDFs, basados en casos prácticos reales.
Enriquecer eventos
Usa una función definida por el usuario para enriquecer los eventos con campos nuevos y obtener más información contextual.
Ejemplo:
function process(inJson) {
const data = JSON.parse(inJson);
// Add new field to track data source
data.source = "source1";
return JSON.stringify(data);
}
Transformar eventos
Usa una función definida por el usuario para transformar todo el formato del evento en función de lo que espere el destino.
En el siguiente ejemplo, se revierte una entrada de registro de Cloud Logging
(LogEntry
) a la cadena de registro original
cuando está disponible. (En función de la fuente del registro, la cadena de registro original se incluye a veces en el campo textPayload
). Puedes usar este patrón para enviar los registros sin procesar en su formato original, en lugar de enviar todo el LogEntry
de Cloud Logging.
function process(inJson) {
const data = JSON.parse(inJson);
if (data.textPayload) {
return data.textPayload; // Return string value, and skip JSON.stringify
}
return JSON.stringify(obj);
}
Ocultar o eliminar datos de eventos
Usa una función definida por el usuario para ocultar o eliminar una parte del evento.
En el siguiente ejemplo, se oculta el nombre del campo sensitiveField
sustituyendo su valor y se elimina por completo el campo redundantField
.
function process(inJson) {
const data = JSON.parse(inJson);
// Normalize existing field values
data.source = (data.source && data.source.toLowerCase()) || "unknown";
// Redact existing field values
if (data.sensitiveField) {
data.sensitiveField = "REDACTED";
}
// Remove existing fields
if (data.redundantField) {
delete(data.redundantField);
}
return JSON.stringify(data);
}
Eventos de ruta
Usa una UDF para enrutar eventos a destinos independientes en el receptor de nivel inferior.
En el siguiente ejemplo, basado en la plantilla Pub/Sub a Splunk, se dirige cada evento al índice de Splunk correcto. Llama a una función local definida por el usuario para asignar eventos a índices.
function process(inJson) {
const obj = JSON.parse(inJson);
// Set index programmatically for data segregation in Splunk
obj._metadata = {
index: splunkIndexLookup(obj)
}
return JSON.stringify(obj);
}
En el siguiente ejemplo se enrutan los eventos no reconocidos a la cola de mensajes fallidos, suponiendo que la plantilla admita una cola de mensajes fallidos. Por ejemplo, consulta la plantilla Pub/Sub a JDBC. Puedes usar este patrón para filtrar las entradas inesperadas antes de escribir en el destino.
function process(inJson) {
const data = JSON.parse(inJson);
// Route unrecognized events to the deadletter topic
if (!data.hasOwnProperty('severity')) {
throw new Error("Unrecognized event. eventId='" + data.Id + "'");
}
return JSON.stringify(data);
Filtrar eventos
Usa una función definida por el usuario para filtrar los eventos no deseados o no reconocidos de la salida.
En el siguiente ejemplo, se eliminan los eventos en los que data.severity
es igual a "DEBUG"
.
function process(inJson) {
const data = JSON.parse(inJson);
// Drop events with certain field values
if (data.severity == "DEBUG") {
return null;
}
return JSON.stringify(data);
}
Siguientes pasos
- Plantillas proporcionadas por Google
- Crear y ejecutar una plantilla Flex
- Ejecutar plantillas clásicas
- Ampliar una plantilla de Dataflow con funciones definidas por el usuario (entrada de blog)
- UDFs de ejemplo (GitHub)