Modèle Texte Cloud Storage vers BigQuery (flux)

Le pipeline Texte Cloud Storage vers BigQuery est un pipeline de streaming qui diffuse les fichiers texte stockés dans Cloud Storage, les transforme à l'aide d'une fonction JavaScript définie par l'utilisateur (UDF) que vous fournissez et ajoute le résultat à BigQuery.

Le pipeline fonctionne indéfiniment et doit être arrêté manuellement via une annulation et non un drainage, en raison de son utilisation de la transformation Watch qui est une fonction DoFn non compatible avec le drainage.

Conditions requises pour ce pipeline

  • Créez un fichier JSON décrivant le schéma de la table de sortie dans BigQuery.

    Assurez-vous qu'il existe un tableau JSON de niveau supérieur intitulé fields et que son contenu suit le modèle {"name": "COLUMN_NAME", "type": "DATA_TYPE"}. Exemple :

    {
      "fields": [
        {
          "name": "name",
          "type": "STRING"
        },
        {
          "name": "age",
          "type": "INTEGER"
        }
      ]
    }
  • Créez un fichier JavaScript (.js) à l'aide de la fonction définie par l'utilisateur (UDF) qui fournit la logique pour transformer les lignes de texte. Votre fonction doit renvoyer une chaîne JSON.

    L'exemple suivant divise chaque ligne d'un fichier CSV, crée un objet JSON avec les valeurs et renvoie une chaîne JSON:

    function process(inJson) {
      val = inJson.split(",");
    
      const obj = {
        "name": val[0],
        "age": parseInt(val[1])
      };
      return JSON.stringify(obj);
    }

Paramètres de modèle

Paramètres obligatoires

  • inputFilePattern: chemin d'accès gs:// vers le texte que vous souhaitez traiter dans Cloud Storage. Exemple :gs://your-bucket/your-file.txt
  • JSONPath: chemin d'accès gs:// vers le fichier JSON qui définit votre schéma BigQuery, stocké dans Cloud Storage. Exemple :gs://your-bucket/your-schema.json
  • outputTable: emplacement de la table BigQuery à utiliser pour stocker les données traitées. Si vous réutilisez une table existante, elle est écrasée. Exemple :<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>
  • javascriptTextTransformGcsPath: URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, gs://your-bucket/your-transforms/*.js.
  • javascriptTextTransformFunctionName: nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser. Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la section https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples (par exemple, transform_udf1).
  • bigQueryLoadingTemporaryDirectory: répertoire temporaire pour le processus de chargement de BigQuery. Exemple :gs://your-bucket/your-files/temp-dir

Paramètres facultatifs

  • outputDeadletterTable: table des messages qui n'ont pas pu atteindre la table de sortie. En l'absence de table existante, une table va être créée lors de l'exécution du pipeline. Si aucune valeur n'est spécifiée, <outputTableSpec>_error_records est utilisé. Par exemple, <PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>.
  • useStorageWriteApiAtLeastOnce: ce paramètre ne prend effet que si Use BigQuery Storage Write API est activé. Si cette option est activée, la sémantique de type "au moins une fois" est utilisée pour l'API Storage Write. Sinon, la sémantique de type "exactement une fois" est utilisée. La valeur par défaut est "false".
  • useStorageWriteApi: si cette valeur est définie sur "true", le pipeline utilise l'API BigQuery Storage Write (https://cloud.google.com/bigquery/docs/write-api). La valeur par défaut est false. Pour en savoir plus, consultez la page "Utiliser l'API Storage Write" (https://beam.apache.org/documentation/io/built-in/google-bigquery/#storage-write-api).
  • numStorageWriteApiStreams: spécifie le nombre de flux d'écriture, lorsque vous utilisez l'API Storage Write. Si useStorageWriteApi est défini sur true et useStorageWriteApiAtLeastOnce sur false, vous devez définir ce paramètre. La valeur par défaut est 0.
  • storageWriteApiTriggeringFrequencySec: spécifie la fréquence de déclenchement, en secondes, lorsque vous utilisez l'API Storage Write. Si useStorageWriteApi est défini sur true et useStorageWriteApiAtLeastOnce sur false, vous devez définir ce paramètre.
  • pythonExternalTextTransformGcsPath: modèle de chemin d'accès Cloud Storage pour le code Python contenant vos fonctions définies par l'utilisateur. Exemple :gs://your-bucket/your-function.py
  • javascriptTextTransformReloadIntervalMinutes: spécifie la fréquence d'actualisation de l'UDF, en minutes. Si la valeur est supérieure à 0, Dataflow vérifie régulièrement le fichier UDF dans Cloud Storage et actualise l'UDF si le fichier est modifié. Ce paramètre vous permet de mettre à jour l'UDF pendant l'exécution du pipeline, sans avoir à redémarrer le job. Si la valeur est 0, l'actualisation de l'UDF est désactivée. La valeur par défaut est 0.

