The Bulk Compress Cloud Storage Files template is a batch pipeline that compresses files on
Cloud Storage to a specified location. This template can be useful when you need to compress
large batches of files as part of a periodic archival process. The supported compression modes are:
BZIP2, DEFLATE, GZIP.
Files output to the destination location will follow a naming schema of original filename appended
with the compression mode extension. The extensions appended will be one of:
.bzip2, .deflate, .gz.
Any errors
which occur during the compression process will be output to the failure file in CSV format of
filename, error message. If no failures occur wile running the pipeline, the error file will still be created
but will contain no error records.
Requirements for this pipeline:
The compression must be in one of the following formats: BZIP2,
DEFLATE, GZIP.
The output directory must exist prior to running the pipeline.
Template parameters
Parameter
Description
inputFilePattern
The input file pattern to read from. For example, gs://bucket-name/uncompressed/*.txt.
outputDirectory
The output location to write to. For example, gs://bucket-name/compressed/.
outputFailureFile
The error log output file to use for write failures that occur during the compression
process. For example, gs://bucket-name/compressed/failed.csv. If there are no
failures, the file is still created but will be empty. The file contents are in CSV format
(Filename, Error) and consist of one line for each file that fails compression.
compression
The compression algorithm used to compress the matched files. Must be one of:
BZIP2, DEFLATE, GZIP
Executing the Bulk Compress Cloud Storage Files template
/*
* Copyright (C) 2018 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link BulkCompressor} is a batch pipeline that compresses files on matched by an input file
* pattern and outputs them to a specified file location. This pipeline can be useful when you need
* to compress large batches of files as part of a perodic archival process. The supported
* compression modes are: <code>BZIP2</code>, <code>DEFLATE</code>, <code>GZIP</code>, <code>ZIP
* </code>. Files output to the destination location will follow a naming schema of original
* filename appended with the compression mode extension. The extensions appended will be one of:
* <code>.bzip2</code>, <code>.deflate</code>, <code>.gz</code>, <code>.zip</code> as determined by
* the compression type.
*
* <p>Any errors which occur during the compression process will be output to the failure file in
* CSV format of filename, error message. If no failures occur during execution, the error file will
* still be created but will contain no error records.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The compression must be in one of the following formats: <code>BZIP2</code>, <code>DEFLATE
* </code>, <code>GZIP</code>, <code>ZIP</code>.
* <li>The output directory must exist prior to pipeline execution.
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT_ID=PROJECT ID HERE
* PIPELINE_FOLDER=gs://${PROJECT_ID}/dataflow/pipelines/bulk-compressor
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.BulkCompressor \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template \
* --runner=${RUNNER}"
*
* # Execute the template
* JOB_NAME=bulk-compressor-$USER-`date +"%Y%m%d-%H%M%S%z"`
*
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputFilePattern=${PIPELINE_FOLDER}/test/uncompressed/*,\
* outputDirectory=${PIPELINE_FOLDER}/test/compressed,\
* outputFailureFile=${PIPELINE_FOLDER}/test/failure/failed-${JOB_NAME}.csv,\
* compression=GZIP"
* </pre>
*/
public class BulkCompressor {
/** The logger to output status messages to. */
private static final Logger LOG = LoggerFactory.getLogger(BulkCompressor.class);
/** The tag used to identify the main output of the {@link Compressor}. */
private static final TupleTag<String> COMPRESSOR_MAIN_OUT = new TupleTag<String>() {};
/** The tag used to identify the dead-letter output of the {@link Compressor}. */
private static final TupleTag<KV<String, String>> DEADLETTER_TAG =
new TupleTag<KV<String, String>>() {};
/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
*/
public interface Options extends PipelineOptions {
@Description("The input file pattern to read from (e.g. gs://bucket-name/uncompressed/*.gz)")
@Required
ValueProvider<String> getInputFilePattern();
void setInputFilePattern(ValueProvider<String> value);
@Description("The output location to write to (e.g. gs://bucket-name/compressed)")
@Required
ValueProvider<String> getOutputDirectory();
void setOutputDirectory(ValueProvider<String> value);
@Description(
"The output file to write failures during the compression process "
+ "(e.g. gs://bucket-name/compressed/failed.txt). The contents will be one line for "
+ "each file which failed compression. Note that this parameter will "
+ "allow the pipeline to continue processing in the event of a failure.")
@Required
ValueProvider<String> getOutputFailureFile();
void setOutputFailureFile(ValueProvider<String> value);
@Description("The compression algorithm to use on the matched files.")
@Required
ValueProvider<Compression> getCompression();
void setCompression(ValueProvider<Compression> value);
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* BulkCompressor#run(Options)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
/*
* Steps:
* 1) Find all files matching the input pattern
* 2) Compress the files found and output them to the output directory
* 3) Write any errors to the failure output file
*/
PCollectionTuple compressOut =
pipeline
.apply("Match File(s)", FileIO.match().filepattern(options.getInputFilePattern()))
.apply(
"Compress File(s)",
ParDo.of(new Compressor(options.getOutputDirectory(), options.getCompression()))
.withOutputTags(COMPRESSOR_MAIN_OUT, TupleTagList.of(DEADLETTER_TAG)));
compressOut
.get(DEADLETTER_TAG)
.apply(
"Format Errors",
MapElements.into(TypeDescriptors.strings())
.via(kv -> String.format("%s,%s", kv.getKey(), kv.getValue())))
.apply(
"Write Error File",
TextIO.write()
.to(options.getOutputFailureFile())
.withHeader("Filename,Error")
.withoutSharding());
return pipeline.run();
}
/**
* The {@link Compressor} accepts {@link MatchResult.Metadata} from the FileSystems API and
* compresses each file to an output location. Any compression failures which occur during
* execution will be output to a separate output for further processing.
*/
@SuppressWarnings("serial")
public static class Compressor extends DoFn<MatchResult.Metadata, String> {
private final ValueProvider<String> destinationLocation;
private final ValueProvider<Compression> compressionValue;
Compressor(ValueProvider<String> destinationLocation, ValueProvider<Compression> compression) {
this.destinationLocation = destinationLocation;
this.compressionValue = compression;
}
@ProcessElement
public void processElement(ProcessContext context) {
ResourceId inputFile = context.element().resourceId();
Compression compression = compressionValue.get();
// Add the compression extension to the output filename. Example: demo.txt -> demo.txt.gz
String outputFilename = inputFile.getFilename() + compression.getSuggestedSuffix();
// Resolve the necessary resources to perform the transfer
ResourceId outputDir = FileSystems.matchNewResource(destinationLocation.get(), true);
ResourceId outputFile =
outputDir.resolve(outputFilename, StandardResolveOptions.RESOLVE_FILE);
ResourceId tempFile =
outputDir.resolve("temp-" + outputFilename, StandardResolveOptions.RESOLVE_FILE);
// Perform the copy of the compressed channel to the destination.
try (ReadableByteChannel readerChannel = FileSystems.open(inputFile)) {
try (WritableByteChannel writerChannel =
compression.writeCompressed(FileSystems.create(tempFile, MimeTypes.BINARY))) {
// Execute the copy to the temporary file
ByteStreams.copy(readerChannel, writerChannel);
}
// Rename the temporary file to the output file
FileSystems.rename(ImmutableList.of(tempFile), ImmutableList.of(outputFile));
// Output the path to the uncompressed file
context.output(outputFile.toString());
} catch (IOException e) {
LOG.error("Error occurred during compression of {}", inputFile.toString(), e);
context.output(DEADLETTER_TAG, KV.of(inputFile.toString(), e.getMessage()));
}
}
}
}
Bulk Decompress Cloud Storage Files
The Bulk Decompress Cloud Storage Files template is a batch pipeline that decompresses files on
Cloud Storage to a specified location. This functionality is useful when you want to use
compressed data to minimize network bandwidth costs during a migration, but would like to maximize
analytical processing speed by operating on uncompressed data after migration. The pipeline
automatically handles multiple compression modes during a single run and determines the
decompression mode to use based on the file extension
(.bzip2, .deflate, .gz, .zip).
Requirements for this pipeline:
The files to decompress must be in one of the following formats: Bzip2,
Deflate, Gzip, Zip.
The output directory must exist prior to running the pipeline.
Template parameters
Parameter
Description
inputFilePattern
The input file pattern to read from. For example, gs://bucket-name/compressed/*.gz.
outputDirectory
The output location to write to. For example, gs://bucket-name/decompressed.
outputFailureFile
The error log output file to use for write failures that occur during the decompression
process. For example, gs://bucket-name/decompressed/failed.csv. If there are no
failures, the file is still created but will be empty. The file contents are in CSV format
(Filename, Error) and consist of one line for each file that fails decompression.
Executing the Bulk Decompress Cloud Storage Files template
/*
* Copyright (C) 2018 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.csv.QuoteMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This pipeline decompresses file(s) from Google Cloud Storage and re-uploads them to a destination
* location.
*
* <p><b>Parameters</b>
*
* <p>The {@code --inputFilePattern} parameter specifies a file glob to process. Files found can be
* expressed in the following formats:
*
* <pre>
* --inputFilePattern=gs://bucket-name/compressed-dir/*
* --inputFilePattern=gs://bucket-name/compressed-dir/demo*.gz
* </pre>
*
* <p>The {@code --outputDirectory} parameter can be expressed in the following formats:
*
* <pre>
* --outputDirectory=gs://bucket-name
* --outputDirectory=gs://bucket-name/decompressed-dir
* </pre>
*
* <p>The {@code --outputFailureFile} parameter indicates the file to write the names of the files
* which failed decompression and their associated error messages. This file can then be used for
* subsequent processing by another process outside of Dataflow (e.g. send an email with the
* failures, etc.). If there are no failures, the file will still be created but will be empty. The
* failure file structure contains both the file that caused the error and the error message in CSV
* format. The file will contain one header row and two columns (Filename, Error). The filename
* output to the failureFile will be the full path of the file for ease of debugging.
*
* <pre>
* --outputFailureFile=gs://bucket-name/decompressed-dir/failed.csv
* </pre>
*
* <p>Example Output File:
*
* <pre>
* Filename,Error
* gs://docs-demo/compressedFile.gz, File is malformed or not compressed in BZIP2 format.
* </pre>
*
* <p><b>Example Usage</b>
*
* <pre>
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.BulkDecompressor \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=gs://${PROJECT_ID}/dataflow/pipelines/${PIPELINE_FOLDER}/staging \
* --tempLocation=gs://${PROJECT_ID}/dataflow/pipelines/${PIPELINE_FOLDER}/temp \
* --runner=DataflowRunner \
* --inputFilePattern=gs://${PROJECT_ID}/compressed-dir/*.gz \
* --outputDirectory=gs://${PROJECT_ID}/decompressed-dir \
* --outputFailureFile=gs://${PROJECT_ID}/decompressed-dir/failed.csv"
* </pre>
*/
public class BulkDecompressor {
/** The logger to output status messages to. */
private static final Logger LOG = LoggerFactory.getLogger(BulkDecompressor.class);
/**
* A list of the {@link Compression} values excluding {@link Compression#AUTO} and {@link
* Compression#UNCOMPRESSED}.
*/
@VisibleForTesting
static final Set<Compression> SUPPORTED_COMPRESSIONS =
Stream.of(Compression.values())
.filter(value -> value != Compression.AUTO && value != Compression.UNCOMPRESSED)
.collect(Collectors.toSet());
/** The error msg given when the pipeline matches a file but cannot determine the compression. */
@VisibleForTesting
static final String UNCOMPRESSED_ERROR_MSG =
"Skipping file %s because it did not match any compression mode (%s)";
@VisibleForTesting
static final String MALFORMED_ERROR_MSG =
"The file resource %s is malformed or not in %s compressed format.";
/** The tag used to identify the main output of the {@link Decompress} DoFn. */
@VisibleForTesting
static final TupleTag<String> DECOMPRESS_MAIN_OUT_TAG = new TupleTag<String>() {};
/** The tag used to identify the dead-letter sideOutput of the {@link Decompress} DoFn. */
@VisibleForTesting
static final TupleTag<KV<String, String>> DEADLETTER_TAG = new TupleTag<KV<String, String>>() {};
/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
*/
public interface Options extends PipelineOptions {
@Description("The input file pattern to read from (e.g. gs://bucket-name/compressed/*.gz)")
@Required
ValueProvider<String> getInputFilePattern();
void setInputFilePattern(ValueProvider<String> value);
@Description("The output location to write to (e.g. gs://bucket-name/decompressed)")
@Required
ValueProvider<String> getOutputDirectory();
void setOutputDirectory(ValueProvider<String> value);
@Description(
"The output file to write failures during the decompression process "
+ "(e.g. gs://bucket-name/decompressed/failed.txt). The contents will be one line for "
+ "each file which failed decompression. Note that this parameter will "
+ "allow the pipeline to continue processing in the event of a failure.")
@Required
ValueProvider<String> getOutputFailureFile();
void setOutputFailureFile(ValueProvider<String> value);
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* BulkDecompressor#run(Options)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(Options options) {
/*
* Steps:
* 1) Find all files matching the input pattern
* 2) Decompress the files found and output them to the output directory
* 3) Write any errors to the failure output file
*/
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
// Run the pipeline over the work items.
PCollectionTuple decompressOut =
pipeline
.apply("MatchFile(s)", FileIO.match().filepattern(options.getInputFilePattern()))
.apply(
"DecompressFile(s)",
ParDo.of(new Decompress(options.getOutputDirectory()))
.withOutputTags(DECOMPRESS_MAIN_OUT_TAG, TupleTagList.of(DEADLETTER_TAG)));
decompressOut
.get(DEADLETTER_TAG)
.apply(
"FormatErrors",
MapElements.into(TypeDescriptors.strings())
.via(
kv -> {
StringWriter stringWriter = new StringWriter();
try {
CSVPrinter printer =
new CSVPrinter(
stringWriter,
CSVFormat.DEFAULT
.withEscape('\\')
.withQuoteMode(QuoteMode.NONE)
.withRecordSeparator('\n')
);
printer.printRecord(kv.getKey(), kv.getValue());
} catch (IOException e) {
throw new RuntimeException(e);
}
return stringWriter.toString();
}))
// We don't expect error files to be large so we'll create a single
// file for ease of reprocessing by processes outside of Dataflow.
.apply(
"WriteErrorFile",
TextIO.write()
.to(options.getOutputFailureFile())
.withHeader("Filename,Error")
.withoutSharding());
return pipeline.run();
}
/**
* Performs the decompression of an object on Google Cloud Storage and uploads the decompressed
* object back to a specified destination location.
*/
@SuppressWarnings("serial")
public static class Decompress extends DoFn<MatchResult.Metadata, String> {
private final ValueProvider<String> destinationLocation;
Decompress(ValueProvider<String> destinationLocation) {
this.destinationLocation = destinationLocation;
}
@ProcessElement
public void processElement(ProcessContext context) {
ResourceId inputFile = context.element().resourceId();
// Output a record to the failure file if the file doesn't match a known compression.
if (!Compression.AUTO.isCompressed(inputFile.toString())) {
String errorMsg =
String.format(UNCOMPRESSED_ERROR_MSG, inputFile.toString(), SUPPORTED_COMPRESSIONS);
context.output(DEADLETTER_TAG, KV.of(inputFile.toString(), errorMsg));
} else {
try {
ResourceId outputFile = decompress(inputFile);
context.output(outputFile.toString());
} catch (IOException e) {
LOG.error(e.getMessage());
context.output(DEADLETTER_TAG, KV.of(inputFile.toString(), e.getMessage()));
}
}
}
/**
* Decompresses the inputFile using the specified compression and outputs to the main output of
* the {@link Decompress} doFn. Files output to the destination will be first written as temp
* files with a "temp-" prefix within the output directory. If a file fails decompression, the
* filename and the associated error will be output to the dead-letter.
*
* @param inputFile The inputFile to decompress.
* @return A {@link ResourceId} which points to the resulting file from the decompression.
*/
private ResourceId decompress(ResourceId inputFile) throws IOException {
// Remove the compressed extension from the file. Example: demo.txt.gz -> demo.txt
String outputFilename = Files.getNameWithoutExtension(inputFile.toString());
// Resolve the necessary resources to perform the transfer.
ResourceId outputDir = FileSystems.matchNewResource(destinationLocation.get(), true);
ResourceId outputFile =
outputDir.resolve(outputFilename, StandardResolveOptions.RESOLVE_FILE);
ResourceId tempFile =
outputDir.resolve(Files.getFileExtension(inputFile.toString())
+ "-temp-" + outputFilename, StandardResolveOptions.RESOLVE_FILE);
// Resolve the compression
Compression compression = Compression.detect(inputFile.toString());
// Perform the copy of the decompressed channel into the destination.
try (ReadableByteChannel readerChannel =
compression.readDecompressed(FileSystems.open(inputFile))) {
try (WritableByteChannel writerChannel = FileSystems.create(tempFile, MimeTypes.TEXT)) {
ByteStreams.copy(readerChannel, writerChannel);
}
// Rename the temp file to the output file.
FileSystems.rename(
ImmutableList.of(tempFile),
ImmutableList.of(outputFile),
MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
} catch (IOException e) {
String msg = e.getMessage();
LOG.error("Error occurred during decompression of {}", inputFile.toString(), e);
throw new IOException(sanitizeDecompressionErrorMsg(msg, inputFile, compression));
}
return outputFile;
}
/**
* The error messages coming from the compression library are not consistent across compression
* modes. Here we'll attempt to unify the messages to inform the user more clearly when we've
* encountered a file which is not compressed or malformed. Note that GZIP and ZIP compression
* modes will not throw an exception when a decompression is attempted on a file which is not
* compressed.
*
* @param errorMsg The error message thrown during decompression.
* @param inputFile The input file which failed decompression.
* @param compression The compression mode used during decompression.
* @return The sanitized error message. If the error was not from a malformed file, the same
* error message passed will be returned.
*/
private String sanitizeDecompressionErrorMsg(
String errorMsg, ResourceId inputFile, Compression compression) {
if (errorMsg != null
&& (errorMsg.contains("not in the BZip2 format")
|| errorMsg.contains("incorrect header check"))) {
errorMsg = String.format(MALFORMED_ERROR_MSG, inputFile.toString(), compression);
}
return errorMsg;
}
}
}
Datastore Bulk Delete
The Datastore Bulk Delete template is a pipeline which reads in Entities from Datastore with a given GQL query and then deletes all matching Entities in the selected target project. The pipeline can optionally pass the JSON encoded Datastore Entities to your Javascript UDF, which you can use to filter out Entities by returning null values.
Requirements for this pipeline:
Datastore must be set up in the project prior to running the template.
If reading and deleting from separate Datastore instances, the Dataflow
Controller Service Account
must have permission to read from one instance and delete from the other.
Template parameters
Parameter
Description
datastoreReadGqlQuery
GQL Query which specifies which entities to match for deletion. e.g: "SELECT * FROM MyKind".
datastoreReadProjectId
GCP Project Id of the Datastore instance from which you want to read entities (using your GQL Query) that are used for matching.
datastoreDeleteProjectId
GCP Project Id of the Datastore instance from which to delete matching entities. This can be the same as datastoreReadProjectId if you want to read and delete within the same Datastore instance.
datastoreReadNamespace
[Optional] Namespace of requested Entities. Set as "" for default namespace.
javascriptTextTransformGcsPath
[Optional] A Cloud Storage path which contains all your JavaScript code. e.g: "gs://mybucket/mytransforms/*.js". If you don't want to use a UDF leave this field blank.
javascriptTextTransformFunctionName
[Optional] Name of the Function to be called. If this function returns a value of undefined or null for a given Datastore Entity, then that Entity will not be deleted. If you have the javascript code of: "function myTransform(inJson) { ...dostuff...}" then your function name is "myTransform". If you don't want to use a UDF leave this field blank.
You must replace the following values in this example:
Replace JOB_NAME with a job name of your choice.
Replace GQL_QUERY with the query you'll use to match entities for deletion.
Replace DATASTORE_READ_AND_DELETE_PROJECT_ID with your Datastore instance project id. This example will both read and delete from the same Datastore instance.
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates/latest/Datastore_to_Datastore_Delete \
--parameters \
datastoreReadGqlQuery="GQL_QUERY",\
datastoreReadProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID,\
datastoreDeleteProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID
API
Execute from the REST API
When executing this template, you'll need the Cloud Storage path to the template:
To execute this template with a
REST API
request, send an HTTP POST request with your project ID. This request requires
authorization.
You must replace the following values in this example:
Replace JOB_NAME with a job name of your choice.
Replace GQL_QUERY with the query you'll use to match entities for deletion.
Replace DATASTORE_READ_AND_DELETE_PROJECT_ID with your Datastore instance project id. This example will both read and delete from the same Datastore instance.
/*
* Copyright (C) 2018 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreDeleteEntityJson;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreDeleteOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreReadOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.ReadJsonEntities;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.TransformTextViaJavascript;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
/**
* Dataflow template which deletes pulled Datastore Entities.
*/
public class DatastoreToDatastoreDelete {
/**
* Custom PipelineOptions.
*/
public interface DatastoreToDatastoreDeleteOptions extends
PipelineOptions,
DatastoreReadOptions,
JavascriptTextTransformerOptions,
DatastoreDeleteOptions {}
/**
* Runs a pipeline which reads in Entities from datastore, passes in the JSON encoded Entities
* to a Javascript UDF, and deletes all the Entities.
*
* <p>If the UDF returns value of undefined or null for a given Entity, then that Entity will not
* be deleted.
*
* @param args arguments to the pipeline
*/
public static void main(String[] args) {
DatastoreToDatastoreDeleteOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(DatastoreToDatastoreDeleteOptions.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(ReadJsonEntities.newBuilder()
.setGqlQuery(options.getDatastoreReadGqlQuery())
.setProjectId(options.getDatastoreReadProjectId())
.setNamespace(options.getDatastoreReadNamespace())
.build())
.apply(TransformTextViaJavascript.newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.build())
.apply(DatastoreDeleteEntityJson.newBuilder()
.setProjectId(options.getDatastoreDeleteProjectId())
.build());
pipeline.run();
}
}
Streaming Data Generator to Pub/Sub/BigQuery/Cloud Storage
The Streaming Data Generator template is used to generate either an unlimited or fixed number of synthetic records or messages based on user provided schema at
the specified rate. Compatible destinations include Pub/Sub topics, BigQuery tables, and Cloud Storage buckets.
Following are a set of few possible use cases:
Simulate large-scale real-time event publishing to a Pub/Sub topic to measure and determine the number and size of consumers required to process published events.
Generate synthetic data to a BigQuery table or a Cloud Storage bucket to evaluate performance benchmarks or serve as a proof of concept.
Supported sinks and encoding formats
The following table describes which sinks and encoding formats are supported by this template:
JSON
Avro
Parquet
Pub/Sub
Yes
Yes
No
BigQuery
Yes
No
No
Cloud Storage
Yes
Yes
Yes
The JSON Data Generator library used by the pipeline allows various faker functions
to be used for each schema field. For more information on the faker functions and schema format, see the
json-data-generator documentation.
Requirements for this pipeline:
Create a message schema file and store this file in a Cloud Storage location.
The output target must exist prior to execution. The target must be a Pub/Sub topic,
a BigQuery table, or a Cloud Storage bucket depending on sink type.
If the output encoding is Avro or Parquet, then create an Avro schema file and store it in a Cloud Storage location.
Template parameters
Parameter
Description
schemaLocation
Location of the schema file. For example: gs://mybucket/filename.json.
qps
Number of messages to be published per second. For example: 100.
sinkType
[Optional] Output sink Type. Possible values are PUBSUB, BIGQUERY, GCS. Default is PUBSUB.
outputType
[Optional] Output encoding Type. Possible values are JSON, AVRO, PARQUET. Default is JSON.
avroSchemaLocation
[Optional] Location of AVRO Schema file. Mandatory when outputType is AVRO or PARQUET. For example: gs://mybucket/filename.avsc.
topic
[Optional] Name of the Pub/Sub topic to which the pipeline should publish data.Mandatory when sinkType is Pub/Sub. For example: projects/<project-id>/topics/<topic-name>.
outputTableSpec
[Optional] Name of the output BigQuery table.Mandatory when sinkType is BigQuery. For example: your-project:your-dataset.your-table-name.
writeDisposition
[Optional] BigQuery Write Disposition. Possible values are WRITE_APPEND, WRITE_EMPTY or WRITE_TRUNCATE. Default is WRITE_APPEND.
outputDeadletterTable
[Optional] Name of the output BigQuery table to hold failed records. If not provided, pipeline creates table during execution with name {output_table_name}_error_records. For example: your-project:your-dataset.your-table-name.
outputDirectory
[Optional] Path of the output Cloud Storage location. Mandatory when sinkType is Cloud Storage. For example: gs://mybucket/pathprefix/.
outputFilenamePrefix
[Optional] The filename prefix of the output files written to Cloud Storage. Default is output-.
windowDuration
[Optional] Window interval at which output is written to Cloud Storage. Default is 1m (in other words, 1 minute).
numShards
[Optional] Maximum number of output shards. Mandatory when sinkType is Cloud Storage and should be set to 1 or higher number.
messagesLimit
[Optional] Maximum number of output messages. Default is 0 indicating unlimited.
autoscalingAlgorithm
[Optional] Algorithm used for autoscaling the workers. Possible values are
THROUGHPUT_BASED to enable autoscaling or NONE to disable.
maxNumWorkers
[Optional] Maximum number of worker machines. For example: 10.
/*
* Copyright (C) 2020 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import com.github.vincentrussell.json.datagenerator.JsonDataGenerator;
import com.github.vincentrussell.json.datagenerator.JsonDataGeneratorException;
import com.github.vincentrussell.json.datagenerator.impl.JsonDataGeneratorImpl;
import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToBigQuery;
import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToGcs;
import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToPubSub;
import com.google.cloud.teleport.v2.utils.DurationUtils;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import javax.annotation.Nonnull;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link StreamingDataGenerator} is a streaming pipeline which generates messages at a
* specified rate to either Pub/Sub topic or BigQuery/GCS. The messages are generated according to a
* schema template which instructs the pipeline how to populate the messages with fake data
* compliant to constraints.
*
* <p>The number of workers executing the pipeline must be large enough to support the supplied QPS.
* Use a general rule of 2,500 QPS per core in the worker pool.
*
* <p>See <a href="https://github.com/vincentrussell/json-data-generator">json-data-generator</a>
* for instructions on how to construct the schema file.
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT=my-project
* BUCKET_NAME=my-bucket
* SCHEMA_LOCATION=gs://<bucket>/<path>/<to>/game-event-schema.json
* PUBSUB_TOPIC=projects/<project-id>/topics/<topic-id>
* QPS=2500
*
* # Set containerization vars
* IMAGE_NAME=my-image-name
* TARGET_GCR_IMAGE=gcr.io/${PROJECT}/${IMAGE_NAME}
* BASE_CONTAINER_IMAGE=my-base-container-image
* BASE_CONTAINER_IMAGE_VERSION=my-base-container-image-version
* APP_ROOT=/path/to/app-root
* COMMAND_SPEC=/path/to/command-spec
*
* # Build and upload image
* mvn clean package \
* -Dimage=${TARGET_GCR_IMAGE} \
* -Dbase-container-image=${BASE_CONTAINER_IMAGE} \
* -Dbase-container-image.version=${BASE_CONTAINER_IMAGE_VERSION} \
* -Dapp-root=${APP_ROOT} \
* -Dcommand-spec=${COMMAND_SPEC}
*
* # Create a template spec containing the details of image location and metadata in GCS
* as specified in README.md file
*
* # Execute template:
* JOB_NAME=<job-name>
* PROJECT=<project-id>
* TEMPLATE_SPEC_GCSPATH=gs://path/to/template-spec
* SCHEMA_LOCATION=gs://path/to/schema.json
* PUBSUB_TOPIC=projects/$PROJECT/topics/<topic-name>
* QPS=1
*
* gcloud beta dataflow flex-template run $JOB_NAME \
* --project=$PROJECT --region=us-central1 --flex-template \
* --template-file-gcs-location=$TEMPLATE_SPEC_GCSPATH \
* --parameters autoscalingAlgorithm="THROUGHPUT_BASED",schemaLocation=$SCHEMA_LOCATION,topic=$PUBSUB_TOPIC,qps=$QPS,maxNumWorkers=3
*
* </pre>
*/
public class StreamingDataGenerator {
private static final Logger logger = LoggerFactory.getLogger(StreamingDataGenerator.class);
/**
* The {@link StreamingDataGeneratorOptions} class provides the custom execution options passed by
* the executor at the command-line.
*/
public interface StreamingDataGeneratorOptions extends PipelineOptions {
@Description("Indicates rate of messages per second to be published to Pub/Sub.")
@Required
Long getQps();
void setQps(Long value);
@Description("The path to the schema to generate.")
@Required
String getSchemaLocation();
void setSchemaLocation(String value);
@Description("The Pub/Sub topic to write to.")
String getTopic();
void setTopic(String value);
@Description(
"Indicates maximum number of messages to be generated. Default is 0 indicating unlimited.")
@Default.Long(0L)
Long getMessagesLimit();
void setMessagesLimit(Long value);
@Description("The message Output type. --outputType must be one of:[JSON,AVRO,PARQUET]")
@Default.Enum("JSON")
OutputType getOutputType();
void setOutputType(OutputType value);
@Description("The path to Avro schema for encoding message into AVRO output type.")
String getAvroSchemaLocation();
void setAvroSchemaLocation(String value);
@Description("The message sink type. Must be one of:[PUBSUB,BIGQUERY,GCS]")
@Default.Enum("PUBSUB")
SinkType getSinkType();
void setSinkType(SinkType value);
@Description(
"Output BigQuery table spec. "
+ "The name should be in the format: "
+ "<project>:<dataset>.<table_name>.")
String getOutputTableSpec();
void setOutputTableSpec(String value);
@Description("Write disposition to use for BigQuery. Default: WRITE_APPEND")
@Default.String("WRITE_APPEND")
String getWriteDisposition();
void setWriteDisposition(String writeDisposition);
@Description(
"The dead-letter table to output to within BigQuery in <project-id>:<dataset>.<table> "
+ "format. If it doesn't exist, it will be created during pipeline execution.")
String getOutputDeadletterTable();
void setOutputDeadletterTable(String outputDeadletterTable);
@Description(
"The window duration in which data will be written. Defaults to 5m."
+ "Allowed formats are: "
+ "Ns (for seconds, example: 5s), "
+ "Nm (for minutes, example: 12m), "
+ "Nh (for hours, example: 2h).")
@Default.String("1m")
String getWindowDuration();
void setWindowDuration(String windowDuration);
@Description("The directory to write output files. Must end with a slash. ")
String getOutputDirectory();
void setOutputDirectory(String outputDirectory);
@Description(
"The filename prefix of the files to write to. Default file prefix is set to \"output-\". ")
@Default.String("output-")
String getOutputFilenamePrefix();
void setOutputFilenamePrefix(String outputFilenamePrefix);
@Description(
"The maximum number of output shards produced while writing to FileSystem. Default number"
+ " is runner defined.")
@Default.Integer(0)
Integer getNumShards();
void setNumShards(Integer numShards);
}
/** Allowed list of message encoding types. */
public enum OutputType {
JSON(".json"),
AVRO(".avro"),
PARQUET(".parquet");
private final String fileExtension;
/** Sets file extension associated with output type. */
OutputType(String fileExtension) {
this.fileExtension = fileExtension;
}
/** Returns file extension associated with output type. */
public String getFileExtension() {
return fileExtension;
}
}
/** Allowed list of sink types. */
public enum SinkType {
PUBSUB,
BIGQUERY,
GCS
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* StreamingDataGenerator#run(StreamingDataGeneratorOptions)} method to start the pipeline and
* invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args command-line args passed by the executor.
*/
public static void main(String[] args) {
StreamingDataGeneratorOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(StreamingDataGeneratorOptions.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options the execution options.
* @return the pipeline result.
*/
public static PipelineResult run(@Nonnull StreamingDataGeneratorOptions options) {
checkNotNull(options, "options argument to run method cannot be null.");
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
/*
* Steps:
* 1) Trigger at the supplied QPS
* 2) Generate messages containing fake data
* 3) Write messages to appropriate Sink
*/
PCollection<byte[]> fakeMessages =
pipeline
.apply("Trigger", createTrigger(options))
.apply(
"Generate Fake Messages",
ParDo.of(new MessageGeneratorFn(options.getSchemaLocation())));
if (options.getSinkType().equals(SinkType.GCS)) {
fakeMessages =
fakeMessages.apply(
options.getWindowDuration() + " Window",
Window.into(
FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))));
}
fakeMessages.apply("Write To " + options.getSinkType().name(), createSink(options));
return pipeline.run();
}
/**
* Creates either Bounded or UnBounded Source based on messageLimit pipeline option.
*
* @param options the pipeline options.
*/
private static GenerateSequence createTrigger(@Nonnull StreamingDataGeneratorOptions options) {
checkNotNull(options, "options argument to createTrigger method cannot be null.");
GenerateSequence generateSequence =
GenerateSequence.from(0L)
.withRate(options.getQps(), /* periodLength = */ Duration.standardSeconds(1L));
return options.getMessagesLimit() > 0
? generateSequence.to(options.getMessagesLimit())
: generateSequence;
}
/**
* The {@link MessageGeneratorFn} class generates fake messages based on supplied schema
*
* <p>See <a href="https://github.com/vincentrussell/json-data-generator">json-data-generator</a>
* for instructions on how to construct the schema file.
*/
@VisibleForTesting
static class MessageGeneratorFn extends DoFn<Long, byte[]> {
// Not initialized inline or constructor because {@link JsonDataGenerator} is not serializable.
private transient JsonDataGenerator dataGenerator;
private final String schemaLocation;
private String schema;
MessageGeneratorFn(@Nonnull String schemaLocation) {
checkNotNull(schemaLocation,
"schemaLocation argument of MessageGeneratorFn class cannot be null.");
this.schemaLocation = schemaLocation;
}
@Setup
public void setup() throws IOException {
dataGenerator = new JsonDataGeneratorImpl();
schema = SchemaUtils.getGcsFileAsString(schemaLocation);
}
@ProcessElement
public void processElement(
@Element Long element,
@Timestamp Instant timestamp,
OutputReceiver<byte[]> receiver,
ProcessContext context)
throws IOException, JsonDataGeneratorException {
byte[] payload;
// Generate the fake JSON according to the schema.
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
dataGenerator.generateTestDataJson(schema, byteArrayOutputStream);
payload = byteArrayOutputStream.toByteArray();
}
receiver.output(payload);
}
}
/**
* Creates appropriate sink based on sinkType pipeline option.
*
* @param options the pipeline options.
*/
@VisibleForTesting
static PTransform<PCollection<byte[]>, PDone> createSink(
@Nonnull StreamingDataGeneratorOptions options) {
checkNotNull(options, "options argument to createSink method cannot be null.");
switch (options.getSinkType()) {
case PUBSUB:
checkArgument(
options.getTopic() != null,
String.format(
"Missing required value --topic for %s sink type", options.getSinkType().name()));
return StreamingDataGeneratorWriteToPubSub.Writer.builder(options).build();
case BIGQUERY:
checkArgument(
options.getOutputTableSpec() != null,
String.format(
"Missing required value --outputTableSpec in format"
+ " <project>:<dataset>.<table_name> for %s sink type",
options.getSinkType().name()));
return StreamingDataGeneratorWriteToBigQuery.builder(options).build();
case GCS:
checkArgument(
options.getOutputDirectory() != null,
String.format(
"Missing required value --outputDirectory in format gs:// for %s sink type",
options.getSinkType().name()));
return StreamingDataGeneratorWriteToGcs.builder(options).build();
default:
throw new IllegalArgumentException("Unsupported Sink.");
}
}
}
[{
"type": "thumb-down",
"id": "hardToUnderstand",
"label":"Hard to understand"
},{
"type": "thumb-down",
"id": "incorrectInformationOrSampleCode",
"label":"Incorrect information or sample code"
},{
"type": "thumb-down",
"id": "missingTheInformationSamplesINeed",
"label":"Missing the information/samples I need"
},{
"type": "thumb-down",
"id": "otherDown",
"label":"Other"
}]
[{
"type": "thumb-up",
"id": "easyToUnderstand",
"label":"Easy to understand"
},{
"type": "thumb-up",
"id": "solvedMyProblem",
"label":"Solved my problem"
},{
"type": "thumb-up",
"id": "otherUp",
"label":"Other"
}]