The BigQuery to Elasticsearch template is a batch pipeline that ingests data from a BigQuery table into Elasticsearch as documents. The template can either read the entire table or read specific records using a supplied query.
Pipeline requirements
- The source BigQuery table must exist.
- A Elasticsearch host on a Google Cloud instance or on Elastic Cloud with Elasticsearch version 7.0 or later. Must be accessible from the Dataflow worker machines.
Template parameters
Required parameters
- connectionUrl : The Elasticsearch URL in the format https://hostname:[port]. If using Elastic Cloud, specify the CloudID. (Example: https://elasticsearch-host:9200).
- apiKey : The Base64-encoded API key to use for authentication.
- index : The Elasticsearch index that the requests are issued to, such as
my-index.
(Example: my-index).
Optional parameters
- inputTableSpec : The BigQuery table to read from. Format:
projectId:datasetId.tablename
. If you specifyinputTableSpec
, the template reads the data directly from BigQuery storage by using the BigQuery Storage Read API (https://cloud.google.com/bigquery/docs/reference/storage). For information about limitations in the Storage Read API, see https://cloud.google.com/bigquery/docs/reference/storage#limitations. You must specify eitherinputTableSpec
orquery
. If you set both parameters, the template uses thequery
parameter. (Example: bigquery-project:dataset.input_table). - outputDeadletterTable : The BigQuery table for messages that failed to reach the output table, in the format <PROJECT_ID>:<DATASET_NAME>.<DEADLETTER_TABLE>. If a table doesn't exist, is is created during pipeline execution. If not specified,
<outputTableSpec>_error_records
is used. (Example: your-project-id:your-dataset.your-table-name). - query : The SQL query to use to read data from BigQuery. If the BigQuery dataset is in a different project than the Dataflow job, specify the full dataset name in the SQL query, for example: <PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME>. By default, the
query
parameter uses GoogleSQL (https://cloud.google.com/bigquery/docs/introduction-sql), unlessuseLegacySql
istrue
. You must specify eitherinputTableSpec
orquery
. If you set both parameters, the template uses thequery
parameter. (Example: select * from sampledb.sample_table). - useLegacySql : Set to true to use legacy SQL. This parameter only applies when using the
query
parameter. Defaults to: false. - queryLocation : Needed when reading from an authorized view without underlying table's permission. (Example: US).
- queryTempDataset : With this option, you can set an existing dataset to create the temporary table to store the results of the query. (Example: temp_dataset).
- elasticsearchUsername : The Elasticsearch username to authenticate with. If specified, the value of 'apiKey' is ignored.
- elasticsearchPassword : The Elasticsearch password to authenticate with. If specified, the value of 'apiKey' is ignored.
- batchSize : The batch size in number of documents. Defaults to: 1000.
- batchSizeBytes : The batch size in number of bytes. Defaults to: 5242880 (5mb).
- maxRetryAttempts : The maximum number of retry attempts. Must be greater than zero. Defaults to: no retries.
- maxRetryDuration : The maximum retry duration in milliseconds. Must be greater than zero. Defaults to: no retries.
- propertyAsIndex : The property in the document being indexed whose value specifies
_index
metadata to include with the document in bulk requests. Takes precedence over an_index
UDF. Defaults to: none. - javaScriptIndexFnGcsPath : The Cloud Storage path to the JavaScript UDF source for a function that specifies
_index
metadata to include with the document in bulk requests. Defaults to: none. - javaScriptIndexFnName : The name of the UDF JavaScript function that specifies
_index
metadata to include with the document in bulk requests. Defaults to: none. - propertyAsId : A property in the document being indexed whose value specifies
_id
metadata to include with the document in bulk requests. Takes precedence over an_id
UDF. Defaults to: none. - javaScriptIdFnGcsPath : The Cloud Storage path to the JavaScript UDF source for the function that specifies
_id
metadata to include with the document in bulk requests. Defaults to: none. - javaScriptIdFnName : The name of the UDF JavaScript function that specifies the
_id
metadata to include with the document in bulk requests. Defaults to: none. - javaScriptTypeFnGcsPath : The Cloud Storage path to the JavaScript UDF source for a function that specifies
_type
metadata to include with documents in bulk requests. Default: none. - javaScriptTypeFnName : The name of the UDF JavaScript function that specifies the
_type
metadata to include with the document in bulk requests. Defaults to: none. - javaScriptIsDeleteFnGcsPath : The Cloud Storage path to the JavaScript UDF source for the function that determines whether to delete the document instead of inserting or updating it. The function returns a string value of
true
orfalse
. Defaults to: none. - javaScriptIsDeleteFnName : The name of the UDF JavaScript function that determines whether to delete the document instead of inserting or updating it. The function returns a string value of
true
orfalse
. Defaults to: none. - usePartialUpdate : Whether to use partial updates (update rather than create or index, allowing partial documents) with Elasticsearch requests. Defaults to: false.
- bulkInsertMethod : Whether to use
INDEX
(index, allows upserts) orCREATE
(create, errors on duplicate _id) with Elasticsearch bulk requests. Defaults to: CREATE. - trustSelfSignedCerts : Whether to trust self-signed certificate or not. An Elasticsearch instance installed might have a self-signed certificate, Enable this to True to by-pass the validation on SSL certificate. (default is False).
- disableCertificateValidation : If 'true', trust the self-signed SSL certificate. An Elasticsearch instance might have a self-signed certificate. To bypass validation for the certificate, set this parameter to 'true'. Default: false.
- apiKeyKMSEncryptionKey : The Cloud KMS key to decrypt the API key. This parameter must be provided if the apiKeySource is set to KMS. If this parameter is provided, apiKey string should be passed in encrypted. Encrypt parameters using the KMS API encrypt endpoint. The Key should be in the format projects/{gcp_project}/locations/{key_region}/keyRings/{key_ring}/cryptoKeys/{kms_key_name}. See: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt (Example: projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name).
- apiKeySecretId : Secret Manager secret ID for the apiKey. This parameter should be provided if the apiKeySource is set to SECRET_MANAGER. Should be in the format projects/{project}/secrets/{secret}/versions/{secret_version}. (Example: projects/your-project-id/secrets/your-secret/versions/your-secret-version).
- apiKeySource : Source of the API key. One of PLAINTEXT, KMS or SECRET_MANAGER. This parameter must be provided if secret manager or KMS is used. If apiKeySource is set to KMS, apiKeyKMSEncryptionKey and encrypted apiKey must be provided. If apiKeySource is set to SECRET_MANAGER, apiKeySecretId must be provided. If apiKeySource is set to PLAINTEXT, apiKey must be provided. Defaults to: PLAINTEXT.
- socketTimeout : If set, overwrites the default max retry timeout and default socket timeout (30000ms) in the Elastic RestClient.
- javascriptTextTransformGcsPath : The Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) to use. (Example: gs://my-bucket/my-udfs/my_file.js).
- javascriptTextTransformFunctionName : The name of the JavaScript user-defined function (UDF) to use. For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }
, then the function name ismyTransform
. For sample JavaScript UDFs, see UDF Examples (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
User-defined functions
This template supports user-defined functions (UDFs) at several points in the pipeline, described below. For more information, see Create user-defined functions for Dataflow templates.
Index function
Returns the index to which the document belongs.
Template parameters:
javaScriptIndexFnGcsPath
: the Cloud Storage URI of the JavaScript file.javaScriptIndexFnName
: the name of the JavaScript function.
Function specification:
- Input: the Elasticsearch document, serialized as a JSON string.
- Output: the value of the document's
_index
metadata field.
Document ID function
Returns the document ID.
Template parameters:
javaScriptIdFnGcsPath
: the Cloud Storage URI of the JavaScript file.javaScriptIdFnName
: the name of the JavaScript function.
Function specification:
- Input: the Elasticsearch document, serialized as a JSON string.
- Output: the value of the document's
_id
metadata field.
Document deletion function
Specifies whether to delete a document. To use this function, set the bulk
insert mode to INDEX
and provide a
document ID function.
Template parameters:
javaScriptIsDeleteFnGcsPath
: the Cloud Storage URI of the JavaScript file.javaScriptIsDeleteFnName
: the name of the JavaScript function.
Function specification:
- Input: the Elasticsearch document, serialized as a JSON string.
- Output: return the string
"true"
to delete the document, or"false"
to upsert the document.
Mapping type function
Returns the document's mapping type.
Template parameters:
javaScriptTypeFnGcsPath
: the Cloud Storage URI of the JavaScript file.javaScriptTypeFnName
: the name of the JavaScript function.
Function specification:
- Input: the Elasticsearch document, serialized as a JSON string.
- Output: the value of the document's
_type
metadata field.
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
region is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the BigQuery to Elasticsearch template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_Elasticsearch \ --parameters \ inputTableSpec=INPUT_TABLE_SPEC,\ connectionUrl=CONNECTION_URL,\ apiKey=APIKEY,\ index=INDEX
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceREGION_NAME
: the region where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
INPUT_TABLE_SPEC
: your BigQuery table name.CONNECTION_URL
: your Elasticsearch URL.APIKEY
: your base64 encoded API key for authentication.INDEX
: your Elasticsearch index.
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputTableSpec": "INPUT_TABLE_SPEC", "connectionUrl": "CONNECTION_URL", "apiKey": "APIKEY", "index": "INDEX" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_Elasticsearch", } }
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceLOCATION
: the region where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
INPUT_TABLE_SPEC
: your BigQuery table name.CONNECTION_URL
: your Elasticsearch URL.APIKEY
: your base64 encoded API key for authentication.INDEX
: your Elasticsearch index.
What's next
- Learn about Dataflow templates.
- See the list of Google-provided templates.