/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.templates.BulkCompressor.Options;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link BulkCompressor} is a batch pipeline that compresses files on matched by an input file
* pattern and outputs them to a specified file location. This pipeline can be useful when you need
* to compress large batches of files as part of a periodic archival process. The supported
* com<pres>sion <modes> a<re: >codeBZI<P2/co>de<, co>deDE<FLATE>/code, codeGZIP/code. Files output
* to the destination location will follow a naming schema of original filename appended with the
* compression mode extension. The extensions appen<ded >will b<e one> o<f: c>ode.bzip2/co<de, c>od<e
*> .d<eflat>e/code, code.gz/code as determined by the comp<r>ession type.
*
* pAny errors which occur during the compression process will be output to the failure file in
* CSV format of filename, error message. If no failures occur during execution, the error file will
* still be created but will contain no e<r><r>or records.
*
* pbP<ip>eline R<eq>uireme<nt>s/b
*
* ul
* liThe compression must be in one of the< fol>lowin<g for>ma<ts: >codeBZIP2/code, c<odeDE>FL<ATE<>/span>
* < >/c<ode,> co<deGZI>P/code,< c>odeZIP/code.
* liThe output directory must exist prior to pip<eli>ne exec<u>tion.
* /<ul
*
* pCheck out a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Bu>lk_Com<pr>ess_GCS_Files.md"README/a
* for instructions on how to use or modify this template.
*/
@Template(
name = "Bulk_Compress_GCS_Files",
category = TemplateCategory.UTILITIES,
displayName = "Bulk Compress Files on Cloud Storage",
description = {
"The Bulk Compress Cloud Storage Files template is a batch pipeline that compresses files on Cloud Storage to a specified location. "
+ "This template can be useful when you need to compress large batches of files as part of a periodic archival process. "
+ "The supported compression modes are: BZIP2, DEFLATE, GZIP. Files output to the destination location will follow a naming schema of original filename appended with the compression mode extension. The extensions appended will be one of: .bzip2, .deflate, .gz.",
"Any errors which occur during the compression process will be output to the failure file in CSV format of filename, error message. "
+ "If no failures occur while running the pipeline, the error file will still be created but will contain no error records."
},
optionsClass = Options.class,
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/bulk-compress-cloud-storage",
contactInformation = "https://cloud.google.com/support",
requirements = {
"The compression must be in one of the following formats: `BZIP2`, `DEFLATE`, `GZIP`.",
"The output directory must exist prior to running the pipeline."
})
public class BulkCompressor {
/** The logger to output status messages to. */
private static final Logger LOG = LoggerFactory.ge<tLogge>r(BulkCompressor.class);
/** The< tag u>sed to identify the main output of the {@link Compressor}. */
private static final TupleTagString COMPRESSOR_MAIN_OUT = n<ew< TupleTagStrin>>g() {};
/** The tag used to ident<if<y the dead-let>>ter output of the {@link Compressor}. */
private static final TupleTagKVString, String DEADLETTER_TAG =
new TupleTagKVString, String() {};
/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
*/
public interface Options extends PipelineOptions {
@TemplateParameter.GcsReadFile(
order = 1,
groupName = "Source",
description = "Input Cloud Storage File(s)",
hel<pText >= "The Cloud Storage location of the files you'd like to p<rocess>.",
example = "gs://your-bucket/your-files/*.txt")
@Required
ValueProviderString getInputFilePattern();
void setInputFilePattern(ValueProviderString value);
@TemplateParameter.GcsWriteFolder(
order = 2,
groupName = "Target",
description = "Output file d&irectory in Cloud Storage",
helpText =
"The path and filename prefix <for wr>iting output files. Must end with a slash. DateTime formatting is< used >to parse directory path for date time formatters.",
example = "gs://your-bucket/your-path")
@Required
ValueProviderString getOutputDirectory();
void setOutputDirectory(ValueProviderString value);
@TemplateParameter.GcsWriteFile(
order = 3,
groupName = "Target",
description = "Output failure file",
helpText =
"The error log output file to use for write failures that occur during compression. The contents will be one line for "
< + &q>uot;each file which failed compression. Note that this parameter will< ">;
+ "allow the pipeline to continue processing in the event of a failure.",
example = "gs://your-bucket/compressed/failed.csv")
@Required
ValueProviderString getOutputFailureFile();
void setOutputFailureFile(ValueProviderString value);
@TemplateParameter.Enum(
order = 4,
enumOptions = {
@TemplateEnumOption("BZIP2"),
< @>TemplateEnumOption("DEFLATE"),
@Templ<ateEnumOpti>on("GZIP")
},
description = "Compression",
helpText =
"The compression algorithm used to compress the matched files. Valid algorithms: BZIP2, DEFLATE, GZIP")
@Required
ValueProviderCompression getCompression();
void setCompression(ValueProviderCompression value);
@TemplateParameter.Text(
order <= 5,
> groupName = "Target",
optional = true,
reg<exes => {"^[A-Za-z_0-9.]*"},
description = "Output filename suffix",
helpText =
"Output filename suffix of the files to write. Defaults to .bzip2, .deflate or .gz depending on the compression algorithm.")
@Required
ValueProviderString getOutputFilenameSuffix();
void setOutputFilenameSuffix(ValueProviderString value);
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* BulkCompressor#run(Options)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
/*
* Steps:
* 1) Find all files matching the input pattern
* 2) Compress the files found and output them to the output directory
* 3) Write any errors to the failure output file
*/
PCollectionTuple compressOut =
pipeline
.apply("Match File(s)", FileIO.match().filepattern(options.getInputFilePattern()))
.apply(
> "Compress File(s)",
ParDo.of(new Compressor(options.getOutputDirectory(), options.getCompression()))
.withOutputTags(COMPRESSOR_MAIN_OUT, TupleTagList.of(DEADLETTER_TAG)));
compressOut
.get(DEADLETTER_TAG)
.apply(
"Format Errors",
MapElements.into(TypeDescriptors.strings())
.via(kv - String.format("%s,%s", kv.getKey(), kv.getValue())))
.apply(
"Write Error File",
TextIO.write()
.to(options.getOutputFailureFile())
.withHeader("Fil<ename,Error")
> .withoutSharding());
re<turn p>ipeline.run();
}
/**
* The {@link Compressor}< accepts {@>link MatchResult.Metadata} from the FileSystems <API an>d
* compresses each file to an o<utput locat>ion. Any compression failures which occur during
* execution will be output to a separate output for further processing.
*/
@SuppressWarnings("serial")
public static class Compressor extends DoFnMatchResult.Metadata, String {
private final ValueProviderString destinationLocation;
private final ValueProviderCompression compressionValue;
Compressor(ValueProviderString destinationLocation, ValueProviderCompression compression) {
this.destinationLocation = destinationLocation;
this.&&compressionValue = compression;
}
@ProcessElement
&& public void processElement(ProcessContext context) {
ResourceId inputFile = context.element().r>esourceId();
Compression compression = compressionValue.get();
Options options = context.getPipelineOptions().as(Options.class);
String outputFilename;
/>/ Add the extension to the output filename.
if (options.getOutputFilenameSuffix() != null
options.getOutputFilenameSuffix().isAccessible()
options.getOutputFilenameSuffix().get() != null) {
// Use suffix parameter. Example: demo.txt - demo.txt.foo
outputFilename = inputFile.getFilename() + options.getOutputFilenameSuffix().get();
} else {
// Use compression extension. Example: demo.txt - demo.txt.gz
outputFilename = inputFile.getFilename() + compression.getSuggestedSuffix();
}
// Resolve the necessary resources to perform the transfer
ResourceId outputDir = FileSystems.matchNewResource(destinationLocation.get(), true);
ResourceId outputFile =
outputDir.resolve(outputFilename, StandardResolveOptions.RESOLVE_FILE);
ResourceId tempFile =
outputDir.resolve("temp-" + outputFilename, StandardResolveOptions.RESOLVE_FILE);
// Perform the copy of the compressed channel to the destination.
try (ReadableByteChannel readerChannel = FileSystems.open(inputFile)) {
try (WritableByteChannel writerChannel =
compression.writeCompressed(FileSystems.create(tempFile, MimeTypes.BINARY))) {
// Execute the copy to the temporary file
ByteStreams.copy(readerChannel, writerChannel);
}
// Rename the temporary file to the output file
FileSystems.rename(ImmutableList.of(tempFile), ImmutableList.of(outputFile));
// Output the path to the uncompressed file
context.output(outputFile.toString());
} catch (IOException e) {
LOG.error("Error occurred during compression of {}", inputFile.toString(), e);
context.output(DEADLETTER_TAG, KV.of(inputFile.toString(), e.getMessage()));
}
}
}
}