Modèle Pub/Sub vers Elasticsearch

Le modèle Pub/Sub vers Elasticsearch est un pipeline de streaming qui lit les messages d'un abonnement Pub/Sub, exécute une fonction définie par l'utilisateur et les écrit dans Elasticsearch sous forme de documents. Le modèle Dataflow utilise la fonctionnalité de flux de données d'Elasticsearch pour stocker les données de séries temporelles sur plusieurs index tout en vous attribuant une seule ressource nommée pour les requêtes. Les flux de données sont parfaitement adaptés aux journaux, aux métriques, aux traces et aux autres données générées en continu stockées dans Pub/Sub.

Le modèle crée un flux de données nommé logs-gcp.DATASET-NAMESPACE, où :

  • DATASET correspond à la valeur du paramètre de modèle dataset, ou à pubsub s'il n'est pas spécifié.
  • NAMESPACE correspond à la valeur du paramètre de modèle namespace, ou à default s'il n'est pas spécifié.

Conditions requises pour ce pipeline

  • L'abonnement Pub/Sub doit exister et les messages doivent être encodés dans un format JSON valide.
  • Hôte Elasticsearch accessible au public sur une instance Google Cloud ou sur Elastic Cloud avec Elasticsearch version 7.0 ou ultérieure. Pour en savoir plus, consultez la section Google Cloud Integration for Elastic (Intégration de Google Cloud pour Elastic).
  • Sujet Pub/Sub pour le résultat de l'erreur.

Paramètres de modèle

Paramètres obligatoires

