Modello di conversione del formato file (Avro, Parquet, CSV)

Il modello di conversione del formato file è una pipeline batch che converte i file archiviati su Cloud Storage da un formato supportato a un altro.

Sono supportate le seguenti conversioni di formato:

  • Da CSV ad Avro
  • Da CSV a Parquet
  • Da Avro a Parquet
  • Da Parquet ad Avro

Requisiti della pipeline

  • Il bucket Cloud Storage di output deve esistere prima dell'esecuzione della pipeline.

Parametri del modello

Parametro Descrizione
inputFileFormat Il formato del file di input. Deve essere uno dei seguenti: [csv, avro, parquet].
outputFileFormat Il formato del file di output. Deve essere uno dei seguenti: [avro, parquet].
inputFileSpec Il pattern del percorso Cloud Storage per i file di input. Ad esempio, gs://bucket-name/path/*.csv
outputBucket La cartella Cloud Storage in cui scrivere i file di output. Questo percorso deve terminare con una barra. Ad esempio, gs://bucket-name/output/
schema Il percorso Cloud Storage del file dello schema Avro. Ad esempio, gs://bucket-name/schema/my-schema.avsc
containsHeaders (Facoltativo) I file CSV di input contengono un record di intestazione (true/false). Il valore predefinito è false. Obbligatorio solo per la lettura dei file CSV.
csvFormat (Facoltativo) La specifica del formato CSV da utilizzare per l'analisi dei record. Il valore predefinito è Default. Per ulteriori dettagli, consulta Apache Commons CSV Format.
delimiter (Facoltativo) Il delimitatore di campo utilizzato dai file CSV di input.
outputFilePrefix (Facoltativo) Il prefisso del file di output. Il valore predefinito è output.
numShards (Facoltativo) Il numero di frammenti del file di output.

Esegui il modello

  1. Vai alla pagina Crea job da modello di Dataflow.
  2. Vai a Crea job da modello
  3. Nel campo Nome job, inserisci un nome univoco per il job.
  4. (Facoltativo) Per Endpoint a livello di regione, seleziona un valore dal menu a discesa. La regione predefinita è us-central1.

    Per un elenco delle regioni in cui puoi eseguire un job Dataflow, consulta Località di Dataflow.

  5. Nel menu a discesa Modello di flusso di dati, seleziona the Convert file formats template.
  6. Nei campi dei parametri forniti, inserisci i valori dei parametri.
  7. Fai clic su Esegui job.

Nella shell o nel terminale, esegui il modello:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/File_Format_Conversion \
    --parameters \
inputFileFormat=INPUT_FORMAT,\
outputFileFormat=OUTPUT_FORMAT,\
inputFileSpec=INPUT_FILES,\
schema=SCHEMA,\
outputBucket=OUTPUT_FOLDER

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • REGION_NAME: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • INPUT_FORMAT: il formato del file di input; deve essere uno dei seguenti: [csv, avro, parquet]
  • OUTPUT_FORMAT: il formato dei file di output; deve essere uno dei seguenti [avro, parquet]
  • INPUT_FILES: il pattern del percorso per i file di input
  • OUTPUT_FOLDER: la cartella Cloud Storage per i file di output
  • SCHEMA: il percorso del file dello schema Avro

Per eseguire il modello utilizzando l'API REST, invia una richiesta POST HTTP. Per ulteriori informazioni sull'API e sui relativi ambiti di autorizzazione, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileFormat": "INPUT_FORMAT",
          "outputFileFormat": "OUTPUT_FORMAT",
          "inputFileSpec": "INPUT_FILES",
          "schema": "SCHEMA",
          "outputBucket": "OUTPUT_FOLDER"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/File_Format_Conversion",
   }
}

Sostituisci quanto segue:

  • PROJECT_ID: l'ID del progetto Google Cloud in cui vuoi eseguire il job Dataflow
  • JOB_NAME: un nome di job univoco a tua scelta
  • LOCATION: la regione in cui vuoi eseguire il deployment del job Dataflow, ad esempio us-central1
  • VERSION: la versione del modello che vuoi utilizzare

    Puoi utilizzare i seguenti valori:

  • INPUT_FORMAT: il formato del file di input; deve essere uno dei seguenti: [csv, avro, parquet]
  • OUTPUT_FORMAT: il formato dei file di output; deve essere uno dei seguenti [avro, parquet]
  • INPUT_FILES: il pattern del percorso per i file di input
  • OUTPUT_FOLDER: la cartella Cloud Storage per i file di output
  • SCHEMA: il percorso del file dello schema Avro
