Template Teks Cloud Storage ke BigQuery (Streaming)

Pipeline Teks Cloud Storage ke BigQuery adalah pipeline streaming yang melakukan streaming file teks yang disimpan di Cloud Storage, mengubahnya menggunakan fungsi yang ditentukan pengguna (UDF) JavaScript yang Anda berikan, dan menambahkan hasilnya ke BigQuery.

Pipeline berjalan tanpa batas waktu dan perlu dihentikan secara manual melalui cancel, bukan drain, karena penggunaan transformasi Watch, yang merupakan DoFn yang dapat dipisah dan tidak mendukung pemisahan.

Persyaratan pipeline

  • Buat file JSON yang mendeskripsikan skema tabel output Anda di BigQuery.

    Pastikan terdapat array JSON level teratas yang berjudul fields dan isinya mengikuti pola {"name": "COLUMN_NAME", "type": "DATA_TYPE"}. Contoh:

    {
      "fields": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        }
      ]
    }
  • Buat file JavaScript (.js) dengan fungsi UDF yang menyediakan logika untuk mengubah baris teks. Fungsi Anda harus menampilkan string JSON.

    Contoh berikut memisahkan setiap baris file CSV, membuat objek JSON dengan nilai, dan menampilkan string JSON:

    function process(inJson) {
      val = inJson.split(",");
    
      const obj = {
        "name": val[0],
        "age": parseInt(val[1])
      };
      return JSON.stringify(obj);
    }

Parameter template

Parameter yang diperlukan

  • inputFilePattern: Jalur gs:// ke teks di Cloud Storage yang ingin Anda proses. Contoh, gs://your-bucket/your-file.txt.
  • JSONPath: Jalur gs:// ke file JSON yang menentukan skema BigQuery Anda, yang disimpan di Cloud Storage. Contoh, gs://your-bucket/your-schema.json.
  • outputTable: Lokasi tabel BigQuery yang akan digunakan untuk menyimpan data yang diproses. Jika Anda menggunakan kembali tabel yang sudah ada, tabel tersebut akan ditimpa. Contoh, <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.
  • javascriptTextTransformGcsPath: URI Cloud Storage file .js yang menentukan fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan. Misalnya, gs://your-bucket/your-transforms/*.js.
  • javascriptTextTransformFunctionName: Nama fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan. Misalnya, jika kode fungsi JavaScript Anda adalah myTransform(inJson) { /*...do stuff...*/ }, nama fungsinya adalah myTransform. Untuk contoh UDF JavaScript, lihat Contoh UDF (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). Misalnya, transform_udf1.
  • bigQueryLoadingTemporaryDirectory: Direktori sementara untuk proses pemuatan BigQuery. Contoh, gs://your-bucket/your-files/temp-dir.

