Bulk Compress Cloud Storage Files テンプレート

Bulk Compress Cloud Storage Files テンプレートは、Cloud Storage 上のファイルを指定した場所に圧縮するバッチ パイプラインです。このテンプレートは、定期的なアーカイブ プロセスの一環として大量のファイルを圧縮する必要がある場合に役立ちます。サポートされている圧縮モードは、BZIP2DEFLATEGZIP です。出力先の場所に出力されるファイルは、元のファイル名の命名スキーマに従って命名され、ファイル名の末尾に圧縮モードの拡張子が付加されます。付加される拡張子は、.bzip2.deflate.gz のいずれかです。

圧縮処理中に発生したエラーは、CSV 形式(ファイル名, エラー メッセージ)のエラーファイルに出力されます。パイプラインの実行中にエラーが発生しなくてもエラーファイルは作成されますが、ファイル内にエラーレコードはありません。

パイプラインの要件

  • 圧縮形式は、BZIP2DEFLATEGZIP のいずれかにすること。
  • パイプラインの実行前に出力ディレクトリが存在している必要があります。

テンプレートのパラメータ

パラメータ 説明
inputFilePattern 読み込み元の入力ファイルのパターン。例: gs://bucket-name/uncompressed/*.txt
outputDirectory 出力を書き込む場所。例: gs://bucket-name/compressed/
outputFailureFile 圧縮処理中に発生したエラーの書き込みに使用されるエラーログ出力ファイル。たとえば、gs://bucket-name/compressed/failed.csv とします。エラーが発生しなくてもファイルは作成されますが、その中身は空です。このファイルの内容は CSV 形式(ファイル名, エラー)であり、圧縮に失敗したファイルごとに 1 行が使用されます。
compression 一致するファイルを圧縮するために使用する圧縮アルゴリズム。BZIP2DEFLATEGZIP のいずれかにする必要があります。

テンプレートを実行する

コンソールgcloudAPI
  1. Dataflow の [テンプレートからジョブを作成] ページに移動します。
  2. [テンプレートからジョブを作成] に移動
  3. [ジョブ名] フィールドに、固有のジョブ名を入力します。
  4. (省略可)[リージョン エンドポイント] で、プルダウン メニューから値を選択します。デフォルトのリージョンは us-central1 です。

    Dataflow ジョブを実行できるリージョンのリストについては、Dataflow のロケーションをご覧ください。

  5. [Dataflow テンプレート] プルダウン メニューから、[ the Bulk Compress Files on Cloud Storage template] を選択します。
  6. 表示されたパラメータ フィールドに、パラメータ値を入力します。
  7. [ジョブを実行] をクリックします。

シェルまたはターミナルで、テンプレートを実行します。

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Bulk_Compress_GCS_Files \
    --region REGION_NAME \
    --parameters \
inputFilePattern=gs://BUCKET_NAME/uncompressed/*.txt,\
outputDirectory=gs://BUCKET_NAME/compressed,\
outputFailureFile=gs://BUCKET_NAME/failed/failure.csv,\
compression=COMPRESSION

次のように置き換えます。

  • JOB_NAME: 一意の任意のジョブ名
  • REGION_NAME: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • BUCKET_NAME: Cloud Storage バケットの名前
  • COMPRESSION: 任意の圧縮アルゴリズム

REST API を使用してテンプレートを実行するには、HTTP POST リクエストを送信します。API とその認証スコープの詳細については、projects.templates.launch をご覧ください。

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Bulk_Compress_GCS_Files
{
   "jobName": "JOB_NAME",
   "parameters": {
       "inputFilePattern": "gs://BUCKET_NAME/uncompressed/*.txt",
       "outputDirectory": "gs://BUCKET_NAME/compressed",
       "outputFailureFile": "gs://BUCKET_NAME/failed/failure.csv",
       "compression": "COMPRESSION"
   },
   "environment": { "zone": "us-central1-f" }
}

