Cloud Storage to Elasticsearch template

The Cloud Storage to Elasticsearch template is a batch pipeline that reads data from CSV files stored in a Cloud Storage bucket and writes the data into Elasticsearch as JSON documents.

Pipeline requirements

  • The Cloud Storage bucket must exist.
  • A Elasticsearch host on a Google Cloud instance or on Elasticsearch Cloud that is accessible from Dataflow must exist.
  • A BigQuery table for error output must exist.

CSV schema

If the CSV files contain headers, set the containsHeaders template parameter to true.

Otherwise, create a JSON schema file that describes the data. Specify the Cloud Storage URI of the schema file in the jsonSchemaPath template parameter. The following example shows a JSON schema:

[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]

Alternatively, you can provide a user-defined function (UDF) that parses the CSV text and outputs Elasticsearch documents.

Template parameters

Parameter Description
inputFileSpec The Cloud Storage file pattern to search for CSV files. Example: gs://mybucket/test-*.csv.
connectionUrl Elasticsearch URL in the format https://hostname:[port] or specify CloudID if using Elastic Cloud.
apiKey Base64 Encoded API key used for authentication.
index The Elasticsearch index toward which the requests will be issued such as my-index.
deadletterTable The BigQuery Deadletter table to send failed inserts to. Example: <your-project>:<your-dataset>.<your-table-name>.
containsHeaders (Optional) Boolean to denote whether headers are included in the CSV. Defaultfalse.
delimiter (Optional) The delimiter that the CSV uses. Example: ,
csvFormat (Optional) The CSV format according to Apache Commons CSV format. Default: Default.
jsonSchemaPath (Optional) The Path to JSON schema. Default: null.
largeNumFiles (Optional) Set to true if number of files is in the tens of thousands. Default: false.
javascriptTextTransformGcsPath (Optional) The Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) you want to use. For example, gs://my-bucket/my-udfs/my_file.js.
javascriptTextTransformFunctionName (Optional) The name of the JavaScript user-defined function (UDF) that you want to use. For example, if your JavaScript function code is myTransform(inJson) { /*...do stuff...*/ }, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples.
batchSize (Optional) Batch size in number of documents. Default: 1000.
batchSizeBytes (Optional) Batch size in number of bytes. Default: 5242880 (5mb).
maxRetryAttempts (Optional) Max retry attempts, must be > 0. Default: no retries.
maxRetryDuration (Optional) Max retry duration in milliseconds, must be > 0. Default: no retries.
csvFileEncoding (Optional) CSV file encoding.
propertyAsIndex (Optional) A property in the document being indexed whose value will specify _index metadata to be included with document in bulk request (takes precedence over an _index UDF). Default: none.
propertyAsId (Optional) A property in the document being indexed whose value will specify _id metadata to be included with document in bulk request (takes precedence over an _id UDF). Default: none.
javaScriptIndexFnGcsPath (Optional) The Cloud Storage path to the JavaScript UDF source for a function that will specify _index metadata to be included with document in bulk request. Default: none.
javaScriptIndexFnName (Optional) UDF JavaScript function name for function that will specify _index metadata to be included with document in bulk request. Default: none.
javaScriptIdFnGcsPath (Optional) The Cloud Storage path to the JavaScript UDF source for a function that will specify _id metadata to be included with document in bulk request. Default: none.
javaScriptIdFnName (Optional) UDF JavaScript function name for function that will specify _id metadata to be included with document in bulk request. Default: none.
javaScriptTypeFnGcsPath (Optional) The Cloud Storage path to the JavaScript UDF source for a function that will specify _type metadata to be included with document in bulk request. Default: none.
javaScriptTypeFnName (Optional) UDF JavaScript function name for function that will specify _type metadata to be included with document in bulk request. Default: none.
javaScriptIsDeleteFnGcsPath (Optional) The Cloud Storage path to JavaScript UDF source for function that will determine if document should be deleted rather than inserted or updated. The function should return string value "true" or "false". Default: none.
javaScriptIsDeleteFnName (Optional) UDF JavaScript function name for function that will determine if document should be deleted rather than inserted or updated. The function should return string value "true" or "false". Default: none.
usePartialUpdate (Optional) Whether to use partial updates (update rather than create or index, allowing partial docs) with Elasticsearch requests. Default: false.
bulkInsertMethod (Optional) Whether to use INDEX (index, allows upserts) or CREATE (create, errors on duplicate _id) with Elasticsearch bulk requests. Default: CREATE.

