Cloud Storage to Elasticsearch 模板

Cloud Storage to Elasticsearch 模板是一种批处理流水线,可从存储在 Cloud Storage 存储桶中的 CSV 文件读取数据,并将数据作为 JSON 文档注入到 Elasticsearch 中。

流水线要求

  • Cloud Storage 存储桶必须存在。
  • Google Cloud 实例或 Elasticsearch Cloud 上必须存在可通过 Dataflow 访问的 Elasticsearch 主机。
  • 错误输出的 BigQuery 表必须存在。

CSV 架构

如果 CSV 文件包含标题,请将 containsHeaders 模板参数设置为 true

否则,请创建一个描述数据的 JSON 架构文件。在 jsonSchemaPath 模板参数中指定架构文件的 Cloud Storage URI。以下示例展示了一个 JSON 架构:

[{"name":"id", "type":"text"}, {"name":"age", "type":"integer"}]

或者,您可以提供用户定义的函数 (UDF),用于解析 CSV 文本并输出 Elasticsearch 文档。

模板参数

参数 说明
inputFileSpec 用于搜索 CSV 文件的 Cloud Storage 文件格式。示例:gs://mybucket/test-*.csv
connectionUrl Elasticsearch 网址,格式为 https://hostname:[port] 或指定 CloudID(如果使用 Elastic Cloud)。
apiKey 用于身份验证的 Base64 编码 API 密钥。
index 将对其发出请求的 Elasticsearch 索引,例如 my-index
deadletterTable 将失败的插入发送到的 BigQuery Deadletter 表。示例:<your-project>:<your-dataset>.<your-table-name>
containsHeaders (可选)用于指明 CSV 中是否包含标题的布尔值。默认值:false
delimiter (可选)CSV 使用的分隔符。示例:,
csvFormat (可选)基于 Apache Commons CSV 格式的 CSV 格式。默认值:Default
jsonSchemaPath (可选)JSON 架构的路径。默认值:null
largeNumFiles (可选)如果文件数达到数万个,则设置为 true。默认值:false
javascriptTextTransformGcsPath (可选).js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
javascriptTextTransformFunctionName (可选) 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称。 例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例
batchSize (可选)文档数量中的批次大小。默认值:1000
batchSizeBytes (可选)批次大小(以字节为单位)。默认值:5242880 (5mb)。
maxRetryAttempts (可选)尝试次数上限,必须大于 0。默认值:不重试。
maxRetryDuration (可选)重试时长上限(以毫秒为单位),必须大于 0。默认值:不重试。
csvFileEncoding (可选)CSV 文件编码。
propertyAsIndex (可选)要编入索引的文档中的一个属性,其值将指定批量请求要包含在文档中的 _index 元数据(优先于 _index UDF)。默认值:none。
propertyAsId (可选)要编入索引的文档中的一个属性,其值将指定批量请求要包含在文档中的 _id 元数据(优先于 _id UDF)。默认值:none。
javaScriptIndexFnGcsPath (可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _index 元数据。默认值:none。
javaScriptIndexFnName (可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _index 元数据。默认值:none。
javaScriptIdFnGcsPath (可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _id 元数据。默认值:none。
javaScriptIdFnName (可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _id 元数据。默认值:none。
javaScriptTypeFnGcsPath (可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将指定批量请求要包含在文档中的 _type 元数据。默认值:none。
javaScriptTypeFnName (可选)函数的 UDF JavaScript 函数名称,该函数将指定批量请求要包含在文档中的 _type 元数据。默认值:none。
javaScriptIsDeleteFnGcsPath (可选)函数的 JavaScript UDF 源的 Cloud Storage 路径,该函数将确定是否应删除文档,而不是插入或更新文档。该函数应返回字符串值 "true""false"。默认值:none。
javaScriptIsDeleteFnName (可选)函数的 UDF JavaScript 函数名称,该函数将确定是否应删除文档,而不是插入或更新文档。该函数应返回字符串值 "true""false"。默认值:none。
usePartialUpdate (可选)是否在 Elasticsearch 请求中使用部分更新(更新而不是创建或索引,允许部分文档)。默认值:false
bulkInsertMethod (可选)在 Elasticsearch 批量请求中使用 INDEX(索引,允许执行更新插入操作)还是 CREATE(创建,会对重复 _id 报错)。默认值:CREATE

用户定义的函数

此模板支持流水线中多个位置的用户定义的函数 (UDF),如下所述。如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数

文本转换函数

将 CSV 数据转换为 Elasticsearch 文档。

模板参数:

  • javascriptTextTransformGcsPath:JavaScript 文件的 Cloud Storage URI。
  • javascriptTextTransformFunctionName:JavaScript 函数的名称。

函数规范:

  • 输入:输入 CSV 文件中的一行。
  • 输出:要插入到 Elasticsearch 中的字符串化 JSON 文档。

索引函数

返回文档所属的索引。

模板参数:

  • javaScriptIndexFnGcsPath:JavaScript 文件的 Cloud Storage URI。
  • javaScriptIndexFnName:JavaScript 函数的名称。

函数规范:

  • 输入:Elasticsearch 文档,序列化为 JSON 字符串。
  • 输出:文档的 _index 元数据字段的值。

文档 ID 函数

返回文档 ID。

模板参数:

  • javaScriptIdFnGcsPath:JavaScript 文件的 Cloud Storage URI。
  • javaScriptIdFnName:JavaScript 函数的名称。

函数规范:

  • 输入:Elasticsearch 文档,序列化为 JSON 字符串。
  • 输出:文档的 _id 元数据字段的值。

文档删除函数

指定是否删除文档。如需使用此函数,请将批量插入模式设置为 INDEX 并提供文档 ID 函数

模板参数:

  • javaScriptIsDeleteFnGcsPath:JavaScript 文件的 Cloud Storage URI。
  • javaScriptIsDeleteFnName:JavaScript 函数的名称。

函数规范:

  • 输入:Elasticsearch 文档,序列化为 JSON 字符串。
  • 输出:返回字符串 "true" 可删除文档,返回 "false" 可更新/插入文档。

映射类型函数

返回文档的映射类型。

模板参数:

  • javaScriptTypeFnGcsPath:JavaScript 文件的 Cloud Storage URI。
  • javaScriptTypeFnName:JavaScript 函数的名称。

函数规范:

  • 输入:Elasticsearch 文档,序列化为 JSON 字符串。
  • 输出:文档的 _type 元数据字段的值。

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Cloud Storage to Elasticsearch template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID\
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_to_Elasticsearch \
    --parameters \
inputFileSpec=INPUT_FILE_SPEC,\
connectionUrl=CONNECTION_URL,\
apiKey=APIKEY,\
index=INDEX,\
deadletterTable=DEADLETTER_TABLE,\

请替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • INPUT_FILE_SPEC:您的 Cloud Storage 文件格式。
  • CONNECTION_URL:您的 Elasticsearch 网址。
  • APIKEY:用于身份验证的 base64 编码 API 密钥。
  • INDEX:您的 Elasticsearch 索引。
  • DEADLETTER_TABLE:您的 BigQuery 表。

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileSpec": "INPUT_FILE_SPEC",
          "connectionUrl": "CONNECTION_URL",
          "apiKey": "APIKEY",
          "index": "INDEX",
          "deadletterTable": "DEADLETTER_TABLE"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_to_Elasticsearch",
   }
}

请替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • INPUT_FILE_SPEC:您的 Cloud Storage 文件格式。
  • CONNECTION_URL:您的 Elasticsearch 网址。
  • APIKEY:用于身份验证的 base64 编码 API 密钥。
  • INDEX:您的 Elasticsearch 索引。
  • DEADLETTER_TABLE:您的 BigQuery 表。

后续步骤