Parameter opsional

  • outputDeadletterTable: Tabel untuk pesan yang gagal mencapai tabel output. Jika tidak ada, tabel akan dibuat selama eksekusi pipeline. Jika tidak ditentukan, <outputTableSpec>_error_records akan digunakan. Misalnya, <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.
  • useStorageWriteApiAtLeastOnce: Parameter ini hanya berlaku jika Use BigQuery Storage Write API diaktifkan. Jika diaktifkan, semantik minimal satu kali akan digunakan untuk Storage Write API, jika tidak, semantik tepat satu kali akan digunakan. Defaultnya adalah: false.
  • useStorageWriteApi: Jika benar, pipeline akan menggunakan BigQuery Storage Write API (https://cloud.google.com/bigquery/docs/write-api). Nilai defaultnya adalah false. Untuk informasi selengkapnya, lihat Menggunakan Storage Write API (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams: Saat menggunakan Storage Write API, menentukan jumlah aliran tulis. Jika useStorageWriteApi adalah true dan useStorageWriteApiAtLeastOnce adalah false, Anda harus menetapkan parameter ini. Setelan defaultnya adalah: 0.
  • storageWriteApiTriggeringFrequencySec: Saat menggunakan Storage Write API, menentukan frekuensi pemicuan, dalam detik. Jika useStorageWriteApi adalah true dan useStorageWriteApiAtLeastOnce adalah false, Anda harus menetapkan parameter ini.
  • pythonExternalTextTransformGcsPath: Pola jalur Cloud Storage untuk kode Python yang berisi fungsi yang ditentukan pengguna. Contoh, gs://your-bucket/your-function.py.
  • javascriptTextTransformReloadIntervalMinutes: Menentukan seberapa sering UDF dimuat ulang, dalam hitungan menit. Jika nilainya lebih besar dari 0, Dataflow akan memeriksa file UDF di Cloud Storage secara berkala, dan memuat ulang UDF jika file diubah. Parameter ini memungkinkan Anda mengupdate UDF saat pipeline berjalan, tanpa perlu memulai ulang tugas. Jika nilainya adalah 0, pemuatan ulang UDF akan dinonaktifkan. Nilai defaultnya adalah 0.

Fungsi yang ditentukan pengguna (UDF)

Template ini memerlukan UDF yang mengurai file input, seperti yang dijelaskan dalam Persyaratan pipeline. Template memanggil UDF untuk setiap baris teks dalam setiap file input. Untuk mengetahui informasi selengkapnya tentang cara membuat UDF, lihat Membuat fungsi yang ditentukan pengguna untuk template Dataflow.

Spesifikasi fungsi

UDF memiliki spesifikasi berikut:

  • Input: satu baris teks dari file input.
  • Output: string JSON yang cocok dengan skema tabel tujuan BigQuery.

Menjalankan template

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Cloud Storage Text to BigQuery (Stream) template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

Di shell atau terminal, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

Ganti kode berikut:

  • JOB_NAME: nama tugas unik pilihan Anda
  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • STAGING_LOCATION: lokasi untuk melakukan staging file lokal (misalnya, gs://your-bucket/staging)
  • JAVASCRIPT_FUNCTION: nama fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan

    Misalnya, jika kode fungsi JavaScript Anda adalah myTransform(inJson) { /*...do stuff...*/ }, nama fungsinya adalah myTransform. Untuk contoh UDF JavaScript, lihat Contoh UDF.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: jalur Cloud Storage ke file JSON yang berisi definisi skema
  • PATH_TO_JAVASCRIPT_UDF_FILE: URI Cloud Storage dari file .js yang menentukan fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan—misalnya, gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: jalur Cloud Storage ke set data teks Anda
  • BIGQUERY_TABLE: nama tabel BigQuery Anda
  • BIGQUERY_UNPROCESSED_TABLE: nama tabel BigQuery Anda untuk pesan yang belum diproses
  • PATH_TO_TEMP_DIR_ON_GCS: jalur Cloud Storage ke direktori sementara

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex",
   }
}

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • STAGING_LOCATION: lokasi untuk melakukan staging file lokal (misalnya, gs://your-bucket/staging)
  • JAVASCRIPT_FUNCTION: nama fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan

    Misalnya, jika kode fungsi JavaScript Anda adalah myTransform(inJson) { /*...do stuff...*/ }, nama fungsinya adalah myTransform. Untuk contoh UDF JavaScript, lihat Contoh UDF.

  • PATH_TO_BIGQUERY_SCHEMA_JSON: jalur Cloud Storage ke file JSON yang berisi definisi skema
  • PATH_TO_JAVASCRIPT_UDF_FILE: URI Cloud Storage dari file .js yang menentukan fungsi yang ditentukan pengguna (UDF) JavaScript yang ingin Anda gunakan—misalnya, gs://my-bucket/my-udfs/my_file.js
  • PATH_TO_TEXT_DATA: jalur Cloud Storage ke set data teks Anda
  • BIGQUERY_TABLE: nama tabel BigQuery Anda
  • BIGQUERY_UNPROCESSED_TABLE: nama tabel BigQuery Anda untuk pesan yang belum diproses
  • PATH_TO_TEMP_DIR_ON_GCS: jalur Cloud Storage ke direktori sementara
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.api.client.json.JsonFactory;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.metadata.MultiTemplate;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
import com.google.cloud.teleport.v2.templates.TextToBigQueryStreaming.TextToBigQueryStreamingOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters.FailsafeJsonToTableRow;
import com.google.cloud.teleport.v2.transforms.ErrorConverters.WriteStringMessageErrors;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer;
import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.RowToStringFailsafeElementFn;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.GCSUtils;
import com.google.cloud.teleport.v2.utils.ResourceUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Watch.Growth;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link TextToBigQueryStreaming} is a streaming version of {@link TextIOToBigQuery} pipeline
 * that reads text files, applies a JavaScript UDF and writes the output to BigQuery. The pipeline
 * continuously polls for new files, reads them row-by-row and processes each record into BigQuery.
 * The polling interval is set at 10 seconds.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Stream_GCS_Text_to_BigQuery_Flex.md">README</a>
 * for instructions on how to use or modify this template.
 */
