Creazione di funzioni definite dall'utente per i modelli Dataflow

Alcuni modelli Dataflow forniti da Google supportano le funzioni definite dall'utente. Le funzioni definite dall'utente consentono di estendere la funzionalità di un modello senza modificare il codice del modello.

Panoramica

Per creare una funzione definita dall'utente, scrivi una funzione JavaScript o Python, a seconda del modello. Puoi archiviare il file di codice della funzione definita dall'utente in Cloud Storage e specificare la posizione come parametro di modello. Per ogni elemento di input, il modello chiama la funzione. La funzione trasforma l'elemento o esegue un'altra logica personalizzata e restituisce il risultato al modello.

Ad esempio, potresti utilizzare una funzione definita dall'utente per:

  • Riformatta i dati di input in modo che corrispondano a uno schema di destinazione.
  • Oscura i dati sensibili.
  • Filtra alcuni elementi dall'output.

L'input della funzione funzione definita dall'utente è un singolo elemento di dati, serializzato come stringa JSON. La funzione restituisce una stringa JSON serializzata come output. Il formato dei dati dipende dal modello. Ad esempio, nel modello di sottoscrizione Pub/Sub a BigQuery, l'input è i dati del messaggio Pub/Sub serializzati come un oggetto JSON e l'output è un oggetto JSON serializzato che rappresenta una riga di tabella BigQuery. Per ulteriori informazioni, consulta la documentazione relativa a ciascun modello.

Esegui un modello con una funzione definita dall'utente

Per eseguire un modello con una funzione definita dall'utente, devi specificare il percorso Cloud Storage del file JavaScript e il nome della funzione come parametri del modello.

Con alcuni modelli forniti da Google, puoi anche creare la funzione definita dall'utente direttamente nella console Google Cloud, come segue:

  1. Vai alla pagina Dataflow nella console Google Cloud.

    Vai alla pagina Dataflow

  2. Fai clic su Crea job da modello.

  3. Seleziona il modello fornito da Google che vuoi eseguire.

  4. Espandi Parametri facoltativi. Se il modello supporta le funzioni definite dall'utente, ha un parametro per la località Cloud Storage della funzione definita dall'utente e un altro parametro per il nome della funzione.

  5. Accanto al parametro del modello, fai clic su Crea funzione definita dall'utente.

  6. Nel riquadro Seleziona o crea una funzione definita dall'utente:

    1. Inserisci un nome file. Esempio: my_udf.js.
    2. Seleziona una cartella di Cloud Storage. Esempio: gs://your-bucket/your-folder.
    3. Utilizza l'editor di codice incorporato per scrivere la funzione. L'editor è precompilato con codice boilerplate che puoi utilizzare come punto di partenza.
    4. Fai clic su Crea funzione definita dall'utente.

      La console Google Cloud salva il file UDF e compila il percorso di Cloud Storage.

    5. Inserisci il nome della funzione nel campo corrispondente.

Scrivi una funzione JavaScript definita dall'utente

Il seguente codice mostra una funzione JavaScript no-op da cui puoi iniziare:

/*
 * @param {string} inJson input JSON message (stringified)
 * @return {?string} outJson output JSON message (stringified)
 */
function process(inJson) {
  const obj = JSON.parse(inJson);

  // Example data transformations:
  // Add a field: obj.newField = 1;
  // Modify a field: obj.existingField = '';
  // Filter a record: return null;

  return JSON.stringify(obj);
}

Il codice JavaScript viene eseguito sul motore JavaScript Nashorn. Ti consigliamo di testare la funzione definita dall'utente sul motore Nashorn prima di eseguirne il deployment. Il motore Nashorn non corrisponde esattamente all'implementazione Node.js di JavaScript. Un problema comune è l'utilizzo di console.log() o Number.isNaN(), nessuno dei due è definito nel motore Nashorn.

Puoi testare la funzione definita dall'utente sul motore Nashorn utilizzando Cloud Shell, in cui JDK 11 è preinstallato. Avvia Nashorn in modalità interattiva come segue:

jjs

Nella shell interattiva di Nashorn, esegui questi passaggi:

  1. Chiama load per caricare il file JavaScript della funzione definita dall'utente.
  2. Definisci un oggetto JSON di input in base ai messaggi previsti della pipeline.
  3. Usa la funzione JSON.stringify per serializzare l'input in una stringa JSON.
  4. Chiama la funzione UDF per elaborare la stringa JSON.
  5. Chiama JSON.parse per deserializzare l'output.
  6. Verifica il risultato.

Esempio:

> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)

Scrivi una funzione definita dall'utente Python