次のように置き換えます。

  • PROJECT_ID: Dataflow ジョブを実行する Google Cloud プロジェクト ID
  • JOB_NAME: 一意の任意のジョブ名
  • LOCATION: Dataflow ジョブをデプロイするリージョン(例: us-central1
  • VERSION: 使用するテンプレートのバージョン

    使用できる値は次のとおりです。

    • latest: 最新バージョンのテンプレートを使用します。このテンプレートは、バケット内で日付のない親フォルダ(gs://dataflow-templates-REGION_NAME/latest/)にあります。
    • バージョン名(例: 2023-09-12-00_RC00)。特定のバージョンのテンプレートを使用します。このテンプレートは、バケット内で対応する日付の親フォルダ(gs://dataflow-templates-REGION_NAME/)にあります。
  • BUCKET_NAME: Cloud Storage バケットの名前
  • COMPRESSION: 任意の圧縮アルゴリズム
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.templates.BulkCompressor.Options;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link BulkCompressor} is a batch pipeline that compresses files on matched by an input file
 * pattern and outputs them to a specified file location. This pipeline can be useful when you need
 * to compress large batches of files as part of a periodic archival process. The supported
 * compression modes are: <code>BZIP2</code>, <code>DEFLATE</code>, <code>GZIP</code>. Files output
 * to the destination location will follow a naming schema of original filename appended with the
 * compression mode extension. The extensions appended will be one of: <code>.bzip2</code>, <code>
 * .deflate</code>, <code>.gz</code> as determined by the compression type.
 *
 * <p>Any errors which occur during the compression process will be output to the failure file in
 * CSV format of filename, error message. If no failures occur during execution, the error file will
 * still be created but will contain no error records.
 *
 * <p><b>Pipeline Requirements</b>
 *
 * <ul>
 *   <li>The compression must be in one of the following formats: <code>BZIP2</code>, <code>DEFLATE
 *       </code>, <code>GZIP</code>, <code>ZIP</code>.
 *   <li>The output directory must exist prior to pipeline execution.
 * </ul>
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Bulk_Compress_GCS_Files.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Bulk_Compress_GCS_Files",
    category = TemplateCategory.UTILITIES,
    displayName = "Bulk Compress Files on Cloud Storage",
    description = {
      "The Bulk Compress Cloud Storage Files template is a batch pipeline that compresses files on Cloud Storage to a specified location. "
          + "This template can be useful when you need to compress large batches of files as part of a periodic archival process. "
          + "The supported compression modes are: BZIP2, DEFLATE, GZIP. Files output to the destination location will follow a naming schema of original filename appended with the compression mode extension. The extensions appended will be one of: .bzip2, .deflate, .gz.",
      "Any errors which occur during the compression process will be output to the failure file in CSV format of filename, error message. "
          + "If no failures occur while running the pipeline, the error file will still be created but will contain no error records."
    },
    optionsClass = Options.class,
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/bulk-compress-cloud-storage",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The compression must be in one of the following formats: `BZIP2`, `DEFLATE`, `GZIP`.",
      "The output directory must exist prior to running the pipeline."
    })
public class BulkCompressor {

  /** The logger to output status messages to. */
  private static final Logger LOG = LoggerFactory.getLogger(BulkCompressor.class);

  /** The tag used to identify the main output of the {@link Compressor}. */
  private static final TupleTag<String> COMPRESSOR_MAIN_OUT = new TupleTag<String>() {};

  /** The tag used to identify the dead-letter output of the {@link Compressor}. */
  private static final TupleTag<KV<String, String>> DEADLETTER_TAG =
      new TupleTag<KV<String, String>>() {};

  /**
   * The {@link Options} class provides the custom execution options passed by the executor at the
   * command-line.
   */
  public interface Options extends PipelineOptions {
    @TemplateParameter.GcsReadFile(
        order = 1,
        groupName = "Source",
        description = "Input Cloud Storage File(s)",
        helpText = "The Cloud Storage location of the files you'd like to process.",
        example = "gs://your-bucket/your-files/*.txt")
    @Required
    ValueProvider<String> getInputFilePattern();

    void setInputFilePattern(ValueProvider<String> value);

    @TemplateParameter.GcsWriteFolder(
        order = 2,
        groupName = "Target",
        description = "Output file directory in Cloud Storage",
        helpText =
            "The path and filename prefix for writing output files. Must end with a slash. DateTime formatting is used to parse directory path for date & time formatters.",
        example = "gs://your-bucket/your-path")
    @Required
    ValueProvider<String> getOutputDirectory();

    void setOutputDirectory(ValueProvider<String> value);

    @TemplateParameter.GcsWriteFile(
        order = 3,
        groupName = "Target",
        description = "Output failure file",
        helpText =
            "The error log output file to use for write failures that occur during compression. The contents will be one line for "
                + "each file which failed compression. Note that this parameter will "
                + "allow the pipeline to continue processing in the event of a failure.",
        example = "gs://your-bucket/compressed/failed.csv")
    @Required
    ValueProvider<String> getOutputFailureFile();

    void setOutputFailureFile(ValueProvider<String> value);

    @TemplateParameter.Enum(
        order = 4,
        enumOptions = {
          @TemplateEnumOption("BZIP2"),
          @TemplateEnumOption("DEFLATE"),
          @TemplateEnumOption("GZIP")
        },
        description = "Compression",
        helpText =
            "The compression algorithm used to compress the matched files. Valid algorithms: BZIP2, DEFLATE, GZIP")
    @Required
    ValueProvider<Compression> getCompression();

    void setCompression(ValueProvider<Compression> value);

