Deploying production-ready log exports to Splunk using Dataflow

Stay organized with collections Save and categorize content based on your preferences.
Last reviewed 2022-09-27 UTC

In this tutorial, you create a scalable, fault-tolerant log export mechanism using Cloud Logging, Pub/Sub, and Dataflow.

This tutorial is intended for administrators who want to stream their logs and events from resources in Google Cloud into either Splunk Enterprise or Splunk Cloud Platform for IT operations or security use cases. This tutorial uses the Google-provided Splunk Dataflow template to stream logs to Splunk HTTP Event Collector (HEC) reliably and at scale. The tutorial also discusses Dataflow pipeline capacity planning and how to handle potential delivery failures when there are transient server or network issues.

To automate deployment steps in this tutorial using infrastructure as code (IaC), see the terraform-splunk-log-export GitHub repository.

The tutorial assumes an organization resource hierarchy similar to the following diagram, which shows an organization-level aggregated sink to export logs to Splunk. You create the log export pipeline in an example project named Splunk Export Project, where logs from all the Google Cloud projects under the organization node are securely collected, processed, and delivered.

Organization aggregated sink for logs export to Splunk.

Architecture

The following architectural diagram shows the logs export process that you build in this tutorial:

Log export to Splunk.

  • At the start of the process, an organization-level log sink routes logs to a single Pub/Sub topic and subscription.
  • At the center of the process, the main Dataflow pipeline is a Pub/Sub-to-Splunk streaming pipeline which pulls logs from the Pub/Sub subscription and delivers them to Splunk.
  • Parallel to the main Dataflow pipeline, the second Dataflow pipeline is a Pub/Sub-to-Pub/Sub streaming pipeline to replay messages if a delivery fails.
  • At the end of the process, the log destination is the HEC endpoint of Splunk Enterprise or Splunk Cloud Platform.

Objectives

  • Create an aggregated log sink in a dedicated project.
  • Plan Splunk Dataflow pipeline capacity to match your organization's log rate.
  • Deploy the Splunk Dataflow pipeline to export logs to Splunk.
  • Transform logs or events in-flight using user-defined functions (UDF) within the Splunk Dataflow pipeline.
  • Handle delivery failures to avoid data loss from potential misconfiguration or transient network issues.

Costs

This tutorial uses the following billable components of Google Cloud:

To generate a cost estimate based on your projected usage, use the pricing calculator. New Google Cloud users might be eligible for a free trial.

When you finish this tutorial, you can avoid continued billing by deleting the resources you created. For more information, see Clean up.

Before you begin

  1. In the Google Cloud console, on the project selector page, select or create a Google Cloud project.

    Go to project selector

  2. Make sure that billing is enabled for your Cloud project. Learn how to check if billing is enabled on a project.

  3. Enable the Cloud Monitoring API, Secret Manager, Compute Engine, Pub/Sub, and Dataflow APIs.

    Enable the APIs

Get IAM permissions

  1. In the Google Cloud console, check that you have the following Identity and Access Management (IAM) permissions on the organization and project resources. For more information, see Granting, changing, and revoking access to resources.
    Permissions Predefined roles Resource
    • logging.sinks.create
    • logging.sinks.get
    • logging.sinks.update
    • Logs Configuration Writer (roles/logging.configWriter)
    organization
    • compute.networks.*
    • compute.routers.*
    • compute.firewalls.*
    • networkservices.*
    • Compute Network Admin (roles/compute.networkAdmin)
    • Compute Security Admin (roles/compute.securityAdmin)
    project
    • secretmanager.*
    • Secret Manager Admin (roles/secretmanager.admin)
    project
  2. If you don't have the correct IAM permissions, create a custom role. A custom role will give you the access that you need, while also helping you to follow the principle of least privilege.

