Pub/Sub to Splunk 模板

Pub/Sub to Splunk 模板是一种流处理流水线,可从 Pub/Sub 订阅中读取消息,并使用 Splunk 的 HTTP Event Collector (HEC) 将消息载荷写入 Splunk。此模板的最常见使用场景是将日志导出到 Splunk。如需查看底层工作流的示例,请参阅使用 Dataflow 将支持生产环境的日志导出部署到 Splunk

在写入 Splunk 之前,您还可以将 JavaScript 用户定义函数应用于消息载荷。任何未能成功处理的消息都会被转发到 Pub/Sub 未处理主题,以便进一步进行问题排查并重新处理。

要为 HEC 令牌提供额外保护,您还可以传入 Cloud KMS 密钥和使用 Cloud KMS 密钥加密的 base64 编码 HEC 令牌参数。如需详细了解如何对 HEC 令牌参数进行加密,请参阅 Cloud KMS API 加密端点

流水线要求

  • 源 Pub/Sub 订阅必须已存在才能运行此流水线。
  • 在运行此流水线之前,Pub/Sub 未处理的主题必须已存在。
  • Splunk HEC 端点必须可从 Dataflow 工作器的网络访问。
  • Splunk HEC 令牌必须已生成并且可用。

模板参数

必需参数

  • inputSubscription:要从中读取输入的 Pub/Sub 订阅。(示例:projects/your-project-id/subscriptions/your-subscription-name)。
  • url:Splunk HEC 网址。此网址必须可从运行流水线的 VPC 路由。(示例:https://splunk-hec-host:8088)。
  • outputDeadletterTopic:用于转发无法递送的消息的 Pub/Sub 主题。例如,projects/<PROJECT_ID>/topics/<TOPIC_NAME>。

可选参数

  • token:Splunk HEC 身份验证令牌。如果 tokenSource 参数设置为 PLAINTEXTKMS,则必须提供。
  • batchCount:向 Splunk 发送多个事件的批次大小。默认值为 1(无批处理)。
  • disableCertificateValidation:停用 SSL 证书验证。默认为 false(已启用验证)。 如果为 true,则不验证证书(所有证书均受信任),并且忽略 rootCaCertificatePath 参数。
  • parallelism:最大并行请求数。默认值为 1(无并行)。
  • includePubsubMessage:在载荷中包含完整的 Pub/Sub 消息。默认值为 false(只有数据元素包含在载荷中)。
  • tokenKMSEncryptionKey:用于解密 HEC 令牌字符串的 Cloud KMS 密钥。如果 tokenSource 设置为 KMS,则必须提供此参数。如果提供了 Cloud KMS 密钥,则必须以加密方式传递 HEC 令牌字符串 must。(示例:projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name)。
  • tokenSecretId:令牌的 Secret Manager Secret ID。如果 tokenSource 设置为 SECRET_MANAGER,则必须提供此参数。(示例:projects/your-project-id/secrets/your-secret/versions/your-secret-version)。
  • tokenSource:令牌的来源。允许使用以下值:PLAINTEXTKMSSECRET_MANAGER。如果使用了 Secret Manager,则必须提供此参数。如果 tokenSource 设置为 KMStokenKMSEncryptionKey 和已加密,则必须提供 token。如果 tokenSource 设置为 SECRET_MANAGER,则必须提供 tokenSecretId。如果 tokenSource 设置为 PLAINTEXT,则必须提供 token
  • rootCaCertificatePath:Cloud Storage 中根 CA 证书的完整网址。Cloud Storage 中提供的证书必须采用 DER 编码,并且可能以二进制或可打印 (Base64) 编码提供。如果证书是使用 Base64 编码提供的,则它必须以 -----BEGIN CERTIFICATE----- 开头为界,并且必须以 -----END CERTIFICATE----- 结尾为界。如果提供此参数,系统会提取此私有 CA 证书文件并将其添加到 Dataflow 工作器的信任库,以便验证 Splunk HEC 端点的 SSL 证书。如果未提供此参数,则使用默认信任库。 (示例:gs://mybucket/mycerts/privateCA.crt)。
  • enableBatchLogs:指定是否应为写入 Splunk 的批次启用日志。默认值:true
  • enableGzipHttpCompression:指定是否应压缩发送到 Splunk HEC 的 HTTP 请求(gzip 内容编码)。默认值:true
  • javascriptTextTransformGcsPath:.js 文件的 Cloud Storage URI,用于定义要使用的 JavaScript 用户定义的函数 (UDF)。例如 gs://my-bucket/my-udfs/my_file.js
  • javascriptTextTransformFunctionName:要使用的 JavaScript 用户定义的函数 (UDF) 的名称。例如,如果 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例 (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples)。
  • javascriptTextTransformReloadIntervalMinutes:定义工作器检查 JavaScript UDF 更改以重新加载文件的时间间隔。默认值为 0。

用户定义的函数

(可选)您可以通过编写用户定义的函数 (UDF) 来扩展此模板。该模板会为每个输入元素调用 UDF。元素载荷会序列化为 JSON 字符串。如需了解详情,请参阅为 Dataflow 模板创建用户定义的函数

函数规范

UDF 具有以下规范:

  • 输入:Pub/Sub 消息数据字段,序列化为 JSON 字符串。
  • 输出:要发送到 Splunk HEC 事件端点的事件数据。输出必须是字符串或字符串化 JSON 对象。

运行模板

控制台

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Pub/Sub to Splunk template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 可选:如需从“正好一次”处理切换到“至少一次”流处理模式,请选择至少一次
  8. 点击运行作业

gcloud

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_PubSub_to_Splunk \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION,\
rootCaCertificatePath=ROOT_CA_CERTIFICATE_PATH

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • INPUT_SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • TOKEN:Splunk 的 HTTP Event Collector 令牌
  • URL:Splunk 的 HTTP Event Collector 的网址路径(例如 https://splunk-hec-host:8088
  • DEADLETTER_TOPIC_NAME:Pub/Sub 主题名称
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT:用于向 Splunk 发送多个事件的批次大小
  • PARALLELISM:用于向 Splunk 发送事件的并行请求数
  • DISABLE_VALIDATION:如果要停用 SSL 证书验证则为 true
  • ROOT_CA_CERTIFICATE_PATH:Cloud Storage 中根 CA 证书的路径(例如 gs://your-bucket/privateCA.crt

API

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION",
       "rootCaCertificatePath": "ROOT_CA_CERTIFICATE_PATH"
   }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目的 ID
  • JOB_NAME:您选择的唯一性作业名称
  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • STAGING_LOCATION:暂存本地文件的位置(例如 gs://your-bucket/staging
  • INPUT_SUBSCRIPTION_NAME:Pub/Sub 订阅名称
  • TOKEN:Splunk 的 HTTP Event Collector 令牌
  • URL:Splunk 的 HTTP Event Collector 的网址路径(例如 https://splunk-hec-host:8088
  • DEADLETTER_TOPIC_NAME:Pub/Sub 主题名称
  • JAVASCRIPT_FUNCTION: 您要使用的 JavaScript 用户定义的函数 (UDF) 的名称

    例如,如果您的 JavaScript 函数代码为 myTransform(inJson) { /*...do stuff...*/ },则函数名称为 myTransform。如需查看 JavaScript UDF 示例,请参阅 UDF 示例

  • PATH_TO_JAVASCRIPT_UDF_FILE.js 文件的 Cloud Storage URI,用于定义您要使用的 JavaScript 用户定义的函数 (UDF),例如 gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT:用于向 Splunk 发送多个事件的批次大小
  • PARALLELISM:用于向 Splunk 发送事件的并行请求数
  • DISABLE_VALIDATION:如果要停用 SSL 证书验证则为 true
  • ROOT_CA_CERTIFICATE_PATH:Cloud Storage 中根 CA 证书的路径(例如 gs://your-bucket/privateCA.crt

后续步骤