Modello Da Pub/Sub a Elasticsearch

Il modello Da Pub/Sub a Elasticsearch è una pipeline di inserimento flussi che legge i messaggi di una sottoscrizione Pub/Sub, esegue una funzione definita dall'utente (UDF) e li scrive in Elasticsearch come documenti. Il modello Dataflow utilizza la funzionalità stream di dati di Elasticsearch per archiviare i dati di serie temporali su più indici, fornendo al contempo un'unica risorsa denominata per le richieste. I flussi di dati sono particolarmente adatti per log, metriche, tracce e altri dati generati continuamente e archiviati in Pub/Sub.

Il modello crea uno stream di dati denominato logs-gcp.DATASET-NAMESPACE, dove:

  • DATASET è il valore del parametro del modello dataset oppure pubsub se non specificato.
  • NAMESPACE è il valore del parametro del modello namespace oppure default se non specificato.

Requisiti della pipeline

  • Deve esistere la sottoscrizione Pub/Sub di origine e i messaggi devono essere codificati in un formato JSON valido.
  • Un host Elasticsearch raggiungibile pubblicamente su un'istanza Google Cloud o su Elastic Cloud con Elasticsearch versione 7.0 o successive. Per ulteriori dettagli, vedi Google Cloud Integration for Elastic.
  • Un argomento Pub/Sub per l'output degli errori.

Parametri del modello

Parametri obbligatori