Fonction définie par l'utilisateur

Ce modèle nécessite une fonction définie par l'utilisateur (UDF) qui analyse les fichiers d'entrée, comme décrit dans la section Exigences du pipeline. Le modèle appelle l'UDF pour chaque ligne de texte de chaque fichier d'entrée. Pour en savoir plus sur la création d'UDF, consultez la page Créer des fonctions définies par l'utilisateur pour les modèles Dataflow.

Spécification de la fonction

La spécification de l'UDF se présente comme suit :

  • Entrée : une ligne de texte provenant d'un fichier d'entrée.
  • Résultat : chaîne JSON correspondant au schéma de la table de destination BigQuery.

Exécuter le modèle

  1. Accédez à la page Dataflow Créer un job à partir d'un modèle.
  2. Accéder à la page Créer un job à partir d'un modèle
  3. Dans le champ Nom du job, saisissez un nom de job unique.
  4. Facultatif : pour Point de terminaison régional, sélectionnez une valeur dans le menu déroulant. La région par défaut est us-central1.

    Pour obtenir la liste des régions dans lesquelles vous pouvez exécuter un job Dataflow, consultez la page Emplacements Dataflow.

  5. Dans le menu déroulant Modèle Dataflow, sélectionnez the Cloud Storage Text to BigQuery (Stream) template.
  6. Dans les champs fournis, saisissez vos valeurs de paramètres.
  7. Cliquez sur Run Job (Exécuter la tâche).

Dans le shell ou le terminal, exécutez le modèle :

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex \
    --region REGION_NAME \
    --staging-location STAGING_LOCATION \
    --parameters \
javascriptTextTransformGcsPath=PATH_TO_JAVASCRIPT_UDF_FILE,\
javascriptTextTransformFunctionName=JAVASCRIPT_FUNCTION,\
JSONPath=PATH_TO_BIGQUERY_SCHEMA_JSON,\
inputFilePattern=PATH_TO_TEXT_DATA,\
outputTable=BIGQUERY_TABLE,\
outputDeadletterTable=BIGQUERY_UNPROCESSED_TABLE,\
bigQueryLoadingTemporaryDirectory=PATH_TO_TEMP_DIR_ON_GCS

Remplacez les éléments suivants :

  • JOB_NAME : nom de job unique de votre choix
  • REGION_NAME : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • STAGING_LOCATION : emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
  • JAVASCRIPT_FUNCTION : Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.

    Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.

  • PATH_TO_BIGQUERY_SCHEMA_JSON : chemin d'accès Cloud Storage au fichier JSON contenant la définition du schéma
  • PATH_TO_JAVASCRIPT_UDF_FILE : URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser (par exemple, gs://my-bucket/my-udfs/my_file.js).
  • PATH_TO_TEXT_DATA : chemin d'accès Cloud Storage à votre ensemble de données texte
  • BIGQUERY_TABLE : nom de votre table BigQuery.
  • BIGQUERY_UNPROCESSED_TABLE : nom de votre table BigQuery pour les messages non traités
  • PATH_TO_TEMP_DIR_ON_GCS : chemin d'accès Cloud Storage au répertoire temporaire

Pour exécuter le modèle à l'aide de l'API REST, envoyez une requête HTTP POST. Pour en savoir plus sur l'API, ses autorisations et leurs champs d'application, consultez la section projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
       "javascriptTextTransformFunctionName": "JAVASCRIPT_FUNCTION",
       "JSONPath": "PATH_TO_BIGQUERY_SCHEMA_JSON",
       "javascriptTextTransformGcsPath": "PATH_TO_JAVASCRIPT_UDF_FILE",
       "inputFilePattern":"PATH_TO_TEXT_DATA",
       "outputTable":"BIGQUERY_TABLE",
       "outputDeadletterTable":"BIGQUERY_UNPROCESSED_TABLE",
       "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/Stream_GCS_Text_to_BigQuery_Flex",
   }
}

