The BigQuery to Elasticsearch template is a batch pipeline that ingests data from a BigQuery table into Elasticsearch as documents. The template can either read the entire table or read specific records using a supplied query.
Pipeline requirements
- The source BigQuery table must exist.
- A Elasticsearch host on a Google Cloud instance or on Elastic Cloud with Elasticsearch version 7.0 or later. Must be accessible from the Dataflow worker machines.
Template parameters
Required parameters
- connectionUrl: The Elasticsearch URL in the format
https://hostname:[port]
. If using Elastic Cloud, specify the CloudID. For example,https://elasticsearch-host:9200
. - apiKey: The Base64-encoded API key to use for authentication.
- index: The Elasticsearch index that the requests are issued to. For example,
my-index
.
Optional parameters
- inputTableSpec: The BigQuery table to read from. If you specify
inputTableSpec
, the template reads the data directly from BigQuery storage by using the BigQuery Storage Read API (https://cloud.google.com/bigquery/docs/reference/storage). For information about limitations in the Storage Read API, see https://cloud.google.com/bigquery/docs/reference/storage#limitations. You must specify eitherinputTableSpec
orquery
. If you set both parameters, the template uses thequery
parameter. For example,<BIGQUERY_PROJECT>:<DATASET_NAME>.<INPUT_TABLE>
. - outputDeadletterTable: The BigQuery table for messages that failed to reach the output table. If a table doesn't exist, it is created during pipeline execution. If not specified,
<outputTableSpec>_error_records
is used. For example,<PROJECT_ID>:<DATASET_NAME>.<DEADLETTER_TABLE>
. - query: The SQL query to use to read data from BigQuery. If the BigQuery dataset is in a different project than the Dataflow job, specify the full dataset name in the SQL query, for example: <PROJECT_ID>.<DATASET_NAME>.<TABLE_NAME>. By default, the
query
parameter uses GoogleSQL (https://cloud.google.com/bigquery/docs/introduction-sql), unlessuseLegacySql
istrue
. You must specify eitherinputTableSpec
orquery
. If you set both parameters, the template uses thequery
parameter. For example,select * from sampledb.sample_table
. - useLegacySql: Set to
true
to use legacy SQL. This parameter only applies when using thequery
parameter. Defaults tofalse
. - queryLocation: Needed when reading from an authorized view without underlying table's permission. For example,
US
. - queryTempDataset: With this option, you can set an existing dataset to create the temporary table to store the results of the query. For example,
temp_dataset
. - KMSEncryptionKey: If reading from BigQuery using query source, use this Cloud KMS key to encrypt any temporary tables created. For example,
projects/your-project/locations/global/keyRings/your-keyring/cryptoKeys/your-key
. - elasticsearchUsername: The Elasticsearch username to authenticate with. If specified, the value of
apiKey
is ignored. - elasticsearchPassword: The Elasticsearch password to authenticate with. If specified, the value of
apiKey
is ignored. - batchSize: The batch size in number of documents. Defaults to
1000
. - batchSizeBytes: The batch size in number of bytes. Defaults to
5242880
(5mb). - maxRetryAttempts: The maximum number of retry attempts. Must be greater than zero. Defaults to
no retries
. - maxRetryDuration: The maximum retry duration in milliseconds. Must be greater than zero. Defaults to
no retries
. - propertyAsIndex: The property in the document being indexed whose value specifies
_index
metadata to include with the document in bulk requests. Takes precedence over an_index
UDF. Defaults tonone
. - javaScriptIndexFnGcsPath: The Cloud Storage path to the JavaScript UDF source for a function that specifies
_index
metadata to include with the document in bulk requests. Defaults tonone
. - javaScriptIndexFnName: The name of the UDF JavaScript function that specifies
_index
metadata to include with the document in bulk requests. Defaults tonone
. - propertyAsId: A property in the document being indexed whose value specifies
_id
metadata to include with the document in bulk requests. Takes precedence over an_id
UDF. Defaults tonone
. - javaScriptIdFnGcsPath: The Cloud Storage path to the JavaScript UDF source for the function that specifies
_id
metadata to include with the document in bulk requests. Defaults tonone
. - javaScriptIdFnName: The name of the UDF JavaScript function that specifies the
_id
metadata to include with the document in bulk requests. Defaults tonone
. - javaScriptTypeFnGcsPath: The Cloud Storage path to the JavaScript UDF source for a function that specifies
_type
metadata to include with documents in bulk requests. Defaults tonone
. - javaScriptTypeFnName: The name of the UDF JavaScript function that specifies the
_type
metadata to include with the document in bulk requests. Defaults tonone
. - javaScriptIsDeleteFnGcsPath: The Cloud Storage path to the JavaScript UDF source for the function that determines whether to delete the document instead of inserting or updating it. The function returns a string value of
true
orfalse
. Defaults tonone
. - javaScriptIsDeleteFnName: The name of the UDF JavaScript function that determines whether to delete the document instead of inserting or updating it. The function returns a string value of
true
orfalse
. Defaults tonone
. - usePartialUpdate: Whether to use partial updates (update rather than create or index, allowing partial documents) with Elasticsearch requests. Defaults to
false
. - bulkInsertMethod: Whether to use
INDEX
(index, allows upserts) orCREATE
(create, errors on duplicate _id) with Elasticsearch bulk requests. Defaults toCREATE
. - trustSelfSignedCerts: Whether to trust self-signed certificate or not. An Elasticsearch instance installed might have a self-signed certificate, Enable this to true to by-pass the validation on SSL certificate. (Defaults to:
false
). - disableCertificateValidation: If
true
, trust the self-signed SSL certificate. An Elasticsearch instance might have a self-signed certificate. To bypass validation for the certificate, set this parameter totrue
. Defaults tofalse
. - apiKeyKMSEncryptionKey: The Cloud KMS key to decrypt the API key. This parameter is required if the
apiKeySource
is set toKMS
. If this parameter is provided, pass in an encryptedapiKey
string. Encrypt parameters using the KMS API encrypt endpoint. For the key, use the formatprojects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>
. See: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt For example,projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name
. - apiKeySecretId: The Secret Manager secret ID for the apiKey. If the
apiKeySource
is set toSECRET_MANAGER
, provide this parameter. Use the formatprojects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example,
projects/your-project-id/secrets/your-secret/versions/your-secret-version`. - apiKeySource: The source of the API key. Allowed values are
PLAINTEXT
,KMS
orandSECRET_MANAGER
. This parameter is required when you use Secret Manager or KMS. IfapiKeySource
is set toKMS
,apiKeyKMSEncryptionKey
and encrypted apiKey must be provided. IfapiKeySource
is set toSECRET_MANAGER
,apiKeySecretId
must be provided. IfapiKeySource
is set toPLAINTEXT
,apiKey
must be provided. Defaults to: PLAINTEXT. - socketTimeout: If set, overwrites the default max retry timeout and default socket timeout (30000ms) in the Elastic RestClient.
- javascriptTextTransformGcsPath: The Cloud Storage URI of the .js file that defines the JavaScript user-defined function (UDF) to use. For example,
gs://my-bucket/my-udfs/my_file.js
. - javascriptTextTransformFunctionName: The name of the JavaScript user-defined function (UDF) to use. For example, if your JavaScript function code is
myTransform(inJson) { /*...do stuff...*/ }
, then the function name ismyTransform
. For sample JavaScript UDFs, see UDF Examples (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
User-defined functions
This template supports user-defined functions (UDFs) at several points in the pipeline, described below. For more information, see Create user-defined functions for Dataflow templates.
Index function
Returns the index to which the document belongs.
Template parameters:
javaScriptIndexFnGcsPath
: the Cloud Storage URI of the JavaScript file.javaScriptIndexFnName
: the name of the JavaScript function.
Function specification:
- Input: the Elasticsearch document, serialized as a JSON string.
- Output: the value of the document's
_index
metadata field.
Document ID function
Returns the document ID.
Template parameters:
javaScriptIdFnGcsPath
: the Cloud Storage URI of the JavaScript file.javaScriptIdFnName
: the name of the JavaScript function.
Function specification:
- Input: the Elasticsearch document, serialized as a JSON string.
- Output: the value of the document's
_id
metadata field.
Document deletion function
Specifies whether to delete a document. To use this function, set the bulk
insert mode to INDEX
and provide a
document ID function.
Template parameters:
javaScriptIsDeleteFnGcsPath
: the Cloud Storage URI of the JavaScript file.javaScriptIsDeleteFnName
: the name of the JavaScript function.
Function specification:
- Input: the Elasticsearch document, serialized as a JSON string.
- Output: return the string
"true"
to delete the document, or"false"
to upsert the document.
Mapping type function
Returns the document's mapping type.
Template parameters:
javaScriptTypeFnGcsPath
: the Cloud Storage URI of the JavaScript file.javaScriptTypeFnName
: the name of the JavaScript function.
Function specification:
- Input: the Elasticsearch document, serialized as a JSON string.
- Output: the value of the document's
_type
metadata field.
Run the template
Console
- Go to the Dataflow Create job from template page. Go to Create job from template
- In the Job name field, enter a unique job name.
- Optional: For Regional endpoint, select a value from the drop-down menu. The default
region is
us-central1
.For a list of regions where you can run a Dataflow job, see Dataflow locations.
- From the Dataflow template drop-down menu, select the BigQuery to Elasticsearch template.
- In the provided parameter fields, enter your parameter values.
- Click Run job.
gcloud
In your shell or terminal, run the template:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_Elasticsearch \ --parameters \ inputTableSpec=INPUT_TABLE_SPEC,\ connectionUrl=CONNECTION_URL,\ apiKey=APIKEY,\ index=INDEX
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceREGION_NAME
: the region where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
INPUT_TABLE_SPEC
: your BigQuery table name.CONNECTION_URL
: your Elasticsearch URL.APIKEY
: your base64 encoded API key for authentication.INDEX
: your Elasticsearch index.
API
To run the template using the REST API, send an HTTP POST request. For more information on the
API and its authorization scopes, see
projects.templates.launch
.
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputTableSpec": "INPUT_TABLE_SPEC", "connectionUrl": "CONNECTION_URL", "apiKey": "APIKEY", "index": "INDEX" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_Elasticsearch", } }
Replace the following:
PROJECT_ID
: the Google Cloud project ID where you want to run the Dataflow jobJOB_NAME
: a unique job name of your choiceLOCATION
: the region where you want to deploy your Dataflow job—for example,us-central1
VERSION
: the version of the template that you want to useYou can use the following values:
latest
to use the latest version of the template, which is available in the non-dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/latest/- the version name, like
2023-09-12-00_RC00
, to use a specific version of the template, which can be found nested in the respective dated parent folder in the bucket— gs://dataflow-templates-REGION_NAME/
INPUT_TABLE_SPEC
: your BigQuery table name.CONNECTION_URL
: your Elasticsearch URL.APIKEY
: your base64 encoded API key for authentication.INDEX
: your Elasticsearch index.
What's next
- Learn about Dataflow templates.
- See the list of Google-provided templates.