Template Konversi Format File (Avro, Parquet, CSV)

Template Konversi Format File adalah pipeline batch yang mengonversi file yang disimpan di Cloud Storage dari satu format yang didukung ke format lainnya.

Konversi format berikut didukung:

  • CSV ke Avro
  • CSV ke Parquet
  • Avro ke Parquet
  • Parquet ke Avro

Persyaratan pipeline

  • Bucket Cloud Storage output harus ada sebelum menjalankan pipeline.

Parameter template

Parameter Deskripsi
inputFileFormat Format file input. Harus berupa salah satu dari [csv, avro, parquet].
outputFileFormat Format file output. Harus berupa salah satu dari [avro, parquet].
inputFileSpec Pola jalur Cloud Storage untuk file input. Contoh, gs://bucket-name/path/*.csv
outputBucket Folder Cloud Storage untuk menulis file output. Jalur ini harus diakhiri dengan garis miring. Contoh, gs://bucket-name/output/
schema Jalur Cloud Storage ke file skema Avro. Contoh, gs://bucket-name/schema/my-schema.avsc
containsHeaders (Opsional) File CSV input berisi data header (true/false). Nilai defaultnya adalah false. Hanya diperlukan saat membaca file CSV.
csvFormat (Opsional) Spesifikasi format CSV yang akan digunakan untuk mengurai data. Nilai defaultnya adalah Default. Lihat Format CSV Apache Commons untuk mengetahui detail selengkapnya.
delimiter (Opsional) Pemisah kolom yang digunakan oleh file CSV input.
outputFilePrefix (Opsional) Awalan file output. Nilai defaultnya adalah output.
numShards (Opsional) Jumlah shard file output.

Menjalankan template

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Convert file formats template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

Di shell atau terminal, jalankan template:

gcloud dataflow flex-template run JOB_NAME \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --template-file-gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/flex/File_Format_Conversion \
    --parameters \
inputFileFormat=INPUT_FORMAT,\
outputFileFormat=OUTPUT_FORMAT,\
inputFileSpec=INPUT_FILES,\
schema=SCHEMA,\
outputBucket=OUTPUT_FOLDER

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • INPUT_FORMAT: format file dari file input; harus salah satu dari [csv, avro, parquet]
  • OUTPUT_FORMAT: format file file output; harus salah satu dari [avro, parquet]
  • INPUT_FILES: pola jalur untuk file input
  • OUTPUT_FOLDER: folder Cloud Storage Anda untuk file output
  • SCHEMA: jalur ke file skema Avro

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/flexTemplates:launch
{
   "launch_parameter": {
      "jobName": "JOB_NAME",
      "parameters": {
          "inputFileFormat": "INPUT_FORMAT",
          "outputFileFormat": "OUTPUT_FORMAT",
          "inputFileSpec": "INPUT_FILES",
          "schema": "SCHEMA",
          "outputBucket": "OUTPUT_FOLDER"
      },
      "containerSpecGcsPath": "gs://dataflow-templates-LOCATION/VERSION/flex/File_Format_Conversion",
   }
}

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • INPUT_FORMAT: format file dari file input; harus salah satu dari [csv, avro, parquet]
  • OUTPUT_FORMAT: format file file output; harus salah satu dari [avro, parquet]
  • INPUT_FILES: pola jalur untuk file input
  • OUTPUT_FOLDER: folder Cloud Storage Anda untuk file output
  • SCHEMA: jalur ke file skema Avro