Setting up your environment

  1. In the Google Cloud console, activate Cloud Shell.

    Activate Cloud Shell

  2. In Cloud Shell, create variables for your project and organization IDs. You use these variables throughout the tutorial.

    export PROJECT_ID=project-id
    export ORG_ID=organization-id
    
    • project-id: your project ID
    • organization-id: your organization ID
  3. For this tutorial, you create resources in the us-central1 region:

    export REGION_ID=us-central1
    
  4. Set the project for your active Cloud Shell session:

    gcloud config set project $PROJECT_ID
    

Setting up secure networking

In this step, you set up secure networking before processing and exporting logs to Splunk Enterprise.

  1. Create a VPC network and subnet:

    gcloud compute networks create export-network --subnet-mode=custom
    gcloud compute networks subnets create export-network-us-central \
         --network=export-network \
         --region=$REGION_ID \
         --range=192.168.1.0/24
    
  2. Create a firewall rule for Dataflow worker virtual machines (VMs) to communicate with one another:

    gcloud compute firewall-rules create allow-internal-dataflow \
         --network=export-network \
         --action=allow \
         --direction=ingress \
         --target-tags=dataflow \
         --source-tags=dataflow \
         --priority=0 \
         --rules=tcp:12345-12346
    

    This rule allows internal traffic between Dataflow VMs which use TCP ports 12345-12346 and have the tag dataflow set by the Dataflow service.

  3. Create a Cloud NAT gateway:

    gcloud compute routers create nat-router \
           --network=export-network \
           --region=$REGION_ID
    
    gcloud compute routers nats create nat-config \
       --router=nat-router \
       --nat-custom-subnet-ip-ranges=export-network-us-central \
       --auto-allocate-nat-external-ips \
       --region=$REGION_ID
    

    For security purposes, you deploy Dataflow pipeline worker VMs without public IP addresses. To allow Dataflow worker VMs to reach the external Splunk HEC service, with the proceeding command, you configure a Cloud NAT mapped to the subnet for the Dataflow VMs, or in this case,export-network-us-central. This configuration lets the Dataflow worker VMs access the internet and to make HTTPS requests to Splunk without the need for external IP addresses on each Dataflow worker VM.

    The Cloud NAT gateway automatically allocates IP addresses depending on the number of Dataflow VMs in use.

    If you want to restrict traffic into Splunk HEC to a subset of known IP addresses, you can reserve static IP addresses and manually assign them to the Cloud NAT gateway. However, this is out of the scope of this tutorial.

    For more information, see Cloud NAT IP addresses and Cloud NAT port reservation documentation.

  4. Enable Private Google Access:

     gcloud compute networks subnets update export-network-us-central \
         --enable-private-ip-google-access \
         --region=$REGION_ID
    

    Private Google Access is automatically enabled when you create a Cloud NAT gateway. However, to allow Dataflow workers with private IP addresses to access the external IP addresses that Google Cloud APIs and services use, you must also manually enable Private Google Access for the subnet.

Creating a log sink

In this section, you create the organization-wide log sink and its Pub/Sub destination, along with the necessary permissions.

  1. In Cloud Shell, create a Pub/Sub topic and associated subscription as your new log sink destination:

    gcloud pubsub topics create org-logs-all
    gcloud pubsub subscriptions create \
        --topic org-logs-all org-logs-all-sub
    
  2. Create the organization log sink:

    gcloud logging sinks create org-logs-all-sink \
      pubsub.googleapis.com/projects/$PROJECT_ID/topics/org-logs-all \
      --organization=$ORG_ID \
      --include-children \
      --log-filter='NOT logName:projects/$PROJECT_ID/logs/dataflow.googleapis.com'
    

    The command consists of the follow options:

    • The --organization option specifies that this is an organization-level log sink.
    • The--include-children option is required to ensure that the organization-level log sink includes all logs across all subfolders and projects.
    • The --log-filter option specifies the logs to be routed. In this example, you exclude Dataflow operations logs specifically for the project $PROJECT_ID, because the log export Dataflow pipeline generates more logs itself as it processes logs. The filter prevents the pipeline from exporting its own logs, avoiding a potentially exponential cycle.

      The output includes a service account in the form of o#####-####@gcp-sa-logging.iam.gserviceaccount.com.

  3. Save the service account in $LOG_SINK_SA as the following variable:

     export LOG_SINK_SA=[MY_SA]@gcp-sa-logging.iam.gserviceaccount.com
    
  4. Give permissions to the log sink service account:

    gcloud pubsub topics add-iam-policy-binding org-logs-all \
        --member=serviceAccount:$LOG_SINK_SA \
        --role=roles/pubsub.publisher
    

    The command grants the Pub/Sub Publisher IAM role to the log sink service account on the Pub/Sub topic org-logs-all, enabling the log sink service account to publish messages on the topic.

