Alcuni modelli Dataflow forniti da Google supportano funzioni definite dall'utente. Le funzioni definite dall'utente consentono di estendere la funzionalità un modello senza modificare il codice del modello.
Panoramica
Per creare una UDF, scrivi una funzione JavaScript o Python, a seconda del modello. Archivia il file di codice UDF in Cloud Storage e specifica la location come parametro del modello. Per ogni elemento di input, il modello chiama la tua funzione. La funzione trasforma l'elemento o esegue altre applicazioni personalizzate e restituisce il risultato al modello.
Ad esempio, puoi utilizzare una UDF per:
- Riformatta i dati di input in modo che corrispondano a uno schema target.
- Oscura i dati sensibili.
- Filtra alcuni elementi dall'output.
L'input della funzione funzione definita dall'utente è un singolo elemento di dati, serializzato come JSON stringa. La funzione restituisce come output una stringa JSON serializzata. Il formato dei dati dipende dal modello. Ad esempio, nel modello Pub/Sub Subscription to BigQuery l'input è costituito dai dati del messaggio Pub/Sub serializzati come oggetto JSON e l'output è un oggetto JSON serializzato che rappresenta una riga della tabella BigQuery. Per ulteriori informazioni, consulta la documentazione di ogni modello.
Esegui un modello con una FDU
Per eseguire un modello con una funzione UDF, specifica la posizione Cloud Storage del file JavaScript e il nome della funzione come parametri del modello.
Con alcuni modelli forniti da Google, puoi anche creare la funzione definita dall'utente direttamente nella Console Google Cloud, come segue:
Vai alla pagina Dataflow nella console Google Cloud.
Fai clic su add_boxCrea job da modello.
Seleziona il modello fornito da Google che vuoi eseguire.
Espandi Parametri facoltativi. Se il modello supporta le funzioni definite dall'utente, per la posizione di Cloud Storage della funzione definita dall'utente e di un altro per il nome della funzione.
Accanto al parametro del modello, fai clic su Crea UDF.
Nel riquadro Seleziona o crea una funzione definita dall'utente:
- Inserisci un nome file. Esempio:
my_udf.js
. - Seleziona una cartella di Cloud Storage.
Esempio:
gs://your-bucket/your-folder
. - Utilizza l'editor di codice in linea per scrivere la funzione. L'editore è precompilato con codice boilerplate, che puoi utilizzare come punto di partenza.
Fai clic su Crea funzione definita dall'utente.
La console Google Cloud salva il file della funzione definita dall'utente e compila la posizione di Cloud Storage.
Inserisci il nome della funzione nel campo corrispondente.
- Inserisci un nome file. Esempio:
Scrivi una funzione JavaScript definita dall'utente
Il seguente codice mostra una funzione JavaScript definita dall'utente da cui puoi iniziare:
/*
* @param {string} inJson input JSON message (stringified)
* @return {?string} outJson output JSON message (stringified)
*/
function process(inJson) {
const obj = JSON.parse(inJson);
// Example data transformations:
// Add a field: obj.newField = 1;
// Modify a field: obj.existingField = '';
// Filter a record: return null;
return JSON.stringify(obj);
}
Il codice JavaScript viene eseguito
Motore JavaScript Nashorn. I nostri suggerimenti
testerai la funzione definita dall'utente sul motore Nashorn prima di eseguirne il deployment. Il motore Nashorn
non corrisponde esattamente all'implementazione Node.js di JavaScript. Un comune
problema è l'utilizzo di console.log()
o Number.isNaN()
, nessuno dei due è
definiti nel motore Nashorn.
Puoi testare la funzione definita dall'utente sul motore Nashorn utilizzando Cloud Shell, JDK 11 preinstallato. Avvia Nashorn in modalità interattiva nel seguente modo:
jjs --language=es6
Nella shell interattiva Nashorn, esegui i seguenti passaggi:
- Chiama
load
per caricare il file JavaScript della UDF. - Definisci un oggetto JSON di input in base ai messaggi previsti dalla pipeline.
- Utilizza la funzione
JSON.stringify
per serializzare l'input in una stringa JSON. - Chiama la funzione UDF per elaborare la stringa JSON.
- Chiama
JSON.parse
per deserializzare l'output. - Verifica il risultato.
Esempio:
> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)
Scrivere una UDF in Python
Il seguente codice mostra una UDF Python no-op da cui puoi iniziare:
import json
def process(value):
# Load the JSON string into a dictionary.
data = json.loads(value)
# Transform the data in some way.
data['new_field'] = 'new_value'
# Serialize the data back to JSON.
return json.dumps(data)
Le funzioni definite dall'utente di Python supportano i pacchetti di dipendenze standard di Python Apache Beam. Non possono utilizzare pacchetti di terze parti.
Gestione degli errori
In genere, quando si verifica un errore durante l'esecuzione della funzione definita dall'utente, l'errore viene scritto in una
posizione dei messaggi non recapitabili. I dettagli dipendono dal modello. Ad esempio,
Sottoscrizione Pub/Sub a BigQuery
modello crea una tabella _error_records
e scrive gli errori lì. Funzione definita dall'utente per il runtime
a causa di errori di sintassi o eccezioni non rilevate. Per verificare
errori di sintassi, testa la funzione definita dall'utente localmente.
Puoi generare in modo programmatico un'eccezione per un elemento che non dovrebbe essere elaborati. In questo caso, l'elemento viene scritto nella posizione della posta inutilizzata, se il modello ne supporta una. Per un esempio che mostra questo approccio, consulta Indirizzare gli eventi.
Esempi di casi d'uso
Questa sezione descrive alcuni pattern comuni per le funzioni definite dall'utente, in base a casi d'uso reali.
Arricchire gli eventi
Utilizza una UDF per arricchire gli eventi con nuovi campi per informazioni più contestuali.
Esempio:
function process(inJson) {
const data = JSON.parse(inJson);
// Add new field to track data source
data.source = "source1";
return JSON.stringify(data);
}
Trasformare gli eventi
Utilizza una UDF per trasformare l'intero formato dell'evento in base alle aspettative della destinazione.
L'esempio seguente ripristina una voce del log di Cloud Logging
(LogEntry
) alla stringa del log originale, se disponibile. (a seconda della sorgente log, la stringa di log originale è
a volte compilate nel campo textPayload
. Puoi usare questo pattern per
inviare i log non elaborati nel formato originale, invece di inviare l'intero
LogEntry
da Cloud Logging.
function process(inJson) {
const data = JSON.parse(inJson);
if (data.textPayload) {
return data.textPayload; // Return string value, and skip JSON.stringify
}
return JSON.stringify(obj);
}
Oscura o rimuovi i dati sugli eventi
Utilizza una funzione definita dall'utente per oscurare o rimuovere una parte dell'evento.
L'esempio seguente oscura il nome del campo sensitiveField
sostituendo il relativo
e rimuove completamente il campo denominato redundantField
.
function process(inJson) {
const data = JSON.parse(inJson);
// Normalize existing field values
data.source = (data.source && data.source.toLowerCase()) || "unknown";
// Redact existing field values
if (data.sensitiveField) {
data.sensitiveField = "REDACTED";
}
// Remove existing fields
if (data.redundantField) {
delete(data.redundantField);
}
return JSON.stringify(data);
}
Eventi di percorso
Utilizza una funzione definita dall'utente per indirizzare gli eventi a destinazioni separate nel sink a valle.
L'esempio seguente, in base al Da Pub/Sub a Splunk del modello, instrada ogni evento all'indice Splunk corretto. Chiama una funzione locale definita dall'utente per mappare gli eventi agli indici.
function process(inJson) {
const obj = JSON.parse(inJson);
// Set index programmatically for data segregation in Splunk
obj._metadata = {
index: splunkIndexLookup(obj)
}
return JSON.stringify(obj);
}
L'esempio successivo instrada gli eventi non riconosciuti alla coda dei messaggi non recapitabili, presupponendo che il modello supporta una coda di messaggi non recapitabili. (Ad esempio, vedi il Da Pub/Sub a JDBC template.) Puoi utilizzare questo pattern per filtrare le voci impreviste prima di scrivere nella destinazione.
function process(inJson) {
const data = JSON.parse(inJson);
// Route unrecognized events to the deadletter topic
if (!data.hasOwnProperty('severity')) {
throw new Error("Unrecognized event. eventId='" + data.Id + "'");
}
return JSON.stringify(data);
Filtra eventi
Utilizza una UDF per filtrare gli eventi indesiderati o non riconosciuti dall'output.
Nell'esempio seguente vengono eliminati gli eventi in cui data.severity
è uguale a "DEBUG"
.
function process(inJson) {
const data = JSON.parse(inJson);
// Drop events with certain field values
if (data.severity == "DEBUG") {
return null;
}
return JSON.stringify(data);
}
Passaggi successivi
- Modelli forniti da Google
- Creazione ed esecuzione di un modello flessibile
- Esecuzione di modelli classici
- Estendi il tuo modello Dataflow con le funzioni definite dall'utente (post del blog)
- UDF di esempio (GitHub)