Java
/*
 * Copyright (C) 2019 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.v2.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.templates.FileFormatConversion.FileFormatConversionOptions;
import com.google.cloud.teleport.v2.transforms.AvroConverters.AvroOptions;
import com.google.cloud.teleport.v2.transforms.CsvConverters.CsvPipelineOptions;
import com.google.cloud.teleport.v2.transforms.ParquetConverters.ParquetOptions;
import java.util.EnumMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link FileFormatConversion} pipeline takes in an input file, converts it to a desired format
 * and saves it to Cloud Storage. Supported file transformations are:
 *
 * <ul>
 *   <li>Csv to Avro
 *   <li>Csv to Parquet
 *   <li>Avro to Parquet
 *   <li>Parquet to Avro
 * </ul>
 *
 * <p><b>Pipeline Requirements</b>
 *
 * <ul>
 *   <li>Input file exists in Google Cloud Storage.
 *   <li>Google Cloud Storage output bucket exists.
 * </ul>
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/file-format-conversion/README_File_Format_Conversion.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "File_Format_Conversion",
    category = TemplateCategory.UTILITIES,
    displayName = "Convert file formats between Avro, Parquet & CSV",
    description = {
      "The File Format Conversion template is a batch pipeline that converts files stored on Cloud Storage from one supported format to another.\n",
      "The following format conversions are supported:\n"
          + "- CSV to Avro\n"
          + "- CSV to Parquet\n"
          + "- Avro to Parquet\n"
          + "- Parquet to Avro"
    },
    optionsClass = FileFormatConversionOptions.class,
    optionalOptions = {"deadletterTable"},
    flexContainerName = "file-format-conversion",
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/file-format-conversion",
    contactInformation = "https://cloud.google.com/support",
    requirements = {"The output Cloud Storage bucket must exist before running the pipeline."})
public class FileFormatConversion {

  /** Logger for class. */
  private static final Logger LOG = LoggerFactory.getLogger(FileFormatConversionFactory.class);

  private static EnumMap<ValidFileFormats, String> validFileFormats =
      new EnumMap<ValidFileFormats, String>(ValidFileFormats.class);

  /**
   * The {@link FileFormatConversionOptions} provides the custom execution options passed by the
   * executor at the command-line.
   */
  public interface FileFormatConversionOptions
      extends PipelineOptions, CsvPipelineOptions, AvroOptions, ParquetOptions {
    @TemplateParameter.Enum(
        order = 1,
        enumOptions = {
          @TemplateEnumOption("avro"),
          @TemplateEnumOption("csv"),
          @TemplateEnumOption("parquet")
        },
        description = "File format of the input files.",
        helpText = "File format of the input files. Needs to be either avro, parquet or csv.")
    @Required
    String getInputFileFormat();

    void setInputFileFormat(String inputFileFormat);

    @TemplateParameter.Enum(
        order = 2,
        enumOptions = {@TemplateEnumOption("avro"), @TemplateEnumOption("parquet")},
        description = "File format of the output files.",
        helpText = "File format of the output files. Needs to be either avro or parquet.")
    @Required
    String getOutputFileFormat();

    void setOutputFileFormat(String outputFileFormat);
  }

  /** The {@link ValidFileFormats} enum contains all valid file formats. */
  public enum ValidFileFormats {
    CSV,
    AVRO,
    PARQUET
  }

  /**
   * Main entry point for pipeline execution.
   *
   * @param args Command line arguments to the pipeline.
   */
  public static void main(String[] args) {
    UncaughtExceptionLogger.register();

    FileFormatConversionOptions options =
        PipelineOptionsFactory.fromArgs(args)
            .withValidation()
            .as(FileFormatConversionOptions.class);

    run(options);
  }

  /**
   * Runs the pipeline to completion with the specified options.
   *
   * @param options The execution options.
   * @return The pipeline result.
   * @throws RuntimeException thrown if incorrect file formats are passed.
   */
  public static PipelineResult run(FileFormatConversionOptions options) {
    String inputFileFormat = options.getInputFileFormat().toUpperCase();
    String outputFileFormat = options.getOutputFileFormat().toUpperCase();

    validFileFormats.put(ValidFileFormats.CSV, "CSV");
    validFileFormats.put(ValidFileFormats.AVRO, "AVRO");
    validFileFormats.put(ValidFileFormats.PARQUET, "PARQUET");

    if (!validFileFormats.containsValue(inputFileFormat)) {
      throw new IllegalArgumentException("Invalid input file format.");
    }
    if (!validFileFormats.containsValue(outputFileFormat)) {
      throw new IllegalArgumentException("Invalid output file format.");
    }
    if (inputFileFormat.equals(outputFileFormat)) {
      throw new IllegalArgumentException("Input and output file format cannot be the same.");
    }

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    pipeline.apply(
        inputFileFormat + " to " + outputFileFormat,
        FileFormatConversionFactory.FileFormat.newBuilder()
            .setOptions(options)
            .setInputFileFormat(inputFileFormat)
            .setOutputFileFormat(outputFileFormat)
            .build());

    return pipeline.run();
  }
}

Langkah berikutnya