Setting up a Splunk HEC endpoint

In this step, you set up a Splunk HEC endpoint and store the newly created HEC token as a secret in Secret Manager. When you deploy the Splunk Dataflow pipeline, you need to supply both the endpoint URL and the token.

Configure Splunk HEC

  1. If you don't already have a Splunk HEC endpoint, see the Splunk documentation to learn how to configure Splunk HEC. Splunk HEC can be running on the Splunk Cloud Platform service or on your own Splunk Enterprise instance.
  2. In your Cloud Shell session, after a Splunk HEC token is created, copy the token value.
  3. Save the token value in a temporary file named splunk-hec-token-plaintext.txt.

Store Splunk HEC token in Secret Manager

When you deploy the Splunk Dataflow pipeline, you can pass the token value either as a plaintext, or as a ciphertext encrypted with a Cloud KMS key, or as a secret version encrypted and managed by Secret Manager. In this tutorial, you use the Secret Manager option as it offers the least complex and most efficient way to protect your Splunk HEC token. This option also prevents Splunk HEC token leakage from the Dataflow console or the job details.

A secret in Secret Manager contains a collection of secret versions which themselves store the actual secret data, such as the Splunk HEC token. In this section, you create a secret and a single underlying secret version in which to store the Splunk HEC token value. If you later choose to rotate your Splunk HEC token, you can add the new token as a new secret version to this secret. For general information on the rotation of secrets, see About rotation schedules.

  1. In Cloud Shell, create a secret to contain your Splunk HEC token:

    gcloud secrets create hec-token \
      --replication-policy="automatic"
    

    For more information on the replication policies for secrets, see Choose a replication policy.

  2. Add the token as a secret version using the contents of the file splunk-hec-token-plaintext.txt:

    gcloud secrets versions add hec-token \
      --data-file="./splunk-hec-token-plaintext.txt"
    
  3. Delete the splunk-hec-token-plaintext.txt file as it is no longer needed.

Grant access to Splunk HEC token's secret

Dataflow pipeline workers use the Dataflow worker service account to access resources and execute operations. To allow the Dataflow pipeline workers to access the secret that contains the Splunk HEC token, you need to grant the Dataflow worker service account the Secret Manager Secret Accessor role (roles/secretmanager.secretAccessor) on the secret.

The Dataflow pipeline workers are Compute Engine instances, and by default, use your project's Compute Engine default service account as the worker service account: <project-number>-compute@developer.gserviceaccount.com. The Compute Engine default service account is created automatically when you enable the Compute Engine API for your project.

  • In Cloud Shell, add the following IAM policy binding to grant access to the Compute Engine default service account:

    gcloud secrets add-iam-policy-binding hec-token \
      --member="PROJECT-NUMBER-compute@developer.gserviceaccount.com" \
      --role="roles/secretmanager.secretAccessor"
    

    Replace PROJECT-NUMBER with your project number.

Planning Dataflow pipeline capacity

Before you deploy the Dataflow pipeline, you need to determine its maximum size and throughput. Determining these values ensures that the pipeline can handle peak daily log volume (GB/day) and log message rate (events per second, or EPS) from the upstream Pub/Sub subscription without incurring either of the following:

  • Delays due to either message backlog or message throttling.
  • Extra costs from overprovisioning a pipeline (for more details, see the note at the end of this section).

The example values in this tutorial are based on an organization with the following characteristics:

  • Generates 1 TB of logs daily.
  • Has an average message size of 1 KB.
  • Has a sustained peak message rate that is two times the average rate.

