Modello da BigQuery a Elasticsearch

Il modello da BigQuery a Elasticsearch è una pipeline batch che importa i dati da una tabella BigQuery in Elasticsearch come documenti. Il modello può leggere l'intera tabella o leggere record specifici utilizzando una query fornita.

Requisiti della pipeline

  • La tabella BigQuery di origine deve esistere.
  • Un host Elasticsearch su un'istanza Google Cloud o su Elastic Cloud con Elasticsearch versione 7.0 o successive. Deve essere accessibile dalle macchine worker Dataflow.

Parametri del modello

Parametri obbligatori

  • connectionUrl : l'URL Elasticsearch nel formato https://hostname:[port]. Se utilizzi Elastic Cloud, specifica il CloudID. Esempio: https://elasticsearch-host:9200.
  • apiKey : la chiave API codificata in Base64 da utilizzare per l'autenticazione.
  • index : l'indice Elasticsearch a cui vengono inviate le richieste, ad esempio my-index. (ad esempio: my-index).

Parametri facoltativi

  • inputTableSpec : la tabella BigQuery da cui leggere. Formato: projectId:datasetId.tablename. Se specifichi inputTableSpec, il modello legge i dati direttamente dallo spazio di archiviazione BigQuery utilizzando l'API BigQuery Storage Read (https://cloud.google.com/bigquery/docs/reference/storage). Per informazioni sulle limitazioni dell'API Storage Read, consulta https://cloud.google.com/bigquery/docs/reference/storage#limitations. Devi specificare inputTableSpec o query. Se imposti entrambi i parametri, il modello utilizza il parametro query. Esempio: bigquery-project:dataset.input_table.
  • outputDeadletterTable : la tabella BigQuery per i messaggi che non sono riusciti a raggiungere la tabella di output, nel formato <PROJECT_ID>:<DATASET_NAME>.<DEADLETTER_TABLE>. Se non esiste una tabella, viene creata durante l'esecuzione della pipeline. Se non specificato, viene utilizzato <outputTableSpec>_error_records. Ad esempio: id-progetto:set-di-dati.nome-tabella.
  • query : la query SQL da utilizzare per leggere i dati da BigQuery. Se il set di dati BigQuery si trova in un progetto diverso dal job Dataflow, specifica il nome completo del set di dati nella query SQL, ad esempio: <PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME>. Per impostazione predefinita, il parametro query utilizza GoogleSQL (https://cloud.google.com/bigquery/docs/introduction-sql), a meno che useLegacySql non sia true. Devi specificare inputTableSpec o query. Se imposti entrambi i parametri, il modello utilizza il parametro query. Ad esempio, seleziona * da sampledb.sample_table.
  • useLegacySql : imposta il valore su true per utilizzare SQL precedente. Questo parametro si applica solo quando si utilizza il parametro query. Il valore predefinito è: false.
  • queryLocation : necessaria per la lettura da una visualizzazione autorizzata senza l'autorizzazione della tabella sottostante. (Esempio: Stati Uniti).
  • elasticsearchUsername : il nome utente Elasticsearch con cui eseguire l'autenticazione. Se specificato, il valore di "apiKey" viene ignorato.
  • elasticsearchPassword : la password di Elasticsearch con cui eseguire l'autenticazione. Se specificato, il valore di "apiKey" viene ignorato.
  • batchSize : la dimensione del batch in numero di documenti. Il valore predefinito è 1000.
  • batchSizeBytes : la dimensione del batch in numero di byte. Il valore predefinito è 5242880 (5 MB).
  • maxRetryAttempts : numero massimo di nuovi tentativi. Deve essere maggiore di zero. Il valore predefinito è: nessun nuovo tentativo.
  • maxRetryDuration : la durata massima per i nuovi tentativi in millisecondi. Deve essere maggiore di zero. Il valore predefinito è: nessun nuovo tentativo.
  • propertyAsIndex : la proprietà nel documento indicizzato il cui valore specifica i metadati _index da includere nel documento nelle richieste collettive. Ha la precedenza su una funzione definita dall'utente _index. Il valore predefinito è: nessuno.
  • javaScriptIndexFnGcsPath : il percorso Cloud Storage dell'origine della funzione JavaScript definita dall'utente per una funzione che specifica i metadati _index da includere nel documento nelle richieste in blocco. Il valore predefinito è: nessuno.
  • javaScriptIndexFnName : il nome della funzione JavaScript della funzione definita dall'utente che specifica i metadati _index da includere nel documento nelle richieste collettive. Il valore predefinito è: nessuno.
  • propertyAsId : una proprietà nel documento indicizzato il cui valore specifica i metadati _id da includere nel documento nelle richieste collettive. Ha la precedenza su una funzione definita dall'utente _id. Il valore predefinito è: nessuno.
  • javaScriptIdFnGcsPath : il percorso Cloud Storage della sorgente della funzione JavaScript definita dall'utente per la funzione che specifica i metadati _id da includere nel documento nelle richieste in blocco. Il valore predefinito è: nessuno.
  • javaScriptIdFnName : il nome della funzione JavaScript della funzione definita dall'utente che specifica i metadati _id da includere nel documento nelle richieste collettive. Il valore predefinito è: nessuno.
  • javaScriptTypeFnGcsPath : il percorso Cloud Storage dell'origine della funzione JavaScript definita dall'utente per una funzione che specifica i metadati _type da includere con i documenti nelle richieste collettive. Valore predefinito: nessuno.
  • javaScriptTypeFnName : il nome della funzione JavaScript della funzione definita dall'utente che specifica i metadati _type da includere nel documento nelle richieste collettive. Il valore predefinito è: nessuno.
  • javaScriptIsDeleteFnGcsPath : il percorso Cloud Storage dell'origine della funzione JavaScript definita dall'utente per la funzione che determina se eliminare il documento anziché inserirlo o aggiornarlo. La funzione restituisce un valore stringa di true o false. Il valore predefinito è: nessuno.
  • javaScriptIsDeleteFnName : il nome della funzione JavaScript della funzione definita dall'utente che determina se eliminare il documento anziché inserirlo o aggiornarlo. La funzione restituisce un valore stringa di true o false. Il valore predefinito è: nessuno.
  • usePartialUpdate : indica se utilizzare aggiornamenti parziali (aggiornare anziché creare o indicizzare, consentendo documenti parziali) con richieste Elasticsearch. Il valore predefinito è: false.
  • bulkInsertMethod : indica se utilizzare INDEX (indice, consente upsert) o CREATE (creazione, errori su _id duplicati) con richieste collettive Elasticsearch. Il valore predefinito è CREATE.
  • trustSelfSignedCerts : indica se considerare attendibile il certificato autofirmato o meno. Un'istanza Elasticsearch installata potrebbe avere un certificato autofirmato. Abilita questo valore su True per bypassare la convalida sul certificato SSL. (il valore predefinito è False).
  • disableCertificateValidation : se impostato su "true", considera attendibile il certificato SSL autofirmato. Un'istanza di Elasticsearch potrebbe avere un certificato autofirmato. Per ignorare la convalida del certificato, imposta questo parametro su "true". Valore predefinito: false.
  • apiKeyKMSEncryptionKey : la chiave Cloud KMS per decriptare la chiave API. Questo parametro deve essere fornito se apiKeySource è impostato su KMS. Se viene fornito questo parametro, la stringa apiKey deve essere passata in modalità criptata. Cripta i parametri utilizzando l'endpoint di crittografia dell'API KMS. Il formato della chiave deve essere projects/{gcp_project}/locations/{key_region}/keyRings/{key_ring}/cryptoKeys/{kms_key_name}. Vedi: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt (ad esempio: projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name).
  • apiKeySecretId : ID secret di Secret Manager per apiKey. Questo parametro deve essere fornito se apiKeySource è impostato su SECRET_MANAGER. Il formato deve essere projects/{project}/secrets/{secret}/versions/{secret_version}. (ad esempio: projects/tuo-ID-progetto/secret/tuo-secret/versioni/tua-versione-secret).
  • apiKeySource : origine della chiave API. Uno tra PLAINTEXT, KMS o SECRET_MANAGER. Questo parametro è obbligatorio se viene utilizzato Secret Manager o KMS. Se apiKeySource è impostato su KMS, è necessario specificare apiKeyKMSEncryptionKey e l'apiKey criptata. Se apiKeySource è impostato su SECRET_MANAGER, è necessario specificare apiKeySecretId. Se apiKeySource è impostato su PLAINTEXT, è necessario specificare apiKey. Il valore predefinito è PLAINTEXT.
  • javascriptTextTransformGcsPath : l'URI Cloud Storage del file .js che definisce la funzione JavaScript definita dall'utente da utilizzare. (Esempio: gs://my-bucket/my-udfs/my_file.js).
  • javascriptTextTransformFunctionName : il nome della funzione definita dall'utente di JavaScript da utilizzare. Ad esempio, se il codice della funzione JavaScript è myTransform(inJson) { /*...do stuff...*/ }, il nome della funzione è myTransform. Per esempi di funzioni JavaScript di esempio, consulta gli esempi di funzioni definite dall'utente (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).

Funzioni definite dall'utente

Questo modello supporta le funzioni definite dall'utente in più punti della pipeline, descritta di seguito. Per ulteriori informazioni, consulta Creare funzioni definite dall'utente per i modelli Dataflow.

Funzione di indice

Restituisce l'indice a cui appartiene il documento.

Parametri del modello:

  • javaScriptIndexFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptIndexFnName: il nome della funzione JavaScript.

Specifica della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: il valore del campo dei metadati _index del documento.

Funzione ID documento

Restituisce l'ID documento.

Parametri del modello:

  • javaScriptIdFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptIdFnName: il nome della funzione JavaScript.

Specifica della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: il valore del campo dei metadati _id del documento.

Funzione di eliminazione documenti

Specifica se eliminare un documento. Per utilizzare questa funzione, imposta la modalità di inserimento collettivo su INDEX e fornisci una funzione ID documento.

Parametri del modello:

  • javaScriptIsDeleteFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptIsDeleteFnName: il nome della funzione JavaScript.

Specifica della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: restituisci la stringa "true" per eliminare il documento o "false" per eseguire l'upsert del documento.

Funzione di tipo mappatura

Restituisce il tipo di mappatura del documento.

Parametri del modello:

  • javaScriptTypeFnGcsPath: l'URI Cloud Storage del file JavaScript.
  • javaScriptTypeFnName: il nome della funzione JavaScript.

Specifica della funzione:

  • Input: il documento Elasticsearch, serializzato come stringa JSON.
  • Output: il valore del campo dei metadati _type del documento.

Esegui il modello

Console

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, vedi Località Dataflow.

  5. Nel menu a discesa Modello Dataflow, seleziona the BigQuery to Elasticsearch template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

gcloud

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_Elasticsearch \
    --parameters \
inputTableSpec=INPUT_TABLE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • INPUT_TABLE_SPEC: il nome della tua tabella BigQuery.
  • CONNECTION_URL: il tuo URL Elasticsearch.
  • APIKEY: la tua chiave API codificata in Base64 per l'autenticazione.
  • INDEX: il tuo indice Elasticsearch.

API

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per maggiori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputTableSpec": "INPUT_TABLE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_Elasticsearch",
   }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome job univoco a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • INPUT_TABLE_SPEC: il nome della tua tabella BigQuery.
  • CONNECTION_URL: il tuo URL Elasticsearch.
  • APIKEY: la tua chiave API codificata in Base64 per l'autenticazione.
  • INDEX: il tuo indice Elasticsearch.

Passaggi successivi