Modello Cloud Storage to Elasticsearch

Il modello da Cloud Storage a Elasticsearch è una pipeline batch che legge i dati dai file CSV archiviati in un bucket Cloud Storage e li scrive in Elasticsearch come documenti JSON.

Requisiti della pipeline

  • Il bucket Cloud Storage deve esistere.
  • Deve esistere un host Elasticsearch su un'istanza Google Cloud o su Elasticsearch Cloud accessibile da Dataflow.
  • Deve esistere una tabella BigQuery per l'output di errore.

Schema CSV

Se i file CSV contengono intestazioni, imposta il parametro del modello containsHeaders su true.

In caso contrario, crea un file schema JSON che descriva i dati. Specifica l'URI Cloud Storage del file dello schema nel parametro jsonSchemaPath del modello. L'esempio seguente mostra uno schema JSON:

[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]

In alternativa, puoi fornire una funzione definita dall'utente;utente (UDF) che analizza il testo CSV e genera documenti Elasticsearch.

Parametri del modello

Parametri obbligatori

  • deadletterTable : la tabella BigQuery per i messaggi non recapitabili a cui inviare gli inserimenti non riusciti. (esempio: tuo-progetto:tuo-set-di-dati.nome-tabella).
  • inputFileSpec : il pattern di file Cloud Storage per la ricerca dei file CSV. Esempio: gs://mybucket/test-*.csv.
  • connectionUrl : l'URL di Elasticsearch nel formato https://hostname:[port]. Se utilizzi Elastic Cloud, specifica il CloudID. (esempio: https://elasticsearch-host:9200).
  • apiKey : la chiave API codificata in Base64 da utilizzare per l'autenticazione.
  • index : l'indice Elasticsearch a cui vengono inviate le richieste, ad esempio my-index. (esempio: my-index).