You can substitute the example values with values from your organization as you work through the steps in Set maximum pipeline size and Set rate-controlling parameters.

Set maximum pipeline size

  1. Determine the average EPS using the following formula:

    \( {AverageEventsPerSecond}\simeq\frac{TotalDailyLogsInTB}{AverageMessageSizeInKB}\times\frac{10^9}{24\times3600} \)

    In this example, the average rate of generated logs is 11.5k EPS.

  2. Determine sustained peak EPS using the following formula, where the multiplier N represents the bursty nature of logging. In this example, N=2, so the peak rate of generated logs is 23k EPS.

    \( {PeakEventsPerSecond = N \times\ AverageEventsPerSecond} \)

  3. After you calculate the maximum EPS, you can use the following sizing guidelines to determine the maximum required number of vCPUs. You can also use this number to calculate the maximum number of Dataflow workers, or maxNumWorkers, assuming n1-standard-4 machine type.

    \( {maxCPUs = ⌈PeakEventsPerSecond / 3k ⌉\\ maxNumWorkers = ⌈maxCPUs / 4 ⌉} \)

    In this example, you need a maximum of ⌈23 / 3⌉ = 8 vCPU cores, which is a maximum of 2 VM workers of default machine type n1-standard-4.

  4. In Cloud Shell, set the pipeline size using the following environment variables:

    export DATAFLOW_MACHINE_TYPE="n1-standard-4"
    export DATAFLOW_MACHINE_COUNT=2
    

Set rate-controlling parameters

Splunk Dataflow pipeline has rate-controlling parameters. These parameters tune its output EPS rate and prevent the downstream Splunk HEC endpoint from being overloaded.

  1. Maximize the EPS rate by determining the total number of parallel connections to Splunk HEC across all VM workers using the following guideline:

    \( {parallelism = maxCPUs * 2} \)

    Override the parallelism setting to account for 2-4 parallel connections per vCPU, with the maximum number of workers deployed. The default parallelism value of 1 disables parallelism, artificially limiting the output rate.

    In this example, the number of parallel connections is calculated to be 2 x 8 = 16.

  2. To increase EPS and reduce load on Splunk HEC, use event batching:

    \( {batchCount >= 10} \)

    With an average log message around 1 KB, we recommend that you batch at least 10 events per request. Setting this minimum number of events helps avoid excessive load on Splunk HEC, while still increasing the effective EPS rate.

  3. In Cloud Shell, set the following environment variables for rate controls using the calculated values for parallelism and batchCount:

    export DATAFLOW_PARALLELISM=16
    export DATAFLOW_BATCH_COUNT=10
    

Summary of pipeline capacity parameters

The following table summarizes the pipeline capacity values used for the next steps of this tutorial along with recommended general best practices for configuring these job parameters.

Parameter Tutorial value General best practice
DATAFLOW_MACHINE_TYPE n1-standard-4 Set to baseline machine size n1-standard-4 for the best performance to cost ratio
DATAFLOW_MACHINE_COUNT

2

Set to number of workers maxNumWorkers needed to handle expected peak EPS as calculated above
DATAFLOW_PARALLELISM

16

Set to 2 x vCPUs/worker x maxNumWorkers to maximize number of parallel HEC connections
DATAFLOW_BATCH_COUNT

10

Set to 10-50 events/request for logs, provided the max buffering delay (two seconds) is acceptable

An autoscaling pipeline deploys one data persistent disk (by default 400 GB) for each potential streaming worker, assuming the maximum number of workers, or maxNumWorkers. These disks are mounted among the running workers at any point in time, including startup.

Because each worker instance is limited to 15 persistent disks, the minimum number of starting workers is ⌈maxNumWorkers/15⌉. So, if the default value is maxNumWorkers=20, the pipeline usage (and cost) is as follows:

  • Storage: static with 20 persistent disks.
  • Compute: dynamic with minimum of 2 worker instances (⌈20/15⌉ = 2), and a maximum of 20.

