BigQuery to Elasticsearch 模板是一种批处理流水线,用于将 BigQuery 表中的数据作为文档注入到 Elasticsearch 中。该模板可以读取整个表,也可以使用提供的查询读取特定记录。
流水线要求
- 源 BigQuery 表必须存在。
- 必须具有在 Google Cloud 实例上或 Elastic Cloud 上运行的使用 Elasticsearch 7.0 版或更高版本的 Elasticsearch 主机。必须可从 Dataflow 工作器机器访问。
模板参数
参数 | 说明 |
---|---|
connectionUrl |
Elasticsearch 网址,格式为 https://hostname:[port] 或指定 CloudID(如果使用 Elastic Cloud)。 |
apiKey |
用于身份验证的 Base64 编码 API 密钥。 |
index |
将向其发出请求的 Elasticsearch 索引,例如 my-index 。 |
inputTableSpec |
(可选)要读取并插入到 Elasticsearch 中的 BigQuery 表。必须提供表或查询。例如 projectId:datasetId.tablename 。 |
query |
(可选)用于从 BigQuery 拉取数据的 SQL 查询。必须提供表或查询。 |
useLegacySql |
(可选)设置为 true 即可使用旧版 SQL(仅在提供查询时适用)。默认值:false 。 |
batchSize |
(可选)文档数量中的批次大小。默认值:1000 。 |
batchSizeBytes |
(可选)批次大小(以字节为单位)。默认值:5242880 (5mb)。 |
maxRetryAttempts |
(可选)尝试次数上限,必须大于 0。默认值:no retries 。 |
maxRetryDuration |
(可选)重试时长上限(以毫秒为单位),必须大于 0。默认值:no retries 。 |
propertyAsIndex |
(可选)要编入索引的文档中的一个属性,其值将指定批量请求要包含在文档中的 _index 元数据(优先于 _index UDF)。默认值:none。 |
propertyAsId |
(可选)要编入索引的文档中的一个属性,其值将指定批量请求要包含在文档中的 _id 元数据(优先于 _id UDF)。默认值:none。 |
javaScriptIndexFnGcsPath |
(可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _index 元数据。默认值:none。 |
javaScriptIndexFnName |
(可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _index 元数据。默认值:none。 |
javaScriptIdFnGcsPath |
(可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _id 元数据。默认值:none。 |
javaScriptIdFnName |
(可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _id 元数据。默认值:none。 |
javaScriptTypeFnGcsPath |
(可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _type 元数据。默认值:none。 |
javaScriptTypeFnName |
(可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _type 元数据。默认值:none。 |
javaScriptIsDeleteFnGcsPath |
(可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将确定是否应删除文档,而不是插入或更新文档。该函数应返回字符串值 "true" 或 "false" 。默认值:none。 |
javaScriptIsDeleteFnName |
(可选)函数的 UDF JavaScript 函数名称,该函数将确定是否应删除文档,而不是插入或更新文档。该函数应返回字符串值 "true" 或 "false" 。默认值:none。 |
usePartialUpdate |
(可选)是否在 Elasticsearch 请求中使用部分更新(更新而不是创建或索引,允许部分文档)。默认值:false 。 |
bulkInsertMethod |
(可选)在 Elasticsearch 批量请求中使用 INDEX (索引,允许执行更新插入操作)还是 CREATE (创建,会对重复 _id 报错)。默认值:CREATE 。 |
用户定义的函数
此模板支持流水线中多个位置的用户定义的函数 (UDF),如下所述。如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数。
索引函数
返回文档所属的索引。
模板参数:
javaScriptIndexFnGcsPath
:JavaScript 文件的 Cloud Storage URI。javaScriptIndexFnName
:JavaScript 函数的名称。
函数规范:
- 输入:Elasticsearch 文档,序列化为 JSON 字符串。
- 输出:文档的
_index
元数据字段的值。
文档 ID 函数
返回文档 ID。
模板参数:
javaScriptIdFnGcsPath
:JavaScript 文件的 Cloud Storage URI。javaScriptIdFnName
:JavaScript 函数的名称。
函数规范:
- 输入:Elasticsearch 文档,序列化为 JSON 字符串。
- 输出:文档的
_id
元数据字段的值。
文档删除函数
指定是否删除文档。如需使用此函数,请将批量插入模式设置为 INDEX
并提供文档 ID 函数。
模板参数:
javaScriptIsDeleteFnGcsPath
:JavaScript 文件的 Cloud Storage URI。javaScriptIsDeleteFnName
:JavaScript 函数的名称。
函数规范:
- 输入:Elasticsearch 文档,序列化为 JSON 字符串。
- 输出:返回字符串
"true"
可删除文档,返回"false"
可更新/插入文档。
映射类型函数
返回文档的映射类型。
模板参数:
javaScriptTypeFnGcsPath
:JavaScript 文件的 Cloud Storage URI。javaScriptTypeFnName
:JavaScript 函数的名称。
函数规范:
- 输入:Elasticsearch 文档,序列化为 JSON 字符串。
- 输出:文档的
_type
元数据字段的值。
运行模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为
us-central1
。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the BigQuery to Elasticsearch template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID \ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/BigQuery_to_Elasticsearch \ --parameters \ inputTableSpec=INPUT_TABLE_SPEC,\ connectionUrl=CONNECTION_URL,\ apiKey=APIKEY,\ index=INDEX
请替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 IDJOB_NAME
:您选择的唯一性作业名称REGION_NAME
:要在其中部署 Dataflow 作业的区域,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
INPUT_TABLE_SPEC
:您的 BigQuery 表名称。CONNECTION_URL
:您的 Elasticsearch 网址。APIKEY
:用于身份验证的 base64 编码 API 密钥。INDEX
:您的 Elasticsearch 索引。
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch
。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputTableSpec": "INPUT_TABLE_SPEC", "connectionUrl": "CONNECTION_URL", "apiKey": "APIKEY", "index": "INDEX" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/BigQuery_to_Elasticsearch", } }
请替换以下内容:
PROJECT_ID
:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 IDJOB_NAME
:您选择的唯一性作业名称LOCATION
:要在其中部署 Dataflow 作业的区域,例如us-central1
VERSION
:您要使用的模板的版本您可使用以下值:
latest
,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00
),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
INPUT_TABLE_SPEC
:您的 BigQuery 表名称。CONNECTION_URL
:您的 Elasticsearch 网址。APIKEY
:用于身份验证的 base64 编码 API 密钥。INDEX
:您的 Elasticsearch 索引。
后续步骤
- 了解 Dataflow 模板。
- 参阅 Google 提供的模板列表。