Modello Da Cloud Storage a Elasticsearch

Il modello da Cloud Storage a Elasticsearch è una pipeline batch che legge i dati dai file CSV archiviati in un bucket Cloud Storage e li scrive in Elasticsearch come documenti JSON.

Requisiti della pipeline

  • Il bucket Cloud Storage deve esistere.
  • Deve esistere un host Elasticsearch su un'istanza Google Cloud o su Elasticsearch Cloud accessibile da Dataflow.
  • Deve esistere una tabella BigQuery per l'output dell'errore.

Schema CSV

Se i file CSV contengono intestazioni, imposta il parametro del modello containsHeaders su true.

In caso contrario, crea un file di schema JSON che descriva i dati. Specifica l'URI Cloud Storage del file di schema nel parametro del modello jsonSchemaPath. L'esempio seguente mostra uno schema JSON:

[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]

In alternativa, puoi fornire una funzione definita dall'utente;utente (UDF) che analizza il testo del file CSV e restituisce i documenti Elasticsearch.

Parametri del modello

Parametro Descrizione
inputFileSpec Il pattern dei file di Cloud Storage per cercare i file CSV. Esempio: gs://mybucket/test-*.csv.
connectionUrl URL Elasticsearch nel formato https://hostname:[port] oppure specifica CloudID se utilizzi Elastic Cloud.
apiKey Chiave API codificata in Base64 utilizzata per l'autenticazione.
index L'indice Elasticsearch verso il quale verranno inviate le richieste, ad esempio my-index.
deadletterTable La tabella BigQuery a cui inviare gli inserti non riusciti. Esempio: <your-project>:<your-dataset>.<your-table-name>.
containsHeaders (Facoltativo) Valore booleano per indicare se le intestazioni sono incluse nel file CSV. Valore predefinito false.
delimiter (Facoltativo) Il delimitatore utilizzato nel file CSV. Esempio: ,
csvFormat (Facoltativo) Il formato CSV secondo il formato CSV Apache Commons. Valore predefinito: Default.
jsonSchemaPath (Facoltativo) Il percorso dello schema JSON. Valore predefinito: null.
largeNumFiles (Facoltativo) Imposta il valore su true se il numero di file è pari a decine di migliaia. Valore predefinito: false.
javascriptTextTransformGcsPath (Facoltativo) L'URI Cloud Storage del file .js che definisce la funzione JavaScript definita dall'utente che vuoi utilizzare. Ad esempio: gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Facoltativo) Il nome della funzione definita dall'utente di JavaScript che vuoi utilizzare. Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per esempi di funzioni JavaScript definite, consulta gli esempi di funzioni definite dall'utente.
batchSize (Facoltativo) Dimensioni del batch in numero di documenti. Valore predefinito: 1000.
batchSizeBytes (Facoltativo) Dimensioni del batch in numero di byte. Valore predefinito: 5242880 (5 MB).
maxRetryAttempts (Facoltativo) Il numero massimo di tentativi. Deve essere maggiore di 0. Valore predefinito: nessun nuovo tentativo.
maxRetryDuration (Facoltativo) Durata massima nuovo tentativo in millisecondi, deve essere maggiore di 0. Valore predefinito: nessun nuovo tentativo.
csvFileEncoding (Facoltativo) Codifica dei file CSV.
propertyAsIndex (Facoltativo) Una proprietà del documento da indicizzare il cui valore specifica i metadati _index da includere nel documento nella richiesta collettiva (ha la precedenza su una funzione definita dall'utente _index). Valore predefinito: nessuno.
propertyAsId (Facoltativo) Una proprietà del documento da indicizzare il cui valore specifica i metadati _id da includere nel documento nella richiesta collettiva (ha la precedenza su una funzione definita dall'utente _id). Valore predefinito: nessuno.
javaScriptIndexFnGcsPath (Facoltativo) Il percorso Cloud Storage dell'origine della funzione JavaScript definita dall'utente per una funzione che specificherà i metadati _index da includere nel documento nella richiesta in blocco. Valore predefinito: nessuno.
javaScriptIndexFnName (Facoltativo) Nome della funzione JavaScript della funzione definita dall'utente che specifica i metadati _index da includere nel documento nella richiesta collettiva. Valore predefinito: nessuno.
javaScriptIdFnGcsPath (Facoltativo) Il percorso Cloud Storage dell'origine della funzione JavaScript definita dall'utente per una funzione che specificherà i metadati _id da includere nel documento nella richiesta in blocco. Valore predefinito: nessuno.
javaScriptIdFnName (Facoltativo) Nome della funzione JavaScript della funzione definita dall'utente che specifica i metadati _id da includere nel documento nella richiesta collettiva. Valore predefinito: nessuno.
javaScriptTypeFnGcsPath (Facoltativo) Il percorso Cloud Storage dell'origine della funzione JavaScript definita dall'utente per una funzione che specificherà i metadati _type da includere nel documento nella richiesta in blocco. Valore predefinito: nessuno.
javaScriptTypeFnName (Facoltativo) Nome della funzione JavaScript della funzione definita dall'utente che specifica i metadati _type da includere nel documento nella richiesta collettiva. Valore predefinito: nessuno.
javaScriptIsDeleteFnGcsPath (Facoltativo) Percorso Cloud Storage dell'origine della funzione JavaScript definita dall'utente per la funzione che determinerà se il documento deve essere eliminato anziché inserito o aggiornato. La funzione deve restituire il valore stringa "true" o "false". Valore predefinito: nessuno.
javaScriptIsDeleteFnName (Facoltativo) Nome della funzione JavaScript della funzione definita dall'utente che determinerà se il documento deve essere eliminato anziché inserito o aggiornato. La funzione deve restituire il valore stringa "true" o "false". Valore predefinito: nessuno.
usePartialUpdate (Facoltativo) Indica se utilizzare aggiornamenti parziali (aggiornare anziché creare o indicizzare, consentendo documenti parziali) con le richieste Elasticsearch. Valore predefinito: false.
bulkInsertMethod (Facoltativo) Indica se utilizzare INDEX (indice, consente upsert) o CREATE (creazione, errori su _id duplicati) con richieste collettive Elasticsearch. Valore predefinito: CREATE.

Funzioni definite dall'utente

Questo modello supporta le funzioni definite dall'utente in più punti della pipeline, descritta di seguito. Per ulteriori informazioni, consulta Creare funzioni definite dall'utente per i modelli Dataflow.

Funzione di trasformazione del testo

Trasforma i dati CSV in un documento Elasticsearch.

Parametri del modello:

  • javascriptTextTransformGcsPath: l'URI Cloud Storage del file JavaScript.
  • javascriptTextTransformFunctionName: il nome della funzione JavaScript.

Specifica della funzione:

  • Input: una singola riga di un file CSV di input.
  • Output: un documento JSON stringato da inserire in Elasticsearch.

Funzione di indice

Restituisce l'indice a cui appartiene il documento.

Parametri del modello:

  • javaScriptIndexFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptIndexFnName: il nome della funzione JavaScript.

Specifica della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: il valore del campo dei metadati _index del documento.

Funzione ID documento

Restituisce l'ID documento.

Parametri del modello:

  • javaScriptIdFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptIdFnName: il nome della funzione JavaScript.

Specifica della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: il valore del campo dei metadati _id del documento.

Funzione di eliminazione documenti

Specifica se eliminare un documento. Per utilizzare questa funzione, imposta la modalità di inserimento collettivo su INDEX e fornisci una funzione ID documento.

Parametri del modello:

  • javaScriptIsDeleteFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptIsDeleteFnName: il nome della funzione JavaScript.

Specifica della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: restituisci la stringa "true" per eliminare il documento o "false" per eseguire l'upsert del documento.

Funzione di tipo mappatura

Restituisce il tipo di mappatura del documento.

Parametri del modello:

  • javaScriptTypeFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptTypeFnName: il nome della funzione JavaScript.

Specifica della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: il valore del campo dei metadati _type del documento.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, vedi Località Dataflow.

  5. Nel menu a discesa Modello Dataflow, seleziona the Cloud Storage to Elasticsearch template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID\
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \
    --parameters \
inputFileSpec=INPUT_FILE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX,\
deadletterTable=DEADLETTER_TABLE,\

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • INPUT_FILE_SPEC: il pattern del tuo file di Cloud Storage.
  • CONNECTION_URL: il tuo URL Elasticsearch.
  • APIKEY: la tua chiave API codificata in Base64 per l'autenticazione.
  • INDEX: il tuo indice Elasticsearch.
  • DEADLETTER_TABLE: la tua tabella BigQuery.

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileSpec": "INPUT_FILE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX",
          "deadletterTable": "DEADLETTER_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch",
   }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • INPUT_FILE_SPEC: il pattern del tuo file di Cloud Storage.
  • CONNECTION_URL: il tuo URL Elasticsearch.
  • APIKEY: la tua chiave API codificata in Base64 per l'autenticazione.
  • INDEX: il tuo indice Elasticsearch.
  • DEADLETTER_TABLE: la tua tabella BigQuery.

Passaggi successivi