Il modello da Cloud Storage a Elasticsearch è una pipeline batch che legge i dati dai file CSV archiviati in un bucket Cloud Storage e li scrive in Elasticsearch come documenti JSON.
Requisiti della pipeline
- Il bucket Cloud Storage deve esistere.
- Deve esistere un host Elasticsearch su un'istanza Google Cloud o su Elasticsearch Cloud accessibile da Dataflow.
- Deve esistere una tabella BigQuery per l'output dell'errore.
Schema CSV
Se i file CSV contengono intestazioni, imposta il parametro del modello containsHeaders
su true
.
In caso contrario, crea un file di schema JSON che descriva i dati. Specifica l'URI Cloud Storage del file di schema nel parametro del modello jsonSchemaPath
. L'esempio seguente mostra uno schema JSON:
[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]
In alternativa, puoi fornire una funzione definita dall'utente;utente (UDF) che analizza il testo del file CSV e restituisce i documenti Elasticsearch.
Parametri del modello
Parametro | Descrizione |
---|---|
inputFileSpec |
Il pattern dei file di Cloud Storage per cercare i file CSV. Esempio: gs://mybucket/test-*.csv . |
connectionUrl |
URL Elasticsearch nel formato https://hostname:[port] oppure specifica CloudID se utilizzi Elastic Cloud. |
apiKey |
Chiave API codificata in Base64 utilizzata per l'autenticazione. |
index |
L'indice Elasticsearch verso il quale verranno inviate le richieste, ad esempio my-index . |
deadletterTable |
La tabella BigQuery a cui inviare gli inserti non riusciti. Esempio: <your-project>:<your-dataset>.<your-table-name> . |
containsHeaders |
(Facoltativo) Valore booleano per indicare se le intestazioni sono incluse nel file CSV. Valore predefinito false . |
delimiter |
(Facoltativo) Il delimitatore utilizzato nel file CSV. Esempio: , |
csvFormat |
(Facoltativo) Il formato CSV secondo il formato CSV Apache Commons. Valore predefinito: Default . |
jsonSchemaPath |
(Facoltativo) Il percorso dello schema JSON. Valore predefinito: null . |
largeNumFiles |
(Facoltativo) Imposta il valore su true se il numero di file è pari a decine di migliaia. Valore predefinito: false . |
javascriptTextTransformGcsPath |
(Facoltativo)
L'URI Cloud Storage del file .js che definisce la funzione JavaScript definita dall'utente che vuoi utilizzare. Ad esempio: gs://my-bucket/my-udfs/my_file.js .
|
javascriptTextTransformFunctionName |
(Facoltativo)
Il nome della funzione definita dall'utente di JavaScript che vuoi utilizzare.
Ad esempio, se il codice della funzione JavaScript è
myTransform(inJson) { /*...do stuff...*/ } , il nome della funzione è
myTransform . Per esempi di funzioni JavaScript definite, consulta gli esempi di funzioni definite dall'utente.
|
batchSize |
(Facoltativo) Dimensioni del batch in numero di documenti. Valore predefinito: 1000 . |
batchSizeBytes |
(Facoltativo) Dimensioni del batch in numero di byte. Valore predefinito: 5242880 (5 MB). |
maxRetryAttempts |
(Facoltativo) Il numero massimo di tentativi. Deve essere maggiore di 0. Valore predefinito: nessun nuovo tentativo. |
maxRetryDuration |
(Facoltativo) Durata massima nuovo tentativo in millisecondi, deve essere maggiore di 0. Valore predefinito: nessun nuovo tentativo. |
csvFileEncoding |
(Facoltativo) Codifica dei file CSV. |
propertyAsIndex |
(Facoltativo) Una proprietà del documento da indicizzare il cui valore specifica i metadati _index da includere nel documento nella richiesta collettiva (ha la precedenza su una funzione definita dall'utente _index ). Valore predefinito: nessuno. |
propertyAsId |
(Facoltativo) Una proprietà del documento da indicizzare il cui valore specifica i metadati _id da includere nel documento nella richiesta collettiva (ha la precedenza su una funzione definita dall'utente _id ). Valore predefinito: nessuno. |
javaScriptIndexFnGcsPath |
(Facoltativo) Il percorso Cloud Storage dell'origine della funzione JavaScript definita dall'utente per una funzione che specificherà i metadati _index da includere nel documento nella richiesta in blocco. Valore predefinito: nessuno. |
javaScriptIndexFnName |
(Facoltativo) Nome della funzione JavaScript della funzione definita dall'utente che specifica i metadati _index da includere nel documento nella richiesta collettiva. Valore predefinito: nessuno. |
javaScriptIdFnGcsPath |
(Facoltativo) Il percorso Cloud Storage dell'origine della funzione JavaScript definita dall'utente per una funzione che specificherà i metadati _id da includere nel documento nella richiesta in blocco. Valore predefinito: nessuno. |
javaScriptIdFnName |
(Facoltativo) Nome della funzione JavaScript della funzione definita dall'utente che specifica i metadati _id da includere nel documento nella richiesta collettiva. Valore predefinito: nessuno. |
javaScriptTypeFnGcsPath |
(Facoltativo) Il percorso Cloud Storage dell'origine della funzione JavaScript definita dall'utente per una funzione che specificherà i metadati _type da includere nel documento nella richiesta in blocco. Valore predefinito: nessuno. |
javaScriptTypeFnName |
(Facoltativo) Nome della funzione JavaScript della funzione definita dall'utente che specifica i metadati _type da includere nel documento nella richiesta collettiva. Valore predefinito: nessuno. |
javaScriptIsDeleteFnGcsPath |
(Facoltativo) Percorso Cloud Storage dell'origine della funzione JavaScript definita dall'utente per la funzione che determinerà se il documento deve essere eliminato anziché inserito o aggiornato. La funzione deve restituire il valore stringa "true" o "false" . Valore predefinito: nessuno. |
javaScriptIsDeleteFnName |
(Facoltativo) Nome della funzione JavaScript della funzione definita dall'utente che determinerà se il documento deve essere eliminato anziché inserito o aggiornato. La funzione deve restituire il valore stringa "true" o "false" . Valore predefinito: nessuno. |
usePartialUpdate |
(Facoltativo) Indica se utilizzare aggiornamenti parziali (aggiornare anziché creare o indicizzare, consentendo documenti parziali) con le richieste Elasticsearch. Valore predefinito: false . |
bulkInsertMethod |
(Facoltativo) Indica se utilizzare INDEX (indice, consente upsert) o CREATE (creazione, errori su _id duplicati) con richieste collettive Elasticsearch. Valore predefinito: CREATE . |
Funzioni definite dall'utente
Questo modello supporta le funzioni definite dall'utente in più punti della pipeline, descritta di seguito. Per ulteriori informazioni, consulta Creare funzioni definite dall'utente per i modelli Dataflow.
Funzione di trasformazione del testo
Trasforma i dati CSV in un documento Elasticsearch.
Parametri del modello:
javascriptTextTransformGcsPath
: l'URI Cloud Storage del file JavaScript.javascriptTextTransformFunctionName
: il nome della funzione JavaScript.
Specifica della funzione:
- Input: una singola riga di un file CSV di input.
- Output: un documento JSON stringato da inserire in Elasticsearch.
Funzione di indice
Restituisce l'indice a cui appartiene il documento.
Parametri del modello:
javaScriptIndexFnGcsPath
: l'URI Cloud Storage del file JavaScript.javaScriptIndexFnName
: il nome della funzione JavaScript.
Specifica della funzione:
- Input: il documento Elasticsearch, serializzato come stringa JSON.
- Output: il valore del campo dei metadati
_index
del documento.
Funzione ID documento
Restituisce l'ID documento.
Parametri del modello:
javaScriptIdFnGcsPath
: l'URI Cloud Storage del file JavaScript.javaScriptIdFnName
: il nome della funzione JavaScript.
Specifica della funzione:
- Input: il documento Elasticsearch, serializzato come stringa JSON.
- Output: il valore del campo dei metadati
_id
del documento.
Funzione di eliminazione documenti
Specifica se eliminare un documento. Per utilizzare questa funzione, imposta la modalità di inserimento
collettivo su INDEX
e fornisci una
funzione ID documento.
Parametri del modello:
javaScriptIsDeleteFnGcsPath
: l'URI Cloud Storage del file JavaScript.javaScriptIsDeleteFnName
: il nome della funzione JavaScript.
Specifica della funzione:
- Input: il documento Elasticsearch, serializzato come stringa JSON.
- Output: restituisci la stringa
"true"
per eliminare il documento o"false"
per eseguire l'upsert del documento.
Funzione di tipo mappatura
Restituisce il tipo di mappatura del documento.
Parametri del modello:
javaScriptTypeFnGcsPath
: l'URI Cloud Storage del file JavaScript.javaScriptTypeFnName
: il nome della funzione JavaScript.
Specifica della funzione:
- Input: il documento Elasticsearch, serializzato come stringa JSON.
- Output: il valore del campo dei metadati
_type
del documento.
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, vedi Località Dataflow.
- Nel menu a discesa Modello Dataflow, seleziona the Cloud Storage to Elasticsearch template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID\ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \ --parameters \ inputFileSpec=INPUT_FILE_SPEC,\ connectionUrl=CONNECTION_URL,\ apiKey=APIKEY,\ index=INDEX,\ deadletterTable=DEADLETTER_TABLE,\
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome job univoco a tua sceltaVERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella padre senza data del bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, come
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella padre con data all'interno del bucket: gs://dataflow-templates-REGION_NAME/
REGION_NAME
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
INPUT_FILE_SPEC
: il pattern del tuo file di Cloud Storage.CONNECTION_URL
: il tuo URL Elasticsearch.APIKEY
: la tua chiave API codificata in Base64 per l'autenticazione.INDEX
: il tuo indice Elasticsearch.DEADLETTER_TABLE
: la tua tabella BigQuery.
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFileSpec": "INPUT_FILE_SPEC", "connectionUrl": "CONNECTION_URL", "apiKey": "APIKEY", "index": "INDEX", "deadletterTable": "DEADLETTER_TABLE" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch", } }
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome job univoco a tua sceltaVERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella padre senza data del bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, come
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella padre con data all'interno del bucket: gs://dataflow-templates-REGION_NAME/
LOCATION
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
INPUT_FILE_SPEC
: il pattern del tuo file di Cloud Storage.CONNECTION_URL
: il tuo URL Elasticsearch.APIKEY
: la tua chiave API codificata in Base64 per l'autenticazione.INDEX
: il tuo indice Elasticsearch.DEADLETTER_TABLE
: la tua tabella BigQuery.
Passaggi successivi
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.