Modèle Pub/Sub vers Splunk

Le modèle Pub/Sub vers Splunk est un pipeline de streaming qui lit les messages d'un abonnement Pub/Sub et écrit leur charge utile dans Splunk à l'aide de la solution HEC (HTTP Event Collector) de Splunk. Le cas d'utilisation le plus courant de ce modèle est l'exportation de journaux vers Splunk. Pour découvrir un exemple du workflow sous-jacent, consultez la section Déployer des exportations de journaux prêtes pour la production vers Splunk à l'aide de Dataflow.

Avant d'écrire vers Splunk, vous pouvez également appliquer une fonction JavaScript définie par l'utilisateur vers la charge utile du message. Tous les messages dont le traitement échoue sont transférés vers un sujet Pub/Sub non traité en vue d'opérations de dépannage supplémentaires et d'un nouveau traitement.

Pour ajouter une couche de protection à votre jeton HEC, vous pouvez également transmettre une clé Cloud KMS ainsi que le paramètre de jeton HEC encodé en base64 chiffré avec cette clé. Pour en savoir plus sur le chiffrement des paramètres du jeton HEC, consultez la page sur le point de terminaison du chiffrement de l'API Cloud KMS.

Conditions requises pour ce pipeline

  • L'abonnement Pub/Sub source doit exister avant l'exécution du pipeline.
  • Le sujet Pub/Sub non traité doit exister avant l'exécution du pipeline.
  • Le point de terminaison Splunk HEC doit être accessible à partir du réseau de nœuds de calcul Dataflow.
  • Le jeton de la solution HEC de Splunk doit être généré et disponible.

Paramètres de modèle

Paramètres obligatoires

  • inputSubscription: abonnement Pub/Sub à partir duquel lire l'entrée. Exemple :projects/your-project-id/subscriptions/your-subscription-name
  • url: URL HEC Splunk. L'URL doit être routable depuis le VPC dans lequel le pipeline est exécuté Exemple :https://splunk-hec-host:8088
  • outputDeadletterTopic: sujet Pub/Sub auquel transférer les messages non distribuables. Exemple :projects/<PROJECT_ID>/topics/<TOPIC_NAME>