User-defined functions

This template supports user-defined functions (UDFs) at several points in the pipeline, described below. For more information, see Create user-defined functions for Dataflow templates.

Text transform function

Transforms the CSV data into an Elasticsearch document.

Template parameters:

  • javascriptTextTransformGcsPath: the Cloud Storage URI of the JavaScript file.
  • javascriptTextTransformFunctionName: the name of the JavaScript function.

Function specification:

  • Input: a single line from an input CSV file.
  • Output: a stringified JSON document to insert into Elasticsearch.

Index function

Returns the index to which the document belongs.

Template parameters:

  • javaScriptIndexFnGcsPath: the Cloud Storage URI of the JavaScript file.
  • javaScriptIndexFnName: the name of the JavaScript function.

Function specification:

  • Input: the Elasticsearch document, serialized as a JSON string.
  • Output: the value of the document's _index metadata field.

Document ID function

Returns the document ID.

Template parameters:

  • javaScriptIdFnGcsPath: the Cloud Storage URI of the JavaScript file.
  • javaScriptIdFnName: the name of the JavaScript function.

Function specification:

  • Input: the Elasticsearch document, serialized as a JSON string.
  • Output: the value of the document's _id metadata field.

Document deletion function

Specifies whether to delete a document. To use this function, set the bulk insert mode to INDEX and provide a document ID function.

Template parameters:

  • javaScriptIsDeleteFnGcsPath: the Cloud Storage URI of the JavaScript file.
  • javaScriptIsDeleteFnName: the name of the JavaScript function.

Function specification:

  • Input: the Elasticsearch document, serialized as a JSON string.
  • Output: return the string "true" to delete the document, or "false" to upsert the document.

Mapping type function

Returns the document's mapping type.

Template parameters:

  • javaScriptTypeFnGcsPath: the Cloud Storage URI of the JavaScript file.
  • javaScriptTypeFnName: the name of the JavaScript function.

Function specification:

  • Input: the Elasticsearch document, serialized as a JSON string.
  • Output: the value of the document's _type metadata field.

Run the template

Console

  1. Go to the Dataflow Create job from template page.
  2. Go to Create job from template
  3. In the Job name field, enter a unique job name.
  4. Optional: For Regional endpoint, select a value from the drop-down menu. The default region is us-central1.

    For a list of regions where you can run a Dataflow job, see Dataflow locations.

  5. From the Dataflow template drop-down menu, select the Cloud Storage to Elasticsearch template.
  6. In the provided parameter fields, enter your parameter values.
  7. Click Run job.

gcloud

In your shell or terminal, run the template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID\
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \
    --parameters \
inputFileSpec=INPUT_FILE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX,\
deadletterTable=DEADLETTER_TABLE,\

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • REGION_NAME: the region where you want to deploy your Dataflow job—for example, us-central1
  • INPUT_FILE_SPEC: your Cloud Storage file pattern.
  • CONNECTION_URL: your Elasticsearch URL.
  • APIKEY: your base64 encoded API key for authentication.
  • INDEX: your Elasticsearch index.
  • DEADLETTER_TABLE: your BigQuery table.

API

To run the template using the REST API, send an HTTP POST request. For more information on the API and its authorization scopes, see projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileSpec": "INPUT_FILE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX",
          "deadletterTable": "DEADLETTER_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch",
   }
}

Replace the following:

  • PROJECT_ID: the Google Cloud project ID where you want to run the Dataflow job
  • JOB_NAME: a unique job name of your choice
  • VERSION: the version of the template that you want to use

    You can use the following values:

  • LOCATION: the region where you want to deploy your Dataflow job—for example, us-central1
  • INPUT_FILE_SPEC: your Cloud Storage file pattern.
  • CONNECTION_URL: your Elasticsearch URL.
  • APIKEY: your base64 encoded API key for authentication.
  • INDEX: your Elasticsearch index.
  • DEADLETTER_TABLE: your BigQuery table.

What's next