Modèle Cloud Storage vers Elasticsearch

Le modèle Cloud Storage vers Elasticsearch est un pipeline par lots qui lit les données de fichiers CSV stockés dans un bucket Cloud Storage et les écrit dans Elasticsearch sous forme de documents JSON.

Conditions requises pour ce pipeline

  • Le bucket Cloud Storage doit exister.
  • Un hôte Elasticsearch sur une instance Google Cloud ou sur Elasticsearch Cloud accessible à partir de Dataflow doit exister.
  • Une table BigQuery pour la sortie d'erreur doit exister.

Schéma CSV

Si les fichiers CSV contiennent des en-têtes, définissez le paramètre de modèle containsHeaders sur true.

Sinon, créez un fichier de schéma JSON décrivant les données. Spécifiez l'URI Cloud Storage du fichier de schéma dans le paramètre de modèle jsonSchemaPath. L'exemple suivant illustre un schéma JSON :

[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]

Vous pouvez également fournir une fonction définie par l'utilisateur (UDF, user-defined function) qui analyse le texte des fichiers CSV et génère des documents Elasticsearch.

Paramètres de modèle

Paramètres obligatoires

  • deadletterTable : table de lettres mortes BigQuery à laquelle envoyer les insertions ayant échoué. (par exemple, your-project:your-dataset.your-table-name).
  • inputFileSpec : modèle de fichier Cloud Storage permettant de rechercher des fichiers CSV. Exemple : gs://mybucket/test-*.csv.
  • connectionUrl : URL Elasticsearch au format https://hostname:[port]. Si vous utilisez Elastic Cloud, spécifiez le CloudID. (Exemple : https://elasticsearch-host:9200).
  • apiKey : clé API encodée en base64 à utiliser pour l'authentification.
  • index : index Elasticsearch vers lequel les requêtes sont envoyées, tel que my-index. (par exemple, my-index).

