Créer des fonctions définies par l'utilisateur pour les modèles Dataflow

Certains modèles Dataflow fournis par Google sont compatibles avec les fonctions définies par l'utilisateur (user-defined functions, UDF). Les UDF vous permettent d'étendre les fonctionnalités d'un modèle sans modifier le code du modèle.

Présentation

Pour créer une UDF, vous devez écrire une fonction JavaScript ou Python, selon le modèle. Vous stockez le fichier de code de la fonction définie par l'utilisateur dans Cloud Storage et spécifiez l'emplacement en tant que paramètre de modèle. Pour chaque élément d'entrée, le modèle appelle votre fonction. La fonction transforme l'élément ou exécute une autre logique personnalisée, puis renvoie le résultat au modèle.

Par exemple, vous pouvez utiliser une fonction définie par l'utilisateur pour:

  • Reformater les données d'entrée pour qu'elles correspondent à un schéma cible.
  • Masquer les données sensibles.
  • Filtrer certains éléments de la sortie.

L'entrée de la fonction définie par l'utilisateur est constituée d'un élément de données unique, sérialisé en tant que chaîne JSON. La fonction renvoie en sortie une chaîne JSON sérialisée. Le format des données dépend du modèle. Par exemple, dans le modèle Abonnement Pub/Sub vers BigQuery, l'entrée est constituée par les données du message Pub/Sub sérialisées en tant qu'objet JSON, et la sortie est un objet JSON sérialisé représentant une ligne de table BigQuery. Pour en savoir plus, consultez la documentation de chaque modèle.

Exécuter un modèle avec une fonction définie par l'utilisateur

Pour exécuter un modèle avec une fonction définie par l'utilisateur, vous devez spécifier l'emplacement Cloud Storage du fichier JavaScript et le nom de la fonction comme paramètres du modèle.

Avec certains modèles fournis par Google, vous pouvez également créer la fonction définie par l'utilisateur directement dans la console Google Cloud, comme suit:

  1. Accédez à la page Dataflow dans la console Google Cloud.

    Accéder à la page Dataflow

  2. Cliquez sur Create job from template(Créer une tâche à partir d'un modèle).

  3. Sélectionnez le modèle fourni par Google que vous souhaitez exécuter.

  4. Développez la section Paramètres facultatifs. Si le modèle est compatible avec les fonctions définies par l'utilisateur, il possède un paramètre pour l'emplacement Cloud Storage de l'UDF et un autre paramètre pour le nom de la fonction.

  5. À côté du paramètre de modèle, cliquez sur Créer une UDF.

  6. Dans le panneau Select or Create a User-Defined Function (UDF):

    1. Saisissez un nom de fichier. Exemple : my_udf.js
    2. Sélectionnez un dossier Cloud Storage. Exemple : gs://your-bucket/your-folder
    3. Utilisez l'éditeur de code intégré pour écrire la fonction. L'éditeur est prérempli avec du code récurrent que vous pouvez utiliser comme point de départ.
    4. Cliquez sur Create UDF (Créer une fonction définie par l'utilisateur).

      La console Google Cloud enregistre le fichier de fonction définie par l'utilisateur et renseigne l'emplacement Cloud Storage.

    5. Saisissez le nom de votre fonction dans le champ correspondant.

Écrire une fonction JavaScript définie par l'utilisateur

Le code suivant montre une UDF JavaScript no-op à partir de laquelle vous pouvez démarrer:

/*
 * @param {string} inJson input JSON message (stringified)
 * @return {?string} outJson output JSON message (stringified)
 */
function process(inJson) {
  const obj = JSON.parse(inJson);

  // Example data transformations:
  // Add a field: obj.newField = 1;
  // Modify a field: obj.existingField = '';
  // Filter a record: return null;

  return JSON.stringify(obj);
}

Le code JavaScript s'exécute sur le moteur JavaScript Nashorn. Nous vous recommandons de tester votre fonction définie par l'utilisateur sur le moteur Nashorn avant de la déployer. Le moteur Nashorn ne correspond pas exactement à la mise en œuvre Node.js de JavaScript. Un problème courant est l'utilisation de console.log() ou Number.isNaN(), qui ne sont pas définis dans le moteur Nashorn.

Vous pouvez tester votre fonction définie par l'utilisateur sur le moteur Nashorn en utilisant Cloud Shell, sur lequel JDK 11 est préinstallé. Lancez Nashorn en mode interactif, comme suit:

jjs

Dans l'interface système interactive Nashorn, procédez comme suit:

  1. Appelez load pour charger votre fichier JavaScript UDF.
  2. Définissez un objet JSON d'entrée en fonction des messages attendus de votre pipeline.
  3. Utilisez la fonction JSON.stringify pour sérialiser l'entrée sur une chaîne JSON.
  4. Appelez votre fonction définie par l'utilisateur pour traiter la chaîne JSON.
  5. Appelez JSON.parse pour désérialiser le résultat.
  6. Vérifiez le résultat.

Exemple :

> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)

Écrire une UDF Python

Le code suivant montre une UDF Python no-op à partir de laquelle vous pouvez démarrer:

import json
def process(value):
  # Load the JSON string into a dictionary.
  data = json.loads(value)

  # Transform the data in some way.
  data['new_field'] = 'new_value'

  # Serialize the data back to JSON.
  return json.dumps(data)

Les fonctions Python définies par l'utilisateur sont compatibles avec les packages de dépendances standards pour Python et Apache Beam. Ils ne peuvent pas utiliser de packages tiers.

Gestion des exceptions

En règle générale, lorsqu'une erreur se produit pendant l'exécution d'une fonction définie par l'utilisateur, elle est écrite dans un emplacement de lettre morte. Les détails dépendent du modèle. Par exemple, le modèle Abonnement Pub/Sub vers BigQuery crée une table _error_records et y écrit les erreurs. Des erreurs UDF de l'environnement d'exécution peuvent se produire en raison d'erreurs de syntaxe ou d'exceptions non interceptées. Pour rechercher les erreurs de syntaxe, testez votre fonction définie par l'utilisateur localement.

Vous pouvez générer par programmation une exception pour un élément qui ne doit pas être traité. Dans ce cas, l'élément est écrit dans l'emplacement des lettres mortes, si le modèle le permet. Pour obtenir un exemple illustrant cette approche, consultez la section Événements de routage.

Exemples de cas d'utilisation

Cette section décrit certains modèles courants de fonctions définies par l'utilisateur, en fonction de cas d'utilisation réels.

Enrichir des événements

Enrichissez vos événements avec de nouveaux champs pour obtenir des informations plus contextuelles à l'aide d'une fonction définie par l'utilisateur.

Exemple :

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Add new field to track data source
  data.source = "source1";
  return JSON.stringify(data);
}

