Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen

Einige von Google bereitgestellte Dataflow-Vorlagen unterstützen benutzerdefinierte Funktionen (User-Defined Functions, UDFs). Mit UDFs können Sie die Funktionalität einer Vorlage erweitern, ohne den Vorlagencode zu ändern.

Überblick

Zum Erstellen einer UDF schreiben Sie je nach Vorlage eine JavaScript-Funktion oder eine Python-Funktion. Sie speichern die UDF-Codedatei in Cloud Storage und geben den Speicherort als Vorlagenparameter an. Für jedes Eingabeelement ruft die Vorlage Ihre Funktion auf. Die Funktion transformiert das Element oder führt andere benutzerdefinierte Logik aus und gibt das Ergebnis an die Vorlage zurück.

Sie können eine UDF beispielsweise für Folgendes verwenden:

  • Formatieren Sie die Eingabedaten so, dass sie mit einem Zielschema übereinstimmen.
  • Entfernen Sie sensible Daten.
  • Einige Elemente aus der Ausgabe filtern.

Die Eingabe für die UDF-Funktion ist ein einzelnes Datenelement, das als JSON-String serialisiert wird. Die Funktion gibt einen serialisierten JSON-String als Ausgabe zurück. Das Datenformat hängt von der Vorlage ab. In der Vorlage Pub/Sub-Abo für BigQuery ist die Eingabe beispielsweise die Pub/Sub-Nachrichtendaten, die als JSON-Objekt serialisiert sind, und die Ausgabe ist ein serialisiertes JSON-Objekt, das eine BigQuery-Tabellenzeile darstellt. Weitere Informationen finden Sie in der Dokumentation zu den einzelnen Vorlagen.

Vorlage mit einer UDF ausführen

Zum Ausführen einer Vorlage mit einer UDF geben Sie den Cloud Storage-Speicherort der JavaScript-Datei und den Namen der Funktion als Vorlagenparameter an.

Mit einigen von Google bereitgestellten Vorlagen können Sie die UDF auch direkt in der Google Cloud Console erstellen:

  1. Rufen Sie in der Google Cloud Console die Seite "Dataflow" auf.

    Zur Seite "Dataflow"

  2. Klicken Sie auf Job aus Vorlage erstellen.

  3. Wählen Sie die von Google bereitgestellte Vorlage aus, die Sie ausführen möchten.

  4. Maximieren Sie Optionale Parameter. Wenn die Vorlage UDFs unterstützt, hat sie einen Parameter für den Cloud Storage-Speicherort der UDF und einen weiteren Parameter für den Funktionsnamen.

  5. Klicken Sie neben dem Vorlagenparameter auf UDF erstellen.

  6. Im Bereich Benutzerdefinierte Funktion (UDF) auswählen oder erstellen:

    1. Geben Sie einen Dateinamen ein. Beispiel: my_udf.js.
    2. Wählen Sie einen Cloud Storage-Ordner aus. Beispiel: gs://your-bucket/your-folder.
    3. Verwenden Sie den Inline-Code-Editor, um die Funktion zu schreiben. Der Editor ist bereits mit Boilerplate-Code ausgefüllt, den Sie als Ausgangspunkt verwenden können.
    4. Klicken Sie auf UDF erstellen.

      Die Google Cloud Console speichert die UDF-Datei und füllt den Cloud Storage-Speicherort aus.

    5. Geben Sie den Namen der Funktion in das entsprechende Feld ein.

JavaScript-UDF schreiben

Der folgende Code zeigt eine managementfreie JavaScript-UDF, von der aus Sie beginnen können:

/*
 * @param {string} inJson input JSON message (stringified)
 * @return {?string} outJson output JSON message (stringified)
 */
function process(inJson) {
  const obj = JSON.parse(inJson);

  // Example data transformations:
  // Add a field: obj.newField = 1;
  // Modify a field: obj.existingField = '';
  // Filter a record: return null;

  return JSON.stringify(obj);
}

Der JavaScript-Code wird auf der Nashorn-JavaScript-Engine ausgeführt. Wir empfehlen Ihnen, Ihre UDF auf der Nashorn-Engine zu testen, bevor Sie sie bereitstellen. Die Nashorn-Engine stimmt nicht genau mit der Node.js-Implementierung von JavaScript überein. Ein häufiges Problem besteht in der Verwendung von console.log() oder Number.isNaN(). Beide sind nicht in der Nashorn-Engine definiert.

Sie können Ihre UDF auf der Nashorn-Engine mit Cloud Shell testen, in der JDK 11 vorinstalliert ist. Starten Sie Nashorn so im interaktiven Modus:

jjs

Führen Sie in der interaktiven Shell die folgenden Schritte aus:

  1. Rufen Sie load auf, um die UDF-JavaScript-Datei zu laden.
  2. Definieren Sie ein JSON-Eingabeobjekt abhängig von den erwarteten Nachrichten Ihrer Pipeline.
  3. Mit der Funktion JSON.stringify können Sie die Eingabe in einem JSON-String serialisieren.
  4. Rufen Sie Ihre UDF-Funktion auf, um den JSON-String zu verarbeiten.
  5. Rufen Sie JSON.parse auf, um die Ausgabe zu deserialisieren.
  6. Prüfen Sie das Ergebnis

