使用 Dataflow 将支持生产环境的日志导出部署到 Splunk

在本教程中,您将使用 Cloud LoggingPub/SubDataflow,创建可扩容、可容错的日志输出机制。

本教程适用于希望将 Google Cloud 资源中的日志和事件流式传输到适用于 IT 运营或安全使用场景的 Splunk EnterpriseSplunk Cloud。本教程使用 Google 提供的 Splunk Dataflow 模板,安全可靠第将日志流式传输到 Splunk HTTP Event Collector (HEC)。此外,本教程还讨论了 Dataflow 流水线容量规划,以及如何应对遇到瞬时服务器或网络问题时可能发生的交付失败。

本教程假定组织资源层次结构与下图类似,显示了组织层聚合接收器将日志导出到 Splunk。您可以在名为 Splunk Export Project 的示例项目中创建日志导出流水线,在其中安全收集、处理和传递组织节点下所有 Google Cloud 项目的日志。

导出到 Splunk 的日志的组织聚合接收器。

架构

以下架构图显示了您在本教程中构建的日志导出过程:

日志导出到 Splunk。

  • 该过程开始时,组织级层的日志接收器会将日志路由到单个 Pub/Sub 主题和订阅。
  • 在流程的中心,主 Dataflow 流水线是一个 Pub/Sub-to-Splunk 流式流水线,该流水线从 Pub/Sub 订阅中提取日志并将其传送至 Splunk。
  • 与 Dataflow 主流水线并行的第二个 Dataflow 流水线是一种 Pub/Sub-to-Pub/Sub 流式传输流水线,用于在传送失败时重放消息。
  • 该流程结束时,日志目标位置是 Splunk Enterprise 或 Splunk Cloud 的 HEC 端点。

目标

  • 在专用项目中创建聚合日志接收器。
  • 规划 Splunk Dataflow 流水线容量以匹配组织的日志速率。
  • 部署 Splunk Dataflow 流水线以将日志导出到 Splunk。
  • 在 Splunk Dataflow 流水线中使用用户定义的函数 (UDF) 来转换运行的日志或事件。
  • 处理传送失败以避免可能出现的配置错误或暂时性的网络问题。

费用

本教程使用 Google Cloud 的以下收费组件:

您可使用价格计算器根据您的预计使用量来估算费用。 Google Cloud 新用户可能有资格申请免费试用

完成本教程后,您可以删除所创建的资源以避免继续计费。如需了解详情,请参阅清理

准备工作

  1. 在 Google Cloud Console 的项目选择器页面上,选择或创建一个 Google Cloud 项目。

    转到“项目选择器”

  2. 确保您的 Cloud 项目已启用结算功能。 了解如何确认您的项目是否已启用结算功能

  3. 启用 Cloud Monitoring API, Cloud Key Management Service, Compute Engine, Pub/Sub, and Dataflow API。

    启用 API

获取 IAM 权限

  1. 在 Cloud Console 中,检查您是否对组织和项目资源具有以下身份和访问权限管理 (IAM) 权限。如需了解详情,请参阅授予、更改和撤消对资源的访问权限
    权限 预定义角色 资源
    • logging.sinks.create
    • logging.sinks.get
    • logging.sinks.update
    • Logs Configuration Writer (roles/logging.configWriter)
    组织
    • cloudkms.keyRings.create
    • cloudkms.cryptoKeys.*
    • Cloud KMS Admin (roles/cloudkms.admin)
    项目
    • compute.networks.*
    • compute.routers.*
    • compute.firewalls.*
    • networkservices.*
    • Compute Network Admin (roles/compute.networkAdmin)
    • Compute Security Admin (roles/compute.securityAdmin)
    项目
  2. 如果您没有正确的 IAM 权限,请创建自定义角色。自定义角色可为您提供所需的访问权限,还有助于您符合最小权限原则。