This value is the equivalent to 8 TB of persistent disk, which could incur unnecessary cost if the disks are not fully used, especially if only one or two workers are running the majority of the time.

Exporting logs using Dataflow pipeline

In this section, you deploy the Dataflow pipeline that delivers Google Cloud log messages to Splunk HEC. You also deploy dependent resources such as unprocessed topics (also known as dead-letter topics) and subscriptions to hold any undeliverable messages.

Deploy the Dataflow pipeline

  1. In Cloud Shell, create a Pub/Sub topic and subscription to be used as an unprocessed subscription:

     gcloud pubsub topics create org-logs-all-dl
     gcloud pubsub subscriptions create --topic org-logs-all-dl org-logs-all-dl-sub
    
  2. In Cloud Shell, set the following environment variables to configure the template parameters:

    # Splunk HEC endpoint values
    export SPLUNK_HEC_URL=YOUR_SPLUNK_HEC_URL
    # Dataflow pipeline input subscription and dead-letter topic
    export DATAFLOW_INPUT_SUB="org-logs-all-sub"
    export DATAFLOW_DEADLETTER_TOPIC="org-logs-all-dl"
    

    Replace YOUR_SPLUNK_HEC_URL with your Splunk HEC URL using the form protocol://host[:port], where:

    • protocol is either http or https.
    • host is the fully qualified domain name (FQDN) or IP address of either your Splunk HEC instance, or, if you have multiple HEC instances, the associated HTTP(S) (or DNS-based) load balancer.
    • port is the HEC port number. It is optional, and depends on your Splunk HEC endpoint configuration.

    YOUR_SPLUNK_HEC_URL must not include the HEC endpoint path, for example, /services/collector. Splunk Dataflow template currently only supports /services/collector endpoint for JSON-formatted events, and it automatically appends that path to your Splunk HEC URL input. To learn more about that HEC endpoint, see the Splunk documentation for services/collector endpoint.

    An example of a valid Splunk HEC URL input is https://splunk-hec.example.com:8088. If you are sending data to HEC on Splunk Cloud Platform, see Send data to HEC on Splunk Cloud to determine the above host and port portions of your specific Splunk HEC URL.

  3. Deploy the Dataflow pipeline:

    # Set Dataflow pipeline job name
    JOB_NAME=pubsub-to-splunk-`date +"%Y%m%d-%H%M%S"`
    # Run Dataflow pipeline job
    gcloud beta dataflow jobs run ${JOB_NAME} \
       --gcs-location=gs://dataflow-templates/latest/Cloud_PubSub_to_Splunk \
       --worker-machine-type=$DATAFLOW_MACHINE_TYPE \
       --max-workers=$DATAFLOW_MACHINE_COUNT \
       --region=$REGION_ID \
       --network=export-network \
       --subnetwork=regions/$REGION_ID/subnetworks/export-network-us-central \
       --disable-public-ips \
       --parameters \
    inputSubscription=projects/${PROJECT_ID}/subscriptions/${DATAFLOW_INPUT_SUB},\
    outputDeadletterTopic=projects/${PROJECT_ID}/topics/${DATAFLOW_DEADLETTER_TOPIC},\
    url=${SPLUNK_HEC_URL},\
    tokenSource=SECRET_MANAGER, \
    tokenSecretId=projects/${PROJECT_ID}/secrets/hec-token/versions/1, \
    batchCount=${DATAFLOW_BATCH_COUNT},\
    parallelism=${DATAFLOW_PARALLELISM},\
    javascriptTextTransformGcsPath=gs://splk-public/js/dataflow_udf_messages_replay.js,\
    javascriptTextTransformFunctionName=process
    

    Copy the new job ID returned in the output.

    By default, Splunk Dataflow pipeline validates SSL certificate for your Splunk HEC endpoint. If you want to use self-signed certificates for development and testing, you must disable the SSL validation For more information, see the Pub/Sub to Splunk Dataflow template parameters (disableCertificateValidation).

  4. Save the new job ID in the DATAFLOW_JOB_ID environment variable. You use this variable in a later step.

    export DATAFLOW_JOB_ID="YOUR_DATAFLOW_JOB_ID"
    