Paramètres facultatifs

  • inputFormat : format de fichier d'entrée. La valeur par défaut est "CSV".
  • containsHeaders : booléen indiquant si les fichiers CSV d'entrée contiennent un enregistrement d'en-tête (vrai/faux). Uniquement requis pour la lecture de fichiers CSV. La valeur par défaut est "false".
  • séparateur : séparateur de colonne des fichiers texte d'entrée. Par défaut : utilisez le délimiteur fourni dans csvFormat (exemple : ,).
  • csvFormat : spécification du format CSV à utiliser pour l'analyse des enregistrements. La valeur par défaut est "default". Pour en savoir plus, consultez la page https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html. Doit correspondre exactement aux noms de format trouvés à l'adresse suivante : https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.Predefined.html.
  • jsonSchemaPath : chemin d'accès au schéma JSON. La valeur par défaut est "null". (Exemple : gs://path/to/schema).
  • largeNumFiles : valeur définie sur "true" si le nombre de fichiers est de l'ordre de plusieurs dizaines de milliers. La valeur par défaut est "false".
  • csvFileEncoding : format d'encodage des caractères du fichier CSV. Les valeurs autorisées sont US-ASCII, ISO-8859-1, UTF-8 et UTF-16. La valeur par défaut est UTF8.
  • logDetailedCsvConversionErrors : définissez cette valeur sur "true" pour activer la journalisation détaillée des erreurs en cas d'échec de l'analyse CSV. Notez que cela peut exposer des données sensibles dans les journaux (par exemple, si le fichier CSV contient des mots de passe). Valeur par défaut : "false".
  • elasticsearchUsername : Nom d'utilisateur Elasticsearch avec lequel s'authentifier. Si spécifié, la valeur de "apiKey" est ignorée.
  • elasticsearchPassword : Mot de passe Elasticsearch avec lequel s'authentifier. Si spécifié, la valeur de "apiKey" est ignorée.
  • batchSize : Taille de lot en nombre de documents. La valeur par défaut est 1000.
  • batchSizeBytes : taille du lot en nombre d'octets. La valeur par défaut est "5242880" (5 Mo).
  • maxRetryAttempts : nombre maximal de nouvelles tentatives. Cette valeur doit être supérieure à zéro (0). La valeur par défaut est aucune nouvelle tentative.
  • maxRetryDuration : durée maximale de la nouvelle tentative en millisecondes. Cette valeur doit être supérieure à zéro (0). La valeur par défaut est aucune nouvelle tentative.
  • propertyAsIndex : propriété du document indexé dont la valeur spécifie les métadonnées _index à inclure dans le document des requêtes groupées. A priorité sur une fonction définie par l'utilisateur _index. La valeur par défaut est "NONE".
  • javaScriptIndexFnGcsPath : chemin d'accès Cloud Storage à la source JavaScript définie par l'utilisateur pour une fonction qui spécifie les métadonnées _index à inclure dans le document des requêtes groupées. La valeur par défaut est "NONE".
  • javaScriptIndexFnName : nom de la fonction JavaScript définie par l'utilisateur qui spécifie les métadonnées _index à inclure avec le document dans les requêtes groupées. La valeur par défaut est "NONE".
  • propertyAsId : propriété du document indexé dont la valeur spécifie les métadonnées _id à inclure dans le document des requêtes groupées. A priorité sur une fonction définie par l'utilisateur _id. La valeur par défaut est "NONE".
  • javaScriptIdFnGcsPath : chemin d'accès Cloud Storage à la source JavaScript définie par l'utilisateur pour une fonction qui spécifie les métadonnées _id à inclure dans le document des requêtes groupées. La valeur par défaut est "NONE".
  • javaScriptIdFnName : nom de la fonction JavaScript définie par l'utilisateur qui spécifie les métadonnées _id à inclure avec le document dans les requêtes groupées. La valeur par défaut est "NONE".
  • javaScriptTypeFnGcsPath : chemin d'accès Cloud Storage à la source JavaScript définie par l'utilisateur pour une fonction qui spécifie les métadonnées _type à inclure dans les documents des requêtes groupées. Valeur par défaut = none
  • javaScriptTypeFnName : nom de la fonction JavaScript UDF définie par l'utilisateur qui spécifie les métadonnées _type à inclure avec le document dans les requêtes groupées. La valeur par défaut est "NONE".
  • javaScriptIsDeleteFnGcsPath : chemin d'accès Cloud Storage à la source JavaScript définie par l'utilisateur pour une fonction qui détermine si le document doit être supprimé au lieu d'être inséré ou mis à jour. La fonction renvoie une valeur de chaîne de true ou false. La valeur par défaut est "NONE".
  • javaScriptIsDeleteFnName : nom de la fonction JavaScript définie par l'utilisateur qui détermine si le document doit être supprimé au lieu d'être inséré ou mis à jour. La fonction renvoie une valeur de chaîne de true ou false. La valeur par défaut est "NONE".
  • usePartialUpdate : Indique si les requêtes partielles doivent être utilisées (mises à jour plutôt que créées ou indexées, et autoriser les documents partiels) avec des requêtes Elasticsearch. La valeur par défaut est "false".
  • bulkInsertMethod : Indique s'il faut utiliser INDEX (index, upsert autorisé) ou CREATE (création, erreurs sur l'identifiant dupliqué) avec les requêtes groupées Elasticsearch. La valeur par défaut est "CREATE".
  • trustSelfSignedCerts : indique si le certificat autosigné doit être approuvé ou non. Une instance Elasticsearch installée peut avoir un certificat autosigné. Activez cette option sur "True" pour contourner la validation du certificat SSL. La valeur par défaut est false.
  • disableCertificateValidation : si la valeur est "true", approuve le certificat SSL autosigné. Une instance Elasticsearch peut avoir un certificat SSL autosigné. Pour contourner la validation du certificat, définissez ce paramètre sur "true". Valeur par défaut : "false".
  • apiKeyKMSEncryptionKey : clé Cloud KMS permettant de déchiffrer la clé API. Ce paramètre doit être spécifié si apiKeySource est défini sur KMS. Si ce paramètre est fourni, la chaîne apiKey doit être transmise sous forme chiffrée. Chiffrez les paramètres à l'aide du point de terminaison de chiffrement de l'API KMS. La clé doit être au format projects/{gcp_project}/locations/{key_region}/keyRings/{key_ring}/cryptoKeys/{kms_key_name}. Voir : https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt (Exemple: projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name).
  • apiKeySecretId : ID du secret fourni par Secret Manager pour l'apiKey. Ce paramètre doit être spécifié si apiKeySource est défini sur SECRET_MANAGER. Doit être au format projects/{project}/secrets/{secret}/versions/{secret_version}. (par exemple, projects/votre-id-projet/secrets/votre-secret/versions/votre-version-secret).
  • apiKeySource : source de la clé API. Valeurs possibles : PLAINTEXT, KMS ou SECRET_MANAGER. Ce paramètre doit être spécifié si Secret Manager ou KMS est utilisé. Si apiKeySource est défini sur KMS, apiKeyKMSEncryptionKey et l'apiKey chiffrée doivent être fournis. Si apiKeySource est défini sur SECRET_MANAGER, apiKeySecretId doit être fourni. Si apiKeySource est défini sur PLAINTEXT, apiKey doit être fourni. La valeur par défaut est PLAINTEXT.
  • javascriptTextTransformGcsPath : URI Cloud Storage du fichier .js qui définit la fonction JavaScript définie par l'utilisateur (UDF) à utiliser. (Exemple: gs://my-bucket/my-udfs/my_file.js).
  • javascriptTextTransformFunctionName : nom de la fonction JavaScript définie par l'utilisateur à utiliser. Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la section https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples.