Il seguente codice mostra una funzione definita dall'utente Python no-op da cui puoi iniziare:

import json
def process(value):
  # Load the JSON string into a dictionary.
  data = json.loads(value)

  # Transform the data in some way.
  data['new_field'] = 'new_value'

  # Serialize the data back to JSON.
  return json.dumps(data)

Le funzioni definite dall'utente Python supportano pacchetti di dipendenze standard per Python e Apache Beam. Non possono utilizzare pacchetti di terze parti.

Gestione degli errori

In genere, quando si verifica un errore durante l'esecuzione della funzione definita dall'utente, l'errore viene scritto in una posizione a messaggi non recapitabili. I dettagli dipendono dal modello. Ad esempio, il modello di sottoscrizione Pub/Sub a BigQuery crea una tabella _error_records e scrive gli errori lì. Gli errori delle funzioni definite dall'utente di runtime possono verificarsi a causa di errori di sintassi o di eccezioni non rilevate. Per verificare la presenza di errori di sintassi, testa la funzione definita dall'utente localmente.

Puoi generare in modo programmatico un'eccezione per un elemento che non deve essere elaborato. Nel caso in cui l'elemento venga scritto nella posizione dei messaggi non recapitabili, se il modello ne supporta uno. Per un esempio che mostra questo approccio, consulta Eventi di routing.

Esempi di casi d'uso

Questa sezione descrive alcuni modelli comuni per le funzioni definite dall'utente, basati su casi d'uso reali.

Arricchisci gli eventi

Utilizza una funzione definita dall'utente per arricchire gli eventi con nuovi campi per fornire informazioni più contestuali.

Esempio:

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Add new field to track data source
  data.source = "source1";
  return JSON.stringify(data);
}

Trasforma gli eventi

Utilizza una funzione definita dall'utente per trasformare l'intero formato dell'evento in base a ciò che la destinazione si aspetta.

L'esempio seguente ripristina una voce di log di Cloud Logging (LogEntry) alla stringa di log originale, se disponibile. A seconda dell'origine log, la stringa di log originale a volte viene compilata nel campo textPayload. Potresti utilizzare questo pattern per inviare i log non elaborati nel formato originale, invece di inviare l'intero LogEntry da Cloud Logging.

 function process(inJson) {
  const data = JSON.parse(inJson);

  if (data.textPayload) {
    return data.textPayload; // Return string value, and skip JSON.stringify
  }
 return JSON.stringify(obj);
}

Oscurare o rimuovere i dati sugli eventi

Utilizza una funzione definita dall'utente per oscurare o rimuovere una parte dell'evento.

L'esempio seguente oscura il nome del campo sensitiveField sostituendo il relativo valore e rimuove completamente il campo denominato redundantField.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Normalize existing field values
  data.source = (data.source && data.source.toLowerCase()) || "unknown";

  // Redact existing field values
  if (data.sensitiveField) {
    data.sensitiveField = "REDACTED";
  }

  // Remove existing fields
  if (data.redundantField) {
    delete(data.redundantField);
  }

  return JSON.stringify(data);
}

Eventi del percorso

Utilizza una funzione definita dall'utente per indirizzare gli eventi a destinazioni separate nel sink downstream.

L'esempio seguente, basato sul modello Pub/Sub a Splunk, instrada ogni evento all'indice Splunk corretto. Richiama una funzione locale definita dall'utente per mappare gli eventi agli indici.

function process(inJson) {
  const obj = JSON.parse(inJson);

  // Set index programmatically for data segregation in Splunk
  obj._metadata = {
    index: splunkIndexLookup(obj)
  }
  return JSON.stringify(obj);
}

L'esempio successivo indirizza gli eventi non riconosciuti alla coda dei messaggi non recapitabili, supponendo che il modello supporti una coda di messaggi non recapitabili. Ad esempio, vedi il modello Da Pub/Sub a JDBC. Puoi utilizzare questo pattern per filtrare le voci impreviste prima di scrivere nella destinazione.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Route unrecognized events to the deadletter topic
  if (!data.hasOwnProperty('severity')) {
    throw new Error("Unrecognized event. eventId='" + data.Id + "'");
  }

  return JSON.stringify(data);

Filtra eventi

Utilizza una funzione definita dall'utente per filtrare gli eventi indesiderati o non riconosciuti dall'output.

Nell'esempio seguente vengono eliminati gli eventi in cui data.severity è uguale a "DEBUG".

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Drop events with certain field values
  if (data.severity == "DEBUG") {
    return null;
  }

  return JSON.stringify(data);
}

Passaggi successivi