Le modèle BigQuery vers Elasticsearch est un pipeline par lots qui ingère les données d'une table BigQuery dans Elasticsearch sous forme de documents. Le modèle peut lire la table entière ou lire des enregistrements spécifiques indiqués par une requête fournie.
Conditions requises pour ce pipeline
- La table BigQuery source doit exister.
- Un hôte Elasticsearch sur une instance Google Cloud ou sur Elastic Cloud avec Elasticsearch version 7.0 ou ultérieure. Doit être accessible à partir des machines de nœud de calcul Dataflow.
Paramètres de modèle
Paramètres obligatoires
- connectionUrl : URL Elasticsearch au format https://hostname:[port]. Si vous utilisez Elastic Cloud, spécifiez le CloudID. (Exemple : https://elasticsearch-host:9200).
- apiKey : clé API encodée en base64 à utiliser pour l'authentification.
- index : index Elasticsearch auquel les requêtes sont émises, tel que
my-index.
(exemple: my-index).
Paramètres facultatifs
- inputTableSpec : table BigQuery à lire. Format :
projectId:datasetId.tablename
. Si vous spécifiezinputTableSpec
, le modèle lit les données directement à partir de l'espace de stockage BigQuery à l'aide de l'API BigQuery Storage Read (https://cloud.google.com/bigquery/docs/reference/storage). Pour en savoir plus sur les limites de l'API Storage Read, consultez la page https://cloud.google.com/bigquery/docs/reference/storage#limitations. Vous devez spécifierinputTableSpec
ouquery
. Si vous définissez les deux paramètres, le modèle utilise le paramètrequery
. (Exemple: bigquery-project:dataset.input_table). - outputDeadletterTable : table BigQuery pour les messages qui n'ont pas pu atteindre la table de sortie, au format <ID_PROJET>:<NOM_ENSEMBLE_DE_DONNÉES>.<TABLE_LETTRES_MORTES>. Si une table n'existe pas, elle est créée lors de l'exécution du pipeline. Si aucune valeur n'est spécifiée,
<outputTableSpec>_error_records
est utilisé. (Par exemple : your-project-id:your-dataset.your-table-name). - query : requête SQL à utiliser pour lire les données à partir de BigQuery. Si l'ensemble de données BigQuery se trouve dans un projet différent de celui de la tâche Dataflow, spécifiez le nom complet de l'ensemble de données dans la requête SQL, par exemple : <ID_PROJET>.<NOM_ENSEMBLE_DE_DONNÉES>.<NOM_TABLE>. Par défaut, le paramètre
query
utilise GoogleSQL (https://cloud.google.com/bigquery/docs/introduction-sql), sauf siuseLegacySql
esttrue
. Vous devez spécifierinputTableSpec
ouquery
. Si vous définissez les deux paramètres, le modèle utilise le paramètrequery
. (Exemple : sélectionnez * dans sampledb.sample_table). - useLegacySql : Défini sur la valeur "true" pour utiliser l'ancien SQL. Ce paramètre s'applique uniquement lorsque vous utilisez le paramètre
query
. La valeur par défaut est "false". - queryLocation : requis lors de la lecture à partir d'une vue autorisée sans l'autorisation de la table sous-jacente. (Par exemple : US).
- elasticsearchUsername : Nom d'utilisateur Elasticsearch avec lequel s'authentifier. Si spécifié, la valeur de "apiKey" est ignorée.
- elasticsearchPassword : Mot de passe Elasticsearch avec lequel s'authentifier. Si spécifié, la valeur de "apiKey" est ignorée.
- batchSize : Taille de lot en nombre de documents. La valeur par défaut est 1000.
- batchSizeBytes : taille du lot en octets. La valeur par défaut est 5242880 (5 Mo).
- maxRetryAttempts : nombre maximal de nouvelles tentatives à effectuer. Cette valeur doit être supérieure à zéro (0). La valeur par défaut est aucune nouvelle tentative.
- maxRetryDuration : durée maximale de nouvelle tentative en millisecondes. Cette valeur doit être supérieure à zéro (0). La valeur par défaut est aucune nouvelle tentative.
- propertyAsIndex : propriété du document indexée dont la valeur spécifie les métadonnées
_index
à inclure avec le document dans les requêtes groupées. Prioritaire sur une UDF (fonction définie par l'utilisateur)_index
. La valeur par défaut est "NONE". - javaScriptIndexFnGcsPath : chemin d'accès Cloud Storage à la source JavaScript définie par l'utilisateur pour une fonction qui spécifie les métadonnées
_index
à inclure dans le document des requêtes groupées. La valeur par défaut est "NONE". - javaScriptIndexFnName : nom de la fonction JavaScript définie par l'utilisateur qui spécifie les métadonnées
_index
à inclure avec le document dans les requêtes groupées. La valeur par défaut est "NONE". - propertyAsId : propriété du document indexée dont la valeur spécifie les métadonnées
_id
à inclure avec le document dans les requêtes groupées. Prioritaire sur une UDF (fonction définie par l'utilisateur)_id
. La valeur par défaut est "NONE". - javaScriptIdFnGcsPath : chemin d'accès Cloud Storage à la source JavaScript définie par l'utilisateur pour la fonction qui spécifie les métadonnées
_id
à inclure dans le document des requêtes groupées. La valeur par défaut est "NONE". - javaScriptIdFnName : nom de la fonction JavaScript définie par l'utilisateur qui spécifie les métadonnées
_id
à inclure avec le document dans les requêtes groupées. La valeur par défaut est "NONE". - javaScriptTypeFnGcsPath : chemin d'accès Cloud Storage à la source JavaScript définie par l'utilisateur pour une fonction qui spécifie les métadonnées
_type
à inclure dans les documents des requêtes groupées. Valeur par défaut = none - javaScriptTypeFnName : nom de la fonction JavaScript UDF définie par l'utilisateur qui spécifie les métadonnées
_type
à inclure avec le document dans les requêtes groupées. La valeur par défaut est "NONE". - javaScriptIsDeleteFnGcsPath : chemin d'accès Cloud Storage à la source de la fonction JavaScript définie par l'utilisateur pour la fonction qui détermine s'il convient de supprimer le document plutôt que de l'insérer ou de le mettre à jour. La fonction renvoie une valeur de chaîne
true
oufalse
. La valeur par défaut est "NONE". - javaScriptIsDeleteFnName : nom de la fonction JavaScript définie par l'utilisateur qui détermine si le document doit être supprimé ou mis à jour. La fonction renvoie une valeur de chaîne
true
oufalse
. La valeur par défaut est "NONE". - usePartialUpdate : Indique si les requêtes partielles doivent être utilisées (mises à jour plutôt que créées ou indexées, et autoriser les documents partiels) avec des requêtes Elasticsearch. La valeur par défaut est "false".
- bulkInsertMethod : Indique s'il faut utiliser
INDEX
(index, upsert autorisé) ouCREATE
(création, erreurs sur l'identifiant dupliqué) avec les requêtes groupées Elasticsearch. La valeur par défaut est CREATE. - trustSelfSignedCerts : indique si le certificat autosigné doit être approuvé ou non. Une instance Elasticsearch installée peut avoir un certificat autosigné. Activez cette option sur "True" pour contourner la validation du certificat SSL. La valeur par défaut est false.
- disableCertificateValidation : si la valeur est "true", approuve le certificat SSL autosigné. Une instance Elasticsearch peut avoir un certificat SSL autosigné. Pour contourner la validation du certificat, définissez ce paramètre sur "true". Valeur par défaut : "false".
- apiKeyKMSEncryptionKey : clé Cloud KMS permettant de déchiffrer la clé API. Ce paramètre doit être spécifié si apiKeySource est défini sur KMS. Si ce paramètre est fourni, la chaîne apiKey doit être transmise sous forme chiffrée. Chiffrez les paramètres à l'aide du point de terminaison de chiffrement de l'API KMS. La clé doit être au format projects/{gcp_project}/locations/{key_region}/keyRings/{key_ring}/cryptoKeys/{kms_key_name}. Voir : https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt (Exemple: projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name).
- apiKeySecretId : ID du secret fourni par Secret Manager pour l'apiKey. Ce paramètre doit être spécifié si apiKeySource est défini sur SECRET_MANAGER. Doit être au format projects/{project}/secrets/{secret}/versions/{secret_version}. (par exemple, projects/votre-id-projet/secrets/votre-secret/versions/votre-version-secret).
- apiKeySource : source de la clé API. Valeurs possibles : PLAINTEXT, KMS ou SECRET_MANAGER. Ce paramètre doit être spécifié si Secret Manager ou KMS est utilisé. Si apiKeySource est défini sur KMS, apiKeyKMSEncryptionKey et l'apiKey chiffrée doivent être fournis. Si apiKeySource est défini sur SECRET_MANAGER, apiKeySecretId doit être fourni. Si apiKeySource est défini sur PLAINTEXT, apiKey doit être fourni. La valeur par défaut est PLAINTEXT.
- javascriptTextTransformGcsPath : URI Cloud Storage du fichier .js qui définit la fonction JavaScript définie par l'utilisateur (UDF) à utiliser. (Exemple: gs://my-bucket/my-udfs/my_file.js).
- javascriptTextTransformFunctionName : nom de la fonction JavaScript définie par l'utilisateur à utiliser. Par exemple, si le code de votre fonction JavaScript est
myTransform(inJson) { /*...do stuff...*/ }
, le nom de la fonction estmyTransform
. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la section https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples.
Fonctions définies par l'utilisateur
Ce modèle accepte les fonctions définies par l'utilisateur (UDF) en plusieurs points du pipeline, décrits ci-dessous. Pour en savoir plus, consultez la page Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.
Fonction d'index
Renvoie l'index auquel le document appartient.
Paramètres de modèle :
javaScriptIndexFnGcsPath
: URI Cloud Storage du fichier JavaScript.javaScriptIndexFnName
: nom de la fonction JavaScript.
Spécification de la fonction :
- Entrée : document Elasticsearch, sérialisé en tant que chaîne JSON.
- Résultat : valeur du champ de métadonnées
_index
du document.
Fonction d'ID de document
Renvoie l'ID du document.
Paramètres de modèle :
javaScriptIdFnGcsPath
: URI Cloud Storage du fichier JavaScript.javaScriptIdFnName
: nom de la fonction JavaScript.
Spécification de la fonction :
- Entrée : document Elasticsearch, sérialisé en tant que chaîne JSON.
- Résultat : valeur du champ de métadonnées
_id
du document.
Fonction de suppression de document
Spécifie si un document doit être supprimé. Pour utiliser cette fonction, définissez le mode d'insertion groupée sur INDEX
et spécifiez une fonction d'ID de document.
Paramètres de modèle :
javaScriptIsDeleteFnGcsPath
: URI Cloud Storage du fichier JavaScript.javaScriptIsDeleteFnName
: nom de la fonction JavaScript.
Spécification de la fonction :
- Entrée : document Elasticsearch, sérialisé en tant que chaîne JSON.
- Résultat : renvoie la chaîne
"true"
pour supprimer le document, ou la chaîne"false"
pour appliquer une opération upsert sur le document.
Fonction de type de mappage
Renvoie le type de mappage du document.
Paramètres de modèle :
javaScriptTypeFnGcsPath
: URI Cloud Storage du fichier JavaScript.javaScriptTypeFnName
: nom de la fonction JavaScript.
Spécification de la fonction :
- Entrée : document Elasticsearch, sérialisé en tant que chaîne JSON.
- Résultat : valeur du champ de métadonnées
_type
du document.
Exécuter le modèle
Console
- Accédez à la page Dataflow Créer un job à partir d'un modèle. Accéder à la page Créer un job à partir d'un modèle
- Dans le champ Nom du job, saisissez un nom de job unique.
- Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est
us-central1
.Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.
- Dans le menu déroulant Modèle Dataflow, sélectionnez the BigQuery to Elasticsearch template.
- Dans les champs fournis, saisissez vos valeurs de paramètres.
- Cliquez sur Run Job (Exécuter la tâche).
gcloud
Dans le shell ou le terminal, exécutez le modèle :
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_Elasticsearch \ --parameters \ inputTableSpec=INPUT_TABLE_SPEC,\ connectionUrl=CONNECTION_URL,\ apiKey=APIKEY,\ index=INDEX
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google Cloud dans lequel vous souhaitez exécuter le job DataflowJOB_NAME
: nom de job unique de votre choixREGION_NAME
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
INPUT_TABLE_SPEC
: nom de votre table BigQuery.CONNECTION_URL
: URL Elasticsearch.APIKEY
: clé API encodée en base64 pour l'authentification.INDEX
: votre index Elasticsearch.
API
Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputTableSpec": "INPUT_TABLE_SPEC", "connectionUrl": "CONNECTION_URL", "apiKey": "APIKEY", "index": "INDEX" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_Elasticsearch", } }
Remplacez les éléments suivants :
PROJECT_ID
: ID du projet Google Cloud dans lequel vous souhaitez exécuter le job DataflowJOB_NAME
: nom de job unique de votre choixLOCATION
: région dans laquelle vous souhaitez déployer votre job Dataflow, par exempleus-central1
VERSION
: version du modèle que vous souhaitez utiliserVous pouvez utiliser les valeurs suivantes :
latest
pour utiliser la dernière version du modèle, disponible dans le dossier parent non daté du bucket gs://dataflow-templates-REGION_NAME/latest/- Le nom de la version, par exemple
2023-09-12-00_RC00
, pour utiliser une version spécifique du modèle, qui est imbriqué dans le dossier parent daté respectif dans le bucket : gs://dataflow-templates-REGION_NAME/
INPUT_TABLE_SPEC
: nom de votre table BigQuery.CONNECTION_URL
: URL Elasticsearch.APIKEY
: clé API encodée en base64 pour l'authentification.INDEX
: votre index Elasticsearch.
Étapes suivantes
- Apprenez-en plus sur les modèles Dataflow.
- Consultez la liste des modèles fournis par Google.