Le modèle Cloud Storage vers Elasticsearch est un pipeline par lots qui lit les données de fichiers CSV stockés dans un bucket Cloud Storage et les écrit dans Elasticsearch sous forme de documents JSON.
Conditions requises pour ce pipeline
- Le bucket Cloud Storage doit exister.
- Un hôte Elasticsearch sur une instance Google Cloud ou sur Elasticsearch Cloud accessible à partir de Dataflow doit exister.
- Une table BigQuery pour la sortie d'erreur doit exister.
Schéma CSV
Si les fichiers CSV contiennent des en-têtes, définissez le paramètre de modèle containsHeaders
sur true
.
Sinon, créez un fichier de schéma JSON décrivant les données. Spécifiez l'URI Cloud Storage du fichier de schéma dans le paramètre de modèle jsonSchemaPath
. L'exemple suivant illustre un schéma JSON :
[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]
Vous pouvez également fournir une fonction définie par l'utilisateur (UDF, user-defined function) qui analyse le texte des fichiers CSV et génère des documents Elasticsearch.
Paramètres de modèle
Paramètres | Description |
---|---|
inputFileSpec |
Modèle de fichier Cloud Storage permettant de rechercher des fichiers CSV. Exemple : gs://mybucket/test-*.csv . |
connectionUrl |
URL Elasticsearch au format https://hostname:[port] , ou spécifiez CloudID si vous utilisez Elastic Cloud. |
apiKey |
Clé API encodée en base64 utilisée pour l'authentification. |
index |
Index Elasticsearch vers lequel les requêtes seront émises, tel que my-index . |
deadletterTable |
Table de lettres mortes BigQuery à laquelle envoyer les insertions ayant échoué. Exemple : <your-project>:<your-dataset>.<your-table-name> . |
containsHeaders |
(Facultatif) Valeur booléenne indiquant si les en-têtes sont inclus dans le fichier CSV. false par défaut. |
delimiter |
(Facultatif) Délimiteur utilisé par le fichier CSV. Exemple : , |
csvFormat |
(Facultatif) Format CSV conforme au format CSV Apache Commons. Valeur par défaut : Default |
jsonSchemaPath |
(Facultatif) Chemin d'accès au schéma JSON. Valeur par défaut : null |
largeNumFiles |
(Facultatif) Définie sur "true" si le nombre de fichiers est de l'ordre de plusieurs dizaines de milliers. Valeur par défaut : false |
javascriptTextTransformGcsPath |
(Facultatif)
URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Exemple :gs://my-bucket/my-udfs/my_file.js
|
javascriptTextTransformFunctionName |
(Facultatif)
Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.
Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ } , le nom de la fonction est myTransform . Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.
|
batchSize |
(Facultatif) Taille de lot en nombre de documents. Valeur par défaut : 1000 |
batchSizeBytes |
(Facultatif) Taille de lot en octets. Valeur par défaut : 5242880 (5 Mo). |
maxRetryAttempts |
(Facultatif) Nombre maximal de nouvelles tentatives, doit être supérieur 0. Par défaut : aucune nouvelle tentative. |
maxRetryDuration |
(Facultatif) Durée maximale de la nouvelle tentative en millisecondes, doit être supérieure à 0. Par défaut : aucune nouvelle tentative. |
csvFileEncoding |
(Facultatif) Encodage des fichiers CSV. |
propertyAsIndex |
(Facultatif) Propriété du document indexée dont la valeur spécifie les métadonnées _index à inclure dans le document de la requête groupée (prioritaire sur une UDF _index ). Valeur par défaut = none |
propertyAsId |
(Facultatif) Propriété du document indexée dont la valeur spécifie les métadonnées _id à inclure dans le document de la requête groupée (prioritaire sur une UDF _id ). Valeur par défaut = none |
javaScriptIndexFnGcsPath |
(Facultatif) Chemin d'accès Cloud Storage à la source JavaScript définie par l'utilisateur pour une fonction qui spécifie les métadonnées _index à inclure dans le document de la requête groupée. Valeur par défaut = none |
javaScriptIndexFnName |
(Facultatif) Nom de la fonction JavaScript définie par l'utilisateur pour la fonction qui spécifie les métadonnées _index à inclure dans le document de requête groupée. Valeur par défaut = none |
javaScriptIdFnGcsPath |
(Facultatif) Chemin d'accès Cloud Storage à la source JavaScript définie par l'utilisateur pour une fonction qui spécifie les métadonnées _id à inclure dans le document de la requête groupée. Valeur par défaut = none |
javaScriptIdFnName |
(Facultatif) Nom de la fonction JavaScript définie par l'utilisateur pour la fonction qui spécifie les métadonnées _id à inclure dans le document de requête groupée. Valeur par défaut = none |
javaScriptTypeFnGcsPath |
(Facultatif) Chemin d'accès Cloud Storage à la source JavaScript définie par l'utilisateur pour une fonction qui spécifie les métadonnées _type à inclure dans le document de la requête groupée. Valeur par défaut = none |
javaScriptTypeFnName |
(Facultatif) Nom de la fonction JavaScript définie par l'utilisateur pour la fonction qui spécifie les métadonnées _type à inclure dans le document de requête groupée. Valeur par défaut = none |
javaScriptIsDeleteFnGcsPath |
(Facultatif) Chemin d'accès Cloud Storage à la source JavaScript définie par l'utilisateur pour la fonction qui déterminera si le document doit être supprimé plutôt que d'être inséré ou mis à jour. La fonction doit renvoyer une valeur de chaîne "true" ou "false" . Valeur par défaut = none |
javaScriptIsDeleteFnName |
(Facultatif) Nom de la fonction JavaScript définie par l'utilisateur pour la fonction qui déterminera si le document doit être supprimé plutôt que d'être inséré ou mis à jour. La fonction doit renvoyer une valeur de chaîne "true" ou "false" . Valeur par défaut = none |
usePartialUpdate |
(Facultatif) Indique si les requêtes partielles doivent être utilisées (mises à jour plutôt que créées ou indexées, et autoriser les documents partiels) avec des requêtes Elasticsearch. Valeur par défaut : false |
bulkInsertMethod |
(Facultatif) Indique s'il faut utiliser INDEX (index, upsert autorisé) ou CREATE (création, erreurs sur l'identifiant dupliqué) avec les requêtes groupées Elasticsearch. Valeur par défaut : CREATE |
Fonctions définies par l'utilisateur
Ce modèle accepte les fonctions définies par l'utilisateur (UDF) en plusieurs points du pipeline, décrits ci-dessous. Pour en savoir plus, consultez la page Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.
Fonction de transformation de texte
Transforme les données CSV en un document Elasticsearch.
Paramètres de modèle :
javascriptTextTransformGcsPath
: URI Cloud Storage du fichier JavaScript.javascriptTextTransformFunctionName
: nom de la fonction JavaScript.
Spécification de la fonction :
- Entrée : une ligne provenant d'un fichier CSV d'entrée.
- Résultat : document JSON sous forme de chaînes à insérer dans Elasticsearch.
Fonction d'index
Renvoie l'index auquel le document appartient.
Paramètres de modèle :
javaScriptIndexFnGcsPath
: URI Cloud Storage du fichier JavaScript.javaScriptIndexFnName
: nom de la fonction JavaScript.
Spécification de la fonction :
- Entrée : document Elasticsearch, sérialisé en tant que chaîne JSON.
- Résultat : valeur du champ de métadonnées
_index
du document.
Fonction d'ID de document
Renvoie l'ID du document.
Paramètres de modèle :
javaScriptIdFnGcsPath
: URI Cloud Storage du fichier JavaScript.javaScriptIdFnName
: nom de la fonction JavaScript.
Spécification de la fonction :
- Entrée : document Elasticsearch, sérialisé en tant que chaîne JSON.
- Résultat : valeur du champ de métadonnées
_id
du document.
Fonction de suppression de document
Spécifie si un document doit être supprimé. Pour utiliser cette fonction, définissez le mode d'insertion groupée sur INDEX
et spécifiez une fonction d'ID de document.
Paramètres de modèle :
javaScriptIsDeleteFnGcsPath
: URI Cloud Storage du fichier JavaScript.javaScriptIsDeleteFnName
: nom de la fonction JavaScript.
Spécification de la fonction :
- Entrée : document Elasticsearch, sérialisé en tant que chaîne JSON.
- Résultat : renvoie la chaîne
"true"
pour supprimer le document, ou la chaîne"false"
pour appliquer une opération upsert sur le document.
Fonction de type de mappage
Renvoie le type de mappage du document.
Paramètres de modèle :
javaScriptTypeFnGcsPath
: URI Cloud Storage du fichier JavaScript.javaScriptTypeFnName
: nom de la fonction JavaScript.
Spécification de la fonction :
- Entrée : document Elasticsearch, sérialisé en tant que chaîne JSON.
- Résultat : valeur du champ de métadonnées
_type
du document.
Exécuter le modèle
Console
- Accédez à la page Dataflow Créer un job à partir d'un modèle. Accéder à la page Créer un job à partir d'un modèle
- Dans le champ Nom du job, saisissez un nom de job unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the Cloud Storage to Elasticsearch template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID\ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \ --parameters \ inputFileSpec=INPUT_FILE_SPEC,\ connectionUrl=CONNECTION_URL,\ apiKey=APIKEY,\ index=INDEX,\ deadletterTable=DEADLETTER_TABLE,\
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google Cloud dans lequel vous souhaitez exécuter le job DataflowJOB_NAME
: nom de job unique de votre choixVERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
REGION_NAME
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
INPUT_FILE_SPEC
: modèle de votre fichier Cloud Storage.CONNECTION_URL
: URL Elasticsearch.APIKEY
: clé API encodée en base64 pour l'authentification.INDEX
: votre index Elasticsearch.DEADLETTER_TABLE
: votre table BigQuery.
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFileSpec": "INPUT_FILE_SPEC", "connectionUrl": "CONNECTION_URL", "apiKey": "APIKEY", "index": "INDEX", "deadletterTable": "DEADLETTER_TABLE" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch", } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google Cloud dans lequel vous souhaitez exécuter le job DataflowJOB_NAME
: nom de job unique de votre choixVERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
LOCATION
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
INPUT_FILE_SPEC
: modèle de votre fichier Cloud Storage.CONNECTION_URL
: URL Elasticsearch.APIKEY
: clé API encodée en base64 pour l'authentification.INDEX
: votre index Elasticsearch.DEADLETTER_TABLE
: votre table BigQuery.
Étapes suivantes
- Apprenez-en plus sur les modèles Dataflow.
- Consultez la liste des modèles fournis par Google.