Pub/Sub to Elasticsearch 模板

Pub/Sub to Elasticsearch 模板是一种流处理流水线,可从 Pub/Sub 订阅读取消息、执行用户定义的函数 (UDF) 并将其作为文档写入 Elasticsearch。Dataflow 模板使用 Elasticsearch 的数据流功能跨多个索引存储时间序列数据,同时为请求提供单个命名资源。数据流非常适合存储在 Pub/Sub 中的日志、指标、跟踪记录和其他持续生成的数据。

该模板会创建一个名为 logs-gcp.DATASET-NAMESPACE 的数据流,其中:

  • DATASETdataset 模板参数的值;如果未指定,则为 pubsub
  • NAMESPACEnamespace 模板参数的值;如果未指定,则为 default

流水线要求

  • 来源 Pub/Sub 订阅必须存在,并且消息必须采用有效的 JSON 格式进行编码。
  • Google Cloud 实例上或 Elastic Cloud 上使用 Elasticsearch 7.0 版或更高版本的可公开访问的 Elasticsearch 主机。如需了解详情,请参阅适用于 Elastic 的 Google Cloud 集成
  • 用于错误输出的 Pub/Sub 主题。

模板参数

必需参数

可选参数

  • dataset:使用 Pub/Sub 发送的日志类型,我们为其提供了开箱即用的信息中心。已知的日志类型值包括 audit、vpcflow 和 firewall。默认值“pubsub”。
  • namespace:任意分组,例如环境(dev、prod 或 qa)、团队或战略性业务部门。默认值:“default”。
  • elasticsearchTemplateVersion:Dataflow 模板版本标识符,通常由 Google Cloud 定义。默认值为 1.0.0。
  • javascriptTextTransformGcsPath:包含用户定义的函数的 JavaScript 代码的 Cloud Storage 路径模式。(示例:gs://your-bucket/your-function.js)。
  • javascriptTextTransformFunctionName:要从 JavaScript 文件调用的函数的名称。只能使用字母、数字和下划线。(示例:“transform”或“transform_udf1”)。
  • javascriptTextTransformReloadIntervalMinutes:定义工作器检查 JavaScript UDF 更改以重新加载文件的时间间隔。默认值为 0。
  • elasticsearchUsername:用于进行身份验证的 Elasticsearch 用户名。如果指定了此参数,则系统会忽略“apiKey”的值。
  • elasticsearchPassword:用于进行身份验证的 Elasticsearch 密码。如果指定了此参数,则系统会忽略“apiKey”的值。
  • batchSize:按文档数量的批次大小。默认值:“1000”。
  • batchSizeBytes:用于将消息批量插入 Elasticsearch 的批次大小(以字节为单位)。默认值:“5242880 (5mb)”。
  • maxRetryAttempts:重试次数上限,必须大于 0。默认值:“no retries”。
  • maxRetryDuration:重试时长上限(以毫秒为单位),必须大于 0。默认值:“no retries”。
  • propertyAsIndex:要编入索引的文档中的一个属性,其值指定批量请求要包含在文档中的“_index”元数据(优先于“_index”UDF)。默认值:none。
  • javaScriptIndexFnGcsPath:函数的 JavaScript UDF 来源的 Cloud Storage 路径,该函数指定批量请求要包含在文档中的“_index”元数据。默认值:none。
  • javaScriptIndexFnName:函数的 UDF JavaScript 函数名称,该函数指定批量请求要包含在文档中的 _index 元数据。默认值:none。
  • propertyAsId:要编入索引的文档中的一个属性,其值指定批量请求要包含在文档中的“_id”元数据(优先于“_id”UDF)。默认值:none。
  • javaScriptIdFnGcsPath:函数的 JavaScript UDF 来源的 Cloud Storage 路径,该函数指定批量请求要包含在文档中的“_id”元数据。默认值:none。
  • javaScriptIdFnName:函数的 UDF JavaScript 函数名称,该函数指定批量请求要包含在文档中的 _id 元数据。默认值:none。
  • javaScriptTypeFnGcsPath:函数的 JavaScript UDF 来源的 Cloud Storage 路径,该函数指定批量请求要包含在文档中的“_type”元数据。默认值:none。
  • javaScriptTypeFnName:函数的 UDF JavaScript 函数名称,该函数指定批量请求要包含在文档中的“_type”元数据。默认值:none。
  • javaScriptIsDeleteFnGcsPath:函数的 JavaScript UDF 来源的 Cloud Storage 路径,该函数确定是否应删除文档,而不是插入或更新文档。该函数应返回字符串值“true”或“false”。默认值:none。
  • javaScriptIsDeleteFnName:函数的 UDF JavaScript 函数名称,该函数确定是否应删除文档,而不是插入或更新文档。该函数应返回字符串值“true”或“false”。默认值:none。
  • usePartialUpdate:是否在 Elasticsearch 请求中使用部分更新(更新而不是创建或编入索引,允许部分文档)。默认值:“false”。
  • bulkInsertMethod:在 Elasticsearch 批量请求中使用“INDEX”(编入索引,允许 upsert)还是“CREATE”(创建,会对重复 _id 报错)。默认值:“CREATE”。
  • trustSelfSignedCerts:是否信任自签名证书。已安装的 Elasticsearch 实例可能具有自签名证书,将此参数设置为 True 可绕过对 SSL 证书的验证。(默认值为 False)。
  • disableCertificateValidation:如果为“true”,则信任自签名 SSL 证书。Elasticsearch 实例可能具有自签名证书。如需绕过对证书的验证,请将此参数设置为“true”。默认值:false。
  • apiKeyKMSEncryptionKey:用于解密 API 密钥的 Cloud KMS 密钥。如果 apiKeySource 设置为 KMS,则必须提供此参数。如果提供了此参数,则应以加密方式传递 apiKey 字符串。使用 KMS API 加密端点对参数进行加密。密钥应采用 projects/{gcp_project}/locations/{key_region}/keyRings/{key_ring}/cryptoKeys/{kms_key_name} 格式。请参阅:https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt(示例:projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name)。
  • apiKeySecretId:apiKey 的 Secret Manager Secret ID。如果 apiKeySource 设置为 SECRET_MANAGER,则应提供此参数。应采用 projects/{project}/secrets/{secret}/versions/{secret_version} 格式。(示例:projects/your-project-id/secrets/your-secret/versions/your-secret-version)。
  • apiKeySource:API 密钥的来源。PLAINTEXT、KMS 或 SECRET_MANAGER 之一。如果使用了 Secret Manager 或 KMS,则必须提供此参数。如果 apiKeySource 设置为 KMS,则必须提供 apiKeyKMSEncryptionKey 和加密的 apiKey。如果 apiKeySource 设置为 SECRET_MANAGER,则必须提供 apiKeySecretId。如果 apiKeySource 设置为 PLAINTEXT,则必须提供 apiKey。默认值为 PLAINTEXT。

用户定义的函数

此模板支持流水线中多个位置的用户定义的函数 (UDF),如下所述。如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数

文本转换函数

将 Pub/Sub 消息转换为 Elasticsearch 文档。

模板参数:

  • javascriptTextTransformGcsPath:JavaScript 文件的 Cloud Storage URI。
  • javascriptTextTransformFunctionName:JavaScript 函数的名称。

函数规范:

  • 输入:Pub/Sub 消息数据字段,序列化为 JSON 字符串。
  • 输出:要插入到 Elasticsearch 中的字符串化 JSON 文档。

索引函数

返回文档所属的索引。

模板参数:

  • javaScriptIndexFnGcsPath:JavaScript 文件的 Cloud Storage URI。
  • javaScriptIndexFnName:JavaScript 函数的名称。

函数规范:

  • 输入:Elasticsearch 文档,序列化为 JSON 字符串。
  • 输出:文档的 _index 元数据字段的值。

文档 ID 函数

返回文档 ID。

模板参数:

  • javaScriptIdFnGcsPath:JavaScript 文件的 Cloud Storage URI。
  • javaScriptIdFnName:JavaScript 函数的名称。

函数规范:

  • 输入:Elasticsearch 文档,序列化为 JSON 字符串。
  • 输出:文档的 _id 元数据字段的值。

文档删除函数

指定是否删除文档。如需使用此函数,请将批量插入模式设置为 INDEX 并提供文档 ID 函数

模板参数:

  • javaScriptIsDeleteFnGcsPath:JavaScript 文件的 Cloud Storage URI。
  • javaScriptIsDeleteFnName:JavaScript 函数的名称。

函数规范:

  • 输入:Elasticsearch 文档,序列化为 JSON 字符串。
  • 输出:返回字符串 "true" 可删除文档,返回 "false" 可更新/插入文档。

映射类型函数

返回文档的映射类型。

模板参数:

  • javaScriptTypeFnGcsPath:JavaScript 文件的 Cloud Storage URI。
  • javaScriptTypeFnName:JavaScript 函数的名称。

函数规范:

  • 输入:Elasticsearch 文档,序列化为 JSON 字符串。
  • 输出:文档的 _type 元数据字段的值。

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to Elasticsearch template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Elasticsearch \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • ERROR_OUTPUT_TOPIC:用于错误输出的 Pub/Sub 主题
  • SUBSCRIPTION_NAME:您的 Pub/Sub 订阅名称
  • CONNECTION_URL:您的 Elasticsearch 网址
  • DATASET:您的日志类型
  • NAMESPACE:数据集的命名空间
  • APIKEY:用于身份验证的 base64 编码 API 密钥

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_Elasticsearch",
   }
}
  

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • ERROR_OUTPUT_TOPIC:用于错误输出的 Pub/Sub 主题
  • SUBSCRIPTION_NAME:您的 Pub/Sub 订阅名称
  • CONNECTION_URL:您的 Elasticsearch 网址
  • DATASET:您的日志类型
  • NAMESPACE:数据集的命名空间
  • APIKEY:用于身份验证的 base64 编码 API 密钥

后续步骤