@MultiTemplate({
  @Template(
      name = "Stream_GCS_Text_to_BigQuery_Flex",
      category = TemplateCategory.STREAMING,
      displayName = "Cloud Storage Text to BigQuery (Stream)",
      description = {
        "The Text Files on Cloud Storage to BigQuery pipeline is a streaming pipeline that allows you to stream text files stored in Cloud Storage, transform them using a JavaScript User Defined Function (UDF) that you provide, and append the result to BigQuery.\n",
        "The pipeline runs indefinitely and needs to be terminated manually via a\n"
            + "    <a href=\"https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#cancel\">cancel</a> and not a\n"
            + "    <a href=\"https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#drain\">drain</a>, due to its use of the\n"
            + "    <code>Watch</code> transform, which is a splittable <code>DoFn</code> that does not support\n"
            + "    draining."
      },
      skipOptions = {
        "pythonExternalTextTransfromGcsPath",
        "pythonExternalTextTransformFunctionName"
      },
      optionsClass = TextToBigQueryStreamingOptions.class,
      flexContainerName = "text-to-bigquery-streaming",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/text-to-bigquery-stream",
      contactInformation = "https://cloud.google.com/support",
      requirements = {
        "Create a JSON file that describes the schema of your output table in BigQuery.\n"
            + "    <p>\n"
            + "      Ensure that there is a top-level JSON array titled <code>fields</code> and that its\n"
            + "      contents follow the pattern <code>{\"name\": \"COLUMN_NAME\", \"type\": \"DATA_TYPE\"}</code>.\n"
            + "      For example:\n"
            + "    </p>\n"
            + "<pre class=\"prettyprint lang-json\">\n"
            + "{\n"
            + "  \"fields\": [\n"
            + "    {\n"
            + "      \"name\": \"location\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"name\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"age\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"color\",\n"
            + "      \"type\": \"STRING\",\n"
            + "      \"mode\": \"REQUIRED\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"coffee\",\n"
            + "      \"type\": \"STRING\",\n"
            + "      \"mode\": \"REQUIRED\"\n"
            + "    }\n"
            + "  ]\n"
            + "}\n"
            + "</pre>",
        "Create a JavaScript (<code>.js</code>) file with your UDF function that supplies the logic\n"
            + "    to transform the lines of text. Note that your function must return a JSON string.\n"
            + "    <p>For example, this function splits each line of a CSV file and returns a JSON string after\n"
            + "      transforming the values.</p>\n"
            + "<pre class=\"prettyprint\" suppresswarning>\n"
            + "function transform(line) {\n"
            + "var values = line.split(',');\n"
            + "\n"
            + "var obj = new Object();\n"
            + "obj.location = values[0];\n"
            + "obj.name = values[1];\n"
            + "obj.age = values[2];\n"
            + "obj.color = values[3];\n"
            + "obj.coffee = values[4];\n"
            + "var jsonString = JSON.stringify(obj);\n"
            + "\n"
            + "return jsonString;\n"
            + "}\n"
            + "</pre>"
      },
      streaming = true,
      supportsAtLeastOnce = true),
  @Template(
      name = "Stream_GCS_Text_to_BigQuery_Xlang",
      category = TemplateCategory.STREAMING,
      displayName = "Cloud Storage Text to BigQuery (Stream) with Python UDF",
      type = Template.TemplateType.XLANG,
      description = {
        "The Text Files on Cloud Storage to BigQuery pipeline is a streaming pipeline that allows you to stream text files stored in Cloud Storage, transform them using a Python User Defined Function (UDF) that you provide, and append the result to BigQuery.\n",
        "The pipeline runs indefinitely and needs to be terminated manually via a\n"
            + "    <a href=\"https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#cancel\">cancel</a> and not a\n"
            + "    <a href=\"https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#drain\">drain</a>, due to its use of the\n"
            + "    <code>Watch</code> transform, which is a splittable <code>DoFn</code> that does not support\n"
            + "    draining."
      },
      skipOptions = {
        "javascriptTextTransformGcsPath",
        "javascriptTextTransformFunctionName",
        "javascriptTextTransformReloadIntervalMinutes"
      },
      optionsClass = TextToBigQueryStreamingOptions.class,
      flexContainerName = "text-to-bigquery-streaming-xlang",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/text-to-bigquery-stream",
      contactInformation = "https://cloud.google.com/support",
      requirements = {
        "Create a JSON file that describes the schema of your output table in BigQuery.\n"
            + "    <p>\n"
            + "      Ensure that there is a top-level JSON array titled <code>fields</code> and that its\n"
            + "      contents follow the pattern <code>{\"name\": \"COLUMN_NAME\", \"type\": \"DATA_TYPE\"}</code>.\n"
            + "      For example:\n"
            + "    </p>\n"
            + "<pre class=\"prettyprint lang-json\">\n"
            + "{\n"
            + "  \"fields\": [\n"
            + "    {\n"
            + "      \"name\": \"location\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"name\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"age\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"color\",\n"
            + "      \"type\": \"STRING\",\n"
            + "      \"mode\": \"REQUIRED\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"coffee\",\n"
            + "      \"type\": \"STRING\",\n"
            + "      \"mode\": \"REQUIRED\"\n"
            + "    }\n"
            + "  ]\n"
            + "}\n"
            + "</pre>",
        "Create a Python (<code>.js</code>) file with your UDF function that supplies the logic\n"
            + "    to transform the lines of text. Note that your function must return a JSON string.\n"
            + "    <p>For example, this function splits each line of a CSV file and returns a JSON string after\n"
            + "      transforming the values.</p>\n"
            + "<pre class=\"prettyprint\" suppresswarning>\n"
            + "import json\n"
            + "def transform(line): \n"
            + "  values = line.split(',')\n"
            + "\n"
            + "  obj = {\n"
            + "     'location' : values[0],\n"
            + "     'name' : values[1],\n"
            + "     'age' : values[2],\n"
            + "     'color' : values[3],\n"
            + "     'coffee' : values[4]\n"
            + "  }\n"
            + "  jsonString = JSON.dumps(obj);\n"
            + "\n"
            + "  return jsonString;\n"
            + "\n"
            + "</pre>"
      },
      streaming = true,
      supportsAtLeastOnce = true)
})
public class TextToBigQueryStreaming {