View logs in Splunk

It takes a few minutes for the Dataflow pipeline workers to be provisioned and ready to deliver logs to Splunk HEC. You can confirm that logs are properly received and indexed in the Splunk Enterprise or Splunk Cloud Platform search interface.

It should take no more than a few minutes for the Dataflow pipeline workers to be provisioned and ready to deliver logs to Splunk HEC. You can confirm that logs are properly received and indexed in the Splunk Enterprise or Splunk Cloud Platform search interface. To see the number of logs per type of monitored resource:

  1. In Splunk, open Splunk Search & Reporting.
  2. Run the search index=[MY_INDEX] | stats count by resource.type where MY_INDEX index is configured for your Splunk HEC token.

    View logs in Splunk.

  3. If you don't see any events, see Handling delivery failures.

Transforming events in-flight with UDF

The Splunk Dataflow template supports UDF for custom event transformation. The pipeline you deployed uses a sample UDF, specified by the optional parameters javascriptTextTransformGcsPath and javascriptTextTransformFunctionName. The sample UDF includes code examples for event enrichment, including adding new fields or setting Splunk HEC metadata on an event basis. The sample UDF also includes decoding logic to replay failed deliveries, which you learn how to do in the Modify the sample UDF.

In this section, you edit the sample UDF function to add a new event field. This new field specifies the value of the originating Pub/Sub subscription as additional contextual information.

Modify the sample UDF

  1. In Cloud Shell, download the JavaScript file that contains the sample UDF function:

    wget https://storage.googleapis.com/splk-public/js/dataflow_udf_messages_replay.js
    
  2. Open the JavaScript file in an editor of your choice. Uncomment the line that adds a new field inputSubscription to the event payload:

    // event.inputSubscription = "splunk-dataflow-pipeline";
    
  3. Set the new event field inputSubscription to "org-logs-all-sub" to track the input Pub/Sub subscription where the event came from:

    event.inputSubscription = "org-logs-all-sub";
    
  4. Save the file.

  5. In Cloud Shell, create a new Cloud Storage bucket:

    # Create a new Cloud Storage bucket
    gsutil mb -b on gs://${PROJECT_ID}-dataflow/
    
  6. Upload the file to the Cloud Storage bucket:

    # Upload JavaScript file
    gsutil cp ./dataflow_udf_messages_replay.js gs://${PROJECT_ID}-dataflow/js/
    

Update the Dataflow Pipeline with the new UDF

  1. In Cloud Shell, stop the pipeline by using the Drain option to ensure that the logs which were already pulled from Pub/Sub are not lost:

    gcloud dataflow jobs drain $DATAFLOW_JOB_ID --region=$REGION_ID
    
  2. Deploy a new pipeline with the updated UDF:

    # Set Dataflow pipeline job name
    JOB_NAME=pubsub-to-splunk-`date +"%Y%m%d-%H%M%S"`
    # Run Dataflow pipeline job
    gcloud beta dataflow jobs run ${JOB_NAME} \
       --gcs-location=gs://dataflow-templates/latest/Cloud_PubSub_to_Splunk \
       --worker-machine-type=$DATAFLOW_MACHINE_TYPE \
       --max-workers=$DATAFLOW_MACHINE_COUNT \
       --region=$REGION_ID \
       --network=export-network \
       --subnetwork=regions/$REGION_ID/subnetworks/export-network-us-central \
       --disable-public-ips \
       --parameters \
    inputSubscription=projects/${PROJECT_ID}/subscriptions/${DATAFLOW_INPUT_SUB},\
    outputDeadletterTopic=projects/${PROJECT_ID}/topics/${DATAFLOW_DEADLETTER_TOPIC},\
    url=${SPLUNK_HEC_URL},\
    tokenSource=SECRET_MANAGER, \
    tokenSecretId=projects/${PROJECT_ID}/secrets/hec-token/versions/1, \
    batchCount=${DATAFLOW_BATCH_COUNT},\
    parallelism=${DATAFLOW_PARALLELISM},\
    javascriptTextTransformGcsPath=gs://${PROJECT_ID}-dataflow/js/dataflow_udf_messages_replay.js,\
    javascriptTextTransformFunctionName=process
    

    Copy the new job ID returned in the output.

  3. Save the job ID in the DATAFLOW_JOB_ID environment variable. export DATAFLOW_JOB_ID="YOUR_DATAFLOW_JOB_ID"

