Template Pub/Sub ke Elasticsearch

Template Pub/Sub ke Elasticsearch adalah pipeline streaming yang membaca pesan dari langganan Pub/Sub, mengeksekusi fungsi yang ditentukan pengguna (UDF), dan menulisnya ke Elasticsearch sebagai dokumen. Template Dataflow menggunakan fitur aliran data Elasticsearch untuk menyimpan data deret waktu di beberapa indeks sekaligus memberi Anda satu resource bernama untuk permintaan. Aliran data sangat cocok untuk log, metrik, rekaman aktivitas, dan data lain yang terus dihasilkan dan disimpan di Pub/Sub.

Template membuat aliran data bernama logs-gcp.DATASET-NAMESPACE, dengan:

  • DATASET adalah nilai parameter template dataset, atau pubsub jika tidak ditentukan.
  • NAMESPACE adalah nilai parameter template namespace, atau default jika tidak ditentukan.

Persyaratan pipeline

  • Langganan Pub/Sub sumber harus ada dan pesan harus dienkode dalam format JSON yang valid.
  • Host Elasticsearch yang dapat dijangkau secara publik di instance Google Cloud atau di Elastic Cloud dengan Elasticsearch versi 7.0 atau yang lebih baru. Lihat Integrasi Google Cloud untuk Elastic untuk mengetahui detail selengkapnya.
  • Topik Pub/Sub untuk output error.

Parameter template

Parameter yang diperlukan

  • inputSubscription: Langganan Pub/Sub untuk menggunakan input. Contoh, projects/<PROJECT_ID>/subscriptions/<SUBSCRIPTION_NAME>.
  • errorOutputTopic: Topik output Pub/Sub untuk memublikasikan data yang gagal, dalam format projects/<PROJECT_ID>/topics/<TOPIC_NAME>.
  • connectionUrl: URL Elasticsearch dalam format https://hostname:[port]. Jika menggunakan Elastic Cloud, tentukan CloudID. Misalnya, https://elasticsearch-host:9200.
  • apiKey: Kunci API yang dienkode Base64 untuk digunakan dalam autentikasi.