设置您的环境

  1. 在 Cloud Console 中,激活 Cloud Shell。

    激活 Cloud Shell

  2. 在 Cloud Shell 中,为您的项目和组织 ID 创建变量。以备在本教程中使用。

    export PROJECT_ID=project-id
    export ORG_ID=organization-id
    
    • project-id:您的项目 ID
    • organization-id:您的组织 ID。
  3. 在本教程中,您将在 us-central1地区创建资源。

    export REGION_ID=us-central1
    
  4. 为您的活跃 Cloud Shell 会话设置项目:

    gcloud config set project $PROJECT_ID
    

设置安全网络

在此步骤中,在处理日志并将其导出到 Splunk Enterprise 之前,请先设置安全网络。

  1. 创建 VPC 网络和子网:

    gcloud compute networks create export-network --subnet-mode=custom
    gcloud compute networks subnets create export-network-us-central \
         --network=export-network \
         --region=$REGION_ID \
         --range=192.168.1.0/24
    
  2. 创建 Cloud NAT 网关:

    gcloud compute routers create nat-router \
           --network=export-network \
           --region=$REGION_ID
    
    gcloud compute routers nats create nat-config \
       --router=nat-router \
       --nat-custom-subnet-ip-ranges=export-network-us-central \
       --auto-allocate-nat-external-ips \
       --region=$REGION_ID
    

    为了安全起见,您需要部署没有公共 IP 地址的 Dataflow 流水线工作器虚拟机。如需允许 Dataflow 工作器虚拟机访问外部 Splunk HEC 服务,并使用后续命令,您可配置映射到 Dataflow 虚拟机子网的 Cloud NAT,或在这种情况下,使用 export-network-us-central。此配置允许 Dataflow 工作器虚拟机访问互联网并向 Splunk 发出 HTTPS 请求,而无需每个 Dataflow 工作器虚拟机上的外部 IP 地址。

    Cloud NAT 网关会根据使用的 Dataflow 虚拟机数量自动分配 IP 地址。

    如果您想将进入 Splunk HEC 的流量限制到已知 IP 地址的子集,则可以预留静态 IP 地址并将其手动分配给 Cloud NAT 网关。但是,本教程不涵盖相关内容。

    如需了解详情,请参阅 Cloud NAT IP 地址Cloud NAT 端口预留文档。

  3. 启用专用 Google 访问通道:

     gcloud compute networks subnets update export-network-us-central \
         --enable-private-ip-google-access \
         --region=$REGION_ID
    

    当您创建 Cloud NAT 网关时,系统会自动启用专用 Google 访问通道。但是,如需允许具有专用 IP 地址的 Dataflow 工作器访问 Google Cloud API 和服务使用的外部 IP 地址,您还必须为子网手动启用专用 Google 访问通道。

创建日志接收器

在本部分中,您将创建组织级日志接收器及其 Pub/Sub 目标位置以及必要的权限。

  1. 在 Cloud Shell 中,创建一个 Pub/Sub 主题和关联的订阅作为新的日志接收器目的地:

    gcloud pubsub topics create org-logs-all
    gcloud pubsub subscriptions create \
        --topic org-logs-all org-logs-all-sub
    
  2. 创建组织日志接收器:

    gcloud logging sinks create org-logs-all-sink \
      pubsub.googleapis.com/projects/$PROJECT_ID/topics/org-logs-all \
      --organization=$ORG_ID \
      --include-children \
      --log-filter='NOT logName:projects/$PROJECT_ID/logs/dataflow.googleapis.com'
    

    该命令包含以下选项:

    • --organization 选项指定这是组织级的日志接收器。
    • 必须使用 --include-children 选项,以确保组织级的日志接收器包含所有子文件夹和项目中的所有日志。
    • --log-filter 选项指定要路由的日志。在此示例中,您将拥有专门用于项目 $PROJECT_ID 的 Dataflow 操作日志,因为日志导出 Dataflow 流水线时处理日志会生成更多日志。此过滤条件可阻止流水线导出自己的日志,从而避免潜在的指数循环。

      输出包括 o#####-####@gcp-sa-logging.iam.gserviceaccount.com 形式的服务帐号。

  3. $LOG_SINK_SA 中的服务帐号保存为以下变量:

     export LOG_SINK_SA=[MY_SA]@gcp-sa-logging.iam.gserviceaccount.com
    
  4. 向日志接收器服务帐号授予权限:

    gcloud pubsub topics add-iam-policy-binding org-logs-all \
        --member=serviceAccount:$LOG_SINK_SA \
        --role=roles/pubsub.publisher
    

    该命令会向 Pub/Sub 主题 org-logs-all 中的日志接收器服务帐号授予 Pub/Sub Publisher IAM 角色,使日志接收器服务帐号可以发布有关该主题的消息。

