Le modèle Pub/Sub vers Elasticsearch est un pipeline de streaming qui lit les messages d'un abonnement Pub/Sub, exécute une fonction définie par l'utilisateur et les écrit dans Elasticsearch sous forme de documents. Le modèle Dataflow utilise la fonctionnalité de flux de données d'Elasticsearch pour stocker les données de séries temporelles sur plusieurs index tout en vous attribuant une seule ressource nommée pour les requêtes. Les flux de données sont parfaitement adaptés aux journaux, aux métriques, aux traces et aux autres données générées en continu stockées dans Pub/Sub.
Le modèle crée un flux de données nommé logs-gcp.DATASET-NAMESPACE
, où :
- DATASET correspond à la valeur du paramètre de modèle
dataset
, ou àpubsub
s'il n'est pas spécifié. - NAMESPACE correspond à la valeur du paramètre de modèle
namespace
, ou àdefault
s'il n'est pas spécifié.
Conditions requises pour ce pipeline
- L'abonnement Pub/Sub doit exister et les messages doivent être encodés dans un format JSON valide.
- Hôte Elasticsearch accessible au public sur une instance Google Cloud ou sur Elastic Cloud avec Elasticsearch version 7.0 ou ultérieure. Pour en savoir plus, consultez la section Google Cloud Integration for Elastic (Intégration de Google Cloud pour Elastic).
- Sujet Pub/Sub pour le résultat de l'erreur.
Paramètres de modèle
Paramètres obligatoires
- inputSubscription: abonnement Pub/Sub à partir duquel consommer l'entrée. Exemple :
projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>
- errorOutputTopic: sujet de sortie Pub/Sub pour la publication d'enregistrements ayant échoué, au format
projects/<PROJECT_ID>/topics/<TOPIC_NAME>
. - connectionUrl: URL Elasticsearch au format
https://hostname:[port]
. Si vous utilisez Elastic Cloud, spécifiez le CloudID. Par exemple,https://elasticsearch-host:9200
. - apiKey: clé API encodée en base64 à utiliser pour l'authentification.
Paramètres facultatifs
- dataset: type de journaux envoyés à l'aide de Pub/Sub pour lesquels nous disposons d'un tableau de bord prêt à l'emploi. Les valeurs de types de journaux connues sont
audit
,vpcflow
etfirewall
. La valeur par défaut est:pubsub
. - namespace: regroupement arbitraire, tel qu'un environnement (développement, production ou QA), une équipe ou une unité commerciale stratégique. La valeur par défaut est
default
. - elasticsearchTemplateVersion: identifiant de version du modèle Dataflow, généralement défini par Google Cloud. La valeur par défaut est 1.0.0.
- javascriptTextTransformGcsPath: URI Cloud Storage du fichier .js qui définit la fonction JavaScript définie par l'utilisateur (UDF) à utiliser. Exemple :
gs://my-bucket/my-udfs/my_file.js
- javascriptTextTransformFunctionName: nom de la fonction JavaScript définie par lUDF;utilisateur à utiliser. Par exemple, si le code de votre fonction JavaScript est
myTransform(inJson) { /*...do stuff...*/ }
, le nom de la fonction estmyTransform
. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la section https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples. - javascriptTextTransformReloadIntervalMinutes: spécifie la fréquence d'actualisation de l'UDF, en minutes. Si la valeur est supérieure à 0, Dataflow vérifie régulièrement le fichier UDF dans Cloud Storage et actualise l'UDF si le fichier est modifié. Ce paramètre vous permet de mettre à jour l'UDF pendant l'exécution du pipeline, sans avoir à redémarrer le job. Si la valeur est
0
, l'actualisation de l'UDF est désactivée. La valeur par défaut est0
. - elasticsearchUsername: nom d'utilisateur Elasticsearch avec lequel s'authentifier. Si spécifié, la valeur de
apiKey
est ignorée. - elasticsearchPassword: mot de passe Elasticsearch avec lequel s'authentifier. Si spécifié, la valeur de
apiKey
est ignorée. - batchSize: taille de lot en nombre de documents. La valeur par défaut est
1000
. - batchSizeBytes: taille du lot en nombre d'octets. La valeur par défaut est
5242880
(5 Mo). - maxRetryAttempts: nombre maximal de nouvelles tentatives. Cette valeur doit être supérieure à zéro (0). La valeur par défaut est
no retries
. - maxRetryDuration: durée maximale de la nouvelle tentative en millisecondes. Cette valeur doit être supérieure à zéro (0). La valeur par défaut est
no retries
. - propertyAsIndex: propriété du document indexé dont la valeur spécifie les métadonnées
_index
à inclure dans le document des requêtes groupées. A priorité sur une fonction définie par l'utilisateur_index
. La valeur par défaut estnone
. - javaScriptIndexFnGcsPath: chemin d'accès Cloud Storage à la source JavaScript définie par l'utilisateur pour une fonction qui spécifie les métadonnées
_index
à inclure dans le document des requêtes groupées. La valeur par défaut estnone
. - javaScriptIndexFnName: nom de la fonction JavaScript définie par l'utilisateur qui spécifie les métadonnées
_index
à inclure avec le document dans les requêtes groupées. La valeur par défaut estnone
. - propertyAsId: propriété du document indexé dont la valeur spécifie les métadonnées
_id
à inclure dans le document des requêtes groupées. A priorité sur une fonction définie par l'utilisateur_id
. La valeur par défaut estnone
. - javaScriptIdFnGcsPath: chemin d'accès Cloud Storage à la source JavaScript définie par l'utilisateur pour une fonction qui spécifie les métadonnées
_id
à inclure dans le document des requêtes groupées. La valeur par défaut estnone
. - javaScriptIdFnName: nom de la fonction JavaScript définie par l'utilisateur qui spécifie les métadonnées
_id
à inclure avec le document dans les requêtes groupées. La valeur par défaut estnone
. - javaScriptTypeFnGcsPath: chemin d'accès Cloud Storage à la source JavaScript définie par l'utilisateur pour une fonction qui spécifie les métadonnées
_type
à inclure dans les documents des requêtes groupées. La valeur par défaut estnone
. - javaScriptTypeFnName: nom de la fonction JavaScript définie par l'utilisateur qui spécifie les métadonnées
_type
à inclure avec le document dans les requêtes groupées. La valeur par défaut estnone
. - javaScriptIsDeleteFnGcsPath: chemin d'accès Cloud Storage à la source JavaScript définie par l'utilisateur pour une fonction qui détermine si le document doit être supprimé au lieu d'être inséré ou mis à jour. La fonction renvoie une valeur de chaîne de
true
oufalse
. La valeur par défaut estnone
. - javaScriptIsDeleteFnName: nom de la fonction JavaScript définie par l'utilisateur qui détermine si le document doit être supprimé au lieu d'être inséré ou mis à jour. La fonction renvoie une valeur de chaîne de
true
oufalse
. La valeur par défaut estnone
. - usePartialUpdate: Indique si les requêtes partielles doivent être utilisées (mises à jour plutôt que créées ou indexées, et autoriser les documents partiels) avec des requêtes Elasticsearch. La valeur par défaut est
false
. - bulkInsertMethod: indique s'il faut utiliser
INDEX
(index, upsert autorisé) ouCREATE
(création, erreurs sur l'identifiant dupliqué) avec les requêtes groupées Elasticsearch. La valeur par défaut estCREATE
. - trustSelfSignedCerts: indique si le certificat autosigné doit être approuvé ou non. Une instance Elasticsearch installée peut avoir un certificat autosigné. Activez cette option sur "True" pour contourner la validation du certificat SSL. (par défaut:
false
) - disableCertificateValidation: si la valeur est
true
, approuve le certificat SSL autosigné. Une instance Elasticsearch peut avoir un certificat SSL autosigné. Pour contourner la validation du certificat, définissez ce paramètre surtrue
. La valeur par défaut estfalse
. - apiKeyKMSEncryptionKey: clé Cloud KMS permettant de déchiffrer la clé API. Ce paramètre est obligatoire si
apiKeySource
est défini surKMS
. Si ce paramètre est fourni, transmettez une chaîneapiKey
chiffrée. Chiffrez les paramètres à l'aide du point de terminaison de chiffrement de l'API KMS. Pour la clé, utilisez le formatprojects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>
. Voir: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt (par exemple,projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name
). - apiKeySecretId: ID du secret Secret Manager pour l'apiKey. Si
apiKeySource
est défini surSECRET_MANAGER
, fournissez ce paramètre. Utilisez le formatprojects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example,
projects/votre-id-projet/secrets/votre-secret/versions/votre-version-secret`. - apiKeySource: source de la clé API. Les valeurs autorisées sont
PLAINTEXT
,KMS
ouSECRET_MANAGER
. Ce paramètre est obligatoire lorsque vous utilisez Secret Manager ou KMS. SiapiKeySource
est défini surKMS
,apiKeyKMSEncryptionKey
et une clé API chiffrée doivent être fournies. SiapiKeySource
est défini surSECRET_MANAGER
,apiKeySecretId
doit être fourni. SiapiKeySource
est défini surPLAINTEXT
,apiKey
doit être fourni. La valeur par défaut est PLAINTEXT. - socketTimeout: si défini, remplace le délai avant expiration maximal par défaut et le délai avant expiration du socket par défaut (30 000 ms) dans le RestClient Elastic.
Fonctions définies par l'utilisateur
Ce modèle accepte les fonctions définies par l'utilisateur (UDF) en plusieurs points du pipeline, décrits ci-dessous. Pour en savoir plus, consultez la page Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.
Fonction de transformation de texte
Transforme le message Pub/Sub en un document Elasticsearch.
Paramètres de modèle :
javascriptTextTransformGcsPath
: URI Cloud Storage du fichier JavaScript.javascriptTextTransformFunctionName
: nom de la fonction JavaScript.
Spécification de la fonction :
- Entrée : champ de données du message Pub/Sub, sérialisé en tant que chaîne JSON.
- Résultat : document JSON sous forme de chaînes à insérer dans Elasticsearch.
Fonction d'index
Renvoie l'index auquel le document appartient.
Paramètres de modèle :
javaScriptIndexFnGcsPath
: URI Cloud Storage du fichier JavaScript.javaScriptIndexFnName
: nom de la fonction JavaScript.
Spécification de la fonction :
- Entrée : document Elasticsearch, sérialisé en tant que chaîne JSON.
- Résultat : valeur du champ de métadonnées
_index
du document.
Fonction d'ID de document
Renvoie l'ID du document.
Paramètres de modèle :
javaScriptIdFnGcsPath
: URI Cloud Storage du fichier JavaScript.javaScriptIdFnName
: nom de la fonction JavaScript.
Spécification de la fonction :
- Entrée : document Elasticsearch, sérialisé en tant que chaîne JSON.
- Résultat : valeur du champ de métadonnées
_id
du document.
Fonction de suppression de document
Spécifie si un document doit être supprimé. Pour utiliser cette fonction, définissez le mode d'insertion groupée sur INDEX
et spécifiez une fonction d'ID de document.
Paramètres de modèle :
javaScriptIsDeleteFnGcsPath
: URI Cloud Storage du fichier JavaScript.javaScriptIsDeleteFnName
: nom de la fonction JavaScript.
Spécification de la fonction :
- Entrée : document Elasticsearch, sérialisé en tant que chaîne JSON.
- Résultat : renvoie la chaîne
"true"
pour supprimer le document, ou la chaîne"false"
pour appliquer une opération upsert sur le document.
Fonction de type de mappage
Renvoie le type de mappage du document.
Paramètres de modèle :
javaScriptTypeFnGcsPath
: URI Cloud Storage du fichier JavaScript.javaScriptTypeFnName
: nom de la fonction JavaScript.
Spécification de la fonction :
- Entrée : document Elasticsearch, sérialisé en tant que chaîne JSON.
- Résultat : valeur du champ de métadonnées
_type
du document.
Exécuter le modèle
Console
- Accédez à la page Dataflow Créer un job à partir d'un modèle. Accéder à la page Créer un job à partir d'un modèle
- Dans le champ Nom du job, saisissez un nom de job unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to Elasticsearch template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Elasticsearch_Flex \ --parameters \ inputSubscription=SUBSCRIPTION_NAME,\ connectionUrl=CONNECTION_URL,\ dataset=DATASET,\ namespace=NAMESPACE,\ apiKey=APIKEY,\ errorOutputTopic=ERROR_OUTPUT_TOPIC
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google Cloud dans lequel vous souhaitez exécuter le job DataflowJOB_NAME
: nom de job unique de votre choixREGION_NAME
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
ERROR_OUTPUT_TOPIC
: sujet Pub/Sub pour le résultat d'erreurSUBSCRIPTION_NAME
: nom de votre abonnement Pub/SubCONNECTION_URL
: URL ElasticsearchDATASET
: type de journalNAMESPACE
: espace de noms pour un ensemble de donnéesAPIKEY
: clé API encodée en base64 pour l'authentification
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputSubscription": "SUBSCRIPTION_NAME", "connectionUrl": "CONNECTION_URL", "dataset": "DATASET", "namespace": "NAMESPACE", "apiKey": "APIKEY", "errorOutputTopic": "ERROR_OUTPUT_TOPIC" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_Elasticsearch_Flex", } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google Cloud dans lequel vous souhaitez exécuter le job DataflowJOB_NAME
: nom de job unique de votre choixLOCATION
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
ERROR_OUTPUT_TOPIC
: sujet Pub/Sub pour le résultat d'erreurSUBSCRIPTION_NAME
: nom de votre abonnement Pub/SubCONNECTION_URL
: URL ElasticsearchDATASET
: type de journalNAMESPACE
: espace de noms pour un ensemble de donnéesAPIKEY
: clé API encodée en base64 pour l'authentification
Étape suivante
- Apprenez-en plus sur les modèles Dataflow.
- Consultez la liste des modèles fournis par Google.