Parameter opsional

  • set data: Jenis log yang dikirim menggunakan Pub/Sub, yang memiliki dasbor siap pakai. Nilai jenis log yang diketahui adalah audit, vpcflow, dan firewall. Default-nya adalah: pubsub.
  • namespace: Pengelompokan arbitrer, seperti lingkungan (dev, prod, atau qa), tim, atau unit bisnis strategis. Default-nya adalah: default.
  • elasticsearchTemplateVersion: ID Versi Template Dataflow, biasanya ditentukan oleh Google Cloud. Default-nya adalah: 1.0.0.
  • javascriptTextTransformGcsPath: URI Cloud Storage file .js yang menentukan fungsi yang ditentukan pengguna (UDF) JavaScript yang akan digunakan. Contoh, gs://my-bucket/my-udfs/my_file.js.
  • javascriptTextTransformFunctionName: Nama fungsi yang ditentukan pengguna (UDF) JavaScript yang akan digunakan. Misalnya, jika kode fungsi JavaScript Anda adalah myTransform(inJson) { /*...do stuff...*/ }, nama fungsinya adalah myTransform. Untuk contoh UDF JavaScript, lihat Contoh UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples).
  • javascriptTextTransformReloadIntervalMinutes: Menentukan seberapa sering UDF dimuat ulang, dalam hitungan menit. Jika nilainya lebih besar dari 0, Dataflow akan memeriksa file UDF di Cloud Storage secara berkala, dan memuat ulang UDF jika file diubah. Parameter ini memungkinkan Anda mengupdate UDF saat pipeline berjalan, tanpa perlu memulai ulang tugas. Jika nilainya adalah 0, pemuatan ulang UDF akan dinonaktifkan. Nilai defaultnya adalah 0.
  • elasticsearchUsername: Nama pengguna Elasticsearch yang akan digunakan untuk mengautentikasi. Jika ditentukan, nilai apiKey akan diabaikan.
  • elasticsearchPassword: Sandi Elasticsearch yang akan digunakan untuk melakukan autentikasi. Jika ditentukan, nilai apiKey akan diabaikan.
  • batchSize: Ukuran batch dalam jumlah dokumen. Default-nya adalah 1000.
  • batchSizeBytes: Ukuran batch dalam jumlah byte. Default-nya adalah 5242880 (5 mb).
  • maxRetryAttempts: Jumlah maksimum percobaan ulang. Harus lebih besar dari nol. Default-nya adalah no retries.
  • maxRetryDuration: Durasi percobaan ulang maksimum dalam milidetik. Harus lebih besar dari nol. Default-nya adalah no retries.
  • propertyAsIndex: Properti dalam dokumen yang diindeks yang nilainya menentukan metadata _index yang akan disertakan dengan dokumen dalam permintaan massal. Lebih diutamakan daripada UDF _index. Setelan defaultnya adalah none.
  • javaScriptIndexFnGcsPath: Jalur Cloud Storage ke sumber UDF JavaScript untuk fungsi yang menentukan metadata _index yang akan disertakan dengan dokumen dalam permintaan massal. Setelan defaultnya adalah none.
  • javaScriptIndexFnName: Nama fungsi JavaScript UDF yang menentukan metadata _index yang akan disertakan dengan dokumen dalam permintaan massal. Setelan defaultnya adalah none.
  • propertyAsId: Properti dalam dokumen yang diindeks yang nilainya menentukan metadata _id yang akan disertakan dengan dokumen dalam permintaan massal. Lebih diutamakan daripada UDF _id. Setelan defaultnya adalah none.
  • javaScriptIdFnGcsPath: Jalur Cloud Storage ke sumber UDF JavaScript untuk fungsi yang menentukan metadata _id yang akan disertakan dengan dokumen dalam permintaan massal. Setelan defaultnya adalah none.
  • javaScriptIdFnName: Nama fungsi JavaScript UDF yang menentukan metadata _id yang akan disertakan dengan dokumen dalam permintaan massal. Setelan defaultnya adalah none.
  • javaScriptTypeFnGcsPath: Jalur Cloud Storage ke sumber UDF JavaScript untuk fungsi yang menentukan metadata _type yang akan disertakan dengan dokumen dalam permintaan massal. Setelan defaultnya adalah none.
  • javaScriptTypeFnName: Nama fungsi JavaScript UDF yang menentukan metadata _type yang akan disertakan dengan dokumen dalam permintaan massal. Setelan defaultnya adalah none.
  • javaScriptIsDeleteFnGcsPath: Jalur Cloud Storage ke sumber UDF JavaScript untuk fungsi yang menentukan apakah akan menghapus dokumen, bukan menyisipkan atau memperbaruinya. Fungsi ini menampilkan nilai string true atau false. Setelan defaultnya adalah none.
  • javaScriptIsDeleteFnName: Nama fungsi JavaScript UDF yang menentukan apakah akan menghapus dokumen, bukan menyisipkan atau memperbaruinya. Fungsi ini menampilkan nilai string true atau false. Setelan defaultnya adalah none.
  • usePartialUpdate: Apakah akan menggunakan pembaruan sebagian (memperbarui, bukan membuat atau mengindeks, yang memungkinkan dokumen sebagian) dengan permintaan Elasticsearch. Default-nya adalah false.
  • bulkInsertMethod: Apakah akan menggunakan INDEX (indeks, memungkinkan upsert) atau CREATE (buat, error pada _id duplikat) dengan permintaan massal Elasticsearch. Setelan defaultnya adalah CREATE.
  • trustSelfSignedCerts: Apakah akan memercayai sertifikat yang ditandatangani sendiri atau tidak. Instance Elasticsearch yang diinstal mungkin memiliki sertifikat yang ditandatangani sendiri. Aktifkan ini ke benar untuk mengabaikan validasi pada sertifikat SSL. (Default: false).
  • disableCertificateValidation: Jika true, percayai sertifikat SSL yang ditandatangani sendiri. Instance Elasticsearch mungkin memiliki sertifikat yang ditandatangani sendiri. Untuk mengabaikan validasi sertifikat, tetapkan parameter ini ke true. Setelan defaultnya adalah false.
  • apiKeyKMSEncryptionKey: Kunci Cloud KMS untuk mendekripsi kunci API. Parameter ini diperlukan jika apiKeySource ditetapkan ke KMS. Jika parameter ini diberikan, teruskan string apiKey terenkripsi. Mengenkripsi parameter menggunakan endpoint enkripsi KMS API. Untuk kunci, gunakan format projects/<PROJECT_ID>/locations/<KEY_REGION>/keyRings/<KEY_RING>/cryptoKeys/<KMS_KEY_NAME>. Lihat: https://cloud.google.com/kms/docs/reference/rest/v1/projects.locations.keyRings.cryptoKeys/encrypt Misalnya, projects/your-project-id/locations/global/keyRings/your-keyring/cryptoKeys/your-key-name.
  • apiKeySecretId: ID secret Secret Manager untuk apiKey. Jika apiKeySource ditetapkan ke SECRET_MANAGER, berikan parameter ini. Gunakan format projects/<PROJECT_ID>/secrets/<SECRET_ID>/versions/<SECRET_VERSION>. For example, projects/your-project-id/secrets/your-secret/versions/your-secret-version`.
  • apiKeySource: Sumber kunci API. Nilai yang diizinkan adalah PLAINTEXT, KMS, atau SECRET_MANAGER. Parameter ini diperlukan saat Anda menggunakan Secret Manager atau KMS. Jika apiKeySource ditetapkan ke KMS, apiKeyKMSEncryptionKey dan apiKey terenkripsi harus diberikan. Jika apiKeySource ditetapkan ke SECRET_MANAGER, apiKeySecretId harus diberikan. Jika apiKeySource ditetapkan ke PLAINTEXT, apiKey harus diberikan. Setelan defaultnya adalah: PLAINTEXT.
  • socketTimeout: Jika ditetapkan, akan menimpa waktu tunggu percobaan ulang maksimum default dan waktu tunggu soket default (30.000 md) di Elastic RestClient.

Fungsi yang ditentukan pengguna (UDF)

Template ini mendukung fungsi yang ditentukan pengguna (UDF) di beberapa titik dalam pipeline, yang dijelaskan di bawah. Untuk informasi selengkapnya, lihat Membuat fungsi yang ditentukan pengguna untuk template Dataflow.

Fungsi transformasi teks

Mengubah pesan Pub/Sub menjadi dokumen Elasticsearch.

Parameter template:

  • javascriptTextTransformGcsPath: URI Cloud Storage file JavaScript.
  • javascriptTextTransformFunctionName: nama fungsi JavaScript.

Spesifikasi fungsi:

  • Input: kolom data pesan Pub/Sub, yang diserialisasi sebagai string JSON.
  • Output: dokumen JSON yang di-string untuk disisipkan ke dalam Elasticsearch.

Fungsi indeks

Menampilkan indeks tempat dokumen berada.

Parameter template:

  • javaScriptIndexFnGcsPath: URI Cloud Storage dari file JavaScript.
  • javaScriptIndexFnName: nama fungsi JavaScript.

Spesifikasi fungsi:

  • Input: dokumen Elasticsearch, yang diserialisasi sebagai string JSON.
  • Output: nilai kolom metadata _index dokumen.

Fungsi ID dokumen

Menampilkan ID dokumen.

Parameter template:

  • javaScriptIdFnGcsPath: URI Cloud Storage dari file JavaScript.
  • javaScriptIdFnName: nama fungsi JavaScript.

Spesifikasi fungsi:

  • Input: dokumen Elasticsearch, yang diserialisasi sebagai string JSON.
  • Output: nilai kolom metadata _id dokumen.

Fungsi penghapusan dokumen

Menentukan apakah akan menghapus dokumen atau tidak. Untuk menggunakan fungsi ini, tetapkan mode penyisipan massal ke INDEX dan berikan fungsi ID dokumen.

Parameter template:

  • javaScriptIsDeleteFnGcsPath: URI Cloud Storage dari file JavaScript.
  • javaScriptIsDeleteFnName: nama fungsi JavaScript.

Spesifikasi fungsi:

  • Input: dokumen Elasticsearch, yang diserialisasi sebagai string JSON.
  • Output: menampilkan string "true" untuk menghapus dokumen, atau "false" untuk memperbarui dan menyisipkan dokumen.

Fungsi jenis pemetaan

Menampilkan jenis pemetaan dokumen.

Parameter template:

  • javaScriptTypeFnGcsPath: URI Cloud Storage dari file JavaScript.
  • javaScriptTypeFnName: nama fungsi JavaScript.

Spesifikasi fungsi:

  • Input: dokumen Elasticsearch, yang diserialisasi sebagai string JSON.
  • Output: nilai kolom metadata _type dokumen.

Menjalankan template

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Pub/Sub to Elasticsearch template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

Di shell atau terminal, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/PubSub_to_Elasticsearch_Flex \
    --parameters \
inputSubscription=SUBSCRIPTION_NAME,\
connectionUrl=CONNECTION_URL,\
dataset=DATASET,\
namespace=NAMESPACE,\
apiKey=APIKEY,\
errorOutputTopic=ERROR_OUTPUT_TOPIC
  

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • ERROR_OUTPUT_TOPIC: topik Pub/Sub Anda untuk output error
  • SUBSCRIPTION_NAME: nama langganan Pub/Sub Anda
  • CONNECTION_URL: URL Elasticsearch Anda
  • DATASET: jenis log Anda
  • NAMESPACE: namespace Anda untuk set data
  • APIKEY: kunci API Anda yang dienkode base64 untuk autentikasi

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputSubscription": "SUBSCRIPTION_NAME",
          "connectionUrl": "CONNECTION_URL",
          "dataset": "DATASET",
          "namespace": "NAMESPACE",
          "apiKey": "APIKEY",
          "errorOutputTopic": "ERROR_OUTPUT_TOPIC"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/PubSub_to_Elasticsearch_Flex",
   }
}
  

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • ERROR_OUTPUT_TOPIC: topik Pub/Sub Anda untuk output error
  • SUBSCRIPTION_NAME: nama langganan Pub/Sub Anda
  • CONNECTION_URL: URL Elasticsearch Anda
  • DATASET: jenis log Anda
  • NAMESPACE: namespace Anda untuk set data
  • APIKEY: kunci API Anda yang dienkode base64 untuk autentikasi
Java
/*
 * Copyright (C) 2021 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.elasticsearch.templates;

import com.google.cloud.teleport.metadata.MultiTemplate;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.elasticsearch.options.PubSubToElasticsearchOptions;
import com.google.cloud.teleport.v2.elasticsearch.transforms.FailedPubsubMessageToPubsubTopicFn;
import com.google.cloud.teleport.v2.elasticsearch.transforms.ProcessEventMetadata;
import com.google.cloud.teleport.v2.elasticsearch.transforms.PubSubMessageToJsonDocument;
import com.google.cloud.teleport.v2.elasticsearch.transforms.WriteToElasticsearch;
import com.google.cloud.teleport.v2.elasticsearch.utils.ElasticsearchIndex;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessageWithAttributesCoder;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link PubSubToElasticsearch} pipeline is a streaming pipeline which ingests data in JSON
 * format from PubSub, applies a Javascript UDF if provided and writes the resulting records to
 * Elasticsearch. If the element fails to be processed then it is written to an error output table
 * in BigQuery.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-elasticsearch/README_PubSub_to_Elasticsearch.md">README</a>
 * for instructions on how to use or modify this template.
 */
@MultiTemplate({
  @Template(
      name = "PubSub_to_Elasticsearch_Flex",
      category = TemplateCategory.STREAMING,
      displayName = "Pub/Sub to Elasticsearch",
      description = {
        "The Pub/Sub to Elasticsearch template is a streaming pipeline that reads messages from a Pub/Sub subscription, executes a user-defined function (UDF), and writes them to Elasticsearch as documents. "
            + "The Dataflow template uses Elasticsearch's <a href=\"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html\">data streams</a> feature to store time series data across multiple indices while giving you a single named resource for requests. "
            + "Data streams are well-suited for logs, metrics, traces, and other continuously generated data stored in Pub/Sub.\n",
        "The template creates a datastream named <code>logs-gcp.DATASET-NAMESPACE</code>, where:\n"
            + "- <code>DATASET</code> is the value of the <code>dataset</code> template parameter, or <code>pubsub</code> if not specified.\n"
            + "- <code>NAMESPACE</code> is the value of the <code>namespace</code> template parameter, or <code>default</code> if not specified."
      },
      optionsClass = PubSubToElasticsearchOptions.class,
      skipOptions = {
        "index",
        "pythonExternalTextTransformGcsPath",
        "pythonExternalTextTransformFunctionName",
      }, // Template just ignores what is sent as "index"
      flexContainerName = "pubsub-to-elasticsearch",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-elasticsearch",
      contactInformation = "https://cloud.google.com/support",
      preview = true,
      requirements = {
        "The source Pub/Sub subscription must exist and the messages must be encoded in a valid JSON format.",
        "A publicly reachable Elasticsearch host on a Google Cloud instance or on Elastic Cloud with Elasticsearch version 7.0 or above. See <a href=\"https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-elasticsearch/docs/PubSubToElasticsearch/README.md#google-cloud-integration-for-elastic\">Google Cloud Integration for Elastic</a> for more details.",
        "A Pub/Sub topic for error output.",
      },
      streaming = true,
      supportsAtLeastOnce = true),
  @Template(
      name = "PubSub_to_Elasticsearch_Xlang",
      category = TemplateCategory.STREAMING,
      displayName = "Pub/Sub to Elasticsearch With Python UDFs",
      type = Template.TemplateType.XLANG,
      description = {
        "The Pub/Sub to Elasticsearch template is a streaming pipeline that reads messages from a Pub/Sub subscription, executes a Python user-defined function (UDF), and writes them to Elasticsearch as documents. "
            + "The Dataflow template uses Elasticsearch's <a href=\"https://www.elastic.co/guide/en/elasticsearch/reference/master/data-streams.html\">data streams</a> feature to store time series data across multiple indices while giving you a single named resource for requests. "
            + "Data streams are well-suited for logs, metrics, traces, and other continuously generated data stored in Pub/Sub.\n",
        "The template creates a datastream named <code>logs-gcp.DATASET-NAMESPACE</code>, where:\n"
            + "- <code>DATASET</code> is the value of the <code>dataset</code> template parameter, or <code>pubsub</code> if not specified.\n"
            + "- <code>NAMESPACE</code> is the value of the <code>namespace</code> template parameter, or <code>default</code> if not specified."
      },
      optionsClass = PubSubToElasticsearchOptions.class,
      skipOptions = {
        "index",
        "javascriptTextTransformGcsPath",
        "javascriptTextTransformFunctionName",
        "javascriptTextTransformReloadIntervalMinutes"
      }, // Template just ignores what is sent as "index" and javascript udf as this is for python
      // udf only.
      flexContainerName = "pubsub-to-elasticsearch-xlang",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/pubsub-to-elasticsearch",
      contactInformation = "https://cloud.google.com/support",
      preview = true,
      requirements = {
        "The source Pub/Sub subscription must exist and the messages must be encoded in a valid JSON format.",
        "A publicly reachable Elasticsearch host on a Google Cloud instance or on Elastic Cloud with Elasticsearch version 7.0 or above. See <a href=\"https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-elasticsearch/docs/PubSubToElasticsearch/README.md#google-cloud-integration-for-elastic\">Google Cloud Integration for Elastic</a> for more details.",
        "A Pub/Sub topic for error output.",
      },
      streaming = true,
      supportsAtLeastOnce = true)
})
public class PubSubToElasticsearch {

  /** The tag for the main output of the json transformation. */
  public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_OUT =
      new TupleTag<FailsafeElement<PubsubMessage, String>>() {};

  /** The tag for the error output table of the json to table row transform. */
  public static final TupleTag<FailsafeElement<PubsubMessage, String>> TRANSFORM_ERROR_OUTPUT_OUT =
      new TupleTag<FailsafeElement<PubsubMessage, String>>() {};

  /** Pubsub message/string coder for pipeline. */
  public static final FailsafeElementCoder<PubsubMessage, String> CODER =
      FailsafeElementCoder.of(PubsubMessageWithAttributesCoder.of(), StringUtf8Coder.of());

  /** String/String Coder for FailsafeElement. */
  public static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  /** The log to output status messages to. */
  private static final Logger LOG = LoggerFactory.getLogger(PubSubToElasticsearch.class);

  /**
   * Main entry point for executing the pipeline.
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    // Parse the user options passed from the command-line.
    PubSubToElasticsearchOptions pubSubToElasticsearchOptions =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(PubSubToElasticsearchOptions.class);

    pubSubToElasticsearchOptions.setIndex(
        new ElasticsearchIndex(
                pubSubToElasticsearchOptions.getDataset(),
                pubSubToElasticsearchOptions.getNamespace())
            .getIndex());

    validateOptions(pubSubToElasticsearchOptions);
    run(pubSubToElasticsearchOptions);
  }

  public static void validateOptions(PubSubToElasticsearchOptions options) {
    switch (options.getApiKeySource()) {
      case "PLAINTEXT":
        return;
      case "KMS":
        // validate that the encryption key is provided.
        if (StringUtils.isEmpty(options.getApiKeyKMSEncryptionKey())) {
          throw new IllegalArgumentException(
              "If apiKeySource is set to KMS, apiKeyKMSEncryptionKey should be provided.");
        }
        return;
      case "SECRET_MANAGER":
        // validate that secretId is provided.
        if (StringUtils.isEmpty(options.getApiKeySecretId())) {
          throw new IllegalArgumentException(
              "If apiKeySource is set to SECRET_MANAGER, apiKeySecretId should be provided.");
        }
    }
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(PubSubToElasticsearchOptions options) {

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    // Register the coders for pipeline
    CoderRegistry coderRegistry = pipeline.getCoderRegistry();

    coderRegistry.registerCoderForType(
        FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor(), FAILSAFE_ELEMENT_CODER);

    coderRegistry.registerCoderForType(CODER.getEncodedTypeDescriptor(), CODER);

    /*
     * Steps: 1) Read PubSubMessage with attributes from input PubSub subscription.
     *        2) Apply Javascript UDF if provided.
     *        3) Index Json string to output ES index.
     *
     */
    LOG.info("Reading from subscription: " + options.getInputSubscription());

    PCollectionTuple convertedPubsubMessages =
        pipeline
            /*
             * Step #1: Read from a PubSub subscription.
             */
            .apply(
                "ReadPubSubSubscription",
                PubsubIO.readMessagesWithAttributes()
                    .fromSubscription(options.getInputSubscription()))
            /*
             * Step #2: Transform the PubsubMessages into Json documents.
             */
            .apply(
                "ConvertMessageToJsonDocument",
                PubSubMessageToJsonDocument.newBuilder()
                    .setJavascriptTextTransformFunctionName(
                        options.getJavascriptTextTransformFunctionName())
                    .setJavascriptTextTransformGcsPath(options.getJavascriptTextTransformGcsPath())
                    .setPythonExternalTextTransformGcsPath(
                        options.getPythonExternalTextTransformGcsPath())
                    .setPythonExternalTextTransformFunctionName(
                        options.getPythonExternalTextTransformFunctionName())
                    .build());

    /*
     * Step #3a: Write Json documents into Elasticsearch using {@link ElasticsearchTransforms.WriteToElasticsearch}.
     */
    convertedPubsubMessages
        .get(TRANSFORM_OUT)
        .apply(
            "GetJsonDocuments",
            MapElements.into(TypeDescriptors.strings()).via(FailsafeElement::getPayload))
        .apply("Insert metadata", new ProcessEventMetadata())
        .apply(
            "WriteToElasticsearch",
            WriteToElasticsearch.newBuilder()
                .setUserAgent("dataflow-pubsub-to-elasticsearch-template/v2")
                .setOptions(options.as(PubSubToElasticsearchOptions.class))
                .build());

    /*
     * Step 3b: Write elements that failed processing to error output PubSub topic via {@link PubSubIO}.
     */
    convertedPubsubMessages
        .get(TRANSFORM_ERROR_OUTPUT_OUT)
        .apply(ParDo.of(new FailedPubsubMessageToPubsubTopicFn()))
        .apply("writeFailureMessages", PubsubIO.writeMessages().to(options.getErrorOutputTopic()));

    // Execute the pipeline and return the result.
    return pipeline.run();
  }
}

Langkah berikutnya