设置 Splunk HEC 端点

在此步骤中,您将设置 Splunk HEC 端点并加密新创建的 HEC 令牌。

配置 Splunk HEC

  1. 如果您还没有 Splunk HEC 端点,请参阅 Splunk 文档,了解如何配置 Splunk HEC。Splunk HEC 可以在 Splunk Cloud 服务上运行,也可以在您自己的 Splunk Enterprise 实例上运行。
  2. 在 Cloud Shell 会话中,创建 Splunk HEC 令牌后,复制令牌值。
  3. 将令牌值保存在名为 splunk-hec-token-plaintext 的文件中。

创建进行加密的 Cloud KMS 密钥

Splunk HEC 网址和令牌是您部署的 Splunk Dataflow 流水线的必需参数。为了提高安全性,请使用 Cloud KMS 密钥加密令牌,并且仅在创建 Dataflow 作业时传递加密令牌。这可以防止来自 Dataflow 控制台或作业详情的 Splunk HEC 令牌泄露。

  1. 在 Cloud Shell 中,创建一个名为 的 Cloud KMS 密钥环:

    # Create a key ring in same location
    gcloud kms keyrings create export-keys \
      --location=$REGION_ID
    
  2. 在新密钥环上创建 Cloud KMS 密钥:

    # Create a key on the new key ring
    gcloud kms keys create hec-token-key \
        --keyring=export-keys \
        --location=$REGION_ID \
        --purpose="encryption"
    

添加加密和解密 Splunk HEC 令牌的权限

在加密 Splunk HEC 令牌之前,您需要具备 Encrypter 和 Decrypter IAM 角色才能使用该密钥。您还需要 Dataflow 控制器服务帐号的 Encrypter 和 Decrypter IAM 角色,因为 Dataflow 流水线工作器需要在本地解密 Splunk 令牌参数。

Dataflow 流水线工作器是 Compute Engine 实例,默认情况下使用项目的 Compute Engine 服务帐号:project-number-compute@developer.gserviceaccount.com

当您为项目启用 Compute Engine API 时,系统会自动创建 Compute Engine 服务帐号。Compute Engine 服务帐号充当 Dataflow 用于访问 Dataflow 的资源以及执行操作的 Dataflow 控制器服务帐号。

  1. 在 Cloud Console 中,转到 安全性页面。

    转到“安全性”页面

  2. 选择加密密钥标签页。

  3. 选中您创建的密钥环旁边的复选框。

  4. 如果修改权限的面板尚未打开,请点击显示信息面板

  5. 在信息面板的权限标签页下,点击添加成员

  6. 将您的项目帐号和 project-number-compute@developer.gserviceaccount.com 添加为成员。

  7. 选择 Cloud KMS CryptoKey Encrypter/Decrypter 角色。

  8. 点击保存

添加了加密者和解密者角色。

加密 Splunk HEC 令牌

在 Cloud Shell 中,使用您在设置 Splunk HEC 端点时创建的 Cloud KMS 密钥 hec-token-key 来加密 Splunk HEC 令牌:

    gcloud kms encrypt \
        --key=hec-token-key \
        --keyring=export-keys \
        --location=$REGION_ID \
        --plaintext-file=./splunk-hec-token-plaintext \
        --ciphertext-file=./splunk-hec-token-encrypted

此命令使用名为 splunk-hec-token-encrypted 的加密 Splunk HEC 令牌创建新文件。您现在可以删除临时文件 splunk-hec-token-plaintext

规划 Dataflow 流水线容量

在部署 Dataflow 流水线之前,您需要确定其大小上限和吞吐量。确定这些值可确保流水线可以处理来自上游 Pub/Sub 订阅的最高每日日志量(GB/日)和日志消息率(每秒事件数,或 EPS),而不会发生以下任何一种情况:

  • 由于消息积压或邮件节流造成的延迟。
  • 超额预配流水线需额外付费(如需了解详情,请参阅本部分末尾的备注)。