Événements de transformation

Utilisez une fonction définie par l'utilisateur pour transformer l'intégralité du format de l'événement en fonction de votre destination.

L'exemple suivant rétablit une entrée de journal Cloud Logging (LogEntry) sur la chaîne de journal d'origine lorsqu'elle est disponible. (Selon la source du journal, la chaîne de journal d'origine est parfois renseignée dans le champ textPayload.) Vous pouvez utiliser ce modèle pour envoyer les journaux bruts dans leur format d'origine, au lieu d'envoyer l'intégralité de LogEntry à partir de Cloud Logging.

 function process(inJson) {
  const data = JSON.parse(inJson);

  if (data.textPayload) {
    return data.textPayload; // Return string value, and skip JSON.stringify
  }
 return JSON.stringify(obj);
}

Masquer ou supprimer des données d'événement

Utilisez une fonction définie par l'utilisateur pour masquer ou supprimer une partie de l'événement.

L'exemple suivant masque le nom de champ sensitiveField en remplaçant sa valeur et supprime complètement le champ nommé redundantField.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Normalize existing field values
  data.source = (data.source && data.source.toLowerCase()) || "unknown";

  // Redact existing field values
  if (data.sensitiveField) {
    data.sensitiveField = "REDACTED";
  }

  // Remove existing fields
  if (data.redundantField) {
    delete(data.redundantField);
  }

  return JSON.stringify(data);
}

Événements de routage

Utilisez une fonction définie par l'utilisateur pour acheminer les événements vers des destinations distinctes dans le récepteur en aval.

L'exemple suivant, basé sur le modèle Pub/Sub vers Splunk, achemine chaque événement vers l'index Splunk approprié. Elle appelle une fonction locale définie par l'utilisateur pour mapper les événements aux index.

function process(inJson) {
  const obj = JSON.parse(inJson);

  // Set index programmatically for data segregation in Splunk
  obj._metadata = {
    index: splunkIndexLookup(obj)
  }
  return JSON.stringify(obj);
}

L'exemple suivant achemine les événements non reconnus vers la file d'attente de lettres mortes, en supposant que le modèle accepte une file d'attente de lettres mortes. (Par exemple, consultez le modèle Pub/Sub vers JDBC.) Vous pouvez utiliser ce modèle pour filtrer les entrées inattendues avant d'écrire dans la destination.

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Route unrecognized events to the deadletter topic
  if (!data.hasOwnProperty('severity')) {
    throw new Error("Unrecognized event. eventId='" + data.Id + "'");
  }

  return JSON.stringify(data);

Filtrer les événements

Utilisez une fonction définie par l'utilisateur pour filtrer les événements indésirables ou non reconnus de la sortie.

L'exemple suivant supprime les événements pour lesquels data.severity correspond à "DEBUG".

 function process(inJson) {
  const data = JSON.parse(inJson);

  // Drop events with certain field values
  if (data.severity == "DEBUG") {
    return null;
  }

  return JSON.stringify(data);
}

Étapes suivantes