Handling delivery failures

Delivery failures can happen due to errors in processing events or connecting to Splunk HEC. In this section, you introduce a delivery failure to demonstrate the error handling workflow. You also learn how to view and trigger the re-delivery of the failed messages to Splunk.

Error handling overview

The following diagram shows the error handling workflow in the Splunk Dataflow pipeline:

Log export to Splunk.

  1. The Pub/Sub to Splunk Dataflow pipeline (the main pipeline) automatically forwards undeliverable messages to the unprocessed topic for user investigation.
  2. The operator investigates the failed messages in the unprocessed subscription, troubleshoots, and fixes the root cause of the delivery failure, for example, fixing HEC token misconfiguration.
  3. The operator triggers a Pub/Sub to Pub/Sub Dataflow pipeline (the secondary pipeline). This pipeline (highlighted in the dotted section of the preceding diagram) is a temporary pipeline that moves the failed messages from the unprocessed subscription back to the original log sink topic.
  4. The main pipeline re-processes the previously failed messages. This step requires the pipeline to use the sample UDFfor correct detection and decoding of failed messages payloads. The following part of the function implements this conditional decoding logic including a tally of delivery attempts for tracking purposes:

    // If message has already been converted to Splunk HEC object  with stringified
     // obj.event JSON payload, then it's a replay of a previously failed delivery:
     // Unnest and parse obj.event. Drop previously injected obj.attributes
     // such as errorMessage and timestamp
     if (obj.event) {
       try {
         event = JSON.parse(obj.event);
         redelivery = true;
       } catch(e) {
         event = obj;
       }
     } else {
       event = obj;
     }
    
     // Keep a tally of delivery attempts
     event.delivery_attempt = event.delivery_attempt || 1;
     if (redelivery) {
       event.delivery_attempt += 1;
     }
    

Trigger delivery failures

In this section, you trigger delivery failures. You can manually introduce a delivery failure with either of the following methods:

  • Stopping Splunk server (if single instance) to cause connection errors.
  • Disabling the relevant HEC token from your Splunk input configuration.

Troubleshoot failed messages

To investigate a failed message, you can use the Google Cloud console:

  1. In the Google Cloud console, open the Pub/Sub Subscriptions page.

    Go to Pub/Sub Subscriptions

  2. Click the unprocessed subscription that you created. If you used the previous example, the subscription name is: projects/${PROJECT_ID}/subscriptions/org-logs-all-dl-sub.

  3. To open the messages viewer, click View Messages.

  4. To view messages, click Pull, making sure to leave Enable ack messages cleared.

  5. You can now inspect the failed messages, in particular:

    • The Splunk event payload under the Message body column.
    • The error message under the attribute.errorMessage column.
    • The error timestamp under the attribute.timestamp column.

The following screenshot is an example of a failure message which you encounter if the Splunk HEC endpoint is temporarily down or is unreachable. Notice the errorMessage attribute: The target server failed to respond.

Failed messages attributes.

Delivery error types

The following table lists some possible Splunk delivery errors, along with the errorMessage attribute that the pipeline records with each message before forwarding these messages to the unprocessed topic:

Delivery error type Automatically retried by pipeline? Example errorMessage attribute
Transient network error Yes Read timed out

or

Connection reset

Splunk server 5xx error Yes Splunk write status code: 503
Splunk server 4xx error No Splunk write status code: 403
Splunk server down No The target server failed to respond
Splunk SSL certificate invalid No Host name X does not match the certificate
UDF JavaScript syntax error No ReferenceError: foo is not defined