Parametri facoltativi

  • inputFormat : formato del file di input. Il valore predefinito è CSV.
  • containsHeaders : i file CSV di input contengono un record di intestazione (true/false). Obbligatorio solo se si leggono file CSV. Il valore predefinito è false.
  • delimiter : il delimitatore di colonna dei file di testo di input. Valore predefinito: utilizza il delimitatore specificato in csvFormat (ad es. ,).
  • csvFormat : specifica del formato CSV da utilizzare per l'analisi dei record. Il valore predefinito è: Predefinito. Per ulteriori dettagli, visita la pagina https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html. Deve corrispondere esattamente ai nomi dei formati disponibili all'indirizzo: https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.Predefined.html.
  • jsonSchemaPath : il percorso allo schema JSON. Il valore predefinito è null. (esempio: gs://path/to/schema).
  • largeNumFiles : impostato su true se il numero di file è compreso tra le decine di migliaia. Il valore predefinito è false.
  • csvFileEncoding : il formato di codifica dei caratteri del file CSV. I valori consentiti sono US-ASCII, ISO-8859-1, UTF-8 e UTF-16. Il valore predefinito è UTF-8.
  • logDetailedCsvConversionErrors : impostato su true per attivare la registrazione dettagliata degli errori quando l'analisi del file CSV non va a buon fine. Tieni presente che questa operazione potrebbe esporre dati sensibili nei log (ad esempio se il file CSV contiene password). Valore predefinito: false.
  • elasticsearchUsername : il nome utente di Elasticsearch con cui eseguire l'autenticazione. Se specificato, il valore di "apiKey" viene ignorato.
  • elasticsearchPassword : la password di Elasticsearch con cui eseguire l'autenticazione. Se specificato, il valore di "apiKey" viene ignorato.
  • batchSize : le dimensioni del batch in numero di documenti. Il valore predefinito è 1000.
  • batchSizeBytes : le dimensioni del batch in numero di byte. Il valore predefinito è 5242880 (5 MB).
  • maxRetryAttempts : il numero massimo di nuovi tentativi. Deve essere maggiore di zero. Il valore predefinito è: nessun nuovo tentativo.
  • maxRetryDuration : la durata massima dei nuovi tentativi in millisecondi. Deve essere maggiore di zero. Il valore predefinito è: nessun nuovo tentativo.
  • propertyAsIndex : la proprietà nel documento sottoposto a indicizzazione il cui valore specifica i metadati _index da includere con il documento nelle richieste collettive. Ha la precedenza su una UDF _index. Il valore predefinito è: nessuno.
  • javaScriptIndexFnGcsPath : il percorso Cloud Storage all'origine della funzione JavaScript UDF per una funzione che specifica i metadati _index da includere con il documento nelle richieste collettive. Il valore predefinito è: nessuno.
  • javaScriptIndexFnName : il nome della funzione JavaScript UDF che specifica i metadati _index da includere con il documento nelle richieste collettive. Il valore predefinito è: nessuno.
  • propertyAsId : una proprietà del documento sottoposto a indicizzazione il cui valore specifica i metadati _id da includere con il documento nelle richieste collettive. Ha la precedenza su una UDF _id. Il valore predefinito è: nessuno.
  • javaScriptIdFnGcsPath : il percorso Cloud Storage dell'origine della funzione JavaScript UDF per la funzione che specifica i metadati _id da includere con il documento nelle richieste collettive. Il valore predefinito è: nessuno.
  • javaScriptIdFnName : il nome della funzione JavaScript UDF che specifica i metadati _id da includere con il documento nelle richieste collettive. Il valore predefinito è: nessuno.
  • javaScriptTypeFnGcsPath : il percorso Cloud Storage all'origine della funzione JavaScript UDF per una funzione che specifica i metadati _type da includere con i documenti nelle richieste collettive. Valore predefinito: nessuno.
  • javaScriptTypeFnName : il nome della funzione JavaScript UDF che specifica i metadati _type da includere con il documento nelle richieste collettive. Il valore predefinito è: nessuno.
  • javaScriptIsDeleteFnGcsPath : il percorso Cloud Storage all'origine della funzione JavaScript UDF che determina se eliminare il documento anziché inserirlo o aggiornarlo. La funzione restituisce un valore di stringa true o false. Il valore predefinito è: nessuno.
  • javaScriptIsDeleteFnName : il nome della funzione JavaScript UDF che determina se eliminare il documento anziché inserirlo o aggiornarlo. La funzione restituisce un valore di stringa true o false. Il valore predefinito è: nessuno.
  • usePartialUpdate : indica se utilizzare aggiornamenti parziali (aggiornamento anziché creazione o indicizzazione, consentendo documenti parziali) con le richieste Elasticsearch. Il valore predefinito è false.
  • bulkInsertMethod : indica se utilizzare INDEX (indice, consente gli upsert) o CREATE (crea, errori su _id duplicati) con le richieste collettive di Elasticsearch. Il valore predefinito è: CREA.
  • trustSelfSignedCerts : indica se il certificato autofirmato deve essere considerato attendibile o meno. Un'istanza Elasticsearch installata potrebbe avere un certificato autofirmato. Imposta questa opzione su True per bypassare la convalida del certificato SSL. (il valore predefinito è False).
  • disableCertificateValidation : se "true", considera attendibile il certificato SSL autofirmato. Un'istanza Elasticsearch potrebbe avere un certificato autofirmato. Per ignorare la convalida del certificato, imposta questo parametro su "true". Valore predefinito: false.
  • apiKeyKMSEncryptionKey : la chiave Cloud KMS per decriptare la chiave API. Questo parametro deve essere fornito se apiKeySource è impostato su KMS. Se viene fornito questo parametro, la stringa apiKey deve essere passata in modalità criptata. Crittografa i parametri utilizzando l'endpoint di crittografia dell'API KMS. La chiave deve essere nel formato projects/{gcp_project}/locations/{key_region}/keyRings/{key_ring}/cryptoKeys/{kms_key_name}. Consulta: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt (esempio: projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name).
  • apiKeySecretId : l'ID secret di Secret Manager per l'apiKey. Questo parametro deve essere fornito se apiKeySource è impostato su SECRET_MANAGER. Deve essere nel formato projects/{project}/secrets/{secret}/versions/{secret_version}. (esempio: projects/your-project-id/secrets/your-secret/versions/your-secret-version).
  • apiKeySource : origine della chiave API. Uno dei valori PLAINTEXT, KMS o SECRET_MANAGER. Questo parametro deve essere fornito se viene utilizzato Secret Manager o KMS. Se apiKeySource è impostato su KMS, devi fornire apiKeyKMSEncryptionKey e l'apiKey criptato. Se apiKeySource è impostato su SECRET_MANAGER, devi fornire apiKeySecretId. Se apiKeySource è impostato su PLAINTEXT, è necessario fornire apiKey. Valore predefinito: PLAINTEXT.
  • socketTimeout : se impostato, sovrascrive il timeout massimo per i tentativi e il timeout del socket predefiniti (30000 ms) in Elastic RestClient.
  • javascriptTextTransformGcsPath : l'URI Cloud Storage del file .js che definisce la funzione definita dall'utente (UDF) JavaScript da utilizzare. (ad es. gs://my-bucket/my-udfs/my_file.js).
  • javascriptTextTransformFunctionName : il nome della funzione definita dall'utente (UDF) JavaScript da utilizzare. Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per esempi di funzioni JavaScript definite dall'utente, consulta Esempi di funzioni UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).

Funzioni definite dall'utente

Questo modello supporta le funzioni definite dall'utente (UDF) in diversi punti della pipeline, descritti di seguito. Per ulteriori informazioni, consulta Creare funzioni predefinite dall'utente per i modelli Dataflow.

Funzione di trasformazione del testo

Trasforma i dati CSV in un documento Elasticsearch.

Parametri del modello:

  • javascriptTextTransformGcsPath: l'URI Cloud Storage del file JavaScript.
  • javascriptTextTransformFunctionName: il nome della funzione JavaScript.

Specifiche della funzione:

  • Input: una singola riga di un file CSV di input.
  • Output: un documento JSON con stringa da inserire in Elasticsearch.

Funzione di indice

Restituisce l'indice a cui appartiene il documento.

Parametri del modello:

  • javaScriptIndexFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptIndexFnName: il nome della funzione JavaScript.

Specifiche della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: il valore del campo dei metadati _index del documento.

Funzione ID documento

Restituisce l'ID documento.

Parametri del modello:

  • javaScriptIdFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptIdFnName: il nome della funzione JavaScript.

Specifiche della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: il valore del campo dei metadati _id del documento.

Funzione di eliminazione dei documenti

Specifica se eliminare un documento. Per utilizzare questa funzione, imposta la modalità di inserimento collettivo su INDEX e fornisci una funzione ID documento.

Parametri del modello:

  • javaScriptIsDeleteFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptIsDeleteFnName: il nome della funzione JavaScript.

Specifiche della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: restituisce la stringa "true" per eliminare il documento o "false" per eseguire l'upsert del documento.

Funzione di tipo di mappatura

Restituisce il tipo di mappatura del documento.

Parametri del modello:

  • javaScriptTypeFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptTypeFnName: il nome della funzione JavaScript.

Specifiche della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: il valore del campo dei metadati _type del documento.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the Cloud Storage to Elasticsearch template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID\
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \
    --parameters \
inputFileSpec=INPUT_FILE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX,\
deadletterTable=DEADLETTER_TABLE,\

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • INPUT_FILE_SPEC: il pattern dei file di Cloud Storage.
  • CONNECTION_URL: il tuo URL Elasticsearch.
  • APIKEY: la chiave API codificata in base64 per l'autenticazione.
  • INDEX: l'indice Elasticsearch.
  • DEADLETTER_TABLE: la tua tabella BigQuery.

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileSpec": "INPUT_FILE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX",
          "deadletterTable": "DEADLETTER_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch",
   }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • INPUT_FILE_SPEC: il pattern dei file di Cloud Storage.
  • CONNECTION_URL: il tuo URL Elasticsearch.
  • APIKEY: la chiave API codificata in base64 per l'autenticazione.
  • INDEX: l'indice Elasticsearch.
  • DEADLETTER_TABLE: la tua tabella BigQuery.

Passaggi successivi