  private static final Logger LOG = LoggerFactory.getLogger(TextToBigQueryStreaming.class);

  /** The tag for the main output for the UDF. */
  private static final TupleTag<FailsafeElement<String, String>> UDF_OUT =
      new TupleTag<FailsafeElement<String, String>>() {};

  /** The tag for the dead-letter output of the udf. */
  private static final TupleTag<FailsafeElement<String, String>> UDF_DEADLETTER_OUT =
      new TupleTag<FailsafeElement<String, String>>() {};

  /** The tag for the main output of the json transformation. */
  private static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<TableRow>() {};

  /** The tag for the dead-letter output of the json to table row transform. */
  private static final TupleTag<FailsafeElement<String, String>> TRANSFORM_DEADLETTER_OUT =
      new TupleTag<FailsafeElement<String, String>>() {};

  /** The default suffix for error tables if dead letter table is not specified. */
  private static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";

  /** Default interval for polling files in GCS. */
  private static final Duration DEFAULT_POLL_INTERVAL = Duration.standardSeconds(10);

  /** Coder for FailsafeElement. */
  private static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  private static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();

  /**
   * Main entry point for executing the pipeline. This will run the pipeline asynchronously. If
   * blocking execution is required, use the {@link
   * TextToBigQueryStreaming#run(TextToBigQueryStreamingOptions)} method to start the pipeline and
   * invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    // Parse the user options passed from the command-line
    TextToBigQueryStreamingOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(TextToBigQueryStreamingOptions.class);
    BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);

    run(options);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(TextToBigQueryStreamingOptions options) {

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    // Register the coder for pipeline
    FailsafeElementCoder<String, String> coder =
        FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

    CoderRegistry coderRegistry = pipeline.getCoderRegistry();
    coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);

    // Determine if we are using Python UDFs or JS UDFs based on the provided options.
    boolean useJavascriptUdf = !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());
    boolean usePythonUdf = !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());
    if (useJavascriptUdf && usePythonUdf) {
      throw new IllegalArgumentException(
          "Either javascript or Python gcs path must be provided, but not both.");
    }

    /*
     * Steps:
     *  1) Read from the text source continuously.
     *  2) Convert to FailsafeElement.
     *  3) Apply Javascript udf transformation.
     *    - Tag records that were successfully transformed and those
     *      that failed transformation.
     *  4) Convert records to TableRow.
     *    - Tag records that were successfully converted and those
     *      that failed conversion.
     *  5) Insert successfully converted records into BigQuery.
     *    - Errors encountered while streaming will be sent to deadletter table.
     *  6) Insert records that failed into deadletter table.
     */