Remplacez les éléments suivants :

  • PROJECT_ID : ID du projet Google Cloud dans lequel vous souhaitez exécuter le job Dataflow
  • JOB_NAME : nom de job unique de votre choix
  • LOCATION : région dans laquelle vous souhaitez déployer votre job Dataflow, par exemple us-central1
  • VERSION : version du modèle que vous souhaitez utiliser

    Vous pouvez utiliser les valeurs suivantes :

  • STAGING_LOCATION : emplacement des fichiers locaux de préproduction (par exemple, gs://your-bucket/staging)
  • JAVASCRIPT_FUNCTION : Nom de la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser.

    Par exemple, si le code de votre fonction JavaScript est myTransform(inJson) { /*...do stuff...*/ }, le nom de la fonction est myTransform. Pour obtenir des exemples de fonctions JavaScript définies par l'utilisateur, consultez la page Exemples de fonctions définies par l'utilisateur.

  • PATH_TO_BIGQUERY_SCHEMA_JSON : chemin d'accès Cloud Storage au fichier JSON contenant la définition du schéma
  • PATH_TO_JAVASCRIPT_UDF_FILE : URI Cloud Storage du fichier .js contenant la fonction JavaScript définie par l'utilisateur que vous souhaitez utiliser (par exemple, gs://my-bucket/my-udfs/my_file.js).
  • PATH_TO_TEXT_DATA : chemin d'accès Cloud Storage à votre ensemble de données texte
  • BIGQUERY_TABLE : nom de votre table BigQuery.
  • BIGQUERY_UNPROCESSED_TABLE : nom de votre table BigQuery pour les messages non traités
  • PATH_TO_TEMP_DIR_ON_GCS : chemin d'accès Cloud Storage au répertoire temporaire
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.api.client.json.JsonFactory;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.metadata.MultiTemplate;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.coders.FailsafeElementCoder;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.BigQueryStorageApiStreamingOptions;
import com.google.cloud.teleport.v2.templates.TextToBigQueryStreaming.TextToBigQueryStreamingOptions;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters.FailsafeJsonToTableRow;
import com.google.cloud.teleport.v2.transforms.ErrorConverters.WriteStringMessageErrors;
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.FailsafeJavascriptUdf;
import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer;
import com.google.cloud.teleport.v2.transforms.PythonExternalTextTransformer.RowToStringFailsafeElementFn;
import com.google.cloud.teleport.v2.utils.BigQueryIOUtils;
import com.google.cloud.teleport.v2.utils.GCSUtils;
import com.google.cloud.teleport.v2.utils.ResourceUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableList;
import java.io.IOException;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.extensions.gcp.util.Transport;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.CreateDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.Write.WriteDisposition;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryInsertError;
import org.apache.beam.sdk.io.gcp.bigquery.InsertRetryPolicy;
import org.apache.beam.sdk.io.gcp.bigquery.WriteResult;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.Flatten;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Watch.Growth;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.commons.lang3.StringUtils;
import org.joda.time.Duration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link TextToBigQueryStreaming} is a streaming version of {@link TextIOToBigQuery} pipeline
 * that reads text files, applies a JavaScript UDF and writes the output to BigQuery. The pipeline
 * continuously polls for new files, reads them row-by-row and processes each record into BigQuery.
 * The polling interval is set at 10 seconds.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/googlecloud-to-googlecloud/README_Stream_GCS_Text_to_BigQuery_Flex.md">README</a>
 * for instructions on how to use or modify this template.
 */
