Il modello da Cloud Storage a Elasticsearch è una pipeline batch che legge i dati dai file CSV archiviati in un bucket Cloud Storage e li scrive in Elasticsearch come documenti JSON.
Requisiti della pipeline
- Il bucket Cloud Storage deve esistere.
- Deve esistere un host Elasticsearch su un'istanza Google Cloud o su Elasticsearch Cloud accessibile da Dataflow.
- Deve esistere una tabella BigQuery per l'output di errore.
Schema CSV
Se i file CSV contengono intestazioni, imposta il parametro del modello containsHeaders
su true
.
In caso contrario, crea un file schema JSON che descriva i dati. Specifica l'URI Cloud Storage del file dello schema nel parametro jsonSchemaPath
del modello. L'esempio seguente mostra uno schema JSON:
[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]
In alternativa, puoi fornire una funzione definita dall'utente;utente (UDF) che analizzi il testo CSV e generi documenti Elasticsearch.
Parametri del modello
Parametri obbligatori
- deadletterTable: la tabella BigQuery per i messaggi non recapitabili a cui inviare gli inserimenti non riusciti. Ad esempio,
your-project:your-dataset.your-table-name
. - inputFileSpec: il pattern di file Cloud Storage per la ricerca dei file CSV. Ad esempio,
gs://mybucket/test-*.csv
. - connectionUrl: l'URL di Elasticsearch nel formato
https://hostname:[port]
. Se utilizzi Elastic Cloud, specifica il CloudID. Ad esempio:https://elasticsearch-host:9200
. - apiKey: la chiave API codificata in Base64 da utilizzare per l'autenticazione.
- index: l'indice Elasticsearch a cui vengono inviate le richieste. Ad esempio,
my-index
.
Parametri facoltativi
- inputFormat: il formato del file di input. Il valore predefinito è
CSV
. - containsHeaders: i file CSV di input contengono un record di intestazione (true/false). Obbligatorio solo se si leggono file CSV. Il valore predefinito è false.
- delimiter: il delimitatore di colonna dei file di testo di input. Valore predefinito:
,
, ad esempio,
. - csvFormat: specifica del formato CSV da utilizzare per l'analisi dei record. Il valore predefinito è
Default
. Per ulteriori dettagli, visita la pagina https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html. Deve corrispondere esattamente ai nomi dei formati disponibili all'indirizzo: https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.Predefined.html. - jsonSchemaPath: il percorso allo schema JSON. Il valore predefinito è
null
. Ad esempio:gs://path/to/schema
. - largeNumFiles: impostato su true se il numero di file è compreso tra le decine di migliaia. Il valore predefinito è
false
. - csvFileEncoding: il formato di codifica dei caratteri del file CSV. I valori consentiti sono
US-ASCII
,ISO-8859-1
,UTF-8
eUTF-16
. Il valore predefinito è UTF-8. - logDetailedCsvConversionErrors: impostato su
true
per attivare la registrazione dettagliata degli errori quando l'analisi del file CSV non va a buon fine. Tieni presente che questa operazione potrebbe esporre dati sensibili nei log (ad esempio se il file CSV contiene password). Valore predefinito:false
. - elasticsearchUsername: il nome utente di Elasticsearch con cui eseguire l'autenticazione. Se specificato, il valore di
apiKey
viene ignorato. - elasticsearchPassword: la password di Elasticsearch con cui eseguire l'autenticazione. Se specificato, il valore di
apiKey
viene ignorato. - batchSize: le dimensioni del batch in numero di documenti. Il valore predefinito è
1000
. - batchSizeBytes: le dimensioni del batch in numero di byte. Il valore predefinito è
5242880
(5 MB). - maxRetryAttempts: il numero massimo di nuovi tentativi. Deve essere maggiore di zero. Il valore predefinito è
no retries
. - maxRetryDuration: la durata massima dei nuovi tentativi in millisecondi. Deve essere maggiore di zero. Il valore predefinito è
no retries
. - propertyAsIndex: la proprietà nel documento sottoposto a indicizzazione il cui valore specifica i metadati
_index
da includere con il documento nelle richieste collettive. Ha la precedenza su una UDF_index
. Il valore predefinito ènone
. - javaScriptIndexFnGcsPath: il percorso Cloud Storage dell'origine della funzione JavaScript UDF per una funzione che specifica i metadati
_index
da includere con il documento nelle richieste collettive. Il valore predefinito ènone
. - javaScriptIndexFnName: il nome della funzione JavaScript UDF che specifica i metadati
_index
da includere con il documento nelle richieste collettive. Il valore predefinito ènone
. - propertyAsId: una proprietà del documento sottoposto a indicizzazione il cui valore specifica i metadati
_id
da includere con il documento nelle richieste collettive. Ha la precedenza su una UDF_id
. Il valore predefinito ènone
. - javaScriptIdFnGcsPath: il percorso Cloud Storage dell'origine della funzione JavaScript UDF per la funzione che specifica i metadati
_id
da includere con il documento nelle richieste collettive. Il valore predefinito ènone
. - javaScriptIdFnName: il nome della funzione JavaScript UDF che specifica i metadati
_id
da includere con il documento nelle richieste collettive. Il valore predefinito ènone
. - javaScriptTypeFnGcsPath: il percorso Cloud Storage all'origine della funzione JavaScript UDF per una funzione che specifica i metadati
_type
da includere con i documenti nelle richieste collettive. Il valore predefinito ènone
. - javaScriptTypeFnName: il nome della funzione JavaScript UDF che specifica i metadati
_type
da includere con il documento nelle richieste collettive. Il valore predefinito ènone
. - javaScriptIsDeleteFnGcsPath: il percorso Cloud Storage all'origine della funzione JavaScript definita dall'utente che determina se eliminare il documento anziché inserirlo o aggiornarlo. La funzione restituisce un valore di stringa
true
ofalse
. Il valore predefinito ènone
. - javaScriptIsDeleteFnName: il nome della funzione JavaScript UDF che determina se eliminare il documento anziché inserirlo o aggiornarlo. La funzione restituisce un valore di stringa
true
ofalse
. Il valore predefinito ènone
. - usePartialUpdate: indica se utilizzare aggiornamenti parziali (aggiornamento anziché creazione o indicizzazione, consentendo documenti parziali) con le richieste Elasticsearch. Il valore predefinito è
false
. - bulkInsertMethod: indica se utilizzare
INDEX
(indice, consente gli upsert) oCREATE
(crea, errori su _id duplicati) con le richieste collettive di Elasticsearch. Il valore predefinito èCREATE
. - trustSelfSignedCerts: indica se il certificato autofirmato deve essere considerato attendibile o meno. Un'istanza Elasticsearch installata potrebbe avere un certificato autofirmato. Imposta questa opzione su true per bypassare la convalida del certificato SSL. (il valore predefinito è
false
). - disableCertificateValidation: se
true
, considera attendibile il certificato SSL autofirmato. Un'istanza Elasticsearch potrebbe avere un certificato autofirmato. Per ignorare la convalida del certificato, imposta questo parametro sutrue
. Il valore predefinito èfalse
. - apiKeyKMSEncryptionKey: la chiave Cloud KMS per decriptare la chiave API. Questo parametro è obbligatorio se
apiKeySource
è impostato suKMS
. Se viene fornito questo parametro, passa una stringaapiKey
criptata. Crittografa i parametri utilizzando l'endpoint di crittografia dell'API KMS. Per la chiave, utilizza il formatoprojects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>
. Consulta: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt Ad esempio,projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name
. - apiKeySecretId: l'ID secret di Secret Manager per l'apiKey. Se
apiKeySource
è impostato suSECRET_MANAGER
, fornisci questo parametro. Utilizza il formatoprojects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example,
projects/your-project-id/secrets/your-secret/versions/your-secret-version`. - apiKeySource: l'origine della chiave API. I valori consentiti sono
PLAINTEXT
,KMS
eSECRET_MANAGER
. Questo parametro è obbligatorio quando utilizzi Secret Manager o KMS. SeapiKeySource
è impostato suKMS
, devono essere fornitiapiKeyKMSEncryptionKey
e l'apiKey criptato. SeapiKeySource
è impostato suSECRET_MANAGER
, deve essere fornitoapiKeySecretId
. SeapiKeySource
è impostato suPLAINTEXT
, deve essere fornitoapiKey
. Valore predefinito: PLAINTEXT. - socketTimeout: se impostato, sovrascrive il timeout massimo per i tentativi e il timeout del socket predefiniti (30000 ms) in Elastic RestClient.
- javascriptTextTransformGcsPath: l'URI Cloud Storage del file .js che definisce la funzione JavaScript definita dall'utente (UDF) da utilizzare. Ad esempio,
gs://my-bucket/my-udfs/my_file.js
. - javascriptTextTransformFunctionName: il nome della funzione definita dall'utente (UDF) JavaScript da utilizzare. Ad esempio, se il codice della funzione JavaScript è
myTransform(inJson) { /*...do stuff...*/ }
, il nome della funzione èmyTransform
. Per esempi di funzioni JavaScript definite dall'utente, consulta Esempi di funzioni UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
Funzioni definite dall'utente
Questo modello supporta le funzioni definite dall'utente (UDF) in diversi punti della pipeline, descritti di seguito. Per ulteriori informazioni, consulta Creare funzioni predefinite dall'utente per i modelli Dataflow.
Funzione di trasformazione del testo
Trasforma i dati CSV in un documento Elasticsearch.
Parametri del modello:
javascriptTextTransformGcsPath
: l'URI Cloud Storage del file JavaScript.javascriptTextTransformFunctionName
: il nome della funzione JavaScript.
Specifiche della funzione:
- Input: una singola riga di un file CSV di input.
- Output: un documento JSON con stringa da inserire in Elasticsearch.
Funzione di indice
Restituisce l'indice a cui appartiene il documento.
Parametri del modello:
javaScriptIndexFnGcsPath
: l'URI Cloud Storage del file JavaScript.javaScriptIndexFnName
: il nome della funzione JavaScript.
Specifiche della funzione:
- Input: il documento Elasticsearch, serializzato come stringa JSON.
- Output: il valore del campo dei metadati
_index
del documento.
Funzione ID documento
Restituisce l'ID documento.
Parametri del modello:
javaScriptIdFnGcsPath
: l'URI Cloud Storage del file JavaScript.javaScriptIdFnName
: il nome della funzione JavaScript.
Specifiche della funzione:
- Input: il documento Elasticsearch, serializzato come stringa JSON.
- Output: il valore del campo dei metadati
_id
del documento.
Funzione di eliminazione dei documenti
Specifica se eliminare un documento. Per utilizzare questa funzione, imposta la modalità di inserimento collettivo su INDEX
e fornisci una funzione ID documento.
Parametri del modello:
javaScriptIsDeleteFnGcsPath
: l'URI Cloud Storage del file JavaScript.javaScriptIsDeleteFnName
: il nome della funzione JavaScript.
Specifiche della funzione:
- Input: il documento Elasticsearch, serializzato come stringa JSON.
- Output: restituisce la stringa
"true"
per eliminare il documento o"false"
per eseguire l'upsert del documento.
Funzione di tipo di mappatura
Restituisce il tipo di mappatura del documento.
Parametri del modello:
javaScriptTypeFnGcsPath
: l'URI Cloud Storage del file JavaScript.javaScriptTypeFnName
: il nome della funzione JavaScript.
Specifiche della funzione:
- Input: il documento Elasticsearch, serializzato come stringa JSON.
- Output: il valore del campo dei metadati
_type
del documento.
Esegui il modello
Console
- Vai alla pagina Crea job da modello di Dataflow. Vai a Crea job da modello
- Nel campo Nome job, inserisci un nome univoco per il job.
- (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è
us-central1
.Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.
- Nel menu a discesa Modello di flusso di dati, seleziona the Cloud Storage to Elasticsearch template.
- Nei campi dei parametri forniti, inserisci i valori dei parametri.
- Fai clic su Esegui job.
gcloud
Nella shell o nel terminale, esegui il modello:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID\ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \ --parameters \ inputFileSpec=INPUT_FILE_SPEC,\ connectionUrl=CONNECTION_URL,\ apiKey=APIKEY,\ index=INDEX,\ deadletterTable=DEADLETTER_TABLE,\
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome di job univoco a tua sceltaVERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale datata nel bucket: gs://dataflow-templates-REGION_NAME/
REGION_NAME
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
INPUT_FILE_SPEC
: il pattern dei file di Cloud Storage.CONNECTION_URL
: il tuo URL Elasticsearch.APIKEY
: la chiave API codificata in base64 per l'autenticazione.INDEX
: l'indice Elasticsearch.DEADLETTER_TABLE
: la tua tabella BigQuery.
API
Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFileSpec": "INPUT_FILE_SPEC", "connectionUrl": "CONNECTION_URL", "apiKey": "APIKEY", "index": "INDEX", "deadletterTable": "DEADLETTER_TABLE" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch", } }
Sostituisci quanto segue:
PROJECT_ID
: l'ID del progetto Google Cloud in cui vuoi eseguire il job DataflowJOB_NAME
: un nome di job univoco a tua sceltaVERSION
: la versione del modello che vuoi utilizzarePuoi utilizzare i seguenti valori:
latest
per utilizzare la versione più recente del modello, disponibile nella cartella principale senza data del bucket: gs://dataflow-templates-REGION_NAME/latest/- il nome della versione, ad esempio
2023-09-12-00_RC00
, per utilizzare una versione specifica del modello, che si trova nidificata nella rispettiva cartella principale datata nel bucket: gs://dataflow-templates-REGION_NAME/
LOCATION
: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempious-central1
INPUT_FILE_SPEC
: il pattern dei file di Cloud Storage.CONNECTION_URL
: il tuo URL Elasticsearch.APIKEY
: la chiave API codificata in base64 per l'autenticazione.INDEX
: l'indice Elasticsearch.DEADLETTER_TABLE
: la tua tabella BigQuery.
Passaggi successivi
- Scopri di più sui modelli Dataflow.
- Consulta l'elenco dei modelli forniti da Google.