Vorlage "Pub/Sub für Datadog"

Die Vorlage "Pub/Sub für Datadog" ist eine Streamingpipeline, die Nachrichten aus einem Pub/Sub-Abo liest und die Nutzlast der Nachricht mithilfe von Datadog-Endpunkten in Datadog schreibt. Der häufigste Anwendungsfall für diese Vorlage ist das Exportieren von Logdateien in Datadog.

Vor dem Schreiben in Datadog können Sie eine benutzerdefinierte JavaScript-Funktion auf die Nachrichtennutzlast anwenden. Alle Nachrichten, bei denen Verarbeitungsfehler auftreten, werden zur weiteren Fehlerbehebung und erneuten Verarbeitung an ein unverarbeitetes Thema in Pub/Sub weitergeleitet.

Als zusätzlichen Schutz für Ihre API-Schlüssel und -Secrets können Sie auch einen Cloud KMS-Schlüssel zusammen mit dem base64-codierten API-Schlüsselparameter übergeben, der mit dem Cloud KMS-Schlüssel verschlüsselt ist. Weitere Informationen zum Verschlüsseln des API-Schlüsselparameters finden Sie unter Cloud KMS API-Verschlüsselungsendpunkt.

Pipelineanforderungen

  • Das als Quelle dienende Pub/Sub-Abo muss vorhanden sein, bevor Sie die Pipeline ausführen.
  • Das Pub/Sub-Thema für nicht verarbeitete Datensätze muss vorhanden sein, bevor die Pipeline ausgeführt wird.
  • Auf die Datadog-URL muss über das Netzwerk der Dataflow-Worker zugegriffen werden können.
  • Der Datadog API-Schlüssel muss generiert und verfügbar sein.

Vorlagenparameter