@MultiTemplate({
  @Template(
      name = "Stream_GCS_Text_to_BigQuery_Flex",
      category = TemplateCategory.STREAMING,
      displayName = "Cloud Storage Text to BigQuery (Stream)",
      description = {
        "The Text Files on Cloud Storage to BigQuery pipeline is a streaming pipeline that allows you to stream text files stored in Cloud Storage, transform them using a JavaScript User Defined Function (UDF) that you provide, and append the result to BigQuery.\n",
        "The pipeline runs indefinitely and needs to be terminated manually via a\n"
            + "    <a href=\"https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#cancel\">cancel</a> and not a\n"
            + "    <a href=\"https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#drain\">drain</a>, due to its use of the\n"
            + "    <code>Watch</code> transform, which is a splittable <code>DoFn</code> that does not support\n"
            + "    draining."
      },
      skipOptions = {
        "pythonExternalTextTransfromGcsPath",
        "pythonExternalTextTransformFunctionName"
      },
      optionsClass = TextToBigQueryStreamingOptions.class,
      flexContainerName = "text-to-bigquery-streaming",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/text-to-bigquery-stream",
      contactInformation = "https://cloud.google.com/support",
      requirements = {
        "Create a JSON file that describes the schema of your output table in BigQuery.\n"
            + "    <p>\n"
            + "      Ensure that there is a top-level JSON array titled <code>fields</code> and that its\n"
            + "      contents follow the pattern <code>{\"name\": \"COLUMN_NAME\", \"type\": \"DATA_TYPE\"}</code>.\n"
            + "      For example:\n"
            + "    </p>\n"
            + "<pre class=\"prettyprint lang-json\">\n"
            + "{\n"
            + "  \"fields\": [\n"
            + "    {\n"
            + "      \"name\": \"location\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"name\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"age\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"color\",\n"
            + "      \"type\": \"STRING\",\n"
            + "      \"mode\": \"REQUIRED\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"coffee\",\n"
            + "      \"type\": \"STRING\",\n"
            + "      \"mode\": \"REQUIRED\"\n"
            + "    }\n"
            + "  ]\n"
            + "}\n"
            + "</pre>",
        "Create a JavaScript (<code>.js</code>) file with your UDF function that supplies the logic\n"
            + "    to transform the lines of text. Note that your function must return a JSON string.\n"
            + "    <p>For example, this function splits each line of a CSV file and returns a JSON string after\n"
            + "      transforming the values.</p>\n"
            + "<pre class=\"prettyprint\" suppresswarning>\n"
            + "function transform(line) {\n"
            + "var values = line.split(',');\n"
            + "\n"
            + "var obj = new Object();\n"
            + "obj.location = values[0];\n"
            + "obj.name = values[1];\n"
            + "obj.age = values[2];\n"
            + "obj.color = values[3];\n"
            + "obj.coffee = values[4];\n"
            + "var jsonString = JSON.stringify(obj);\n"
            + "\n"
            + "return jsonString;\n"
            + "}\n"
            + "</pre>"
      },
      streaming = true,
      supportsAtLeastOnce = true),
  @Template(
      name = "Stream_GCS_Text_to_BigQuery_Xlang",
      category = TemplateCategory.STREAMING,
      displayName = "Cloud Storage Text to BigQuery (Stream) with Python UDF",
      type = Template.TemplateType.XLANG,
      description = {
        "The Text Files on Cloud Storage to BigQuery pipeline is a streaming pipeline that allows you to stream text files stored in Cloud Storage, transform them using a Python User Defined Function (UDF) that you provide, and append the result to BigQuery.\n",
        "The pipeline runs indefinitely and needs to be terminated manually via a\n"
            + "    <a href=\"https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#cancel\">cancel</a> and not a\n"
            + "    <a href=\"https://cloud.google.com/dataflow/docs/guides/stopping-a-pipeline#drain\">drain</a>, due to its use of the\n"
            + "    <code>Watch</code> transform, which is a splittable <code>DoFn</code> that does not support\n"
            + "    draining."
      },
      skipOptions = {
        "javascriptTextTransformGcsPath",
        "javascriptTextTransformFunctionName",
        "javascriptTextTransformReloadIntervalMinutes"
      },
      optionsClass = TextToBigQueryStreamingOptions.class,
      flexContainerName = "text-to-bigquery-streaming-xlang",
      documentation =
          "https://cloud.google.com/dataflow/docs/guides/templates/provided/text-to-bigquery-stream",
      contactInformation = "https://cloud.google.com/support",
      requirements = {
        "Create a JSON file that describes the schema of your output table in BigQuery.\n"
            + "    <p>\n"
            + "      Ensure that there is a top-level JSON array titled <code>fields</code> and that its\n"
            + "      contents follow the pattern <code>{\"name\": \"COLUMN_NAME\", \"type\": \"DATA_TYPE\"}</code>.\n"
            + "      For example:\n"
            + "    </p>\n"
            + "<pre class=\"prettyprint lang-json\">\n"
            + "{\n"
            + "  \"fields\": [\n"
            + "    {\n"
            + "      \"name\": \"location\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"name\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"age\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"color\",\n"
            + "      \"type\": \"STRING\",\n"
            + "      \"mode\": \"REQUIRED\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"coffee\",\n"
            + "      \"type\": \"STRING\",\n"
            + "      \"mode\": \"REQUIRED\"\n"
            + "    }\n"
            + "  ]\n"
            + "}\n"
            + "</pre>",
        "Create a Python (<code>.js</code>) file with your UDF function that supplies the logic\n"
            + "    to transform the lines of text. Note that your function must return a JSON string.\n"
            + "    <p>For example, this function splits each line of a CSV file and returns a JSON string after\n"
            + "      transforming the values.</p>\n"
            + "<pre class=\"prettyprint\" suppresswarning>\n"
            + "import json\n"
            + "def transform(line): \n"
            + "  values = line.split(',')\n"
            + "\n"
            + "  obj = {\n"
            + "     'location' : values[0],\n"
            + "     'name' : values[1],\n"
            + "     'age' : values[2],\n"
            + "     'color' : values[3],\n"
            + "     'coffee' : values[4]\n"
            + "  }\n"
            + "  jsonString = JSON.dumps(obj);\n"
            + "\n"
            + "  return jsonString;\n"
            + "\n"
            + "</pre>"
      },
      streaming = true,
      supportsAtLeastOnce = true)
})
public class TextToBigQueryStreaming {