Paramètres facultatifs

  • dataset : Type de journaux envoyés à l'aide de Pub/Sub pour lesquels nous disposons d'un tableau de bord prêt à l'emploi. Les valeurs de types de journaux connues sont audit, vpcflow et firewall. La valeur par défaut est "pubsub".
  • namespace : Regroupement arbitraire, tel qu'un environnement (développement, production ou QA), une équipe ou une unité commerciale stratégique. default: default
  • elasticsearchTemplateVersion : identifiant de version du modèle Dataflow, généralement défini par Google Cloud. La valeur par défaut est 1.0.0.
  • javascriptTextTransformGcsPath : modèle de chemin d'accès Cloud Storage pour le code JavaScript contenant vos fonctions définies par l'utilisateur. (par exemple : gs://votre-bucket/votre-fonction.js).
  • javascriptTextTransformFunctionName : nom de la fonction à appeler à partir de votre fichier JavaScript. N'utilisez que des lettres, des chiffres et des traits de soulignement. (exemple : "transform" ou "transform_udf1").
  • javascriptTextTransformReloadIntervalMinutes : définissez l'intervalle que les nœuds de calcul peuvent vérifier pour les modifications des UDF JavaScript pour actualiser les fichiers. La valeur par défaut est 0.
  • elasticsearchUsername : Nom d'utilisateur Elasticsearch avec lequel s'authentifier. Si spécifié, la valeur de "apiKey" est ignorée.
  • elasticsearchPassword : Mot de passe Elasticsearch avec lequel s'authentifier. Si spécifié, la valeur de "apiKey" est ignorée.
  • batchSize : Taille de lot en nombre de documents. Valeur par défaut : 1000
  • batchSizeBytes : taille du lot en octets utilisée pour l'insertion par lots de messages dans elasticsearch. Valeur par défaut : '5242880 (5mb)'.
  • maxRetryAttempts : Nombre maximal de nouvelles tentatives, doit être supérieur à 0. Par défaut : aucune nouvelle tentative.
  • maxRetryDuration : Durée maximale de la nouvelle tentative en millisecondes, doit être supérieure à 0. Par défaut : aucune nouvelle tentative.
  • propertyAsIndex : propriété du document indexée dont la valeur spécifie les métadonnées "_index" à inclure dans le document de la requête groupée (prioritaire sur une UDF "_index"). Valeur par défaut = none
  • javaScriptIndexFnGcsPath : chemin d'accès Cloud Storage à la source JavaScript définie par l'utilisateur pour une fonction qui spécifie les métadonnées "_id" à inclure dans le document de la requête groupée. Par défaut : aucun.
  • javaScriptIndexFnName : Nom de la fonction JavaScript définie par l'utilisateur pour la fonction qui spécifie les métadonnées à inclure dans le document de requête groupée. Valeur par défaut = none
  • propertyAsId : propriété du document indexée dont la valeur spécifie les métadonnées "_id" à inclure dans le document de la requête groupée (prioritaire sur une UDF "_id"). Valeur par défaut = none
  • javaScriptIdFnGcsPath : chemin d'accès Cloud Storage à la source JavaScript définie par l'utilisateur pour une fonction qui spécifie les métadonnées "_id" à inclure dans le document de la requête groupée. Par défaut: aucun.
  • javaScriptIdFnName : Nom de la fonction JavaScript définie par l'utilisateur pour la fonction qui spécifie les métadonnées à inclure dans le document de requête groupée. Valeur par défaut = none
  • javaScriptTypeFnGcsPath : chemin d'accès Cloud Storage à la source JavaScript définie par l'utilisateur pour une fonction qui spécifie les métadonnées "_type" à inclure dans le document de la requête groupée. Par défaut : aucun.
  • javaScriptTypeFnName : nom de la fonction JavaScript définie par l'utilisateur pour la fonction qui spécifie les métadonnées "_type" à inclure dans le document de requête groupée. Valeur par défaut = none
  • javaScriptIsDeleteFnGcsPath : Chemin d'accès Cloud Storage à la source JavaScript définie par l'utilisateur pour la fonction qui déterminera si le document doit être supprimé plutôt que d'être inséré ou mis à jour. La fonction doit renvoyer une valeur de chaîne "true" ou "false". Valeur par défaut = none
  • javaScriptIsDeleteFnName : Nom de la fonction JavaScript définie par l'utilisateur pour la fonction qui déterminera si le document doit être supprimé plutôt que d'être inséré ou mis à jour. La fonction doit renvoyer une valeur de chaîne "true" ou "false". Valeur par défaut = none
  • usePartialUpdate : Indique si les requêtes partielles doivent être utilisées (mises à jour plutôt que créées ou indexées, et autoriser les documents partiels) avec des requêtes Elasticsearch. Valeur par défaut : false
  • bulkInsertMethod : indique s'il faut utiliser "INDEX" (index, upsert autorisé) ou "CREATE" (création, erreurs sur l'identifiant dupliqué) avec les requêtes groupées Elasticsearch. La valeur par défaut est "CREATE".
  • trustSelfSignedCerts : indique si le certificat autosigné doit être approuvé ou non. Une instance Elasticsearch installée peut avoir un certificat autosigné. Activez cette option sur "True" pour contourner la validation du certificat SSL. La valeur par défaut est false.
  • disableCertificateValidation : si la valeur est "true", approuve le certificat SSL autosigné. Une instance Elasticsearch peut avoir un certificat SSL autosigné. Pour contourner la validation du certificat, définissez ce paramètre sur "true". Valeur par défaut : "false".
  • apiKeyKMSEncryptionKey : clé Cloud KMS permettant de déchiffrer la clé API. Ce paramètre doit être spécifié si apiKeySource est défini sur KMS. Si ce paramètre est fourni, la chaîne apiKey doit être transmise sous forme chiffrée. Chiffrez les paramètres à l'aide du point de terminaison de chiffrement de l'API KMS. La clé doit être au format projects/{gcp_project}/locations/{key_region}/keyRings/{key_ring}/cryptoKeys/{kms_key_name}. Voir : https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt (Exemple: projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name).
  • apiKeySecretId : ID du secret fourni par Secret Manager pour l'apiKey. Ce paramètre doit être spécifié si apiKeySource est défini sur SECRET_MANAGER. Doit être au format projects/{project}/secrets/{secret}/versions/{secret_version}. (par exemple, projects/votre-id-projet/secrets/votre-secret/versions/votre-version-secret).
  • apiKeySource : source de la clé API. Valeurs possibles : PLAINTEXT, KMS ou SECRET_MANAGER. Ce paramètre doit être spécifié si Secret Manager ou KMS est utilisé. Si apiKeySource est défini sur KMS, apiKeyKMSEncryptionKey et l'apiKey chiffrée doivent être fournis. Si apiKeySource est défini sur SECRET_MANAGER, apiKeySecretId doit être fourni. Si apiKeySource est défini sur PLAINTEXT, apiKey doit être fourni. La valeur par défaut est PLAINTEXT.