In some cases, the pipeline automatically attempts retries with exponential backoff. Examples include Splunk server 5xx errors, which occur if Splunk HEC endpoint is overloaded. Alternatively, there could be a persistent issue that prevents a message from being submitted to HEC. In this case, the pipeline does not attempt a retry. The following are examples of persistent issues:

  • A syntax error in the UDF function.
  • An invalid HEC token causing a Splunk server 4xx 'Forbidden' server response.

Replay failed messages

In this section, you replay the unprocessed messages, on the assumption that the root cause of the delivery failure has since been fixed. If you disabled the Splunk HEC endpoint in the Trigger delivery failures section, check that the Splunk HEC endpoint is now operating.

  1. In Cloud Shell, before re-processing the messages from the unprocessed subscription, we recommend that you take a snapshot of the unprocessed subscription. This prevents the loss of messages if there's an unexpected configuration error.

     gcloud pubsub snapshots create dlt-snapshot-`date +"%Y%m%d-%H%M%S"` \
         --subscription=org-logs-all-dl-sub
    
  2. Use the Pub/Sub to Pub/Sub Dataflow template to transfer the messages from the unprocessed subscription back to the input topic with another Dataflow job:

      DATAFLOW_INPUT_TOPIC="org-logs-all"
      DATAFLOW_DEADLETTER_SUB="org-logs-all-dl-sub"
    
      JOB_NAME=splunk-dataflow-replay-`date +"%Y%m%d-%H%M%S"`
      gcloud dataflow jobs run $JOB_NAME \
           --gcs-location= gs://dataflow-templates/latest/Cloud_PubSub_to_Cloud_PubSub \
           --worker-machine-type=n1-standard-2 \
           --max-workers=1 \
           --region=$REGION_ID \
           --parameters \
      inputSubscription=projects/${PROJECT_ID}/subscriptions/${DATAFLOW_DEADLETTER_SUB},\
      outputTopic=projects/${PROJECT_ID}/topics/${DATAFLOW_INPUT_TOPIC}
    

    Copy the Dataflow job ID that this command returns.

  3. Save the Dataflow job ID to the DATAFLOW_JOB_ID environment variable.

  4. In the Google Cloud console, go to the Pub/Sub Subscriptions page.

    Go to the Pub/Sub Subscriptions page

  5. Select the unprocessed subscription. Confirm that the Unacked message count is down to 0.

    Failed messages.

  6. In Cloud Shell, drain the Dataflow job that you created:

    gcloud dataflow jobs drain $DATAFLOW_JOB_ID --region=$REGION_ID
    

    When messages are transferred back to the original input topic, the main Dataflow pipeline automatically picks up the failed messages and re-delivers them to Splunk.

Confirm messages in Splunk

  1. To confirm that the messages have been re-delivered, in Splunk, open Splunk Search & Reporting

  2. Run a search for delivery_attempts > 1. This is a special field that the sample UDF adds to each event to track the number of delivery attempts. Make sure to expand the search time range to include events that may have occurred in the past, because the event timestamp is the original time of creation, not the time of indexing.

In the following example image, the two messages that originally failed are now successfully delivered and indexed in Splunk with the correct timestamp from a few days ago. Notice that the insertId field value is the same as the value found when inspecting the failed messages by manually pulling from the unprocessed subscription. insertId is a unique identifier for the original log entry that Cloud Logging assigns.

Failed messages in Splunk.

Clean up

To avoid incurring charges to your Google Cloud account for the resources used in this tutorial, either delete the project that contains the resources, or keep the project and delete the individual resources.

Delete the organization-level sink

gcloud logging sinks delete org-logs-all-sink --organization=$ORG_ID

Delete the project

With the log sink deleted, you can proceed with deleting resources created to receive and export logs. The easiest way is to delete the project you created for the tutorial.

  1. In the Google Cloud console, go to the Manage resources page.

    Go to Manage resources

  2. In the project list, select the project that you want to delete, and then click Delete.
  3. In the dialog, type the project ID, and then click Shut down to delete the project.

What's next