Algunas plantillas de Dataflow proporcionadas por Google admiten funciones definidas por el usuario (UDF). Las UDF te permiten extender la funcionalidad de una plantilla sin modificar el código de la plantilla.
Descripción general
Para crear una UDF, escribe una función de JavaScript o de Python, según la plantilla. Almacena el archivo de código de la UDF en Cloud Storage y especifica la ubicación como parámetro de plantilla. Para cada elemento de entrada, la plantilla llama a tu función. La función transforma el elemento o realiza otra lógica personalizada y muestra el resultado en la plantilla.
Por ejemplo, puedes usar una UDF para lo siguiente:
- Vuelve a formatear los datos de entrada para que coincidan con un esquema de destino.
- Oculta datos sensibles.
- Filtra algunos elementos del resultado.
La entrada a la función UDF es un elemento de datos único, serializado como una cadena JSON. La función muestra una cadena JSON serializada como un resultado. El formato de los datos depende de la plantilla. Por ejemplo, en la plantilla de suscripción de Pub/Sub a BigQuery, la entrada son los datos del mensaje de Pub/Sub serializados como un objeto JSON y el resultado es un objeto JSON serializado que representa una fila de la tabla de BigQuery. Para obtener más información, consulta la documentación de cada plantilla.
Ejecuta una plantilla con una UDF
Para ejecutar una plantilla con una UDF, especifica la ubicación de Cloud Storage del archivo JavaScript y el nombre de la función como parámetros de plantilla.
Con algunas plantillas proporcionadas por Google, también puedes crear la UDF directamente en la consola de Google Cloud, de la siguiente manera:
Ve a la página de Dataflow en la consola de Google Cloud.
Haz clic en add_boxCrear trabajo a partir de una plantilla
Selecciona la plantilla que proporciona Google que deseas ejecutar.
Expande Parámetros opcionales. Si la plantilla admite UDF, tiene un parámetro para la ubicación de Cloud Storage de la UDF y otro parámetro para el nombre de la función.
Junto al parámetro de plantilla, haz clic en Crear UDF.
En el panel Selecciona o crea una función definida por el usuario (UDF), haz lo siguiente:
- Ingresa un nombre de archivo. Ejemplo:
my_udf.js
. - Selecciona una carpeta de Cloud Storage.
Ejemplo:
gs://your-bucket/your-folder
. - Usa el editor de código intercalado para escribir la función. El editor ya cuenta con un código estándar que puedes usar como punto de partida.
Haz clic en Crear UDF.
La consola de Google Cloud guarda el archivo de UDF y propaga la ubicación de Cloud Storage.
Ingresa el nombre de la función en el campo correspondiente.
- Ingresa un nombre de archivo. Ejemplo:
Escribe una UDF de JavaScript
En el siguiente código, se muestra una UDF no-ops de JavaScript desde la que puedes comenzar:
/*
* @param {string} inJson input JSON message (stringified)
* @return {?string} outJson output JSON message (stringified)
*/
function process(inJson) {
const obj = JSON.parse(inJson);
// Example data transformations:
// Add a field: obj.newField = 1;
// Modify a field: obj.existingField = '';
// Filter a record: return null;
return JSON.stringify(obj);
}
El código JavaScript se ejecuta en el
motor Nashorn de JavaScript. Te recomendamos probar la UDF en el motor de Nashorn antes de implementarla. El motor de Nashorn no coincide con exactitud con la implementación de Node.js de JavaScript. Un error común es usar console.log()
o Number.isNaN()
, ninguno de los cuales se define en el motor de Nashorn.
Puedes probar tu UDF en el motor de Nashorn mediante Cloud Shell, que tiene preinstalado JDK 11. Inicia Nashorn en modo interactivo de la siguiente manera:
jjs --language=es6
En la shell interactiva de Nashorn, realiza los siguientes pasos:
- Llama a
load
para cargar tu archivo UDF JavaScript. - Define un objeto JSON de entrada según los mensajes esperados de tu canalización.
- Usa la función
JSON.stringify
para serializar la entrada en una string JSON. - Llama a tu función de UDF para procesar la string JSON.
- Llama a
JSON.parse
para deserializar el resultado. - Verifica el resultado.
Ejemplo:
> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)
Escribe una UDF de Python
En el siguiente código, se muestra una UDF no-ops de Python desde la que puedes comenzar:
import json
def process(value):
# Load the JSON string into a dictionary.
data = json.loads(value)
# Transform the data in some way.
data['new_field'] = 'new_value'
# Serialize the data back to JSON.
return json.dumps(data)
Las UDF de Python admiten paquetes de dependencia que son estándar de Python y Apache Beam. No pueden usar paquetes de terceros.
Manejo de errores
Por lo general, cuando se produce un error durante la ejecución de la UDF, el error se escribe en una ubicación de mensajes no entregados. Los detalles dependen de la plantilla. Por ejemplo, la plantilla de
suscripción de Pub/Sub a BigQuery
crea una tabla _error_records
y escribe errores allí. Los errores de UDF del entorno de ejecución pueden ocurrir debido a errores de sintaxis o excepciones no detectadas. Para comprobar si hay errores de sintaxis, prueba tu UDF de forma local.
Puedes generar una excepción de manera programática para un elemento que no debe procesarse. En el caso del elemento, se escribe en la ubicación de mensajes no entregados, si la plantilla admite una. Para ver un ejemplo que muestra este enfoque, consulta Eventos de ruta.
Casos de uso de ejemplo
En esta sección, se describen algunos patrones comunes para las UDF, basados en casos de uso reales.
Enriquece eventos
Usa una UDF para enriquecer eventos con campos nuevos para obtener información más contextual.
Ejemplo:
function process(inJson) {
const data = JSON.parse(inJson);
// Add new field to track data source
data.source = "source1";
return JSON.stringify(data);
}
Transforma eventos
Usa una UDF para transformar todo el formato del evento según lo que espere tu destino.
En el siguiente ejemplo, se revierte una entrada de registro de Cloud Logging (LogEntry
) a la cadena de registro original cuando está disponible. (Según la fuente del archivo de registro, la string de registro original se propaga a veces en el campo textPayload
). Puedes usar este patrón para enviar los registros sin procesar en su formato original, en lugar de enviar todo el LogEntry
desde Cloud Logging.
function process(inJson) {
const data = JSON.parse(inJson);
if (data.textPayload) {
return data.textPayload; // Return string value, and skip JSON.stringify
}
return JSON.stringify(obj);
}
Oculta o quita datos de eventos
Usa una UDF para ocultar o quitar una parte del evento.
En el siguiente ejemplo, se oculta el nombre del campo sensitiveField
mediante el reemplazo de su valor y se quita el campo llamado redundantField
por completo.
function process(inJson) {
const data = JSON.parse(inJson);
// Normalize existing field values
data.source = (data.source && data.source.toLowerCase()) || "unknown";
// Redact existing field values
if (data.sensitiveField) {
data.sensitiveField = "REDACTED";
}
// Remove existing fields
if (data.redundantField) {
delete(data.redundantField);
}
return JSON.stringify(data);
}
Eventos de ruta
Usa una UDF para enrutar eventos a fin de separar destinos en el receptor descendente.
En el siguiente ejemplo, basado en la plantilla Pub/Sub a Splunk, se enruta cada evento al índice de Splunk correcto. Llama a una función local definida por el usuario para asignar eventos a los índices.
function process(inJson) {
const obj = JSON.parse(inJson);
// Set index programmatically for data segregation in Splunk
obj._metadata = {
index: splunkIndexLookup(obj)
}
return JSON.stringify(obj);
}
En el siguiente ejemplo, se enrutan los eventos no reconocidos a la cola de mensajes no entregados, si la plantilla admite una cola de mensajes no entregados. (Por ejemplo, consulta la plantilla Pub/Sub a JDBC). Puedes usar este patrón para filtrar las entradas inesperadas antes de escribir en el destino.
function process(inJson) {
const data = JSON.parse(inJson);
// Route unrecognized events to the deadletter topic
if (!data.hasOwnProperty('severity')) {
throw new Error("Unrecognized event. eventId='" + data.Id + "'");
}
return JSON.stringify(data);
Filtrar eventos
Usa una UDF para filtrar desde el resultado los eventos no deseados o no reconocidos.
En el siguiente ejemplo, se descartan los eventos en los que data.severity
es igual a "DEBUG"
.
function process(inJson) {
const data = JSON.parse(inJson);
// Drop events with certain field values
if (data.severity == "DEBUG") {
return null;
}
return JSON.stringify(data);
}
¿Qué sigue?
- Plantillas que proporciona Google
- Compila y ejecuta una plantilla flexible
- Ejecuta plantillas clásicas
- Extiende tu plantilla de Dataflow con UDF (entrada de blog)
- UDF de ejemplo (GitHub)