Fonctions définies par l'utilisateur

Ce modèle accepte les fonctions définies par l'utilisateur (UDF) en plusieurs points du pipeline, décrits ci-dessous. Pour en savoir plus, consultez la page Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.

Fonction de transformation de texte

Transforme le message Pub/Sub en un document Elasticsearch.

Paramètres de modèle :

  • javascriptTextTransformGcsPath : URI Cloud Storage du fichier JavaScript.
  • javascriptTextTransformFunctionName : nom de la fonction JavaScript.

Spécification de la fonction :

  • Entrée : champ de données du message Pub/Sub, sérialisé en tant que chaîne JSON.
  • Résultat : document JSON sous forme de chaînes à insérer dans Elasticsearch.

Fonction d'index

Renvoie l'index auquel le document appartient.

Paramètres de modèle :

  • javaScriptIndexFnGcsPath : URI Cloud Storage du fichier JavaScript.
  • javaScriptIndexFnName : nom de la fonction JavaScript.

Spécification de la fonction :

  • Entrée : document Elasticsearch, sérialisé en tant que chaîne JSON.
  • Résultat : valeur du champ de métadonnées _index du document.

Fonction d'ID de document

Renvoie l'ID du document.

Paramètres de modèle :

  • javaScriptIdFnGcsPath : URI Cloud Storage du fichier JavaScript.
  • javaScriptIdFnName : nom de la fonction JavaScript.

Spécification de la fonction :

  • Entrée : document Elasticsearch, sérialisé en tant que chaîne JSON.
  • Résultat : valeur du champ de métadonnées _id du document.

Fonction de suppression de document

Spécifie si un document doit être supprimé. Pour utiliser cette fonction, définissez le mode d'insertion groupée sur INDEX et spécifiez une fonction d'ID de document.

Paramètres de modèle :

  • javaScriptIsDeleteFnGcsPath : URI Cloud Storage du fichier JavaScript.
  • javaScriptIsDeleteFnName : nom de la fonction JavaScript.

Spécification de la fonction :

  • Entrée : document Elasticsearch, sérialisé en tant que chaîne JSON.
  • Résultat : renvoie la chaîne "true" pour supprimer le document, ou la chaîne "false" pour appliquer une opération upsert sur le document.

Fonction de type de mappage

Renvoie le type de mappage du document.

Paramètres de modèle :

  • javaScriptTypeFnGcsPath : URI Cloud Storage du fichier JavaScript.
  • javaScriptTypeFnName : nom de la fonction JavaScript.

Spécification de la fonction :

  • Entrée : document Elasticsearch, sérialisé en tant que chaîne JSON.
  • Résultat : valeur du champ de métadonnées _type du document.

Exécuter le modèle

Console

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to Elasticsearch template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Elasticsearch \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • ERROR_OUTPUT_TOPIC : sujet Pub/Sub pour le résultat d'erreur
  • SUBSCRIPTION_NAME : nom de votre abonnement Pub/Sub
  • CONNECTION_URL : URL Elasticsearch
  • DATASET : type de journal
  • NAMESPACE : espace de noms pour un ensemble de données
  • APIKEY : clé API encodée en base64 pour l'authentification

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_Elasticsearch",
   }
}
  

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • ERROR_OUTPUT_TOPIC : sujet Pub/Sub pour le résultat d'erreur
  • SUBSCRIPTION_NAME : nom de votre abonnement Pub/Sub
  • CONNECTION_URL : URL Elasticsearch
  • DATASET : type de journal
  • NAMESPACE : espace de noms pour un ensemble de données
  • APIKEY : clé API encodée en base64 pour l'authentification

Étapes suivantes