本教程中的示例值基于具有以下特征的组织:

  • 每天生成 1 TB 的日志。
  • 平均邮件大小为 1 KB。
  • 持续峰值消息率为平均速率的两倍。

您可以按照设置流水线大小上限设置速率控制参数中的步骤将示例值替换为来自您的组织的值。

设置流水线大小上限

  1. 使用以下公式确定平均 EPS:

    \( {AverageEventsPerSecond}\simeq\frac{TotalDailyLogsInTB}{AverageMessageSizeInKB}\times\frac{10^9}{24\times3600} \)

    在本示例中,生成日志的平均速率为 11.5k EPS。

  2. 使用以下公式确定持续峰值 EPS,其中乘数 N 表示日志记录的突发性质。在本示例中,N=2,因此生成日志的峰值速率为 23k EPS。

    \( {PeakEventsPerSecond = N \times\ AverageEventsPerSecond} \)

  3. 计算最大 EPS 后,您可以使用以下规模调整指南来确定所需的 vCPU 数量上限。您还可以使用此值计算 Dataflow 工作器的最大数量,或 maxNumWorkers,前提是设定 n1-standard-4 机器类型。

    \( {maxCPUs = ⌈PeakEventsPerSecond / 3k ⌉\\ maxNumWorkers = ⌈maxCPUs / 4 ⌉} \)

    在此示例中,您需要最多 $23 / 3⌉ = 8 个 vCPU 核心,默认机器类型 n1-standard-4 最多两个虚拟机工作器。

  4. 在 Cloud Shell 中,使用以下环境变量设置流水线大小:

    export DATAFLOW_MACHINE_TYPE="n1-standard-4"
    export DATAFLOW_MACHINE_COUNT=2
    

设置速率控制参数

Splunk Dataflow 流水线具有速率控制参数。这些参数调整其输出 EPS 速率,并防止下游 Splunk HEC 端点过载。

  1. 使用以下指南来确定所有虚拟机工作器与 Splunk HEC 的并行连接总数,从而最大限度地提高 EPS 率:

    \( {parallelism = maxCPUs * 2} \)

    替换 parallelism 设置,以将每个 vCPU 的 2 个并行连接考虑在内,并部署尽可能多的工作器数量。默认的 parallelism 值为 1 会停用同时载入,从而人为地限制输出速率。

    在此示例中,并行连接数将按 2 x 8 = 16 计算。

  2. 如需增加 EPS 并减少 Splunk HEC 上的负载,请使用事件批处理:

    \( {batchCount >= 10} \)

    平均日志消息大约为 1 KB,我们建议您为每个请求至少批处理 10 个事件。设置此最小数量事件有助于避免 Splunk HEC 上过多的负载,同时仍可提高有效 EPS 速率。

  3. 在 Cloud Shell 中,使用 parallelismbatchCount 的计算值设置以下速率控制环境变量:

    export DATAFLOW_PARALLELISM=16
    export DATAFLOW_BATCH_COUNT=10
    

流水线容量参数摘要

下表展示了本教程后续步骤中使用的流水线容量值,以及为配置这些作业参数推荐的最佳做法。

参数 教程值 一般最佳做法
DATAFLOW_MACHINE_TYPE n1-standard-4 设为基准机器类型大小 n1-standard-4,以获得最佳性能费用比
DATAFLOW_MACHINE_COUNT

2

按上文计算得出的,设置为处理预期峰值 EPS 所需的工作器数量 maxNumWorkers
DATAFLOW_PARALLELISM

16

设置为 2 x vCPU/工作器 x maxNumWorkers 以最大限度地提高并行 HEC 连接数
DATAFLOW_BATCH_COUNT

10

设置为面向日志的 10-50 个事件/请求,前提是最大缓冲延迟时间(两秒)

假设工作器数量上限,或 maxNumWorkers,自动扩缩流水线会为每个潜在流处理工作器部署一个数据永久性磁盘(默认情况下为 400 GB)。这些磁盘在任何点(包括启动)随时都会装载到正在运行的工作器中。