    PCollection<String> sourceRead =
        pipeline.apply(
            TextIO.read()
                .from(options.getInputFilePattern())
                .watchForNewFiles(DEFAULT_POLL_INTERVAL, Growth.never()));
    PCollectionTuple transformedOutput;
    if (usePythonUdf) {
      transformedOutput =
          sourceRead
              .apply(
                  "MapToRecord",
                  PythonExternalTextTransformer.FailsafeRowPythonExternalUdf
                      .stringMappingFunction())
              .setRowSchema(PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.ROW_SCHEMA)
              .apply(
                  "InvokeUDF",
                  PythonExternalTextTransformer.FailsafePythonExternalUdf.newBuilder()
                      .setFileSystemPath(options.getPythonExternalTextTransformGcsPath())
                      .setFunctionName(options.getPythonExternalTextTransformFunctionName())
                      .build())
              .apply(
                  ParDo.of(new RowToStringFailsafeElementFn(UDF_OUT, UDF_DEADLETTER_OUT))
                      .withOutputTags(UDF_OUT, TupleTagList.of(UDF_DEADLETTER_OUT)));

    } else {
      transformedOutput =
          pipeline

              // 1) Read from the text source continuously.
              .apply(
                  "ReadFromSource",
                  TextIO.read()
                      .from(options.getInputFilePattern())
                      .watchForNewFiles(DEFAULT_POLL_INTERVAL, Growth.never()))

              // 2) Convert to FailsafeElement.
              .apply(
                  "ConvertToFailsafeElement",
                  MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                      .via(input -> FailsafeElement.of(input, input)))

              // 3) Apply Javascript udf transformation.
              .apply(
                  "ApplyUDFTransformation",
                  FailsafeJavascriptUdf.<String>newBuilder()
                      .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                      .setFunctionName(options.getJavascriptTextTransformFunctionName())
                      .setReloadIntervalMinutes(
                          options.getJavascriptTextTransformReloadIntervalMinutes())
                      .setSuccessTag(UDF_OUT)
                      .setFailureTag(UDF_DEADLETTER_OUT)
                      .build());
    }

    PCollectionTuple convertedTableRows =
        transformedOutput

            // 4) Convert records to TableRow.
            .get(UDF_OUT)
            .apply(
                "ConvertJSONToTableRow",
                FailsafeJsonToTableRow.<String>newBuilder()
                    .setSuccessTag(TRANSFORM_OUT)
                    .setFailureTag(TRANSFORM_DEADLETTER_OUT)
                    .build());

