Cloud Storage to Elasticsearch 模板是一种批处理流水线,可从存储在 Cloud Storage 存储桶中的 CSV 文件读取数据,并将数据作为 JSON 文档注入到 Elasticsearch 中。
流水线要求
- Cloud Storage 存储桶必须存在。
- Google Cloud Platform 实例或 Elasticsearch Cloud 上必须存在可通过 Dataflow 访问的 Elasticsearch 主机。
- 错误输出的 BigQuery 表必须存在。
CSV 架构
如果 CSV 文件包含标题行,请将 containsHeaders 模板参数设置为 true。
否则,请创建一个 JSON 架构文件来描述数据。在 jsonSchemaPath 模板参数中指定架构文件的 Cloud Storage URI。以下示例展示了 JSON 架构:
[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]
或者,您可以提供一个用户定义的函数 (UDF),用于解析 CSV 文本并输出 Elasticsearch 文档。
模板参数
必需参数
- deadletterTable:将失败的插入发送到的 BigQuery 死信表。例如
your-project:your-dataset.your-table-name。 - inputFileSpec:用于搜索 CSV 文件的 Cloud Storage 文件格式。例如
gs://mybucket/test-*.csv。 - connectionUrl:Elasticsearch 网址,格式为
https://hostname:[port]。如果使用 Elastic Cloud,请指定 CloudID。例如https://elasticsearch-host:9200。 - apiKey:用于身份验证的 Base64 编码 API 密钥。
- index:发出请求的 Elasticsearch 索引。例如
my-index。
可选参数
- inputFormat:输入文件格式。默认值为
CSV。 - containsHeaders:输入 CSV 文件包含标题记录 (true/false)。仅在读取 CSV 文件时才需要。默认值为:false。
- delimiter:输入文本文件的列分隔符。默认值:
,。例如,,。 - csvFormat:用于解析记录的 CSV 格式规范。默认值为:
Default。如需了解详情,请参阅 https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.html。必须与以下网址中找到的格式名称完全一致:https://commons.apache.org/proper/commons-csv/apidocs/org/apache/commons/csv/CSVFormat.Predefined.html。 - jsonSchemaPath:JSON 架构的路径。默认值为
null。例如gs://path/to/schema。 - largeNumFiles:如果文件数达到数万个,则设置为 true。默认值为
false。 - csvFileEncoding:CSV 文件的字符编码格式。允许的值包括
US-ASCII、ISO-8859-1、UTF-8和UTF-16。默认为 UTF-8。 - logDetailedCsvConversionErrors:设置为
true可在 CSV 解析失败时启用详细错误日志记录。请注意,这可能会导致日志中包含敏感数据(例如,如果 CSV 文件包含密码)。默认值:false。 - elasticsearchUsername:用于进行身份验证的 Elasticsearch 用户名。如果指定,则系统会忽略
apiKey的值。 - elasticsearchPassword:用于进行身份验证的 Elasticsearch 密码。如果指定,则系统会忽略
apiKey的值。 - batchSize:按文档数量的批次大小。默认值为
1000。 - batchSizeBytes:批次大小(以字节数为单位)。默认值为
5242880(5mb)。 - maxRetryAttempts:重试次数上限。必须大于零。默认值为
no retries。 - maxRetryDuration:重试时长上限(以毫秒为单位)。必须大于零。默认值为
no retries。 - propertyAsIndex:要编入索引的文档中的一个属性,其值指定批量请求要包含在文档中的
_index元数据。优先于_indexUDF。默认值为none。 - javaScriptIndexFnGcsPath:函数的 JavaScript UDF 来源的 Cloud Storage 路径,该函数指定批量请求要包含在文档中的
_index元数据。默认值为none。 - javaScriptIndexFnName:UDF JavaScript 函数的名称,该函数指定要将
_index元数据包含在批量请求的文档中。默认值为none。 - propertyAsId:要编入索引的文档中的一个属性,其值指定批量请求要包含在文档中的
_id元数据。优先于_idUDF。默认值为none。 - javaScriptIdFnGcsPath:函数的 JavaScript UDF 来源的 Cloud Storage 路径,该函数指定批量请求要包含在文档中的
_id元数据。默认值为none。 - javaScriptIdFnName:UDF JavaScript 函数的名称,该函数指定批量请求要包含在文档中的
_id元数据。默认值为none。 - javaScriptTypeFnGcsPath:函数的 JavaScript UDF 来源的 Cloud Storage 路径,该函数指定要将
_type元数据包含在批量请求的文档中。默认值为none。 - javaScriptTypeFnName:UDF JavaScript 函数的名称,该函数指定批量请求要包含在文档中的
_type元数据。默认值为none。 - javaScriptIsDeleteFnGcsPath:函数的 JavaScript UDF 来源的 Cloud Storage 路径,该函数确定是否应删除文档,而不是插入或更新文档。该函数会返回字符串值
true或false。默认值为none。 - javaScriptIsDeleteFnName:UDF JavaScript 函数的名称,该函数确定是否应删除文档,而不是插入或更新文档。该函数会返回字符串值
true或false。默认值为none。 - usePartialUpdate:是否在 Elasticsearch 请求中使用部分更新(更新而不是创建或编入索引,允许部分文档)。默认值为
false。 - bulkInsertMethod:在 Elasticsearch 批量请求中使用
INDEX(编入索引,允许 upsert)还是CREATE(创建,会对重复 _id 报错)。默认值为CREATE。 - trustSelfSignedCerts:是否信任自签名证书。已安装的 Elasticsearch 实例可能具有自签名证书,将此参数设置为 true 可绕过对 SSL 证书的验证。(默认值为:
false。) - disableCertificateValidation:如果为
true,则信任自签名 SSL 证书。Elasticsearch 实例可能具有自签名证书。如需绕过证书的验证,请将此参数设置为true。默认值为false。 - apiKeyKMSEncryptionKey:用于解密 API 密钥的 Cloud KMS 密钥。如果
apiKeySource设置为KMS,则此参数是必需的。如果提供了此参数,请传入加密的apiKey字符串。使用 KMS API 加密端点对参数进行加密。对于密钥,请使用projects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>格式。请参阅:https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt。例如,projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name。 - apiKeySecretId:apiKey 的 Secret Manager Secret ID。如果
apiKeySource设置为SECRET_MANAGER,请提供此参数。请使用格式projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example,projects/your-project-id/secrets/your-secret/versions/your-secret-version`。 - apiKeySource:API 密钥的来源。允许的值包括
PLAINTEXT、KMS和SECRET_MANAGER。如果您使用 Secret Manager 或 KMS,则此参数是必需的。如果apiKeySource设置为KMS、apiKeyKMSEncryptionKey和已加密,则必须提供 apiKey。如果apiKeySource设置为SECRET_MANAGER,则必须提供apiKeySecretId。如果apiKeySource设置为PLAINTEXT,则必须提供apiKey。默认值为 PLAINTEXT。 - socketTimeout:如果设置,则会覆盖 Elastic RestClient 中的默认重试超时上限和默认套接字超时(30,000 毫秒)。
- javascriptTextTransformGcsPath:.js 文件的 Cloud Storage URI,用于定义要使用的 JavaScript 用户定义的函数 (UDF)。例如
gs://my-bucket/my-udfs/my_file.js。 - javascriptTextTransformFunctionName:要使用的 JavaScript 用户定义的函数 (UDF) 的名称。例如,如果 JavaScript 函数代码为
myTransform(inJson) { /*...do stuff...*/ },则函数名称为myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例 (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)。
用户定义的函数
此模板支持流水线中多个位置的用户定义的函数 (UDF),如下所述。如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数。
文本转换函数
将 CSV 数据转换为 Elasticsearch 文档。
模板参数:
javascriptTextTransformGcsPath:JavaScript 文件的 Cloud Storage URI。javascriptTextTransformFunctionName:JavaScript 函数的名称。
函数规范:
- 输入:来自输入 CSV 文件的单行内容。
- 输出:要插入到 Elasticsearch 中的字符串化 JSON 文档。
索引函数
返回文档所属的索引。
模板参数:
javaScriptIndexFnGcsPath:JavaScript 文件的 Cloud Storage URI。javaScriptIndexFnName:JavaScript 函数的名称。
函数规范:
- 输入:Elasticsearch 文档,序列化为 JSON 字符串。
- 输出:文档的
_index元数据字段的值。
文档 ID 函数
返回文档 ID。
模板参数:
javaScriptIdFnGcsPath:JavaScript 文件的 Cloud Storage URI。javaScriptIdFnName:JavaScript 函数的名称。
函数规范:
- 输入:Elasticsearch 文档,序列化为 JSON 字符串。
- 输出:文档的
_id元数据字段的值。
文档删除函数
指定是否删除文档。如需使用此函数,请将批量插入模式设置为 INDEX 并提供文档 ID 函数。
模板参数:
javaScriptIsDeleteFnGcsPath:JavaScript 文件的 Cloud Storage URI。javaScriptIsDeleteFnName:JavaScript 函数的名称。
函数规范:
- 输入:Elasticsearch 文档,序列化为 JSON 字符串。
- 输出:返回字符串
"true"可删除文档,返回"false"可更新/插入文档。
映射类型函数
返回文档的映射类型。
模板参数:
javaScriptTypeFnGcsPath:JavaScript 文件的 Cloud Storage URI。javaScriptTypeFnName:JavaScript 函数的名称。
函数规范:
- 输入:Elasticsearch 文档,序列化为 JSON 字符串。
- 输出:文档的
_type元数据字段的值。
运行模板
控制台
- 转到 Dataflow 基于模板创建作业页面。 转到“基于模板创建作业”
- 在作业名称字段中,输入唯一的作业名称。
- 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为
us-central1。如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置。
- 从 Dataflow 模板下拉菜单中,选择 the Cloud Storage to Elasticsearch template。
- 在提供的参数字段中,输入您的参数值。
- 点击运行作业。
gcloud
在 shell 或终端中,运行模板:
gcloud dataflow flex-template run JOB_NAME \ --project=PROJECT_ID\ --region=REGION_NAME \ --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \ --parameters \ inputFileSpec=INPUT_FILE_SPEC,\ connectionUrl=CONNECTION_URL,\ apiKey=APIKEY,\ index=INDEX,\ deadletterTable=DEADLETTER_TABLE,\
替换以下内容:
PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目 IDJOB_NAME:您选择的唯一性作业名称VERSION:您要使用的模板的版本您可使用以下值:
latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
REGION_NAME:要在其中部署 Dataflow 作业的区域,例如us-central1INPUT_FILE_SPEC:您的 Cloud Storage 文件格式。CONNECTION_URL:您的 Elasticsearch 网址。APIKEY:用于身份验证的 base64 编码 API 密钥。INDEX:您的 Elasticsearch 索引。DEADLETTER_TABLE:您的 BigQuery 表。
API
如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch。
POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch { "launch_parameter": { "jobName": "JOB_NAME", "parameters": { "inputFileSpec": "INPUT_FILE_SPEC", "connectionUrl": "CONNECTION_URL", "apiKey": "APIKEY", "index": "INDEX", "deadletterTable": "DEADLETTER_TABLE" }, "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch", } }
替换以下内容:
PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目 IDJOB_NAME:您选择的唯一性作业名称VERSION:您要使用的模板的版本您可使用以下值:
latest,以使用模板的最新版本,该模板在存储桶的未标示日期的父文件夹 (gs://dataflow-templates-REGION_NAME/latest/) 中可用- 版本名称(如
2023-09-12-00_RC00),以使用模板的特定版本,该版本嵌套在存储桶的相应日期父文件夹 (gs://dataflow-templates-REGION_NAME/) 中
LOCATION:要在其中部署 Dataflow 作业的区域,例如us-central1INPUT_FILE_SPEC:您的 Cloud Storage 文件格式。CONNECTION_URL:您的 Elasticsearch 网址。APIKEY:用于身份验证的 base64 编码 API 密钥。INDEX:您的 Elasticsearch 索引。DEADLETTER_TABLE:您的 BigQuery 表。
后续步骤
- 了解 Dataflow 模板。
- 参阅 Google 提供的模板列表。