Paramètres facultatifs

  • token: jeton d'authentification HEC Splunk. Doit être spécifié si le paramètre tokenSource est défini sur PLAINTEXT ou KMS.
  • batchCount: taille de lot pour l'envoi de plusieurs événements vers Splunk. La valeur par défaut est 1 (pas de traitement par lots).
  • disableCertificateValidation: désactive la validation du certificat SSL. false par défaut (validation activée). Si la valeur est true, les certificats ne sont pas validés (tous les certificats sont approuvés) et le paramètre rootCaCertificatePath est ignoré.
  • parallelism: nombre maximal de requêtes en parallèle. La valeur par défaut est 1 (aucun parallélisme).
  • includePubsubMessage: inclut le message Pub/Sub complet dans la charge utile. Valeur par défaut : false (seul l'élément de données est inclus dans la charge utile).
  • tokenKMSEncryptionKey: clé Cloud KMS à utiliser pour déchiffrer la chaîne du jeton HEC. Ce paramètre doit être fourni lorsque tokenSource est défini sur KMS. Si la clé Cloud KMS est fournie, la chaîne du jeton HEC doit être transmise sous forme chiffrée. Exemple :projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name
  • tokenSecretId: ID du secret Secret Manager pour le jeton. Ce paramètre doit être fourni lorsque tokenSource est défini sur SECRET_MANAGER. Par exemple, projects/your-project-id/secrets/your-secret/versions/your-secret-version.
  • tokenSource: source du jeton. Les valeurs suivantes sont autorisées : PLAINTEXT, KMS et SECRET_MANAGER. Vous devez fournir ce paramètre lorsque Secret Manager est utilisé. Si tokenSource est défini sur KMS, tokenKMSEncryptionKey et un token chiffré doivent être spécifiés. Si tokenSource est défini sur SECRET_MANAGER, tokenSecretId doit être fourni. Si tokenSource est défini sur PLAINTEXT, token doit être fourni.
  • rootCaCertificatePath: URL complète du certificat CA racine dans Cloud Storage. Le certificat fourni dans Cloud Storage doit être encodé au format DER et peut être fourni en encodage binaire ou imprimable (base64). Si le certificat est fourni avec un encodage en base64, il doit être délimité par "------BEGIN CERTIFICATE-----" au début et par "-----END CERTIFICATE-----" à la fin. Si ce paramètre est fourni, ce fichier de certificat CA privé est extrait et ajouté au trust store du nœud de calcul Dataflow pour vérifier le certificat SSL du point de terminaison HEC de Splunk. Si ce paramètre n'est pas fourni, le trust store par défaut est utilisé. Exemple :gs://mybucket/mycerts/privateCA.crt
  • enableBatchLogs: spécifie si les journaux doivent être activés pour les lots écrits dans Splunk. Valeur par défaut : true.
  • enableGzipHttpCompression: indique si les requêtes HTTP envoyées à la solution HEC de Splunk doivent être compressées (codage de contenu gzip). Valeur par défaut : true.
  • javascriptTextTransformGcsPath: URI Cloud Storage du fichier .js qui définit la fonction JavaScript définie par l'utilisateur (UDF) à utiliser. Exemple :gs://my-bucket/my-udfs/my_file.js
  • javascriptTextTransformFunctionName: nom de la fonction JavaScript définie par l'utilisateur à utiliser. Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la section https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples.
  • javascriptTextTransformReloadIntervalMinutes: définissez l'intervalle que les nœuds de calcul peuvent vérifier pour les modifications des UDF JavaScript pour actualiser les fichiers. La valeur par défaut est 0.

Fonction définie par l'utilisateur

Vous pouvez éventuellement étendre ce modèle en écrivant une fonction définie par l'utilisateur (UDF). Le modèle appelle l'UDF pour chaque élément d'entrée. Les charges utiles des éléments sont sérialisées sous forme de chaînes JSON. Pour en savoir plus, consultez la page Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.

Spécification de la fonction

La spécification de l'UDF se présente comme suit :

  • Entrée : champ de données du message Pub/Sub, sérialisé en tant que chaîne JSON.
  • Sortie : données d'événement à envoyer au point de terminaison des événements HEC Splunk. Le résultat doit être une chaîne ou un objet JSON concaténé.

Exécuter le modèle

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Pub/Sub to Splunk template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Facultatif : Pour passer du traitement de type "exactement une fois" au mode de traitement en flux continu de type "au moins une fois", sélectionnez Au moins une fois.
  8. Cliquez sur Run Job (Exécuter la tâche).

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_PubSub_to_Splunk \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
token=TOKEN,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM,\
disableCertificateValidation=DISABLE_VALIDATION,\
rootCaCertificatePath=ROOT_CA_CERTIFICATE_PATH

Remplacez les éléments suivants :

  • JOB_NAME : nom de job unique de votre choix
  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • STAGING_LOCATION : emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
  • INPUT_SUBSCRIPTION_NAME : nom de l'abonnement Pub/Sub
  • TOKEN : jeton HTTP Event Collector de Splunk
  • URL : chemin d'URL du jeton HTTP Event Collector de Splunk (par exemple, https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME : nom du sujet Pub/Sub
  • JAVASCRIPT_FUNCTION : Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.

    Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.

  • PATH_TO_JAVASCRIPT_UDF_FILE : URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser (par exemple, gs://my-bucket/my-udfs/my_file.js).
  • BATCH_COUNT : taille de lot à utiliser pour envoyer plusieurs événements vers Splunk
  • PARALLELISM : nombre de requêtes parallèles à utiliser pour envoyer des événements vers Splunk
  • DISABLE_VALIDATION : true si vous souhaitez désactiver la validation du certificat SSL
  • ROOT_CA_CERTIFICATE_PATH : chemin d'accès au certificat racine de l'autorité de certification dans Cloud Storage (par exemple, gs://your-bucket/privateCA.crt)

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_PubSub_to_Splunk
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "token": "TOKEN",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM",
       "disableCertificateValidation": "DISABLE_VALIDATION",
       "rootCaCertificatePath": "ROOT_CA_CERTIFICATE_PATH"
   }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • STAGING_LOCATION : emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
  • INPUT_SUBSCRIPTION_NAME : nom de l'abonnement Pub/Sub
  • TOKEN : jeton HTTP Event Collector de Splunk
  • URL : chemin d'URL du jeton HTTP Event Collector de Splunk (par exemple, https://splunk-hec-host:8088)
  • DEADLETTER_TOPIC_NAME : nom du sujet Pub/Sub
  • JAVASCRIPT_FUNCTION : Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.

    Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.

  • PATH_TO_JAVASCRIPT_UDF_FILE : URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser (par exemple, gs://my-bucket/my-udfs/my_file.js).
  • BATCH_COUNT : taille de lot à utiliser pour envoyer plusieurs événements vers Splunk
  • PARALLELISM : nombre de requêtes parallèles à utiliser pour envoyer des événements vers Splunk
  • DISABLE_VALIDATION : true si vous souhaitez désactiver la validation du certificat SSL
  • ROOT_CA_CERTIFICATE_PATH : chemin d'accès au certificat racine de l'autorité de certification dans Cloud Storage (par exemple, gs://your-bucket/privateCA.crt)
Java
/*
 * Copyright (C) 2019 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.cloud.teleport.coders.FailsafeElementCoder;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.splunk.SplunkEvent;
import com.google.cloud.teleport.splunk.SplunkEventCoder;
import com.google.cloud.teleport.splunk.SplunkIO;
import com.google.cloud.teleport.splunk.SplunkWriteError;
import com.google.cloud.teleport.templates.PubSubToSplunk.PubSubToSplunkOptions;
import com.google.cloud.teleport.templates.common.ErrorConverters;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.templates.common.PubsubConverters.PubsubReadSubscriptionOptions;
import com.google.cloud.teleport.templates.common.PubsubConverters.PubsubWriteDeadletterTopicOptions;
import com.google.cloud.teleport.templates.common.SplunkConverters;
import com.google.cloud.teleport.templates.common.SplunkConverters.SplunkOptions;
import com.google.cloud.teleport.util.TokenNestedValueProvider;
import com.google.cloud.teleport.values.FailsafeElement;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonSyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link PubSubToSplunk} pipeline is a streaming pipeline which ingests data from Cloud
 * Pub/Sub, executes a UDF, converts the output to {@link SplunkEvent}s and writes those records
 * into Splunk's HEC endpoint. Any errors which occur in the execution of the UDF, conversion to
 * {@link SplunkEvent} or writing to HEC will be streamed into a Pub/Sub topic.
 *
 * <p><b>Pipeline Requirements</b>
 *
 * <ul>
 *   <li>The source Pub/Sub subscription exists.
 *   <li>HEC end-point is routable from the VPC where the Dataflow job executes.
 *   <li>Deadletter topic exists.
 * </ul>
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_PubSub_to_Splunk.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_PubSub_to_Splunk",
    category = TemplateCategory.STREAMING,
    displayName = "Pub/Sub to Splunk",
    description = {
      "The Pub/Sub to Splunk template is a streaming pipeline that reads messages from a Pub/Sub subscription and writes the message payload to Splunk via Splunk's HTTP Event Collector (HEC). The most common use case of this template is to export logs to Splunk. "
          + "To see an example of the underlying workflow, see <a href=\"https://cloud.google.com/architecture/deploying-production-ready-log-exports-to-splunk-using-dataflow\">Deploying production-ready log exports to Splunk using Dataflow</a>.\n",
      "Before writing to Splunk, you can also apply a JavaScript user-defined function to the message payload. "
          + "Any messages that experience processing failures are forwarded to a Pub/Sub unprocessed topic for further troubleshooting and reprocessing.\n",
      "As an extra layer of protection for your HEC token, you can also pass in a Cloud KMS key along with the base64-encoded HEC token parameter encrypted with the Cloud KMS key. "
          + "See the <a href=\"https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt\">Cloud KMS API encryption endpoint</a> for additional details on encrypting your HEC token parameter."
    },
    optionsClass = PubSubToSplunkOptions.class,
    optionsOrder = {
      PubsubReadSubscriptionOptions.class,
      SplunkOptions.class,
      JavascriptTextTransformerOptions.class,
      PubsubWriteDeadletterTopicOptions.class
    },
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-splunk",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The source Pub/Sub subscription must exist prior to running the pipeline.",
      "The Pub/Sub unprocessed topic must exist prior to running the pipeline.",
      "The Splunk HEC endpoint must be accessible from the Dataflow workers' network.",
      "The Splunk HEC token must be generated and available."
    },
    streaming = true,
    supportsAtLeastOnce = true,
    supportsExactlyOnce = true)
public class PubSubToSplunk {

  /** String/String Coder for FailsafeElement. */
  public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  /** Counter to track inbound messages from source. */
  private static final Counter INPUT_MESSAGES_COUNTER =
      Metrics.counter(PubSubToSplunk.class, "inbound-pubsub-messages");

  /** The tag for successful {@link SplunkEvent} conversion. */
  private static final TupleTag<SplunkEvent> SPLUNK_EVENT_OUT = new TupleTag<SplunkEvent>() {};

  /** The tag for failed {@link SplunkEvent} conversion. */
  private static final TupleTag<FailsafeElement<String, String>> SPLUNK_EVENT_DEADLETTER_OUT =
      new TupleTag<FailsafeElement<String, String>>() {};

  /** The tag for the main output for the UDF. */
  private static final TupleTag<FailsafeElement<String, String>> UDF_OUT =
      new TupleTag<FailsafeElement<String, String>>() {};

  /** The tag for the dead-letter output of the udf. */
  private static final TupleTag<FailsafeElement<String, String>> UDF_DEADLETTER_OUT =
      new TupleTag<FailsafeElement<String, String>>() {};

  /** GSON to process a {@link PubsubMessage}. */
  private static final Gson GSON = new Gson();

  /** Logger for class. */
  private static final Logger LOG = LoggerFactory.getLogger(PubSubToSplunk.class);

  private static final Boolean DEFAULT_INCLUDE_PUBSUB_MESSAGE = false;

  @VisibleForTesting protected static final String PUBSUB_MESSAGE_ATTRIBUTE_FIELD = "attributes";
  @VisibleForTesting protected static final String PUBSUB_MESSAGE_DATA_FIELD = "data";
  private static final String PUBSUB_MESSAGE_ID_FIELD = "messageId";

  /**
   * The main entry-point for pipeline execution. This method will start the pipeline but will not
   * wait for it's execution to finish. If blocking execution is required, use the {@link
   * PubSubToSplunk#run(PubSubToSplunkOptions)} method to start the pipeline and invoke {@code
   * result.waitUntilFinish()} on the {@link PipelineResult}.
   *
   * @param args The command-line args passed by the executor.
   */
  public static void main(String[] args) {

    PubSubToSplunkOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToSplunkOptions.class);

    run(options);
  }

  /**
   * Runs the pipeline to completion with the specified options. This method does not wait until the
   * pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
   * object to block until the pipeline is finished running if blocking programmatic execution is
   * required.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  public static PipelineResult run(PubSubToSplunkOptions options) {

    Pipeline pipeline = Pipeline.create(options);

    // Register coders.
    CoderRegistry registry = pipeline.getCoderRegistry();
    registry.registerCoderForClass(SplunkEvent.class, SplunkEventCoder.of());
    registry.registerCoderForType(
        FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);

    /*
     * Steps:
     *  1) Read messages in from Pub/Sub
     *  2) Convert message to FailsafeElement for processing.
     *  3) Apply user provided UDF (if any) on the input strings.
     *  4) Convert successfully transformed messages into SplunkEvent objects
     *  5) Write SplunkEvents to Splunk's HEC end point.
     *  5a) Wrap write failures into a FailsafeElement.
     *  6) Collect errors from UDF transform (#3), SplunkEvent transform (#4)
     *     and writing to Splunk HEC (#5) and stream into a Pub/Sub deadletter topic.
     */

    // 1) Read messages in from Pub/Sub
    PCollection<String> stringMessages =
        pipeline.apply(
            "ReadMessages",
            new ReadMessages(options.getInputSubscription(), options.getIncludePubsubMessage()));

    // 2) Convert message to FailsafeElement for processing.
    PCollectionTuple transformedOutput =
        stringMessages
            .apply(
                "ConvertToFailsafeElement",
                MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                    .via(input -> FailsafeElement.of(input, input)))

            // 3) Apply user provided UDF (if any) on the input strings.
            .apply(
                "ApplyUDFTransformation",
                FailsafeJavascriptUdf.<String>newBuilder()
                    .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                    .setFunctionName(options.getJavascriptTextTransformFunctionName())
                    .setReloadIntervalMinutes(
                        options.getJavascriptTextTransformReloadIntervalMinutes())
                    .setLoggingEnabled(ValueProvider.StaticValueProvider.of(true))
                    .setSuccessTag(UDF_OUT)
                    .setFailureTag(UDF_DEADLETTER_OUT)
                    .build());

    // 4) Convert successfully transformed messages into SplunkEvent objects
    PCollectionTuple convertToEventTuple =
        transformedOutput
            .get(UDF_OUT)
            .apply(
                "ConvertToSplunkEvent",
                SplunkConverters.failsafeStringToSplunkEvent(
                    SPLUNK_EVENT_OUT, SPLUNK_EVENT_DEADLETTER_OUT));

    // 5) Write SplunkEvents to Splunk's HEC end point.
    PCollection<SplunkWriteError> writeErrors =
        convertToEventTuple
            .get(SPLUNK_EVENT_OUT)
            .apply(
                "WriteToSplunk",
                SplunkIO.writeBuilder()
                    .withToken(
                        new TokenNestedValueProvider(
                            options.getTokenSecretId(),
                            options.getTokenKMSEncryptionKey(),
                            options.getToken(),
                            options.getTokenSource()))
                    .withUrl(options.getUrl())
                    .withBatchCount(options.getBatchCount())
                    .withParallelism(options.getParallelism())
                    .withDisableCertificateValidation(options.getDisableCertificateValidation())
                    .withRootCaCertificatePath(options.getRootCaCertificatePath())
                    .withEnableBatchLogs(options.getEnableBatchLogs())
                    .withEnableGzipHttpCompression(options.getEnableGzipHttpCompression())
                    .build());

    // 5a) Wrap write failures into a FailsafeElement.
    PCollection<FailsafeElement<String, String>> wrappedSplunkWriteErrors =
        writeErrors.apply(
            "WrapSplunkWriteErrors",
            ParDo.of(
                new DoFn<SplunkWriteError, FailsafeElement<String, String>>() {

                  @ProcessElement
                  public void processElement(ProcessContext context) {
                    SplunkWriteError error = context.element();
                    FailsafeElement<String, String> failsafeElement =
                        FailsafeElement.of(error.payload(), error.payload());

                    if (error.statusMessage() != null) {
                      failsafeElement.setErrorMessage(error.statusMessage());
                    }

                    if (error.statusCode() != null) {
                      failsafeElement.setErrorMessage(
                          String.format("Splunk write status code: %d", error.statusCode()));
                    }
                    context.output(failsafeElement);
                  }
                }));

    // 6) Collect errors from UDF transform (#4), SplunkEvent transform (#5)
    //     and writing to Splunk HEC (#6) and stream into a Pub/Sub deadletter topic.
    PCollectionList.of(
            ImmutableList.of(
                convertToEventTuple.get(SPLUNK_EVENT_DEADLETTER_OUT),
                wrappedSplunkWriteErrors,
                transformedOutput.get(UDF_DEADLETTER_OUT)))
        .apply("FlattenErrors", Flatten.pCollections())
        .apply(
            "WriteFailedRecords",
            ErrorConverters.WriteStringMessageErrorsToPubSub.newBuilder()
                .setErrorRecordsTopic(options.getOutputDeadletterTopic())
                .build());

    return pipeline.run();
  }

  /**
   * The {@link PubSubToSplunkOptions} class provides the custom options passed by the executor at
   * the command line.
   */
  public interface PubSubToSplunkOptions
      extends SplunkOptions,
          PubsubReadSubscriptionOptions,
          PubsubWriteDeadletterTopicOptions,
          JavascriptTextTransformerOptions {}

  /**
   * A {@link PTransform} that reads messages from a Pub/Sub subscription, increments a counter and
   * returns a {@link PCollection} of {@link String} messages.
   */
  private static class ReadMessages extends PTransform<PBegin, PCollection<String>> {
    private final ValueProvider<String> subscriptionName;
    private final ValueProvider<Boolean> inputIncludePubsubMessageFlag;
    private Boolean includePubsubMessage;

    ReadMessages(
        ValueProvider<String> subscriptionName,
        ValueProvider<Boolean> inputIncludePubsubMessageFlag) {
      this.subscriptionName = subscriptionName;
      this.inputIncludePubsubMessageFlag = inputIncludePubsubMessageFlag;
    }

    @Override
    public PCollection<String> expand(PBegin input) {
      return input
          .apply(
              "ReadPubsubMessage",
              PubsubIO.readMessagesWithAttributes().fromSubscription(subscriptionName))
          .apply(
              "ExtractMessageIfRequired",
              ParDo.of(
                  new DoFn<PubsubMessage, String>() {

                    @Setup
                    public void setup() {
                      if (inputIncludePubsubMessageFlag != null) {
                        includePubsubMessage = inputIncludePubsubMessageFlag.get();
                      }
                      includePubsubMessage =
                          MoreObjects.firstNonNull(
                              includePubsubMessage, DEFAULT_INCLUDE_PUBSUB_MESSAGE);
                      LOG.info("includePubsubMessage set to: {}", includePubsubMessage);
                    }

                    @ProcessElement
                    public void processElement(ProcessContext context) {
                      if (includePubsubMessage) {
                        context.output(formatPubsubMessage(context.element()));
                      } else {
                        context.output(
                            new String(context.element().getPayload(), StandardCharsets.UTF_8));
                      }
                    }
                  }))
          .apply(
              "CountMessages",
              ParDo.of(
                  new DoFn<String, String>() {
                    @ProcessElement
                    public void processElement(ProcessContext context) {
                      INPUT_MESSAGES_COUNTER.inc();
                      context.output(context.element());
                    }
                  }));
    }
  }

  /**
   * Utility method that formats {@link org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage} according
   * to the model defined in {@link com.google.pubsub.v1.PubsubMessage}.
   *
   * @param pubsubMessage {@link org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage}
   * @return JSON String that adheres to the model defined in {@link
   *     com.google.pubsub.v1.PubsubMessage}
   */
  @VisibleForTesting
  protected static String formatPubsubMessage(PubsubMessage pubsubMessage) {
    JsonObject messageJson = new JsonObject();

    String payload = new String(pubsubMessage.getPayload(), StandardCharsets.UTF_8);
    try {
      JsonObject data = GSON.fromJson(payload, JsonObject.class);
      messageJson.add(PUBSUB_MESSAGE_DATA_FIELD, data);
    } catch (JsonSyntaxException e) {
      messageJson.addProperty(PUBSUB_MESSAGE_DATA_FIELD, payload);
    }

    JsonObject attributes = getAttributesJson(pubsubMessage.getAttributeMap());
    messageJson.add(PUBSUB_MESSAGE_ATTRIBUTE_FIELD, attributes);

    if (pubsubMessage.getMessageId() != null) {
      messageJson.addProperty(PUBSUB_MESSAGE_ID_FIELD, pubsubMessage.getMessageId());
    }

    return messageJson.toString();
  }

  /**
   * Constructs a {@link JsonObject} from a {@link Map} of Pub/Sub attributes.
   *
   * @param attributesMap {@link Map} of Pub/Sub attributes
   * @return {@link JsonObject} of Pub/Sub attributes
   */
  private static JsonObject getAttributesJson(Map<String, String> attributesMap) {
    JsonObject attributesJson = new JsonObject();
    for (String key : attributesMap.keySet()) {
      attributesJson.addProperty(key, attributesMap.get(key));
    }

    return attributesJson;
  }
}

Étape suivante