由于每个工作器实例仅限于 15 个永久性磁盘,因此启动工作器的最小数量为 ⌈maxNumWorkers/15⌉。因此,如果默认值为 maxNumWorkers=20,则流水线使用情况(和费用)如下:

  • 存储:静态存储 20 个永久性磁盘。
  • 计算:至少具有两个工作器实例 (⌈20/15⌉ = 2),且最多为 20 个。

此值相当于 8 TB 永久性磁盘,如果磁盘未被用尽,则可能会产生不必要的费用,尤其是当只有一个或两个工作器在大多数时间运行时。

使用 Dataflow 流水线导出日志

在本部分中,您将部署一个将 Google Cloud 日志消息发送到 Splunk HEC 的 Dataflow 流水线。此外,您还可以部署依赖资源,如未处理主题(也称为死信主题)和订阅来保留任何无法传送的消息。

部署 Dataflow 流水线

  1. 在 Cloud Shell 中,创建一个 Pub/Sub 主题和订阅用作未处理订阅:

     gcloud pubsub topics create org-logs-all-dl
     gcloud pubsub subscriptions create --topic org-logs-all-dl org-logs-all-dl-sub
    
  2. 设置配置剩余的流水线参数所需的环境变量,将 YOUR_SPLUNK_HEC_URL 替换为您的 SPLUNK HEC 端点,例如:https://splunk-hec-host:8088

     # Splunk HEC endpoint values
     export SPLUNK_HEC_URL="YOUR_SPLUNK_HEC_URL"
     export SPLUNK_HEC_TOKEN=`cat ./splunk-hec-token-encrypted | base64`
     # Dataflow pipeline input subscription and dead-letter topic
     export DATAFLOW_INPUT_SUB="org-logs-all-sub"
     export DATAFLOW_DEADLETTER_TOPIC="org-logs-all-dl"
    
  3. 部署 Dataflow 流水线:

    # Set Dataflow pipeline job name
    JOB_NAME=pubsub-to-splunk-`date +"%Y%m%d-%H%M%S"`
    # Run Dataflow pipeline job
    gcloud beta dataflow jobs run ${JOB_NAME} \
       --gcs-location=gs://dataflow-templates/latest/Cloud_PubSub_to_Splunk \
       --worker-machine-type=$DATAFLOW_MACHINE_TYPE \
       --max-workers=$DATAFLOW_MACHINE_COUNT \
       --region=$REGION_ID \
       --network=export-network \
       --subnetwork=regions/$REGION_ID/subnetworks/export-network-us-central \
       --disable-public-ips \
       --parameters \
    inputSubscription=projects/${PROJECT_ID}/subscriptions/${DATAFLOW_INPUT_SUB},\
    outputDeadletterTopic=projects/${PROJECT_ID}/topics/${DATAFLOW_DEADLETTER_TOPIC},\
    tokenKMSEncryptionKey=projects/${PROJECT_ID}/locations/${REGION_ID}/keyRings/export-keys/cryptoKeys/hec-token-key,\
    url=${SPLUNK_HEC_URL},\
    token=${SPLUNK_HEC_TOKEN},\
    batchCount=${DATAFLOW_BATCH_COUNT},\
    parallelism=${DATAFLOW_PARALLELISM},\
    javascriptTextTransformGcsPath=gs://splk-public/js/dataflow_udf_messages_replay.js,\
    javascriptTextTransformFunctionName=process
    

    复制输出中返回的新作业 ID。

    默认情况下,Splunk Dataflow 流水线会验证 Splunk HEC 端点的 SSL 证书。如果要使用自签名证书进行开发和测试,则必须停用 SSL 验证。如需了解详情,请参阅 Pub/Sub to Splunk Dataflow 模板参数 (disableCertificateValidation)

  4. DATAFLOW_JOB_ID 环境变量中保存新的作业 ID。您可以在后续步骤中使用此变量。

    export DATAFLOW_JOB_ID="YOUR_DATAFLOW_JOB_ID"
    

查看 Splunk 中的日志

