Certains modèles Dataflow fournis par Google sont compatibles avec les fonctions définies par l'utilisateur (user-defined functions, UDF). Les UDF vous permettent d'étendre les fonctionnalités d'un modèle sans modifier le code du modèle.
Présentation
Pour créer une UDF, vous devez écrire une fonction JavaScript ou Python, selon le modèle. Vous stockez le fichier de code de la fonction définie par l'utilisateur dans Cloud Storage et spécifiez l'emplacement en tant que paramètre de modèle. Pour chaque élément d'entrée, le modèle appelle votre fonction. La fonction transforme l'élément ou exécute une autre logique personnalisée, puis renvoie le résultat au modèle.
Par exemple, vous pouvez utiliser une fonction définie par l'utilisateur pour:
- Reformater les données d'entrée pour qu'elles correspondent à un schéma cible.
- Masquer les données sensibles.
- Filtrer certains éléments de la sortie.
L'entrée de la fonction définie par l'utilisateur est constituée d'un élément de données unique, sérialisé en tant que chaîne JSON. La fonction renvoie en sortie une chaîne JSON sérialisée. Le format des données dépend du modèle. Par exemple, dans le modèle Abonnement Pub/Sub vers BigQuery, l'entrée est constituée par les données du message Pub/Sub sérialisées en tant qu'objet JSON, et la sortie est un objet JSON sérialisé représentant une ligne de table BigQuery. Pour en savoir plus, consultez la documentation de chaque modèle.
Exécuter un modèle avec une fonction définie par l'utilisateur
Pour exécuter un modèle avec une fonction définie par l'utilisateur, vous devez spécifier l'emplacement Cloud Storage du fichier JavaScript et le nom de la fonction en tant que paramètres de modèle.
Avec certains modèles fournis par Google, vous pouvez également créer la fonction définie par l'utilisateur directement dans la console Google Cloud, comme suit:
Accédez à la page Dataflow dans la console Google Cloud.
Cliquez sur add_boxCreate job from template(Créer une tâche à partir d'un modèle).
Sélectionnez le modèle fourni par Google que vous souhaitez exécuter.
Développez la section Paramètres facultatifs. Si le modèle est compatible avec les fonctions définies par l'utilisateur, il possède un paramètre pour l'emplacement Cloud Storage de l'UDF et un autre paramètre pour le nom de la fonction.
À côté du paramètre de modèle, cliquez sur Créer une UDF.
Dans le panneau Sélectionner ou créer une fonction définie par l'utilisateur (UDF) :
- Saisissez un nom de fichier. Exemple :
my_udf.js
. - Sélectionnez un dossier Cloud Storage.
Exemple :
gs://your-bucket/your-folder
. - Écrivez la fonction à l'aide de l'éditeur de code intégré. L'éditeur est prérempli avec du code récurrent que vous pouvez utiliser comme point de départ.
Cliquez sur Créer une UDF.
La console Google Cloud enregistre le fichier UDF et renseigne l'emplacement Cloud Storage.
Saisissez le nom de votre fonction dans le champ correspondant.
- Saisissez un nom de fichier. Exemple :
Écrire une fonction JavaScript définie par l'utilisateur
Le code suivant montre une UDF JavaScript no-op à partir de laquelle vous pouvez démarrer:
/*
* @param {string} inJson input JSON message (stringified)
* @return {?string} outJson output JSON message (stringified)
*/
function process(inJson) {
const obj = JSON.parse(inJson);
// Example data transformations:
// Add a field: obj.newField = 1;
// Modify a field: obj.existingField = '';
// Filter a record: return null;
return JSON.stringify(obj);
}
Le code JavaScript s'exécute sur le moteur JavaScript Nashorn. Nous vous recommandons de tester votre fonction définie par l'utilisateur sur le moteur Nashorn avant de la déployer. Le moteur Nashorn ne correspond pas exactement à l'implémentation Node.js de JavaScript. Un problème courant est l'utilisation de console.log()
ou Number.isNaN()
, qui ne sont pas définis dans le moteur Nashorn.
Vous pouvez tester votre fonction définie par l'utilisateur sur le moteur Nashorn en utilisant Cloud Shell, sur lequel JDK 11 est préinstallé. Lancez Nashorn en mode interactif comme suit :
jjs --language=es6
Dans le shell interactif Nashorn, procédez comme suit :
- Appelez
load
pour charger votre fichier JavaScript des fonctions définies par l'utilisateur. - Définissez un objet JSON d'entrée en fonction des messages attendus par votre pipeline.
- Utilisez la fonction
JSON.stringify
pour sérialiser l'entrée en une chaîne JSON. - Appelez votre fonction définie par l'utilisateur pour traiter la chaîne JSON.
- Appelez
JSON.parse
pour désérialiser la sortie. - Vérifiez le résultat.
Exemple :
> load('my_udf.js')
> var input = {"name":"user1"}
> var output = process(JSON.stringify(input))
> print(output)
Écrire une UDF Python
Le code suivant montre une UDF Python no-op à partir de laquelle vous pouvez démarrer:
import json
def process(value):
# Load the JSON string into a dictionary.
data = json.loads(value)
# Transform the data in some way.
data['new_field'] = 'new_value'
# Serialize the data back to JSON.
return json.dumps(data)
Les fonctions Python définies par l'utilisateur sont compatibles avec les packages de dépendances standards pour Python et Apache Beam. Ils ne peuvent pas utiliser de packages tiers.
Gestion des exceptions
En règle générale, lorsqu'une erreur se produit lors de l'exécution de la fonction définie par l'utilisateur, elle est écrite dans un emplacement de lettres mortes. Les détails dépendent du modèle. Par exemple, le modèle Abonnement Pub/Sub vers BigQuery crée une table _error_records
et y écrit les erreurs. Des erreurs UDF de l'environnement d'exécution peuvent se produire en raison d'erreurs de syntaxe ou d'exceptions non interceptées. Pour rechercher les erreurs de syntaxe, testez votre fonction définie par l'utilisateur localement.
Vous pouvez générer une exception de manière automatisée pour un élément qui ne doit pas être traité. Dans ce cas, l'élément est écrit à l'emplacement de lettres mortes, si le modèle le permet. Pour obtenir un exemple illustrant cette approche, consultez la section Événements de routage.
Exemples de cas d'utilisation
Cette section décrit certains modèles courants de fonctions définies par l'utilisateur, basés sur des cas d'utilisation réels.
Enrichir des événements
Utilisez une fonction définie par l'utilisateur pour enrichir les événements avec de nouveaux champs afin d'obtenir plus d'informations contextuelles.
Exemple :
function process(inJson) {
const data = JSON.parse(inJson);
// Add new field to track data source
data.source = "source1";
return JSON.stringify(data);
}
Événements de transformation
Utilisez une fonction définie par l'utilisateur pour transformer l'intégralité du format d'événement en fonction de ce que votre destination attend.
L'exemple suivant rétablit une entrée de journal Cloud Logging (LogEntry
) sur la chaîne de journal d'origine lorsqu'elle est disponible. (Selon la source du journal, la chaîne de journal d'origine est parfois renseignée dans le champ textPayload
.) Vous pouvez utiliser ce modèle pour envoyer les journaux bruts dans leur format d'origine, au lieu d'envoyer l'intégralité du fichier LogEntry
depuis Cloud Logging.
function process(inJson) {
const data = JSON.parse(inJson);
if (data.textPayload) {
return data.textPayload; // Return string value, and skip JSON.stringify
}
return JSON.stringify(obj);
}
Masquer ou supprimer des données d'événement
Utilisez une fonction définie par l'utilisateur pour masquer ou supprimer une partie de l'événement.
L'exemple suivant masque le nom de champ sensitiveField
en remplaçant sa valeur, et supprime entièrement le champ nommé redundantField
.
function process(inJson) {
const data = JSON.parse(inJson);
// Normalize existing field values
data.source = (data.source && data.source.toLowerCase()) || "unknown";
// Redact existing field values
if (data.sensitiveField) {
data.sensitiveField = "REDACTED";
}
// Remove existing fields
if (data.redundantField) {
delete(data.redundantField);
}
return JSON.stringify(data);
}
Événements de routage
Utilisez une fonction définie par l'utilisateur pour acheminer des événements vers des destinations distinctes dans le récepteur en aval.
L'exemple suivant, basé sur le modèle Pub/Sub vers Splunk, achemine chaque événement vers l'index Splunk approprié. Il appelle une fonction locale définie par l'utilisateur pour mapper les événements sur des index.
function process(inJson) {
const obj = JSON.parse(inJson);
// Set index programmatically for data segregation in Splunk
obj._metadata = {
index: splunkIndexLookup(obj)
}
return JSON.stringify(obj);
}
L'exemple suivant achemine les événements non reconnus vers la file d'attente de lettres mortes, en supposant que le modèle est compatible avec une file d'attente de lettres mortes. (Par exemple, consultez le modèle Pub/Sub vers JDBC.) Vous pouvez utiliser ce modèle pour filtrer les entrées inattendues avant d'écrire dans la destination.
function process(inJson) {
const data = JSON.parse(inJson);
// Route unrecognized events to the deadletter topic
if (!data.hasOwnProperty('severity')) {
throw new Error("Unrecognized event. eventId='" + data.Id + "'");
}
return JSON.stringify(data);
Filtrer les événements
Utilisez une fonction définie par l'utilisateur pour filtrer les événements indésirables ou non reconnus à partir de la sortie.
L'exemple suivant ignore les événements pour lesquels data.severity
est égal à "DEBUG"
.
function process(inJson) {
const data = JSON.parse(inJson);
// Drop events with certain field values
if (data.severity == "DEBUG") {
return null;
}
return JSON.stringify(data);
}
Étapes suivantes
- Modèles fournis par Google
- Créer et exécuter un modèle Flex
- Exécuter des modèles classiques
- Étendre votre modèle Dataflow avec des fonctions définies par l'utilisateur (article de blog)
- Exemples de fonctions définies par l'utilisateur (GitHub)