Java
/*
 * Copyright (C) 2019 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.templates.FileFormatConversion.FileFormatConversionOptions;
import com.google.cloud.teleport.v2.transforms.AvroConverters.AvroOptions;
import com.google.cloud.teleport.v2.transforms.CsvConverters.CsvPipelineOptions;
import com.google.cloud.teleport.v2.transforms.ParquetConverters.ParquetOptions;
import java.util.EnumMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link FileFormatConversion} pipeline takes in an input file, converts it to a desired format
 * and saves it to Cloud Storage. Supported file transformations are:
 *
 * <ul>
 *   <li>Csv to Avro
 *   <li>Csv to Parquet
 *   <li>Avro to Parquet
 *   <li>Parquet to Avro
 * </ul>
 *
 * <p><b>Pipeline Requirements</b>
 *
 * <ul>
 *   <li>Input file exists in Google Cloud Storage.
 *   <li>Google Cloud Storage output bucket exists.
 * </ul>
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/file-format-conversion/README_File_Format_Conversion.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "File_Format_Conversion",
    category = TemplateCategory.UTILITIES,
    displayName = "Convert file formats between Avro, Parquet & CSV",
    description = {
      "The File Format Conversion template is a batch pipeline that converts files stored on Cloud Storage from one supported format to another.\n",
      "The following format conversions are supported:\n"
          + "- CSV to Avro\n"
          + "- CSV to Parquet\n"
          + "- Avro to Parquet\n"
          + "- Parquet to Avro"
    },
    optionsClass = FileFormatConversionOptions.class,
    optionalOptions = {"deadletterTable"},
    flexContainerName = "file-format-conversion",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/file-format-conversion",
    contactInformation = "https://cloud.google.com/support",
    requirements = {"The output Cloud Storage bucket must exist before running the pipeline."})
public class FileFormatConversion {

  /** Logger for class. */
  private static final Logger LOG = LoggerFactory.getLogger(FileFormatConversionFactory.class);

  private static EnumMap<ValidFileFormats, String> validFileFormats =
      new EnumMap<ValidFileFormats, String>(ValidFileFormats.class);

  /**
   * The {@link FileFormatConversionOptions} provides the custom execution options passed by the
   * executor at the command-line.
   */
  public interface FileFormatConversionOptions
      extends PipelineOptions, CsvPipelineOptions, AvroOptions, ParquetOptions {
    @TemplateParameter.Enum(
        order = 1,
        enumOptions = {
          @TemplateEnumOption("avro"),
          @TemplateEnumOption("csv"),
          @TemplateEnumOption("parquet")
        },
        description = "File format of the input files.",
        helpText = "File format of the input files. Needs to be either avro, parquet or csv.")
    @Required
    String getInputFileFormat();

    void setInputFileFormat(String inputFileFormat);

    @TemplateParameter.Enum(
        order = 2,
        enumOptions = {@TemplateEnumOption("avro"), @TemplateEnumOption("parquet")},
        description = "File format of the output files.",
        helpText = "File format of the output files. Needs to be either avro or parquet.")
    @Required
    String getOutputFileFormat();

    void setOutputFileFormat(String outputFileFormat);
  }

  /** The {@link ValidFileFormats} enum contains all valid file formats. */
  public enum ValidFileFormats {
    CSV,
    AVRO,
    PARQUET
  }

  /**
   * Main entry point for pipeline execution.
   *
   * @param args Command line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    FileFormatConversionOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(FileFormatConversionOptions.class);

    run(options);
  }

  /**
   * Runs the pipeline to completion with the specified options.
   *
   * @param options The execution options.
   * @return The pipeline result.
   * @throws RuntimeException thrown if incorrect file formats are passed.
   */
  public static PipelineResult run(FileFormatConversionOptions options) {
    String inputFileFormat = options.getInputFileFormat().toUpperCase();
    String outputFileFormat = options.getOutputFileFormat().toUpperCase();

    validFileFormats.put(ValidFileFormats.CSV, "CSV");
    validFileFormats.put(ValidFileFormats.AVRO, "AVRO");
    validFileFormats.put(ValidFileFormats.PARQUET, "PARQUET");

    if (!validFileFormats.containsValue(inputFileFormat)) {
      throw new IllegalArgumentException("Invalid input file format.");
    }
    if (!validFileFormats.containsValue(outputFileFormat)) {
      throw new IllegalArgumentException("Invalid output file format.");
    }
    if (inputFileFormat.equals(outputFileFormat)) {
      throw new IllegalArgumentException("Input and output file format cannot be the same.");
    }

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    pipeline.apply(
        inputFileFormat + " to " + outputFileFormat,
        FileFormatConversionFactory.FileFormat.newBuilder()
            .setOptions(options)
            .setInputFileFormat(inputFileFormat)
            .setOutputFileFormat(outputFileFormat)
            .build());

    return pipeline.run();
  }
}

Passaggi successivi