Erforderliche Parameter

  • inputSubscription: Das Pub/Sub-Abo, aus dem die Eingabe gelesen werden soll. Beispiel: projects/your-project-id/subscriptions/your-subscription-name.
  • url: Die URL der Datadog Logs API. Diese URL muss von der VPC, in der die Pipeline ausgeführt wird, weitergeleitet werden können. Weitere Informationen finden Sie in der Datadog-Dokumentation unter „Send logs“ (https://docs.datadoghq.com/api/latest/logs/#send-logs). Beispiel: https://http-intake.logs.datadoghq.com
  • outputDeadletterTopic: Das Pub/Sub-Thema als Weiterleitungsziel für nicht zustellbare Nachrichten. Beispiel: projects/<PROJECT_ID>/topics/<TOPIC_NAME>.

Optionale Parameter

  • apiKey: Der Datadog API-Schlüssel. Sie müssen diesen Wert angeben, wenn apiKeySource auf PLAINTEXT oder KMS festgelegt ist. Weitere Informationen finden Sie in der Datadog-Dokumentation unter „API- und Anwendungsschlüssel“ (https://docs.datadoghq.com/account_management/api-app-keys/).
  • batchCount: Die Batchgröße zum Senden mehrerer Ereignisse an Datadog. Die Standardeinstellung ist 1 (keine Batchverarbeitung).
  • parallelism: Die maximale Anzahl an parallelen Anfragen. Die Standardeinstellung ist 1 (keine Parallelität).
  • includePubsubMessage: Gibt an, ob die vollständige Pub/Sub-Nachricht in die Nutzlast aufgenommen werden soll. Der Standardwert ist true (alle Elemente, einschließlich des Datenelements, sind in der Nutzlast enthalten).
  • apiKeyKMSEncryptionKey: Der Cloud KMS-Schlüssel, der zum Entschlüsseln des API-Schlüssels verwendet werden soll. Sie müssen diesen Parameter angeben, wenn apiKeySource auf KMS festgelegt ist. Wenn der Cloud KMS-Schlüssel bereitgestellt wird, müssen Sie einen verschlüsselten API-Schlüssel übergeben. Beispiel: projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name
  • apiKeySecretId: Die geheime ID des Secret Manager für den API-Schlüssel. Sie müssen diesen Parameter angeben, wenn apiKeySource auf SECRET_MANAGER festgelegt ist. Beispiel: projects/your-project-id/secrets/your-secret/versions/your-secret-version
  • apiKeySource: Die Quelle des API-Schlüssels. Die folgenden Werte werden unterstützt: PLAINTEXT, KMS und SECRET_MANAGER. Sie müssen diesen Parameter angeben, wenn Sie Secret Manager verwenden. Wenn apiKeySource auf KMS gesetzt ist, müssen Sie auch apiKeyKMSEncryptionKey und verschlüsseltes API Key angeben. Wenn apiKeySource auf SECRET_MANAGER gesetzt ist, müssen Sie auch apiKeySecretId angeben. Wenn apiKeySource auf PLAINTEXT gesetzt ist, müssen Sie auch apiKey angeben.
  • javascriptTextTransformGcsPath: Der Cloud Storage-URI der .js-Datei, in der die zu verwendende benutzerdefinierte JavaScript-Funktion (UDF) definiert wird. Beispiel: gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: Der Name der benutzerdefinierten JavaScript-Funktion (UDF), die verwendet werden soll. Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter „UDF-Beispiele“ (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: Definieren Sie das Intervall, in dem die Worker möglicherweise nach JavaScript-UDF-Änderungen suchen, um die Dateien neu zu laden. Die Standardeinstellung ist 0.

Benutzerdefinierte Funktion

Optional können Sie diese Vorlage erweitern, indem Sie eine benutzerdefinierte Funktion (UDF) schreiben. Die Vorlage ruft die UDF für jedes Eingabeelement auf. Nutzlasten von Elementen werden als JSON-Strings serialisiert. Weitere Informationen finden Sie unter Benutzerdefinierte Funktionen für Dataflow-Vorlagen erstellen.

Funktionsspezifikation

UDFs haben die folgende Spezifikation:

  • Eingabe: Das Feld der Pub/Sub-Nachrichtendaten, das als JSON-String serialisiert ist.
  • Ausgabe: Die Ereignisdaten, die an den Datadog Log-Endpunkt gesendet werden sollen. Die Ausgabe muss ein String oder ein String-JSON-Objekt sein.

Führen Sie die Vorlage aus.

  1. Rufen Sie die Dataflow-Seite Job aus Vorlage erstellen auf.
  2. Zur Seite "Job aus Vorlage erstellen“
  3. Geben Sie im Feld Jobname einen eindeutigen Jobnamen ein.
  4. Optional: Wählen Sie für Regionaler Endpunkt einen Wert aus dem Drop-down-Menü aus. Die Standardregion ist us-central1.

    Eine Liste der Regionen, in denen Sie einen Dataflow-Job ausführen können, finden Sie unter Dataflow-Standorte.

  5. Wählen Sie im Drop-down-Menü Dataflow-Vorlage die Option the Pub/Sub to Datadog templateaus.
  6. Geben Sie Ihre Parameterwerte in die Parameterfelder ein.
  7. Klicken Sie auf Job ausführen.

Führen Sie die Vorlage in der Shell oder im Terminal aus:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_PubSub_to_Datadog \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
inputSubscription=projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME,\
apiKey=API_KEY,\
url=URL,\
outputDeadletterTopic=projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME,\
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
batchCount=BATCH_COUNT,\
parallelism=PARALLELISM

Ersetzen Sie dabei Folgendes:

  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • REGION_NAME: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
  • INPUT_SUBSCRIPTION_NAME: der Name des Pub/Sub-Abos
  • API_KEY: Datadog API-Schlüssel
  • URL: die URL für den Datadog-Endpunkt (z. B. https://http-intake.logs.datadoghq.com)
  • DEADLETTER_TOPIC_NAME: der Name des Pub/Sub-Themas
  • JAVASCRIPT_FUNCTION: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.

    Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

  • PATH_TO_JAVASCRIPT_UDF_FILE Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT: die Batchgröße zum Senden mehrerer Ereignisse an Datadog
  • PARALLELISM: die Anzahl der parallelen Anfragen, die zum Senden von Ereignissen an Splunk verwendet werden sollen

Senden Sie eine HTTP-POST-Anfrage, um die Vorlage mithilfe der REST API auszuführen. Weitere Informationen zur API und ihren Autorisierungsbereichen finden Sie unter projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_PubSub_to_Datadog
{
   "jobName": "JOB_NAME",
   "environment": {
       "ipConfiguration": "WORKER_IP_UNSPECIFIED",
       "additionalExperiments": []
   },
   "parameters": {
       "inputSubscription": "projects/PROJECT_ID/subscriptions/INPUT_SUBSCRIPTION_NAME",
       "apiKey": "API_KEY",
       "url": "URL",
       "outputDeadletterTopic": "projects/PROJECT_ID/topics/DEADLETTER_TOPIC_NAME",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "batchCount": "BATCH_COUNT",
       "parallelism": "PARALLELISM"
   }
}

Ersetzen Sie dabei Folgendes:

  • PROJECT_ID: die ID des Google Cloud-Projekts, in dem Sie den Dataflow-Job ausführen möchten
  • JOB_NAME: ein eindeutiger Jobname Ihrer Wahl
  • LOCATION: die Region, in der Sie Ihren Dataflow-Job bereitstellen möchten, z. B. us-central1
  • VERSION: Die Version der Vorlage, die Sie verwenden möchten

    Sie können die folgenden Werte verwenden:

    • latest zur Verwendung der neuesten Version der Vorlage, die im nicht datierten übergeordneten Ordner im Bucket verfügbar ist: gs://dataflow-templates-REGION_NAME/latest/
    • Den Versionsnamen wie 2023-09-12-00_RC00, um eine bestimmte Version der Vorlage zu verwenden. Diese ist verschachtelt im jeweiligen datierten übergeordneten Ordner im Bucket enthalten: gs://dataflow-templates-REGION_NAME/.
  • STAGING_LOCATION: der Speicherort für das Staging lokaler Dateien (z. B. gs://your-bucket/staging)
  • INPUT_SUBSCRIPTION_NAME: der Name des Pub/Sub-Abos
  • API_KEY: Datadog API-Schlüssel
  • URL: die URL für den Datadog-Endpunkt (z. B. https://http-intake.logs.datadoghq.com)
  • DEADLETTER_TOPIC_NAME: der Name des Pub/Sub-Themas
  • JAVASCRIPT_FUNCTION: ist der Name der benutzerdefinierten JavaScript-Funktion (UDF), die Sie verwenden möchten.

    Wenn Ihre JavaScript-Funktion beispielsweise myTransform(inJson) { /*...do stuff...*/ } ist, lautet der Funktionsname myTransform. Beispiele für JavaScript-UDFs finden Sie unter UDF-Beispiele.

  • PATH_TO_JAVASCRIPT_UDF_FILE Der Cloud Storage-URI der Datei .js, in der die benutzerdefinierte JavaScript-Funktion (UDF) definiert wird, die Sie verwenden möchten. Beispiel: gs://my-bucket/my-udfs/my_file.js
  • BATCH_COUNT: die Batchgröße zum Senden mehrerer Ereignisse an Datadog
  • PARALLELISM: die Anzahl der parallelen Anfragen, die zum Senden von Ereignissen an Splunk verwendet werden sollen
Java
/*
 * Copyright (C) 2019 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.cloud.teleport.coders.FailsafeElementCoder;
import com.google.cloud.teleport.datadog.DatadogEvent;
import com.google.cloud.teleport.datadog.DatadogEventCoder;
import com.google.cloud.teleport.datadog.DatadogIO;
import com.google.cloud.teleport.datadog.DatadogWriteError;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.templates.PubSubToDatadog.PubSubToDatadogOptions;
import com.google.cloud.teleport.templates.common.DatadogConverters;
import com.google.cloud.teleport.templates.common.DatadogConverters.DatadogOptions;
import com.google.cloud.teleport.templates.common.ErrorConverters;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.templates.common.PubsubConverters.PubsubReadSubscriptionOptions;
import com.google.cloud.teleport.templates.common.PubsubConverters.PubsubWriteDeadletterTopicOptions;
import com.google.cloud.teleport.util.DatadogApiKeyNestedValueProvider;
import com.google.cloud.teleport.values.FailsafeElement;
import com.google.common.annotations.VisibleForTesting;
import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.google.gson.JsonSyntaxException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link PubSubToDatadog} pipeline is a streaming pipeline which ingests data from Cloud
 * Pub/Sub, executes a UDF, converts the output to {@link DatadogEvent}s and writes those records
 * into Datadog's Logs API. Any errors which occur in the execution of the UDF, conversion to {@link
 * DatadogEvent} or writing to Logs API will be streamed into a Pub/Sub topic.
 *
 * <p><b>Pipeline Requirements</b>
 *
 * <ul>
 *   <li>The source Pub/Sub subscription exists.
 *   <li>Logs API is routable from the VPC where the Dataflow job executes.
 *   <li>Deadletter topic exists.
 * </ul>
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_PubSub_to_Datadog.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_PubSub_to_Datadog",
    category = TemplateCategory.STREAMING,
    displayName = "Pub/Sub to Datadog",
    description = {
      "The Pub/Sub to Datadog template is a streaming pipeline that reads messages from a Pub/Sub subscription and writes the message payload to Datadog by using a Datadog endpoint. The most common use case for this template is to export log files to Datadog. "
          + "For more information check out <a href=\"https://docs.datadoghq.com/integrations/google_cloud_platform/?tab=project#log-collection\">Datadog's log collection process</a>.\n",
      "Before writing to Datadog, you can apply a JavaScript user-defined function to the message payload. "
          + "Any messages that experience processing failures are forwarded to a Pub/Sub unprocessed topic for further troubleshooting and reprocessing.\n",
      "As an extra layer of protection for your API keys and secrets, you can also pass in a Cloud KMS key along with the base64-encoded API key parameter encrypted with the Cloud KMS key. For additional details about encrypting your API key parameter, see the <a href=\"https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt\">Cloud KMS API encryption endpoint</a>."
    },
    optionsClass = PubSubToDatadogOptions.class,
    optionsOrder = {
      PubsubReadSubscriptionOptions.class,
      DatadogOptions.class,
      JavascriptTextTransformerOptions.class,
      PubsubWriteDeadletterTopicOptions.class
    },
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-datadog",
    contactInformation = "https://cloud.google.com/support",
    preview = true,
    requirements = {
      "The source Pub/Sub subscription must exist prior to running the pipeline.",
      "The Pub/Sub unprocessed topic must exist prior to running the pipeline.",
      "The Datadog URL must be accessible from the network of the Dataflow workers.",
      "The Datadog API key must be generated and available."
    },
    streaming = true,
    supportsAtLeastOnce = true)
public class PubSubToDatadog {

  /** String/String Coder for FailsafeElement. */
  public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  /** Counter to track inbound messages from source. */
  private static final Counter INPUT_MESSAGES_COUNTER =
      Metrics.counter(PubSubToDatadog.class, "inbound-pubsub-messages");

  /** The tag for successful {@link DatadogEvent} conversion. */
  private static final TupleTag<DatadogEvent> DATADOG_EVENT_OUT = new TupleTag<DatadogEvent>() {};

  /** The tag for failed {@link DatadogEvent} conversion. */
  private static final TupleTag<FailsafeElement<String, String>> DATADOG_EVENT_DEADLETTER_OUT =
      new TupleTag<FailsafeElement<String, String>>() {};

  /** The tag for the main output for the UDF. */
  private static final TupleTag<FailsafeElement<String, String>> UDF_OUT =
      new TupleTag<FailsafeElement<String, String>>() {};

  /** The tag for the dead-letter output of the udf. */
  private static final TupleTag<FailsafeElement<String, String>> UDF_DEADLETTER_OUT =
      new TupleTag<FailsafeElement<String, String>>() {};

  /** GSON to process a {@link PubsubMessage}. */
  private static final Gson GSON = new Gson();

  /** Logger for class. */
  private static final Logger LOG = LoggerFactory.getLogger(PubSubToDatadog.class);

  private static final Boolean DEFAULT_INCLUDE_PUBSUB_MESSAGE = true;

  @VisibleForTesting protected static final String PUBSUB_MESSAGE_ATTRIBUTE_FIELD = "attributes";
  @VisibleForTesting protected static final String PUBSUB_MESSAGE_DATA_FIELD = "data";
  private static final String PUBSUB_MESSAGE_ID_FIELD = "messageId";

  /**
   * The main entry-point for pipeline execution. This method will start the pipeline but will not
   * wait for it's execution to finish. If blocking execution is required, use the {@link
   * PubSubToDatadog#run(PubSubToDatadogOptions)} method to start the pipeline and invoke {@code
   * result.waitUntilFinish()} on the {@link PipelineResult}.
   *
   * @param args The command-line args passed by the executor.
   */
  public static void main(String[] args) {

    PubSubToDatadogOptions options =
        PipelineOptionsFactory.fromArgs(args).withValidation().as(PubSubToDatadogOptions.class);

    run(options);
  }

  /**
   * Runs the pipeline to completion with the specified options. This method does not wait until the
   * pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
   * object to block until the pipeline is finished running if blocking programmatic execution is
   * required.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  public static PipelineResult run(PubSubToDatadogOptions options) {

    Pipeline pipeline = Pipeline.create(options);

    // Register coders.
    CoderRegistry registry = pipeline.getCoderRegistry();
    registry.registerCoderForClass(DatadogEvent.class, DatadogEventCoder.of());
    registry.registerCoderForType(
        FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);

    /*
     * Steps:
     *  1) Read messages in from Pub/Sub
     *  2) Convert message to FailsafeElement for processing.
     *  3) Apply user provided UDF (if any) on the input strings.
     *  4) Convert successfully transformed messages into DatadogEvent objects
     *  5) Write DatadogEvents to Datadog's Logs API.
     *  5a) Wrap write failures into a FailsafeElement.
     *  6) Collect errors from UDF transform (#3), DatadogEvent transform (#4)
     *     and writing to Datadog Logs API (#5) and stream into a Pub/Sub deadletter topic.
     */

    // 1) Read messages in from Pub/Sub
    PCollection<String> stringMessages =
        pipeline.apply(
            "ReadMessages",
            new ReadMessages(options.getInputSubscription(), options.getIncludePubsubMessage()));

    // 2) Convert message to FailsafeElement for processing.
    PCollectionTuple transformedOutput =
        stringMessages
            .apply(
                "ConvertToFailsafeElement",
                MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                    .via(input -> FailsafeElement.of(input, input)))

            // 3) Apply user provided UDF (if any) on the input strings.
            .apply(
                "ApplyUDFTransformation",
                FailsafeJavascriptUdf.<String>newBuilder()
                    .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                    .setFunctionName(options.getJavascriptTextTransformFunctionName())
                    .setReloadIntervalMinutes(
                        options.getJavascriptTextTransformReloadIntervalMinutes())
                    .setLoggingEnabled(ValueProvider.StaticValueProvider.of(true))
                    .setSuccessTag(UDF_OUT)
                    .setFailureTag(UDF_DEADLETTER_OUT)
                    .build());

    // 4) Convert successfully transformed messages into DatadogEvent objects
    PCollectionTuple convertToEventTuple =
        transformedOutput
            .get(UDF_OUT)
            .apply(
                "ConvertToDatadogEvent",
                DatadogConverters.failsafeStringToDatadogEvent(
                    DATADOG_EVENT_OUT, DATADOG_EVENT_DEADLETTER_OUT));

    // 5) Write DatadogEvents to Datadog's Logs API.
    PCollection<DatadogWriteError> writeErrors =
        convertToEventTuple
            .get(DATADOG_EVENT_OUT)
            .apply(
                "WriteToDatadog",
                DatadogIO.writeBuilder()
                    .withApiKey(
                        new DatadogApiKeyNestedValueProvider(
                            options.getApiKeySecretId(),
                            options.getApiKeyKMSEncryptionKey(),
                            options.getApiKey(),
                            options.getApiKeySource()))
                    .withUrl(options.getUrl())
                    .withBatchCount(options.getBatchCount())
                    .withParallelism(options.getParallelism())
                    .build());

    // 5a) Wrap write failures into a FailsafeElement.
    PCollection<FailsafeElement<String, String>> wrappedDatadogWriteErrors =
        writeErrors.apply(
            "WrapDatadogWriteErrors",
            ParDo.of(
                new DoFn<DatadogWriteError, FailsafeElement<String, String>>() {

                  @ProcessElement
                  public void processElement(ProcessContext context) {
                    DatadogWriteError error = context.element();
                    FailsafeElement<String, String> failsafeElement =
                        FailsafeElement.of(error.payload(), error.payload());

                    if (error.statusMessage() != null) {
                      failsafeElement.setErrorMessage(error.statusMessage());
                    }

                    if (error.statusCode() != null) {
                      failsafeElement.setErrorMessage(
                          String.format("Datadog write status code: %d", error.statusCode()));
                    }
                    context.output(failsafeElement);
                  }
                }));

    // 6) Collect errors from UDF transform (#4), DatadogEvent transform (#5)
    //     and writing to Datadog Logs API (#6) and stream into a Pub/Sub deadletter topic.
    PCollectionList.of(
            ImmutableList.of(
                convertToEventTuple.get(DATADOG_EVENT_DEADLETTER_OUT),
                wrappedDatadogWriteErrors,
                transformedOutput.get(UDF_DEADLETTER_OUT)))
        .apply("FlattenErrors", Flatten.pCollections())
        .apply(
            "WriteFailedRecords",
            ErrorConverters.WriteStringMessageErrorsToPubSub.newBuilder()
                .setErrorRecordsTopic(options.getOutputDeadletterTopic())
                .build());

    return pipeline.run();
  }

  /**
   * The {@link PubSubToDatadogOptions} class provides the custom options passed by the executor at
   * the command line.
   */
  public interface PubSubToDatadogOptions
      extends DatadogOptions,
          PubsubReadSubscriptionOptions,
          PubsubWriteDeadletterTopicOptions,
          JavascriptTextTransformerOptions {}

  /**
   * A {@link PTransform} that reads messages from a Pub/Sub subscription, increments a counter and
   * returns a {@link PCollection} of {@link String} messages.
   */
  private static class ReadMessages extends PTransform<PBegin, PCollection<String>> {
    private final ValueProvider<String> subscriptionName;
    private final ValueProvider<Boolean> inputIncludePubsubMessageFlag;
    private Boolean includePubsubMessage;

    ReadMessages(
        ValueProvider<String> subscriptionName,
        ValueProvider<Boolean> inputIncludePubsubMessageFlag) {
      this.subscriptionName = subscriptionName;
      this.inputIncludePubsubMessageFlag = inputIncludePubsubMessageFlag;
    }

    @Override
    public PCollection<String> expand(PBegin input) {
      return input
          .apply(
              "ReadPubsubMessage",
              PubsubIO.readMessagesWithAttributes().fromSubscription(subscriptionName))
          .apply(
              "ExtractMessageIfRequired",
              ParDo.of(
                  new DoFn<PubsubMessage, String>() {

                    @Setup
                    public void setup() {
                      if (inputIncludePubsubMessageFlag != null) {
                        includePubsubMessage = inputIncludePubsubMessageFlag.get();
                      }
                      includePubsubMessage =
                          MoreObjects.firstNonNull(
                              includePubsubMessage, DEFAULT_INCLUDE_PUBSUB_MESSAGE);
                      LOG.info("includePubsubMessage set to: {}", includePubsubMessage);
                    }

                    @ProcessElement
                    public void processElement(ProcessContext context) {
                      if (includePubsubMessage) {
                        context.output(formatPubsubMessage(context.element()));
                      } else {
                        context.output(
                            new String(context.element().getPayload(), StandardCharsets.UTF_8));
                      }
                    }
                  }))
          .apply(
              "CountMessages",
              ParDo.of(
                  new DoFn<String, String>() {
                    @ProcessElement
                    public void processElement(ProcessContext context) {
                      INPUT_MESSAGES_COUNTER.inc();
                      context.output(context.element());
                    }
                  }));
    }
  }

  /**
   * Utility method that formats {@link org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage} according
   * to the model defined in {@link com.google.pubsub.v1.PubsubMessage}.
   *
   * @param pubsubMessage {@link org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage}
   * @return JSON String that adheres to the model defined in {@link
   *     com.google.pubsub.v1.PubsubMessage}
   */
  @VisibleForTesting
  protected static String formatPubsubMessage(PubsubMessage pubsubMessage) {
    JsonObject messageJson = new JsonObject();

    String payload = new String(pubsubMessage.getPayload(), StandardCharsets.UTF_8);
    try {
      JsonObject data = GSON.fromJson(payload, JsonObject.class);
      messageJson.add(PUBSUB_MESSAGE_DATA_FIELD, data);
    } catch (JsonSyntaxException e) {
      messageJson.addProperty(PUBSUB_MESSAGE_DATA_FIELD, payload);
    }

    JsonObject attributes = getAttributesJson(pubsubMessage.getAttributeMap());
    messageJson.add(PUBSUB_MESSAGE_ATTRIBUTE_FIELD, attributes);

    if (pubsubMessage.getMessageId() != null) {
      messageJson.addProperty(PUBSUB_MESSAGE_ID_FIELD, pubsubMessage.getMessageId());
    }

    return messageJson.toString();
  }

  /**
   * Constructs a {@link JsonObject} from a {@link Map} of Pub/Sub attributes.
   *
   * @param attributesMap {@link Map} of Pub/Sub attributes
   * @return {@link JsonObject} of Pub/Sub attributes
   */
  private static JsonObject getAttributesJson(Map<String, String> attributesMap) {
    JsonObject attributesJson = new JsonObject();
    for (String key : attributesMap.keySet()) {
      attributesJson.addProperty(key, attributesMap.get(key));
    }

    return attributesJson;
  }
}

Nächste Schritte