Beispiel:

> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)

Python-UDF schreiben

Der folgende Code zeigt eine managementfreie Python-UDF, von der aus Sie beginnen können:

import json
def process(value):
  # Load the JSON string into a dictionary.
  data = json.loads(value)

  # Transform the data in some way.
  data['new_field'] = 'new_value'

  # Serialize the data back to JSON.
  return json.dumps(data)

Python-UDFs unterstützen Abhängigkeitspakete, die Standard für Python und Apache Beam sind. Eine Verwendung von Drittanbieterpaketen ist nicht möglich.

Fehlerbehandlung

Wenn ein Fehler während der UDF-Ausführung auftritt, wird der Fehler in der Regel an einen Speicherort für unzustellbare Nachrichten geschrieben. Die Details hängen von der Vorlage ab. Die Vorlage Pub/Sub-Abo für BigQuery erstellt beispielsweise die Tabelle _error_records und schreibt dort Fehler. Laufzeit-UDF-Fehler können aufgrund von Syntaxfehlern oder nicht abgefangenen Ausnahmen auftreten. Testen Sie Ihre UDF lokal auf Syntaxfehler.

Sie können programmatisch eine Ausnahme für ein Element auslösen, das nicht verarbeitet werden soll. In diesem Fall wird das Element an die Position für unzustellbare Nachrichten geschrieben, wenn die Vorlage eine unterstützt. Ein Beispiel für diesen Ansatz finden Sie unter Routenereignisse.

Beispielanwendungsfälle

In diesem Abschnitt werden einige gängige Muster für UDFs beschrieben, die auf realen Anwendungsfällen basieren.

Ereignisse anreichern

Verwenden Sie eine UDF, um Ereignisse mit neuen Feldern anzureichern, um mehr Kontextinformationen zu erhalten.

Beispiel:

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Add new field to track data source
  data.source = "source1";
  return JSON.stringify(data);
}

Transformationsereignisse

Verwenden Sie eine UDF, um das gesamte Ereignisformat abhängig davon zu transformieren, was Ihr Ziel erwartet.

Im folgenden Beispiel wird ein Cloud Logging-Logeintrag (LogEntry) auf den ursprünglichen Logstring zurückgesetzt, falls verfügbar. Je nach Logquelle wird der ursprüngliche Logstring manchmal im Feld textPayload ausgefüllt. Sie können dieses Muster verwenden, um die unbearbeiteten Logs in ihrem ursprünglichen Format zu senden, anstatt die gesamte LogEntry aus Cloud Logging zu senden.

 function process(inJson) {
  const data = JSON.parse(inJson);

  if (data.textPayload) {
    return data.textPayload; // Return string value, and skip JSON.stringify
  }
 return JSON.stringify(obj);
}

Ereignisdaten entfernen oder entfernen

Verwenden Sie eine UDF, um einen Teil des Ereignisses zu entfernen oder zu entfernen.

Im folgenden Beispiel wird der Feldname sensitiveField durch Ersetzen des Werts entfernt und das Feld redundantField wird vollständig entfernt.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Normalize existing field values
  data.source = (data.source && data.source.toLowerCase()) || "unknown";

  // Redact existing field values
  if (data.sensitiveField) {
    data.sensitiveField = "REDACTED";
  }

  // Remove existing fields
  if (data.redundantField) {
    delete(data.redundantField);
  }

  return JSON.stringify(data);
}

Routen-Ereignisse

Verwenden Sie eine UDF, um Ereignisse an separate Ziele in der nachgelagerten Senke weiterzuleiten.

Das folgende Beispiel basiert auf der Vorlage Pub/Sub zu Splunk und leitet jedes Ereignis an den richtigen Splunk-Index weiter. Sie ruft eine benutzerdefinierte lokale Funktion auf, um Ereignisse zu Indexen zuzuordnen.

function process(inJson) {
  const obj = JSON.parse(inJson);

  // Set index programmatically for data segregation in Splunk
  obj._metadata = {
    index: splunkIndexLookup(obj)
  }
  return JSON.stringify(obj);
}

Im nächsten Beispiel werden nicht erkannte Ereignisse an die Warteschlange für unzustellbare Nachrichten weitergeleitet, vorausgesetzt, die Vorlage unterstützt eine Warteschlange für unzustellbare Nachrichten. Weitere Informationen finden Sie beispielsweise in der Vorlage Pub/Sub zu JDBC. Mit diesem Muster können Sie unerwartete Einträge herausfiltern, bevor Sie in das Ziel schreiben.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Route unrecognized events to the deadletter topic
  if (!data.hasOwnProperty('severity')) {
    throw new Error("Unrecognized event. eventId='" + data.Id + "'");
  }

  return JSON.stringify(data);

Ereignisse filtern

Verwenden Sie eine UDF, um unerwünschte oder nicht erkannte Ereignisse aus der Ausgabe zu filtern.

Im folgenden Beispiel werden Ereignisse gelöscht, bei denen data.severity gleich "DEBUG" ist.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Drop events with certain field values
  if (data.severity == "DEBUG") {
    return null;
  }

  return JSON.stringify(data);
}

Nächste Schritte