  private static final Logger LOG = LoggerFactory.getLogger(TextToBigQueryStreaming.class);

  /** The tag for the main output for the UDF. */
  private static final TupleTag<FailsafeElement<String, String>> UDF_OUT =
      new TupleTag<FailsafeElement<String, String>>() {};

  /** The tag for the dead-letter output of the udf. */
  private static final TupleTag<FailsafeElement<String, String>> UDF_DEADLETTER_OUT =
      new TupleTag<FailsafeElement<String, String>>() {};

  /** The tag for the main output of the json transformation. */
  private static final TupleTag<TableRow> TRANSFORM_OUT = new TupleTag<TableRow>() {};

  /** The tag for the dead-letter output of the json to table row transform. */
  private static final TupleTag<FailsafeElement<String, String>> TRANSFORM_DEADLETTER_OUT =
      new TupleTag<FailsafeElement<String, String>>() {};

  /** The default suffix for error tables if dead letter table is not specified. */
  private static final String DEFAULT_DEADLETTER_TABLE_SUFFIX = "_error_records";

  /** Default interval for polling files in GCS. */
  private static final Duration DEFAULT_POLL_INTERVAL = Duration.standardSeconds(10);

  /** Coder for FailsafeElement. */
  private static final FailsafeElementCoder<String, String> FAILSAFE_ELEMENT_CODER =
      FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

  private static final JsonFactory JSON_FACTORY = Transport.getJsonFactory();

  /**
   * Main entry point for executing the pipeline. This will run the pipeline asynchronously. If
   * blocking execution is required, use the {@link
   * TextToBigQueryStreaming#run(TextToBigQueryStreamingOptions)} method to start the pipeline and
   * invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}
   *
   * @param args The command-line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    // Parse the user options passed from the command-line
    TextToBigQueryStreamingOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(TextToBigQueryStreamingOptions.class);
    BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);

    run(options);
  }

  /**
   * Runs the pipeline with the supplied options.
   *
   * @param options The execution parameters to the pipeline.
   * @return The result of the pipeline execution.
   */
  public static PipelineResult run(TextToBigQueryStreamingOptions options) {

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    // Register the coder for pipeline
    FailsafeElementCoder<String, String> coder =
        FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of());