Dataflow 流水线工作器需要几分钟时间完成预配并准备好将日志传送到 Splunk HEC。您可以确认是否已在 Splunk Enterprise 或 Splunk Cloud 搜索界面中正确接收日志并将其编入索引。

Dataflow 流水线工作器仅需几分钟即可预配并准备将日志传送给 Splunk HEC。您可以确认是否已在 Splunk Enterprise 或 Splunk Cloud Search 界面中正确接收日志并将其编入索引。如需查看每种受监控资源的日志数,请执行以下操作:

  1. 在 Splunk 中,打开 Splunk 搜索和报告
  2. 运行搜索 index=[MY_INDEX] | stats count by resource.type,其中您的 Splunk HEC 令牌配置了 MY_INDEX 索引。

    查看 Splunk 中的日志。

  3. 如果您没有看到任何事件,请参阅处理递送失败

使用 UDF 转换运行中的事件

Splunk Dataflow 模板支持用于自定义事件转换的 UDF。您部署的流水线使用由可选参数 javascriptTextTransformGcsPathjavascriptTextTransformFunctionName 指定的示例 UDF。示例 UDF 包括用于事件扩充的代码示例,包括在事件上添加新字段或设置 Splunk HEC 元数据。示例 UDF 还包括解码逻辑来重放失败的递送,您可以在修改示例 UDF 中了解如何操作。

在本部分中,您将修改示例 UDF 函数以添加新的事件字段。这个新字段将原创 Pub/Sub 订阅的值指定为额外的上下文信息。

修改示例 UDF

  1. 在 Cloud Shell 中,下载包含示例 UDF 函数的 JavaScript 文件:

    wget https://storage.googleapis.com/splk-public/js/dataflow_udf_messages_replay.js
    
  2. 使用您所选的编辑器打开 JavaScript 文件。对向事件载荷添加新字段 inputSubscription 的行取消备注:

    // event.inputSubscription = "splunk-dataflow-pipeline";
    
  3. 将新事件字段 inputSubscription 设置为 "org-logs-all-sub" 以跟踪事件来源的 Pub/Sub 订阅:

    event.inputSubscription = "org-logs-all-sub";
    
  4. 保存文件。

  5. 在 Cloud Shell 中,创建一个新的 Cloud Storage 存储分区:

    # Create a new Cloud Storage bucket
    gsutil mb -b on gs://${PROJECT_ID}-dataflow/
    
  6. 将文件上传到 Cloud Storage 存储分区:

    # Upload JavaScript file
    gsutil cp ./dataflow_udf_messages_replay.js gs://${PROJECT_ID}-dataflow/js/
    

使用新的 UDF 更新 Dataflow 流水线

  1. 在 Cloud Shell 中,使用排空选项停止流水线,以确保从 Pub/Sub 中拉取的日志不会丢失。

    gcloud dataflow jobs drain $DATAFLOW_JOB_ID --region=$REGION_ID
    
  2. 使用更新后的 UDF 部署新的流水线:

    # Set Dataflow pipeline job name
    JOB_NAME=pubsub-to-splunk-`date +"%Y%m%d-%H%M%S"`
    # Run Dataflow pipeline job
    gcloud beta dataflow jobs run ${JOB_NAME} \
       --gcs-location=gs://dataflow-templates/latest/Cloud_PubSub_to_Splunk \
       --worker-machine-type=$DATAFLOW_MACHINE_TYPE \
       --max-workers=$DATAFLOW_MACHINE_COUNT \
       --region=$REGION_ID \
       --network=export-network \
       --subnetwork=regions/$REGION_ID/subnetworks/export-network-us-central \
       --disable-public-ips \
       --parameters \
    inputSubscription=projects/${PROJECT_ID}/subscriptions/${DATAFLOW_INPUT_SUB},\
    outputDeadletterTopic=projects/${PROJECT_ID}/topics/${DATAFLOW_DEADLETTER_TOPIC},\
    tokenKMSEncryptionKey=projects/${PROJECT_ID}/locations/${REGION_ID}/keyRings/export-keys/cryptoKeys/hec-token-key,\
    url=${SPLUNK_HEC_URL},\
    token=${SPLUNK_HEC_TOKEN},\
    batchCount=${DATAFLOW_BATCH_COUNT},\
    parallelism=${DATAFLOW_PARALLELISM},\
    javascriptTextTransformGcsPath=gs://${PROJECT_ID}-dataflow/js/dataflow_udf_messages_replay.js,\
    javascriptTextTransformFunctionName=process
    

    复制输出中返回的新作业 ID。

  3. DATAFLOW_JOB_ID 环境变量中保存作业 ID。export DATAFLOW_JOB_ID="YOUR_DATAFLOW_JOB_ID"