    WriteResult writeResult =
        convertedTableRows

            // 5) Insert successfully converted records into BigQuery.
            .get(TRANSFORM_OUT)
            .apply(
                "InsertIntoBigQuery",
                BigQueryIO.writeTableRows()
                    .withJsonSchema(GCSUtils.getGcsFileAsString(options.getJSONPath()))
                    .to(options.getOutputTable())
                    .withExtendedErrorInfo()
                    .withoutValidation()
                    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                    .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                    .withCustomGcsTempLocation(
                        StaticValueProvider.of(options.getBigQueryLoadingTemporaryDirectory())));

    // Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
    PCollection<FailsafeElement<String, String>> failedInserts =
        BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
            .apply(
                "WrapInsertionErrors",
                MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                    .via(TextToBigQueryStreaming::wrapBigQueryInsertError));

    // 6) Insert records that failed transformation or conversion into deadletter table
    PCollectionList.of(
            ImmutableList.of(
                transformedOutput.get(UDF_DEADLETTER_OUT),
                convertedTableRows.get(TRANSFORM_DEADLETTER_OUT),
                failedInserts))
        .apply("Flatten", Flatten.pCollections())
        .apply(
            "WriteFailedRecords",
            WriteStringMessageErrors.newBuilder()
                .setErrorRecordsTable(
                    StringUtils.isNotEmpty(options.getOutputDeadletterTable())
                        ? options.getOutputDeadletterTable()
                        : options.getOutputTable() + DEFAULT_DEADLETTER_TABLE_SUFFIX)
                .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
                .build());

    return pipeline.run();
  }

  /**
   * Method to wrap a {@link BigQueryInsertError} into a {@link FailsafeElement}.
   *
   * @param insertError BigQueryInsert error.
   * @return FailsafeElement object.
   * @throws IOException
   */
  static FailsafeElement<String, String> wrapBigQueryInsertError(BigQueryInsertError insertError) {

    FailsafeElement<String, String> failsafeElement;
    try {

      String rowPayload = JSON_FACTORY.toString(insertError.getRow());
      String errorMessage = JSON_FACTORY.toString(insertError.getError());

      failsafeElement = FailsafeElement.of(rowPayload, rowPayload);
      failsafeElement.setErrorMessage(errorMessage);

    } catch (IOException e) {
      throw new RuntimeException(e);
    }

    return failsafeElement;
  }

  /**
   * The {@link TextToBigQueryStreamingOptions} class provides the custom execution options passed
   * by the executor at the command-line.
   */
  public interface TextToBigQueryStreamingOptions
      extends TextIOToBigQuery.Options, BigQueryStorageApiStreamingOptions {
    @TemplateParameter.BigQueryTable(
        order = 1,
        optional = true,
        description = "The dead-letter table name to output failed messages to BigQuery",
        helpText =
            "Table for messages that failed to reach the output table. If a table doesn't exist, it is created during "
                + "pipeline execution. If not specified, `<outputTableSpec>_error_records` is used.",
        example = "<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>")
    String getOutputDeadletterTable();

    void setOutputDeadletterTable(String value);

    // Hide the UseStorageWriteApiAtLeastOnce in the UI, because it will automatically be turned
    // on when pipeline is running on ALO mode and using the Storage Write API
    @TemplateParameter.Boolean(
        order = 2,
        optional = true,
        parentName = "useStorageWriteApi",
        parentTriggerValues = {"true"},
        description = "Use at at-least-once semantics in BigQuery Storage Write API",
        helpText =
            "This parameter takes effect only if `Use BigQuery Storage Write API` is enabled. If"
                + " enabled the at-least-once semantics will be used for Storage Write API, otherwise"
                + " exactly-once semantics will be used.",
        hiddenUi = true)
    @Default.Boolean(false)
    @Override
    Boolean getUseStorageWriteApiAtLeastOnce();

    void setUseStorageWriteApiAtLeastOnce(Boolean value);
  }
}

Langkah berikutnya