The Cloud Storage to Elasticsearch template is a batch pipeline that reads data from CSV files stored in a Cloud Storage bucket and writes the data into Elasticsearch as JSON documents.
Pipeline requirements
- The Cloud Storage bucket must exist.
- A Elasticsearch host on a Google Cloud instance or on Elasticsearch Cloud that is accessible from Dataflow must exist.
- A BigQuery table for error output must exist.
CSV schema
If the CSV files contain headers, set the containsHeaders
template parameter to true
.
Otherwise, create a JSON schema file that describes the data. Specify the
Cloud Storage URI of the schema file in the jsonSchemaPath
template parameter. The following example shows a JSON schema:
[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]
Alternatively, you can provide a user-defined function (UDF) that parses the CSV text and outputs Elasticsearch documents.
Template parameters
Required parameters
- deadletterTable: The BigQuery dead-letter table to send failed inserts to. For example,
your-project:your-dataset.your-table-name
. - inputFileSpec: The Cloud Storage file pattern to search for CSV files. For example,
gs://mybucket/test-*.csv
. - connectionUrl: The Elasticsearch URL in the format
https://hostname:[port]
. If using Elastic Cloud, specify the CloudID. For example,https://elasticsearch-host:9200
. - apiKey: The Base64-encoded API key to use for authentication.
- index: The Elasticsearch index that the requests are issued to. For example,
my-index
.
Optional parameters
- inputFormat: The input file format. Defaults to
CSV
. - containsHeaders: Input CSV files contain a header record (true/false). Only required if reading CSV files. Defaults to: false.
- delimiter: The column delimiter of the input text files. Default:
,
For example,,
. - csvFormat: CSV format specification to use for parsing records. Default is:
Default
. See https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html for more details. Must match format names exactly found at: https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.Predefined.html. - jsonSchemaPath: The path to the JSON schema. Defaults to
null
. For example,gs://path/to/schema
. - largeNumFiles: Set to true if number of files is in the tens of thousands. Defaults to
false
. - csvFileEncoding: The CSV file character encoding format. Allowed values are
US-ASCII
,ISO-8859-1
,UTF-8
, andUTF-16
. Defaults to: UTF-8. - logDetailedCsvConversionErrors: Set to
true
to enable detailed error logging when CSV parsing fails. Note that this may expose sensitive data in the logs (e.g., if the CSV file contains passwords). Default:false
. - elasticsearchUsername: The Elasticsearch username to authenticate with. If specified, the value of
apiKey
is ignored. - elasticsearchPassword: The Elasticsearch password to authenticate with. If specified, the value of
apiKey
is ignored. - batchSize: The batch size in number of documents. Defaults to
1000
. - batchSizeBytes: The batch size in number of bytes. Defaults to
5242880
(5mb). - maxRetryAttempts: The maximum number of retry attempts. Must be greater than zero. Defaults to
no retries
. - maxRetryDuration: The maximum retry duration in milliseconds. Must be greater than zero. Defaults to
no retries
. - propertyAsIndex: The property in the document being indexed whose value specifies
_index
metadata to include with the document in bulk requests. Takes precedence over an_index
UDF. Defaults tonone
. - javaScriptIndexFnGcsPath: The Cloud Storage path to the JavaScript UDF source for a function that specifies
_index
metadata to include with the document in bulk requests. Defaults tonone
. - javaScriptIndexFnName: The name of the UDF JavaScript function that specifies
_index
metadata to include with the document in bulk requests. Defaults tonone
. - propertyAsId: A property in the document being indexed whose value specifies
_id
metadata to include with the document in bulk requests. Takes precedence over an_id
UDF. Defaults tonone
. - javaScriptIdFnGcsPath: The Cloud Storage path to the JavaScript UDF source for the function that specifies
_id
metadata to include with the document in bulk requests. Defaults tonone
. - javaScriptIdFnName: The name of the UDF JavaScript function that specifies the
_id
metadata to include with the document in bulk requests. Defaults tonone
. - javaScriptTypeFnGcsPath: The Cloud Storage path to the JavaScript UDF source for a function that specifies
_type
metadata to include with documents in bulk requests. Defaults tonone
. - javaScriptTypeFnName: The name of the UDF JavaScript function that specifies the
_type
metadata to include with the document in bulk requests. Defaults tonone
. - javaScriptIsDeleteFnGcsPath: The Cloud Storage path to the JavaScript UDF source for the function that determines whether to delete the document instead of inserting or updating it. The function returns a string value of
true
orfalse
. Defaults tonone
. - javaScriptIsDeleteFnName: The name of the UDF JavaScript function that determines whether to delete the document instead of inserting or updating it. The function returns a string value of
true
orfalse
. Defaults tonone
. - usePartialUpdate: Whether to use partial updates (update rather than create or index, allowing partial documents) with Elasticsearch requests. Defaults to
false
. - bulkInsertMethod: Whether to use
INDEX
(index, allows upserts) orCREATE
(create, errors on duplicate _id) with Elasticsearch bulk requests. Defaults toCREATE
. - trustSelfSignedCerts: Whether to trust self-signed certificate or not. An Elasticsearch instance installed might have a self-signed certificate, Enable this to true to by-pass the validation on SSL certificate. (Defaults to:
false
). - disableCertificateValidation: If
true
, trust the self-signed SSL certificate. An Elasticsearch instance might have a self-signed certificate. To bypass validation for the certificate, set this parameter totrue
. Defaults tofalse
. - apiKeyKMSEncryptionKey: The Cloud KMS key to decrypt the API key. This parameter is required if the
apiKeySource
is set toKMS
. If this parameter is provided, pass in an encryptedapiKey
string. Encrypt parameters using the KMS API encrypt endpoint. For the key, use the formatprojects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>
. See: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt For example,projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name
. - apiKeySecretId: The Secret Manager secret ID for the apiKey. If the
apiKeySource
is set toSECRET_MANAGER
, provide this parameter. Use the formatprojects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example,
projects/your-project-id/secrets/your-secret/versions/your-secret-version`. - apiKeySource: The source of the API key. Allowed values are
PLAINTEXT
,KMS
orandSECRET_MANAGER
. This parameter is required when you use Secret Manager or KMS. IfapiKeySource
is set toKMS
,apiKeyKMSEncryptionKey
and encrypted apiKey must be provided. IfapiKeySource
is set toSECRET_MANAGER
,apiKeySecretId
must be provided. IfapiKeySource
is set toPLAINTEXT
,apiKey
must be provided. Defaults to: PLAINTEXT. - socketTimeout: If set, overwrites the default max retry timeout and default socket timeout (30000ms) in the Elastic RestClient.
- javascriptTextTransformGcsPath: The Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) to use. For example,
gs://my-bucket/my-udfs/my_file.js
. - javascriptTextTransformFunctionName: The name of the JavaScript user-defined function (UDF) to use. For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }
, then the function name ismyTransform
. For sample JavaScript UDFs, see UDF Examples (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
User-defined functions
This template supports user-defined functions (UDFs) at several points in the pipeline, described below. For more information, see Create user-defined functions for Dataflow templates.
Text transform function
Transforms the CSV data into an Elasticsearch document.
Template parameters:
javascriptTextTransformGcsPath
: the Cloud Storage URI of the JavaScript file.javascriptTextTransformFunctionName
: the name of the JavaScript function.
Function specification:
- Input: a single line from an input CSV file.
- Output: a stringified JSON document to insert into Elasticsearch.
Index function
Returns the index to which the document belongs.
Template parameters:
javaScriptIndexFnGcsPath
: the Cloud Storage URI of the JavaScript file.javaScriptIndexFnName
: the name of the JavaScript function.
Function specification:
- Input: the Elasticsearch document, serialized as a JSON string.
- Output: the value of the document's
_index
metadata field.
Document ID function
Returns the document ID.
Template parameters:
javaScriptIdFnGcsPath
: the Cloud Storage URI of the JavaScript file.javaScriptIdFnName
: the name of the JavaScript function.
Function specification:
- Input: the Elasticsearch document, serialized as a JSON string.
- Output: the value of the document's
_id
metadata field.
Document deletion function
Specifies whether to delete a document. To use this function, set the bulk
insert mode to INDEX
and provide a
document ID function.
Template parameters:
javaScriptIsDeleteFnGcsPath
: the Cloud Storage URI of the JavaScript file.javaScriptIsDeleteFnName
: the name of the JavaScript function.
Function specification:
- Input: the Elasticsearch document, serialized as a JSON string.
- Output: return the string
"true"
to delete the document, or"false"
to upsert the document.
Mapping type function
Returns the document's mapping type.
Template parameters:
javaScriptTypeFnGcsPath
: the Cloud Storage URI of the JavaScript file.javaScriptTypeFnName
: the name of the JavaScript function.
Function specification:
- Input: the Elasticsearch document, serialized as a JSON string.
- Output: the value of the document's
_type
metadata field.
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
region is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the Cloud Storage to Elasticsearch template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID\ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \ --parameters \ inputFileSpec=INPUT_FILE_SPEC,\ connectionUrl=CONNECTION_URL,\ apiKey=APIKEY,\ index=INDEX,\ deadletterTable=DEADLETTER_TABLE,\
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceVERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
REGION_NAME
: the region where you want to deploy your Dataflow job—for example,us-central1
INPUT_FILE_SPEC
: your Cloud Storage file pattern.CONNECTION_URL
: your Elasticsearch URL.APIKEY
: your base64 encoded API key for authentication.INDEX
: your Elasticsearch index.DEADLETTER_TABLE
: your BigQuery table.
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFileSpec": "INPUT_FILE_SPEC", "connectionUrl": "CONNECTION_URL", "apiKey": "APIKEY", "index": "INDEX", "deadletterTable": "DEADLETTER_TABLE" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch", } }
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceVERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
LOCATION
: the region where you want to deploy your Dataflow job—for example,us-central1
INPUT_FILE_SPEC
: your Cloud Storage file pattern.CONNECTION_URL
: your Elasticsearch URL.APIKEY
: your base64 encoded API key for authentication.INDEX
: your Elasticsearch index.DEADLETTER_TABLE
: your BigQuery table.
What's next
- Learn about Dataflow templates.
- See the list of Google-provided templates.