处理递送失败

处理失败可能是因为处理事件或连接到 Splunk HEC 出错。在本部分中,您将引入一个递送失败,以演示错误处理工作流。此外,您还将了解如何查看和触发失败的消息传递到 Splunk。

错误处理概览

下图显示了 Splunk Dataflow 流水线中的错误处理工作流:

日志导出到 Splunk。

  1. Pub/Sub to Splunk Dataflow 流水线(主流水线)自动将不送达的消息转发到未处理的主题以供用户调查。
  2. 运营商调查未处理订阅中的失败消息,排查问题并修复递送失败的根本原因,例如解决 HEC 令牌配置错误。
  3. 运算符触发 Pub/Sub 到 Pub/Sub Dataflow 流水线(次要流水线)。此流水线(在上图的虚线部分突出显示)是一个临时流水线,用于将失败消息从未处理订阅中的消息移回原始日志接收器主题。
  4. 主流水线重新处理先前失败的消息。此步骤要求流水线使用示例 UDF 来纠正检测和对失败消息载荷进行解码。函数的以下部分会实现此条件解码逻辑,其中包括用于跟踪的递送尝试数:

    // If message has already been converted to Splunk HEC object  with stringified
     // obj.event JSON payload, then it's a replay of a previously failed delivery:
     // Unnest and parse obj.event. Drop previously injected obj.attributes
     // such as errorMessage and timestamp
     if (obj.event) {
       try {
         event = JSON.parse(obj.event);
         redelivery = true;
       } catch(e) {
         event = obj;
       }
     } else {
       event = obj;
     }
    
     // Keep a tally of delivery attempts
     event.delivery_attempt = event.delivery_attempt || 1;
     if (redelivery) {
       event.delivery_attempt += 1;
     }
    

触发递送失败

在本部分,您触发递送失败。您可以通过以下任一方法手动说明递送失败的情况:

  • 停止 Splunk 服务器(若单个实例)导致连接错误。
  • 通过 Splunk 输入配置停用相关的 HEC 令牌。

问题排查失败消息

要调查失败消息,您可以使用 Cloud Console:

  1. 在 Cloud Console 中,打开 Pub/Sub 订阅页面。

    转到“Pub/Sub 订阅”

  2. 点击您创建的未处理的订阅。如果您使用前面的示例,订阅名称为 projects/${PROJECT_ID}/subscriptions/org-logs-all-dl-sub

  3. 要打开消息查看器,请点击 View Messages

  4. 要查看消息,请点击 Pull,确保已清除 启用确认消息

  5. 现在,您可以检查失败的消息,尤其是:

    • Message body 列下的 Splunk 事件载荷。
    • attribute.errorMessage 列下的错误消息。
    • attribute.timestamp 列下的错误时间戳。

以下屏幕截图显示了 Splunk HEC 端点暂时关闭或无法访问的失败消息的示例。请注意 errorMessage 属性:The target server failed to respond

失败消息属性。

下表列出了一些可能的 Splunk 传送错误以及流水线将这些消息转发到未处理主题前记录下的每条消息记录的 errorMessage 特性:

可能的处理或连接错误 Dataflow 模板自动重试? 属性 errorMessage 示例
Splunk Server 5xx 错误 Splunk write status code: 503
Splunk 服务器 4xx 错误 Splunk write status code: 403
Splunk 服务器关闭 The target server failed to respond
Splunk SSL 证书无效 Host name X does not match the certificate...
UDF JavaScript 语法错误 ReferenceError: foo is not defined
暂时性网络连接错误 Read timed out