    @TemplateParameter.Text(
        order = 5,
        groupName = "Target",
        optional = true,
        regexes = {"^[A-Za-z_0-9.]*"},
        description = "Output filename suffix",
        helpText =
            "Output filename suffix of the files to write. Defaults to .bzip2, .deflate or .gz depending on the compression algorithm.")
    @Required
    ValueProvider<String> getOutputFilenameSuffix();

    void setOutputFilenameSuffix(ValueProvider<String> value);
  }

  /**
   * The main entry-point for pipeline execution. This method will start the pipeline but will not
   * wait for it's execution to finish. If blocking execution is required, use the {@link
   * BulkCompressor#run(Options)} method to start the pipeline and invoke {@code
   * result.waitUntilFinish()} on the {@link PipelineResult}.
   *
   * @param args The command-line args passed by the executor.
   */
  public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    run(options);
  }

  /**
   * Runs the pipeline to completion with the specified options. This method does not wait until the
   * pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
   * object to block until the pipeline is finished running if blocking programmatic execution is
   * required.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  public static PipelineResult run(Options options) {

    // Create the pipeline
    Pipeline pipeline = Pipeline.create(options);

    /*
     * Steps:
     *   1) Find all files matching the input pattern
     *   2) Compress the files found and output them to the output directory
     *   3) Write any errors to the failure output file
     */
    PCollectionTuple compressOut =
        pipeline
            .apply("Match File(s)", FileIO.match().filepattern(options.getInputFilePattern()))
            .apply(
                "Compress File(s)",
                ParDo.of(new Compressor(options.getOutputDirectory(), options.getCompression()))
                    .withOutputTags(COMPRESSOR_MAIN_OUT, TupleTagList.of(DEADLETTER_TAG)));

    compressOut
        .get(DEADLETTER_TAG)
        .apply(
            "Format Errors",
            MapElements.into(TypeDescriptors.strings())
                .via(kv -> String.format("%s,%s", kv.getKey(), kv.getValue())))
        .apply(
            "Write Error File",
            TextIO.write()
                .to(options.getOutputFailureFile())
                .withHeader("Filename,Error")
                .withoutSharding());

    return pipeline.run();
  }

  /**
   * The {@link Compressor} accepts {@link MatchResult.Metadata} from the FileSystems API and
   * compresses each file to an output location. Any compression failures which occur during
   * execution will be output to a separate output for further processing.
   */
  @SuppressWarnings("serial")
  public static class Compressor extends DoFn<MatchResult.Metadata, String> {

    private final ValueProvider<String> destinationLocation;
    private final ValueProvider<Compression> compressionValue;

    Compressor(ValueProvider<String> destinationLocation, ValueProvider<Compression> compression) {
      this.destinationLocation = destinationLocation;
      this.compressionValue = compression;
    }

    @ProcessElement
    public void processElement(ProcessContext context) {
      ResourceId inputFile = context.element().resourceId();
      Compression compression = compressionValue.get();
      Options options = context.getPipelineOptions().as(Options.class);
      String outputFilename;

      // Add the extension to the output filename.
      if (options.getOutputFilenameSuffix() != null
          && options.getOutputFilenameSuffix().isAccessible()
          && options.getOutputFilenameSuffix().get() != null) {
        // Use suffix parameter. Example: demo.txt -> demo.txt.foo
        outputFilename = inputFile.getFilename() + options.getOutputFilenameSuffix().get();
      } else {
        // Use compression extension. Example: demo.txt -> demo.txt.gz
        outputFilename = inputFile.getFilename() + compression.getSuggestedSuffix();
      }

      // Resolve the necessary resources to perform the transfer
      ResourceId outputDir = FileSystems.matchNewResource(destinationLocation.get(), true);
      ResourceId outputFile =
          outputDir.resolve(outputFilename, StandardResolveOptions.RESOLVE_FILE);
      ResourceId tempFile =
          outputDir.resolve("temp-" + outputFilename, StandardResolveOptions.RESOLVE_FILE);

      // Perform the copy of the compressed channel to the destination.
      try (ReadableByteChannel readerChannel = FileSystems.open(inputFile)) {
        try (WritableByteChannel writerChannel =
            compression.writeCompressed(FileSystems.create(tempFile, MimeTypes.BINARY))) {

          // Execute the copy to the temporary file
          ByteStreams.copy(readerChannel, writerChannel);
        }

        // Rename the temporary file to the output file
        FileSystems.rename(ImmutableList.of(tempFile), ImmutableList.of(outputFile));

        // Output the path to the uncompressed file
        context.output(outputFile.toString());
      } catch (IOException e) {
        LOG.error("Error occurred during compression of {}", inputFile.toString(), e);
        context.output(DEADLETTER_TAG, KV.of(inputFile.toString(), e.getMessage()));
      }
    }
  }
}

次のステップ