    CoderRegistry coderRegistry = pipeline.getCoderRegistry();
    coderRegistry.registerCoderForType(coder.getEncodedTypeDescriptor(), coder);

    // Determine if we are using Python UDFs or JS UDFs based on the provided options.
    boolean useJavascriptUdf = !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());
    boolean usePythonUdf = !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());
    if (useJavascriptUdf && usePythonUdf) {
      throw new IllegalArgumentException(
          "Either javascript or Python gcs path must be provided, but not both.");
    }

    /*
     * Steps:
     *  1) Read from the text source continuously.
     *  2) Convert to FailsafeElement.
     *  3) Apply Javascript udf transformation.
     *    - Tag records that were successfully transformed and those
     *      that failed transformation.
     *  4) Convert records to TableRow.
     *    - Tag records that were successfully converted and those
     *      that failed conversion.
     *  5) Insert successfully converted records into BigQuery.
     *    - Errors encountered while streaming will be sent to deadletter table.
     *  6) Insert records that failed into deadletter table.
     */

    PCollection<String> sourceRead =
        pipeline.apply(
            TextIO.read()
                .from(options.getInputFilePattern())
                .watchForNewFiles(DEFAULT_POLL_INTERVAL, Growth.never()));
    PCollectionTuple transformedOutput;
    if (usePythonUdf) {
      transformedOutput =
          sourceRead
              .apply(
                  "MapToRecord",
                  PythonExternalTextTransformer.FailsafeRowPythonExternalUdf
                      .stringMappingFunction())
              .setRowSchema(PythonExternalTextTransformer.FailsafeRowPythonExternalUdf.ROW_SCHEMA)
              .apply(
                  "InvokeUDF",
                  PythonExternalTextTransformer.FailsafePythonExternalUdf.newBuilder()
                      .setFileSystemPath(options.getPythonExternalTextTransformGcsPath())
                      .setFunctionName(options.getPythonExternalTextTransformFunctionName())
                      .build())
              .apply(
                  ParDo.of(new RowToStringFailsafeElementFn(UDF_OUT, UDF_DEADLETTER_OUT))
                      .withOutputTags(UDF_OUT, TupleTagList.of(UDF_DEADLETTER_OUT)));

    } else {
      transformedOutput =
          pipeline

              // 1) Read from the text source continuously.
              .apply(
                  "ReadFromSource",
                  TextIO.read()
                      .from(options.getInputFilePattern())
                      .watchForNewFiles(DEFAULT_POLL_INTERVAL, Growth.never()))

              // 2) Convert to FailsafeElement.
              .apply(
                  "ConvertToFailsafeElement",
                  MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                      .via(input -> FailsafeElement.of(input, input)))

              // 3) Apply Javascript udf transformation.
              .apply(
                  "ApplyUDFTransformation",
                  FailsafeJavascriptUdf.<String>newBuilder()
                      .setFileSystemPath(options.getJavascriptTextTransformGcsPath())
                      .setFunctionName(options.getJavascriptTextTransformFunctionName())
                      .setReloadIntervalMinutes(
                          options.getJavascriptTextTransformReloadIntervalMinutes())
                      .setSuccessTag(UDF_OUT)
                      .setFailureTag(UDF_DEADLETTER_OUT)
                      .build());
    }

    PCollectionTuple convertedTableRows =
        transformedOutput

            // 4) Convert records to TableRow.
            .get(UDF_OUT)
            .apply(
                "ConvertJSONToTableRow",
                FailsafeJsonToTableRow.<String>newBuilder()
                    .setSuccessTag(TRANSFORM_OUT)
                    .setFailureTag(TRANSFORM_DEADLETTER_OUT)
                    .build());

    WriteResult writeResult =
        convertedTableRows

            // 5) Insert successfully converted records into BigQuery.
            .get(TRANSFORM_OUT)
            .apply(
                "InsertIntoBigQuery",
                BigQueryIO.writeTableRows()
                    .withJsonSchema(GCSUtils.getGcsFileAsString(options.getJSONPath()))
                    .to(options.getOutputTable())
                    .withExtendedErrorInfo()
                    .withoutValidation()
                    .withCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
                    .withWriteDisposition(WriteDisposition.WRITE_APPEND)
                    .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors())
                    .withCustomGcsTempLocation(
                        StaticValueProvider.of(options.getBigQueryLoadingTemporaryDirectory())));

    // Elements that failed inserts into BigQuery are extracted and converted to FailsafeElement
    PCollection<FailsafeElement<String, String>> failedInserts =
        BigQueryIOUtils.writeResultToBigQueryInsertErrors(writeResult, options)
            .apply(
                "WrapInsertionErrors",
                MapElements.into(FAILSAFE_ELEMENT_CODER.getEncodedTypeDescriptor())
                    .via(TextToBigQueryStreaming::wrapBigQueryInsertError));

    // 6) Insert records that failed transformation or conversion into deadletter table
    PCollectionList.of(
            ImmutableList.of(
                transformedOutput.get(UDF_DEADLETTER_OUT),
                convertedTableRows.get(TRANSFORM_DEADLETTER_OUT),
                failedInserts))
        .apply("Flatten", Flatten.pCollections())
        .apply(
            "WriteFailedRecords",
            WriteStringMessageErrors.newBuilder()
                .setErrorRecordsTable(
                    StringUtils.isNotEmpty(options.getOutputDeadletterTable())
                        ? options.getOutputDeadletterTable()
                        : options.getOutputTable() + DEFAULT_DEADLETTER_TABLE_SUFFIX)
                .setErrorRecordsTableSchema(ResourceUtils.getDeadletterTableSchemaJson())
                .build());

    return pipeline.run();
  }

  /**
   * Method to wrap a {@link BigQueryInsertError} into a {@link FailsafeElement}.
   *
   * @param insertError BigQueryInsert error.
   * @return FailsafeElement object.
   * @throws IOException
   */
  static FailsafeElement<String, String> wrapBigQueryInsertError(BigQueryInsertError insertError) {

    FailsafeElement<String, String> failsafeElement;
    try {

      String rowPayload = JSON_FACTORY.toString(insertError.getRow());
      String errorMessage = JSON_FACTORY.toString(insertError.getError());

      failsafeElement = FailsafeElement.of(rowPayload, rowPayload);
      failsafeElement.setErrorMessage(errorMessage);

    } catch (IOException e) {
      throw new RuntimeException(e);
    }

    return failsafeElement;
  }

  /**
   * The {@link TextToBigQueryStreamingOptions} class provides the custom execution options passed
   * by the executor at the command-line.
   */
  public interface TextToBigQueryStreamingOptions
      extends TextIOToBigQuery.Options, BigQueryStorageApiStreamingOptions {
    @TemplateParameter.BigQueryTable(
        order = 1,
        optional = true,
        description = "The dead-letter table name to output failed messages to BigQuery",
        helpText =
            "Table for messages that failed to reach the output table. If a table doesn't exist, it is created during "
                + "pipeline execution. If not specified, `<outputTableSpec>_error_records` is used.",
        example = "<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>")
    String getOutputDeadletterTable();

    void setOutputDeadletterTable(String value);

    // Hide the UseStorageWriteApiAtLeastOnce in the UI, because it will automatically be turned
    // on when pipeline is running on ALO mode and using the Storage Write API
    @TemplateParameter.Boolean(
        order = 2,
        optional = true,
        parentName = "useStorageWriteApi",
        parentTriggerValues = {"true"},
        description = "Use at at-least-once semantics in BigQuery Storage Write API",
        helpText =
            "This parameter takes effect only if `Use BigQuery Storage Write API` is enabled. If"
                + " enabled the at-least-once semantics will be used for Storage Write API, otherwise"
                + " exactly-once semantics will be used.",
        hiddenUi = true)
    @Default.Boolean(false)
    @Override
    Boolean getUseStorageWriteApiAtLeastOnce();

    void setUseStorageWriteApiAtLeastOnce(Boolean value);
  }
}

Étape suivante