Fonctions définies par l'utilisateur

Ce modèle accepte les fonctions définies par l'utilisateur (UDF) en plusieurs points du pipeline, décrits ci-dessous. Pour en savoir plus, consultez la page Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.

Fonction de transformation de texte

Transforme les données CSV en un document Elasticsearch.

Paramètres de modèle :

  • javascriptTextTransformGcsPath : URI Cloud Storage du fichier JavaScript.
  • javascriptTextTransformFunctionName : nom de la fonction JavaScript.

Spécification de la fonction :

  • Entrée : une ligne provenant d'un fichier CSV d'entrée.
  • Résultat : document JSON sous forme de chaînes à insérer dans Elasticsearch.

Fonction d'index

Renvoie l'index auquel le document appartient.

Paramètres de modèle :

  • javaScriptIndexFnGcsPath : URI Cloud Storage du fichier JavaScript.
  • javaScriptIndexFnName : nom de la fonction JavaScript.

Spécification de la fonction :

  • Entrée : document Elasticsearch, sérialisé en tant que chaîne JSON.
  • Résultat : valeur du champ de métadonnées _index du document.

Fonction d'ID de document

Renvoie l'ID du document.

Paramètres de modèle :

  • javaScriptIdFnGcsPath : URI Cloud Storage du fichier JavaScript.
  • javaScriptIdFnName : nom de la fonction JavaScript.

Spécification de la fonction :

  • Entrée : document Elasticsearch, sérialisé en tant que chaîne JSON.
  • Résultat : valeur du champ de métadonnées _id du document.

Fonction de suppression de document

Spécifie si un document doit être supprimé. Pour utiliser cette fonction, définissez le mode d'insertion groupée sur INDEX et spécifiez une fonction d'ID de document.

Paramètres de modèle :

  • javaScriptIsDeleteFnGcsPath : URI Cloud Storage du fichier JavaScript.
  • javaScriptIsDeleteFnName : nom de la fonction JavaScript.

Spécification de la fonction :

  • Entrée : document Elasticsearch, sérialisé en tant que chaîne JSON.
  • Résultat : renvoie la chaîne "true" pour supprimer le document, ou la chaîne "false" pour appliquer une opération upsert sur le document.

Fonction de type de mappage

Renvoie le type de mappage du document.

Paramètres de modèle :

  • javaScriptTypeFnGcsPath : URI Cloud Storage du fichier JavaScript.
  • javaScriptTypeFnName : nom de la fonction JavaScript.

Spécification de la fonction :

  • Entrée : document Elasticsearch, sérialisé en tant que chaîne JSON.
  • Résultat : valeur du champ de métadonnées _type du document.

Exécuter le modèle

Console

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Cloud Storage to Elasticsearch template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

gcloud

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID\
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \
    --parameters \
inputFileSpec=INPUT_FILE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX,\
deadletterTable=DEADLETTER_TABLE,\

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • INPUT_FILE_SPEC : modèle de votre fichier Cloud Storage.
  • CONNECTION_URL : URL Elasticsearch.
  • APIKEY : clé API encodée en base64 pour l'authentification.
  • INDEX : votre index Elasticsearch.
  • DEADLETTER_TABLE : votre table BigQuery.

API

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileSpec": "INPUT_FILE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX",
          "deadletterTable": "DEADLETTER_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch",
   }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • INPUT_FILE_SPEC : modèle de votre fichier Cloud Storage.
  • CONNECTION_URL : URL Elasticsearch.
  • APIKEY : clé API encodée en base64 pour l'authentification.
  • INDEX : votre index Elasticsearch.
  • DEADLETTER_TABLE : votre table BigQuery.

Étapes suivantes