Parametri facoltativi

  • dataset : il tipo di log inviati tramite Pub/Sub, per il quale è disponibile una dashboard pronta all'uso. I tipi di log noti sono audit, vpcflow e firewall. Il valore predefinito "pubsub".
  • spazio dei nomi : un raggruppamento arbitrario, ad esempio un ambiente (dev, prod o qa), un team o una business unit strategica. Valore predefinito: 'default'.
  • elasticsearchTemplateVersion : identificatore della versione del modello Dataflow, di solito definito da Google Cloud. Il valore predefinito è 1.0.0.
  • javascriptTextTransformGcsPath : il pattern del percorso di Cloud Storage per il codice JavaScript contenente le funzioni definite dall'utente. (Esempio: gs://your-bucket/your-function.js.
  • javascriptTextTransformFunctionName : il nome della funzione da chiamare dal file JavaScript. Utilizza solo lettere, numeri e trattini bassi. ad esempio: "Transform" o "Transform_udf1".
  • javascriptTextTransformReloadIntervalMinutes : definisci l'intervallo con cui i worker possono controllare le modifiche alla funzione JavaScript definita dall'utente per ricaricare i file. Il valore predefinito è 0.
  • elasticsearchUsername : il nome utente Elasticsearch con cui eseguire l'autenticazione. Se specificato, il valore di "apiKey" viene ignorato.
  • elasticsearchPassword : la password di Elasticsearch con cui eseguire l'autenticazione. Se specificato, il valore di "apiKey" viene ignorato.
  • batchSize : la dimensione del batch in numero di documenti. Valore predefinito: "1000".
  • batchSizeBytes : dimensione batch in byte utilizzata per l'inserimento in gruppo di messaggi in elasticsearch. Valore predefinito: "5242880 (5mb)".
  • maxRetryAttempts : il numero massimo di tentativi, deve essere maggiore di 0. Valore predefinito: "nessun nuovo tentativo".
  • maxRetryDuration : durata massima per i nuovi tentativi in millisecondi, deve essere maggiore di 0. Valore predefinito: "nessun nuovo tentativo".
  • propertyAsIndex : una proprietà nel documento da indicizzare il cui valore specifica i metadati "_index" da includere nel documento in una richiesta collettiva (ha la precedenza su una funzione definita dall'utente "_index"). Valore predefinito: nessuno.
  • javaScriptIndexFnGcsPath : il percorso Cloud Storage dell'origine della funzione JavaScript definita dall'utente per una funzione che specificherà i metadati "_index" da includere nel documento nella richiesta in blocco. Valore predefinito: nessuno.
  • javaScriptIndexFnName : nome della funzione JavaScript UDF nome della funzione che specificherà i metadati _index da includere con il documento nella richiesta in blocco. Valore predefinito: nessuno.
  • propertyAsId : una proprietà nel documento da indicizzare il cui valore specifica i metadati "_id" da includere nel documento in una richiesta collettiva (ha la precedenza su una funzione definita dall'utente "_id"). Valore predefinito: nessuno.
  • javaScriptIdFnGcsPath : il percorso Cloud Storage dell'origine della funzione JavaScript definita dall'utente per una funzione che specificherà i metadati "_id" da includere nel documento nella richiesta in blocco.Valore predefinito: nessuno.
  • javaScriptIdFnName : nome della funzione JavaScript UDF per la funzione che specificherà i metadati _id da includere con il documento nella richiesta in blocco. Valore predefinito: nessuno.
  • javaScriptTypeFnGcsPath : il percorso Cloud Storage dell'origine della funzione della funzione JavaScript definita dall'utente per la funzione che specificherà i metadati "_type" da includere nel documento nella richiesta in blocco. Valore predefinito: nessuno.
  • javaScriptTypeFnName : nome della funzione JavaScript UDF nome della funzione che specificherà i metadati "_type" da includere con il documento nella richiesta in blocco. Valore predefinito: nessuno.
  • javaScriptIsDeleteFnGcsPath : il percorso Cloud Storage dell'origine della funzione JavaScript definita dall'utente che determinerà se il documento deve essere eliminato anziché inserito o aggiornato. La funzione deve restituire il valore di stringa "true" o "false". Valore predefinito: nessuno.
  • javaScriptIsDeleteFnName : nome della funzione JavaScript della funzione definita dall'utente nome della funzione che determina se il documento deve essere eliminato anziché inserito o aggiornato. La funzione deve restituire il valore di stringa "true" o "false". Valore predefinito: nessuno.
  • usePartialUpdate : indica se utilizzare aggiornamenti parziali (aggiornare anziché creare o indicizzare, consentendo documenti parziali) con richieste Elasticsearch. Valore predefinito: "false".
  • bulkInsertMethod : indica se utilizzare "INDEX" (indice, consente l'upsert) o "CREATE" (creazione, errori su _id duplicati) con le richieste collettive Elasticsearch. Valore predefinito: "CREATE".
  • trustSelfSignedCerts : indica se considerare attendibile il certificato autofirmato o meno. Un'istanza Elasticsearch installata potrebbe avere un certificato autofirmato. Abilita questo valore su True per bypassare la convalida sul certificato SSL. (il valore predefinito è False).
  • disableCertificateValidation : se impostato su "true", considera attendibile il certificato SSL autofirmato. Un'istanza di Elasticsearch potrebbe avere un certificato autofirmato. Per ignorare la convalida del certificato, imposta questo parametro su "true". Valore predefinito: false.
  • apiKeyKMSEncryptionKey : la chiave Cloud KMS per decriptare la chiave API. Questo parametro deve essere fornito se apiKeySource è impostato su KMS. Se viene fornito questo parametro, la stringa apiKey deve essere passata in modalità criptata. Cripta i parametri utilizzando l'endpoint di crittografia dell'API KMS. Il formato della chiave deve essere projects/{gcp_project}/locations/{key_region}/keyRings/{key_ring}/cryptoKeys/{kms_key_name}. Vedi: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt (ad esempio: projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name).
  • apiKeySecretId : ID secret di Secret Manager per apiKey. Questo parametro deve essere fornito se apiKeySource è impostato su SECRET_MANAGER. Il formato deve essere projects/{project}/secrets/{secret}/versions/{secret_version}. (ad esempio: projects/tuo-ID-progetto/secret/tuo-secret/versioni/tua-versione-secret).
  • apiKeySource : origine della chiave API. Uno tra PLAINTEXT, KMS o SECRET_MANAGER. Questo parametro è obbligatorio se viene utilizzato Secret Manager o KMS. Se apiKeySource è impostato su KMS, è necessario specificare apiKeyKMSEncryptionKey e l'apiKey criptata. Se apiKeySource è impostato su SECRET_MANAGER, è necessario specificare apiKeySecretId. Se apiKeySource è impostato su PLAINTEXT, è necessario specificare apiKey. Il valore predefinito è PLAINTEXT.

Funzioni definite dall'utente

Questo modello supporta le funzioni definite dall'utente in più punti della pipeline, descritta di seguito. Per ulteriori informazioni, consulta Creare funzioni definite dall'utente per i modelli Dataflow.

Funzione di trasformazione del testo

Trasforma il messaggio Pub/Sub in un documento Elasticsearch.

Parametri del modello:

  • javascriptTextTransformGcsPath: l'URI Cloud Storage del file JavaScript.
  • javascriptTextTransformFunctionName: il nome della funzione JavaScript.

Specifica della funzione:

  • Input: il campo dei dati del messaggio Pub/Sub, serializzato come stringa JSON.
  • Output: un documento JSON stringato da inserire in Elasticsearch.

Funzione di indice

Restituisce l'indice a cui appartiene il documento.

Parametri del modello:

  • javaScriptIndexFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptIndexFnName: il nome della funzione JavaScript.

Specifica della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: il valore del campo dei metadati _index del documento.

Funzione ID documento

Restituisce l'ID documento.

Parametri del modello:

  • javaScriptIdFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptIdFnName: il nome della funzione JavaScript.

Specifica della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: il valore del campo dei metadati _id del documento.

Funzione di eliminazione documenti

Specifica se eliminare un documento. Per utilizzare questa funzione, imposta la modalità di inserimento collettivo su INDEX e fornisci una funzione ID documento.

Parametri del modello:

  • javaScriptIsDeleteFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptIsDeleteFnName: il nome della funzione JavaScript.

Specifica della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: restituisci la stringa "true" per eliminare il documento o "false" per eseguire l'upsert del documento.

Funzione di tipo mappatura

Restituisce il tipo di mappatura del documento.

Parametri del modello:

  • javaScriptTypeFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptTypeFnName: il nome della funzione JavaScript.

Specifica della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: il valore del campo dei metadati _type del documento.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, vedi Località Dataflow.

  5. Nel menu a discesa Modello Dataflow, seleziona the Pub/Sub to Elasticsearch template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Elasticsearch \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • ERROR_OUTPUT_TOPIC: argomento Pub/Sub per l'output di errore
  • SUBSCRIPTION_NAME: nome della sottoscrizione Pub/Sub
  • CONNECTION_URL: il tuo URL Elasticsearch
  • DATASET: il tuo tipo di log
  • NAMESPACE: lo spazio dei nomi per il set di dati
  • APIKEY: la tua chiave API codificata in Base64 per l'autenticazione

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_Elasticsearch",
   }
}
  

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • ERROR_OUTPUT_TOPIC: argomento Pub/Sub per l'output di errore
  • SUBSCRIPTION_NAME: nome della sottoscrizione Pub/Sub
  • CONNECTION_URL: il tuo URL Elasticsearch
  • DATASET: il tuo tipo di log
  • NAMESPACE: lo spazio dei nomi per il set di dati
  • APIKEY: la tua chiave API codificata in Base64 per l'autenticazione

Passaggi successivi