Modello di testo di Cloud Storage in BigQuery con UDF Python

La pipeline Cloud Storage Text to BigQuery con UDF Python è una pipeline batch che legge i file di testo archiviati in Cloud Storage, li trasforma utilizzando una funzione UDF (definita dall'utente) di Python e aggiunge il risultato a una tabella BigQuery.

Requisiti della pipeline

  • Crea un file JSON che descriva lo schema BigQuery.

    Assicurati che esista un array JSON di primo livello denominato BigQuery Schema e che i suoi contenuti rispettino il pattern {"name": "COLUMN_NAME", "type": "DATA_TYPE"}.

    Il modello batch di testo Cloud Storage a BigQuery non supporta l'importazione dei dati nei campi STRUCT (Record) della tabella BigQuery di destinazione.

    Il seguente JSON descrive uno schema BigQuery di esempio:

      "BigQuery Schema": [
          "name": "name",
          "type": "STRING"
          "name": "age",
          "type": "INTEGER"
  • Crea un file Python (.py) con la tua funzione UDF che fornisca la logica per trasformare le righe di testo. La funzione deve restituire una stringa JSON.

    Ad esempio, questa funzione suddivide ogni riga di un file CSV e restituisce una stringa JSON dopo aver trasformato i valori.

    import json
    def process(value):
      data = value.split(',')
      obj = { 'name': data[0], 'age': int(data[1]) }
      return json.dumps(obj)

Parametri del modello

Parametro Descrizione
JSONPath Il percorso gs:// del file JSON che definisce lo schema BigQuery, archiviato in Cloud Storage. Ad esempio, gs://path/to/my/schema.json.
pythonExternalTextTransformGcsPath L'URI di Cloud Storage del file di codice Python che definisce la funzione definita dall'utente (UDF) che vuoi utilizzare. Ad esempio, gs://my-bucket/my-udfs/
pythonExternalTextTransformFunctionName Il nome della funzione definita dall'utente (UDF) di Python che vuoi utilizzare.
inputFilePattern Il percorso gs:// del testo in Cloud Storage che vuoi elaborare. Ad esempio gs://path/to/my/text/data.txt.
outputTable Il nome della tabella BigQuery che vuoi creare per archiviare i dati elaborati. Se riutilizzi una tabella BigQuery esistente, i dati vengono aggiunti alla tabella di destinazione. Ad esempio,
bigQueryLoadingTemporaryDirectory La directory temporanea per il processo di caricamento di BigQuery. Ad esempio, gs://my-bucket/my-files/temp_dir.
useStorageWriteApi Facoltativo: se true, la pipeline utilizza l' API BigQuery Storage Write. Il valore predefinito è false. Per ulteriori informazioni, consulta Utilizzare l'API Storage Write.
useStorageWriteApiAtLeastOnce (Facoltativo) Quando utilizzi l'API Storage Write, specifica la semantica di scrittura. Per utilizzare la semantica almeno una volta, imposta questo parametro su true. Per utilizzare la semantica esattamente una volta, imposta il parametro su false. Questo parametro si applica solo quando useStorageWriteApi è true. Il valore predefinito è false.

Funzione definita dall'utente

Se vuoi, puoi estendere questo modello scrivendo una funzione definita dall'utente (UDF). Il modello chiama la UDF per ogni elemento di input. I payload degli elementi vengono serializzati come stringhe JSON. Per ulteriori informazioni, consulta Creare funzioni predefinite dall'utente per i modelli Dataflow.

Specifiche della funzione

La UDF ha la seguente specifica:

  • Input: una riga di testo di un file di input Cloud Storage.
  • Output: una stringa JSON che corrisponde allo schema della tabella di destinazione BigQuery.

Esegui il modello

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the Text Files on Cloud Storage to BigQuery with Python UDF (Batch) template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --template-file-gcs-location gs://dataflow-templates-REGION_NAME/VERSION/flex/GCS_Text_to_BigQuery_Xlang \
    --region REGION_NAME \
    --parameters \

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • PYTHON_FUNCTION: il nome della funzione definita dall'utente (UDF) di Python che vuoi utilizzare.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: il percorso di Cloud Storage al file JSON contenente la definizione dello schema
  • PATH_TO_PYTHON_UDF_FILE: l'URI Cloud Storage del file di codice Python che definisce la funzione definita dall'utente (UDF) che vuoi utilizzare. Ad esempio, gs://my-bucket/my-udfs/
  • PATH_TO_TEXT_DATA: il percorso Cloud Storage al tuo set di dati di testo
  • BIGQUERY_TABLE: il nome della tabella BigQuery
  • PATH_TO_TEMP_DIR_ON_GCS: il percorso di Cloud Storage alla directory temporanea

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
        "pythonExternalTextTransformFunctionName": "PYTHON_FUNCTION",
        "pythonExternalTextTransformGcsPath": "PATH_TO_PYTHON_UDF_FILE",
        "bigQueryLoadingTemporaryDirectory": "PATH_TO_TEMP_DIR_ON_GCS"
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/GCS_Text_to_BigQuery_Xlang",

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • PYTHON_FUNCTION: il nome della funzione definita dall'utente (UDF) di Python che vuoi utilizzare.
  • PATH_TO_BIGQUERY_SCHEMA_JSON: il percorso di Cloud Storage al file JSON contenente la definizione dello schema
  • PATH_TO_PYTHON_UDF_FILE: l'URI Cloud Storage del file di codice Python che definisce la funzione definita dall'utente (UDF) che vuoi utilizzare. Ad esempio, gs://my-bucket/my-udfs/
  • PATH_TO_TEXT_DATA: il percorso Cloud Storage al tuo set di dati di testo
  • BIGQUERY_TABLE: il nome della tabella BigQuery
  • PATH_TO_TEMP_DIR_ON_GCS: il percorso di Cloud Storage alla directory temporanea
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.util.StreamUtils;
import org.apache.beam.sdk.values.PCollection;
import org.json.JSONArray;
import org.json.JSONObject;

 * Templated pipeline to read text from TextIO, apply a javascript UDF to it, and write it to GCS.
 * <p>Check out <a
 * href="">README</a>
 * for instructions on how to use or modify this template.
      name = "GCS_Text_to_BigQuery_Flex",
      category = TemplateCategory.BATCH,
      displayName = "Text Files on Cloud Storage to BigQuery with BigQuery Storage API support",
      description =
          "The Cloud Storage Text to BigQuery pipeline is a batch pipeline that allows you to read text files stored in "
              + "Cloud Storage, transform them using a JavaScript User Defined Function (UDF) that you provide, and append the result to a BigQuery table.",
      optionsClass = TextIOToBigQuery.Options.class,
      skipOptions = {
      documentation =
      flexContainerName = "text-to-bigquery",
      contactInformation = "",
      requirements = {
        "Create a JSON file that describes your {{bigquery_name_short}} schema.\n"
            + "    <p>Ensure that there is a top-level JSON array titled <code>BigQuery Schema</code> and that its\n"
            + "      contents follow the pattern <code>{\"name\": \"COLUMN_NAME\", \"type\": \"DATA_TYPE\"}</code>.</p>\n"
            + "    <p>The following JSON describes an example BigQuery schema:</p>\n"
            + "<pre class=\"prettyprint lang-json\">\n"
            + "{\n"
            + "  \"BigQuery Schema\": [\n"
            + "    {\n"
            + "      \"name\": \"location\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"name\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"age\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"color\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"coffee\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    }\n"
            + "  ]\n"
            + "}\n"
            + "</pre>",
        "Create a JavaScript (<code>.js</code>) file with your UDF function that supplies the logic\n"
            + "    to transform the lines of text. Your function must return a JSON string.\n"
            + "    <p>For example, this function splits each line of a CSV file and returns a JSON string after\n"
            + "      transforming the values.</p>\n"
            + "<pre class=\"prettyprint\" suppresswarning>\n"
            + "function transform(line) {\n"
            + "var values = line.split(',');\n"
            + "\n"
            + "var obj = new Object();\n"
            + "obj.location = values[0];\n"
            + " = values[1];\n"
            + "obj.age = values[2];\n"
            + "obj.color = values[3];\n"
            + " = values[4];\n"
            + "var jsonString = JSON.stringify(obj);\n"
            + "\n"
            + "return jsonString;\n"
            + "}</pre>"
      name = "GCS_Text_to_BigQuery_Xlang",
      category = TemplateCategory.BATCH,
      displayName =
          "Text Files on Cloud Storage to BigQuery with BigQuery Storage API & Python UDF support",
      type = Template.TemplateType.XLANG,
      description =
          "The Cloud Storage Text to BigQuery pipeline is a batch pipeline that allows you to read text files stored in "
              + "Cloud Storage, transform them using a Python User Defined Function (UDF) that you provide, and append the result to a BigQuery table.",
      optionsClass = TextIOToBigQuery.Options.class,
      skipOptions = {
      optionalOptions = {"javascriptTextTransformGcsPath", "javascriptTextTransformFunctionName"},
      documentation =
      flexContainerName = "text-to-bigquery-xlang",
      contactInformation = "",
      requirements = {
        "Create a JSON file that describes your {{bigquery_name_short}} schema.\n"
            + "    <p>Ensure that there is a top-level JSON array titled <code>BigQuery Schema</code> and that its\n"
            + "      contents follow the pattern <code>{\"name\": \"COLUMN_NAME\", \"type\": \"DATA_TYPE\"}</code>.</p>\n"
            + "    <p>The following JSON describes an example BigQuery schema:</p>\n"
            + "<pre class=\"prettyprint lang-json\">\n"
            + "{\n"
            + "  \"BigQuery Schema\": [\n"
            + "    {\n"
            + "      \"name\": \"location\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"name\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"age\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"color\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"coffee\",\n"
            + "      \"type\": \"STRING\"\n"
            + "    }\n"
            + "  ]\n"
            + "}\n"
            + "</pre>",
        "Create a JavaScript (<code>.js</code>) file with your UDF function that supplies the logic\n"
            + "    to transform the lines of text. Your function must return a JSON string.\n"
            + "    <p>For example, this function splits each line of a CSV file and returns a JSON string after\n"
            + "      transforming the values.</p>\n"
            + "<pre class=\"prettyprint\" suppresswarning>\n"
            + "function transform(line) {\n"
            + "var values = line.split(',');\n"
            + "\n"
            + "var obj = new Object();\n"
            + "obj.location = values[0];\n"
            + " = values[1];\n"
            + "obj.age = values[2];\n"
            + "obj.color = values[3];\n"
            + " = values[4];\n"
            + "var jsonString = JSON.stringify(obj);\n"
            + "\n"
            + "return jsonString;\n"
            + "}</pre>"
public class TextIOToBigQuery {

  /** Options supported by {@link TextIOToBigQuery}. */
  public interface Options
      extends DataflowPipelineOptions,
          BigQueryStorageApiBatchOptions {
        order = 1,
        groupName = "Source",
        optional = false,
        description = "The GCS location of the text you'd like to process",
        helpText = "The gs:// path to the text in Cloud Storage you'd like to process.",
        example = "gs://your-bucket/your-file.txt")
    String getInputFilePattern();

    void setInputFilePattern(String value);

        order = 2,
        optional = false,
        description = "JSON file with BigQuery Schema description",
        helpText =
            "The gs:// path to the JSON file that defines your BigQuery schema, stored in Cloud Storage.",
        example = "gs://your-bucket/your-schema.json")
    String getJSONPath();

    void setJSONPath(String value);

        order = 3,
        optional = false,
        groupName = "Target",
        description = "Output table to write to",
        helpText =
            "The location of the BigQuery table to use to store the processed data. If you reuse an existing table, it is overwritten.",
        example = "<PROJECT_ID>:<DATASET_NAME>.<TABLE_NAME>")
    String getOutputTable();

    void setOutputTable(String value);

        order = 4,
        optional = false,
        description = "GCS path to javascript fn for transforming output",
        helpText =
            "The Cloud Storage URI of the `.js` file that defines the JavaScript user-defined function (UDF) you want to use.",
        example = "gs://your-bucket/your-transforms/*.js")
    String getJavascriptTextTransformGcsPath();

    void setJavascriptTextTransformGcsPath(String jsTransformPath);

        order = 5,
        optional = false,
        regexes = {"[a-zA-Z0-9_]+"},
        description = "UDF Javascript Function Name",
        helpText =
            "The name of the JavaScript user-defined function (UDF) that you want to use. For example, if your JavaScript function code is `myTransform(inJson) { /* stuff...*/ }`, then the function name is `myTransform`. For sample JavaScript UDFs, see UDF Examples (",
        example = "transform_udf1")
    String getJavascriptTextTransformFunctionName();

    void setJavascriptTextTransformFunctionName(String javascriptTextTransformFunctionName);

        order = 6,
        optional = false,
        description = "Temporary directory for BigQuery loading process",
        helpText = "Temporary directory for BigQuery loading process.",
        example = "gs://your-bucket/your-files/temp-dir")
    String getBigQueryLoadingTemporaryDirectory();

    void setBigQueryLoadingTemporaryDirectory(String directory);

  private static final String BIGQUERY_SCHEMA = "BigQuery Schema";

  private static final String NAME = "name";
  private static final String TYPE = "type";
  private static final String MODE = "mode";
  private static final String RECORD_TYPE = "RECORD";
  private static final String FIELDS_ENTRY = "fields";

  public static void main(String[] args) {

    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    run(options, () -> writeToBQTransform(options));

   * Create the pipeline with the supplied options.
   * @param options The execution parameters to the pipeline.
   * @param writeToBQ the transform that outputs {@link TableRow}s to BigQuery.
   * @return The result of the pipeline execution.
  static PipelineResult run(Options options, Supplier<Write<TableRow>> writeToBQ) {

    Pipeline pipeline = Pipeline.create(options);

    boolean useJavascriptUdf = !Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath());
    boolean usePythonUdf = !Strings.isNullOrEmpty(options.getPythonExternalTextTransformGcsPath());
    if (useJavascriptUdf && usePythonUdf) {
      throw new IllegalArgumentException(
          "Either javascript or Python gcs path must be provided, but not both.");

    PCollection<String> source =
        pipeline.apply("Read from source",;
    PCollection<TableRow> udfOut;

    if (usePythonUdf) {
      udfOut =
                  ParDo.of(new PythonExternalTextTransformer.RowToTableRowElementFn()));
    } else {
      udfOut =
                      new SimpleFunction<String, TableRow>() {
                        public TableRow apply(String json) {
                          return BigQueryConverters.convertJsonToTableRow(json);

    udfOut.apply("Insert into Bigquery", writeToBQ.get());


  /** Create the {@link Write} transform that outputs the collection to BigQuery. */
  static Write<TableRow> writeToBQTransform(Options options) {
    return BigQueryIO.writeTableRows()

  /** Parse BigQuery schema from a Json file. */
  private static TableSchema parseSchema(String jsonPath) {
    TableSchema tableSchema = new TableSchema();
    List<TableFieldSchema> fields = new ArrayList<>();

    JSONObject jsonSchema = parseJson(jsonPath);

    JSONArray bqSchemaJsonArray = jsonSchema.getJSONArray(BIGQUERY_SCHEMA);

    for (int i = 0; i < bqSchemaJsonArray.length(); i++) {
      JSONObject inputField = bqSchemaJsonArray.getJSONObject(i);

    return tableSchema;

   * Convert a JSONObject from the Schema JSON to a TableFieldSchema. In case of RECORD, it handles
   * it recursively.
   * @param inputField Input field to convert.
   * @return TableFieldSchema instance to populate the schema.
  private static TableFieldSchema convertToTableFieldSchema(JSONObject inputField) {
    TableFieldSchema field =
        new TableFieldSchema()

    if (inputField.has(MODE)) {

    if (inputField.getString(TYPE) != null && inputField.getString(TYPE).equals(RECORD_TYPE)) {
      List<TableFieldSchema> nestedFields = new ArrayList<>();
      JSONArray fieldsArr = inputField.getJSONArray(FIELDS_ENTRY);
      for (int i = 0; i < fieldsArr.length(); i++) {
        JSONObject nestedJSON = fieldsArr.getJSONObject(i);

    return field;

   * Parses a JSON file and returns a JSONObject containing the necessary source, sink, and schema
   * information.
   * @param pathToJson the JSON file location so we can download and parse it
   * @return the parsed JSONObject
  private static JSONObject parseJson(String pathToJson) {
    try {
      // accessing GCS needs to be done after the pipeline create call, otherwise FileSystems
      // doesn't know about GCS.
      ReadableByteChannel readableByteChannel =
, false));
      String json =
          new String(
      return new JSONObject(json);
    } catch (Exception e) {
      throw new RuntimeException(e);