or

Connection reset

在某些情况下,流水线会自动尝试使用指数退避重试。例如,如果存在 Splunk 服务器 5xx 错误,Splunk HEC 端点过载时会发生这种情况。或者,可能出现持续阻止向 HEC 提交消息的问题。在这种情况下,流水线不会尝试重试。以下问题是 Splunk5xx 错误触发器的示例:

  • UDF 函数中的语法错误。
  • 无效的 HEC 令牌导致 Splunk 服务器 4xx“Forbidden”的服务器响应。

重放失败消息

在本部分中,您将重放未处理的消息,并假设假定递送失败的根本原因已解决。如果您在触发递送失败部分中停用了 Splunk HEC 端点,请检查 Splunk HEC 端点现已运行。

  1. 在 Cloud Shell 中,在重新处理未处理订阅中的消息之前,建议您截取未处理订阅的快照。这可以防止发生意外配置错误的消息丢失。

     gcloud pubsub snapshots create dlt-snapshot-`date +"%Y%m%d-%H%M%S"` \
         --subscription=org-logs-all-dl-sub
    
  2. 使用 Pub/Sub to Pub/Sub Dataflow 模板,将未处理订阅的消息转发到另一个 Dataflow 作业:

      DATAFLOW_INPUT_TOPIC="org-logs-all"
      DATAFLOW_DEADLETTER_SUB="org-logs-all-dl-sub"
    
      JOB_NAME=splunk-dataflow-replay-`date +"%Y%m%d-%H%M%S"`
      gcloud dataflow jobs run $JOB_NAME \
           --gcs-location= gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub \
           --worker-machine-type=n1-standard-2 \
           --max-workers=1 \
           --region=$REGION_ID \
           --parameters \
      inputSubscription=projects/${PROJECT_ID}/subscriptions/${DATAFLOW_DEADLETTER_SUB},\
      outputTopic=projects/${PROJECT_ID}/topics/${DATAFLOW_INPUT_TOPIC}
    

    复制此命令返回的 Dataflow 作业 ID。

  3. 将 Dataflow 作业 ID 保存到 DATAFLOW_JOB_ID 环境变量中。

  4. 在 Cloud Console 中,转到 Pub/Sub 订阅页面。

    转到 Pub/Sub 订阅页面

  5. 选择未处理的订阅。确认未确认的消息数小于 0。

    失败的邮件。

  6. 在 Cloud Shell 中,排空您创建的 Dataflow 作业:

    gcloud dataflow jobs drain $DATAFLOW_JOB_ID --region=$REGION_ID
    

    当消息迁移回原始输入主题时,主 Dataflow 流水线会自动选取失败的消息并将它们重新递送给 Splunk。

确认 Splunk 中的消息

  1. 要确认消息是否已重新递送,请在 Splunk 中打开 Splunk 搜索和报告

  2. 运行搜索 delivery_attempts > 1。该特殊字段中,示例 UDF 会添加到每个事件中以跟踪传送尝试的次数。请务必扩大搜索时间范围,使其包含可能发生的事件,因为事件时间戳是原始创建时间,而不是编入索引时间。

在以下示例图片中,最初失败的两个消息现在已成功传递至 Splunk,并具有几天前的正确时间戳。请注意,insertId 字段值与通过从未处理的订阅手动拉取失败消息时发现的值相同。insertId 是 Cloud Logging 分配的原始日志条目的唯一标识符。

Splunk 中的失败消息。

清理

为避免因本教程中使用的资源导致您的 Google Cloud 帐号产生费用,请删除包含这些资源的项目,或者保留项目但删除各个资源。

删除组织级层的接收器

gcloud logging sinks delete org-logs-all-sink --organization=$ORG_ID

删除项目

删除日志接收器后,您可以继续删除为了接收和导出日志而创建的资源。最简单的方法是删除为本教程创建的项目。

  1. 在 Cloud Console 中,转到管理资源页面。

    转到“管理资源”

  2. 在项目列表中,选择要删除的项目,然后点击删除
  3. 在对话框中输入项目 ID,然后点击关闭以删除项目。

后续步骤