Le modèle de compression groupée des fichiers Cloud Storage est un pipeline par lots qui compresse des fichiers de Cloud Storage vers un emplacement spécifié. Ce modèle peut être utile lorsque vous devez compresser de grands lots de fichiers dans le cadre d'un processus d'archivage périodique. Les modes de compression compatibles sont les suivants : BZIP2, DEFLATE et GZIP.
Les fichiers produits à l'emplacement de destination suivent un schéma de dénomination associant le nom de fichier d'origine à l'extension du mode de compression. Les extensions ajoutées seront les suivantes : .bzip2, .deflate et .gz.
Toute erreur survenant pendant le processus de compression sera inscrite dans le fichier d'échec au format CSV : nom de fichier, message d'erreur. Si aucune erreur ne survient pendant l'exécution du pipeline, le fichier d'erreur sera créé, mais il ne contiendra aucun enregistrement.
Conditions requises pour ce pipeline :
La compression doit être dans l'un des formats suivants : BZIP2, DEFLATE et GZIP.
Le répertoire de sortie doit exister avant l'exécution du pipeline.
Paramètres de modèle
Paramètres
Description
inputFilePattern
Modèle de fichier d'entrée à lire. Par exemple, gs://bucket-name/uncompressed/*.txt.
outputDirectory
Emplacement de sortie où écrire. Par exemple, gs://bucket-name/compressed/.
outputFailureFile
Fichier de sortie du journal des erreurs à utiliser pour les échecs d'écriture lors du processus de compression. Par exemple, gs://bucket-name/compressed/failed.csv. S'il n'y a pas d'échec, le fichier est quand même créé mais sera vide. Le contenu du fichier est au format CSV (nom de fichier, erreur) et contient une ligne pour chaque fichier dont la compression échoue.
compression
Algorithme de compression utilisé pour compresser les fichiers correspondants. Doit être l'un des suivants : BZIP2, DEFLATE et GZIP.
Exécuter le modèle de fichiers Cloud Storage de compression par lots
Cliquez sur Create job from template (Créer une tâche à partir d'un modèle).
Sélectionnez the Bulk Compress Cloud Storage Files template dans le menu déroulant Modèle Dataflow.
Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).Le nom de la tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
Saisissez vos valeurs de paramètres dans les champs fournis.
Cliquez sur Run Job (Exécuter la tâche).
GCLOUD
Exécuter à partir de l'outil de ligne de commande gcloud
Remarque : Pour utiliser l'outil de ligne de commande gcloud afin d'exécuter des modèles, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.
Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :
Remplacez YOUR_PROJECT_ID par l'ID du projet.
Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
Remplacez YOUR_BUCKET_NAME par le nom de votre bucket Cloud Storage.
Remplacez COMPRESSION par l'algorithme de compression de votre choix.
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates/latest/Bulk_Compress_GCS_Files \
--parameters \
inputFilePattern=gs://YOUR_BUCKET_NAME/uncompressed/*.txt,\
outputDirectory=gs://YOUR_BUCKET_NAME/compressed,\
outputFailureFile=gs://YOUR_BUCKET_NAME/failed/failure.csv,\
compression=COMPRESSION
API
Exécuter depuis l'API REST
Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :
Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec votre ID de projet. Cette requête nécessite une autorisation.
Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :
Remplacez YOUR_PROJECT_ID par l'ID du projet.
Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
Remplacez YOUR_BUCKET_NAME par le nom de votre bucket Cloud Storage.
Remplacez COMPRESSION par l'algorithme de compression de votre choix.
/*
* Copyright (C) 2018 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link BulkCompressor} is a batch pipeline that compresses files on matched by an input file
* pattern and outputs them to a specified file location. This pipeline can be useful when you need
* to compress large batches of files as part of a perodic archival process. The supported
* compression modes are: <code>BZIP2</code>, <code>DEFLATE</code>, <code>GZIP</code>, <code>ZIP
* </code>. Files output to the destination location will follow a naming schema of original
* filename appended with the compression mode extension. The extensions appended will be one of:
* <code>.bzip2</code>, <code>.deflate</code>, <code>.gz</code>, <code>.zip</code> as determined by
* the compression type.
*
* <p>Any errors which occur during the compression process will be output to the failure file in
* CSV format of filename, error message. If no failures occur during execution, the error file will
* still be created but will contain no error records.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The compression must be in one of the following formats: <code>BZIP2</code>, <code>DEFLATE
* </code>, <code>GZIP</code>, <code>ZIP</code>.
* <li>The output directory must exist prior to pipeline execution.
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT_ID=PROJECT ID HERE
* PIPELINE_FOLDER=gs://${PROJECT_ID}/dataflow/pipelines/bulk-compressor
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.BulkCompressor \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template \
* --runner=${RUNNER}"
*
* # Execute the template
* JOB_NAME=bulk-compressor-$USER-`date +"%Y%m%d-%H%M%S%z"`
*
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputFilePattern=${PIPELINE_FOLDER}/test/uncompressed/*,\
* outputDirectory=${PIPELINE_FOLDER}/test/compressed,\
* outputFailureFile=${PIPELINE_FOLDER}/test/failure/failed-${JOB_NAME}.csv,\
* compression=GZIP"
* </pre>
*/
public class BulkCompressor {
/** The logger to output status messages to. */
private static final Logger LOG = LoggerFactory.getLogger(BulkCompressor.class);
/** The tag used to identify the main output of the {@link Compressor}. */
private static final TupleTag<String> COMPRESSOR_MAIN_OUT = new TupleTag<String>() {};
/** The tag used to identify the dead-letter output of the {@link Compressor}. */
private static final TupleTag<KV<String, String>> DEADLETTER_TAG =
new TupleTag<KV<String, String>>() {};
/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
*/
public interface Options extends PipelineOptions {
@Description("The input file pattern to read from (e.g. gs://bucket-name/uncompressed/*.gz)")
@Required
ValueProvider<String> getInputFilePattern();
void setInputFilePattern(ValueProvider<String> value);
@Description("The output location to write to (e.g. gs://bucket-name/compressed)")
@Required
ValueProvider<String> getOutputDirectory();
void setOutputDirectory(ValueProvider<String> value);
@Description(
"The output file to write failures during the compression process "
+ "(e.g. gs://bucket-name/compressed/failed.txt). The contents will be one line for "
+ "each file which failed compression. Note that this parameter will "
+ "allow the pipeline to continue processing in the event of a failure.")
@Required
ValueProvider<String> getOutputFailureFile();
void setOutputFailureFile(ValueProvider<String> value);
@Description("The compression algorithm to use on the matched files.")
@Required
ValueProvider<Compression> getCompression();
void setCompression(ValueProvider<Compression> value);
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* BulkCompressor#run(Options)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
/*
* Steps:
* 1) Find all files matching the input pattern
* 2) Compress the files found and output them to the output directory
* 3) Write any errors to the failure output file
*/
PCollectionTuple compressOut =
pipeline
.apply("Match File(s)", FileIO.match().filepattern(options.getInputFilePattern()))
.apply(
"Compress File(s)",
ParDo.of(new Compressor(options.getOutputDirectory(), options.getCompression()))
.withOutputTags(COMPRESSOR_MAIN_OUT, TupleTagList.of(DEADLETTER_TAG)));
compressOut
.get(DEADLETTER_TAG)
.apply(
"Format Errors",
MapElements.into(TypeDescriptors.strings())
.via(kv -> String.format("%s,%s", kv.getKey(), kv.getValue())))
.apply(
"Write Error File",
TextIO.write()
.to(options.getOutputFailureFile())
.withHeader("Filename,Error")
.withoutSharding());
return pipeline.run();
}
/**
* The {@link Compressor} accepts {@link MatchResult.Metadata} from the FileSystems API and
* compresses each file to an output location. Any compression failures which occur during
* execution will be output to a separate output for further processing.
*/
@SuppressWarnings("serial")
public static class Compressor extends DoFn<MatchResult.Metadata, String> {
private final ValueProvider<String> destinationLocation;
private final ValueProvider<Compression> compressionValue;
Compressor(ValueProvider<String> destinationLocation, ValueProvider<Compression> compression) {
this.destinationLocation = destinationLocation;
this.compressionValue = compression;
}
@ProcessElement
public void processElement(ProcessContext context) {
ResourceId inputFile = context.element().resourceId();
Compression compression = compressionValue.get();
// Add the compression extension to the output filename. Example: demo.txt -> demo.txt.gz
String outputFilename = inputFile.getFilename() + compression.getSuggestedSuffix();
// Resolve the necessary resources to perform the transfer
ResourceId outputDir = FileSystems.matchNewResource(destinationLocation.get(), true);
ResourceId outputFile =
outputDir.resolve(outputFilename, StandardResolveOptions.RESOLVE_FILE);
ResourceId tempFile =
outputDir.resolve("temp-" + outputFilename, StandardResolveOptions.RESOLVE_FILE);
// Perform the copy of the compressed channel to the destination.
try (ReadableByteChannel readerChannel = FileSystems.open(inputFile)) {
try (WritableByteChannel writerChannel =
compression.writeCompressed(FileSystems.create(tempFile, MimeTypes.BINARY))) {
// Execute the copy to the temporary file
ByteStreams.copy(readerChannel, writerChannel);
}
// Rename the temporary file to the output file
FileSystems.rename(ImmutableList.of(tempFile), ImmutableList.of(outputFile));
// Output the path to the uncompressed file
context.output(outputFile.toString());
} catch (IOException e) {
LOG.error("Error occurred during compression of {}", inputFile.toString(), e);
context.output(DEADLETTER_TAG, KV.of(inputFile.toString(), e.getMessage()));
}
}
}
}
Décompression groupée de fichiers Cloud Storage
Le modèle de décompression groupée de fichiers Cloud Storage est un pipeline par lots qui décompresse les fichiers de Cloud Storage vers un emplacement spécifié. Cette fonctionnalité est utile lorsque vous souhaitez utiliser des données compressées pour réduire les coûts de bande passante réseau lors d'une migration, tout en optimisant la vitesse de traitement analytique en opérant sur des données non compressées après la migration. Le pipeline gère automatiquement plusieurs modes de compression lors d'une seule exécution et détermine le mode de décompression à utiliser en fonction de l'extension de fichier (.bzip2, .deflate, .gz ou .zip).
Conditions requises pour ce pipeline :
Les fichiers à décompresser doivent être dans l'un des formats suivants : Bzip2, Deflate, Gzip ou Zip.
Le répertoire de sortie doit exister avant l'exécution du pipeline.
Paramètres de modèle
Paramètres
Description
inputFilePattern
Modèle de fichier d'entrée à lire. Par exemple, gs://bucket-name/compressed/*.gz.
outputDirectory
Emplacement de sortie où écrire. Par exemple, gs://bucket-name/decompressed.
outputFailureFile
Le fichier de sortie du journal des erreurs à utiliser pour les échecs d'écriture lors du processus de décompression. Par exemple, gs://bucket-name/decompressed/failed.csv. S'il n'y a pas d'échec, le fichier est quand même créé mais sera vide. Le contenu du fichier est au format CSV (nom de fichier, erreur) et consiste en une ligne pour chaque fichier qui échoue à la décompression.
Exécuter le modèle de décompression groupée de fichiers Cloud Storage
Cliquez sur Create job from template (Créer une tâche à partir d'un modèle).
Sélectionnez the Bulk Decompress Cloud Storage Files template dans le menu déroulant Modèle Dataflow.
Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).Le nom de la tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
Saisissez vos valeurs de paramètres dans les champs fournis.
Cliquez sur Run Job (Exécuter la tâche).
GCLOUD
Exécuter à partir de l'outil de ligne de commande gcloud
Remarque : Pour utiliser l'outil de ligne de commande gcloud afin d'exécuter des modèles, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.
Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :
Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :
Remplacez YOUR_PROJECT_ID par l'ID du projet.
Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
Remplacez YOUR_BUCKET_NAME par le nom de votre bucket Cloud Storage.
Remplacez OUTPUT_FAILURE_FILE_PATH par le chemin d'accès au fichier contenant les informations sur l'échec de votre choix.
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates/latest/Bulk_Decompress_GCS_Files \
--parameters \
inputFilePattern=gs://YOUR_BUCKET_NAME/compressed/*.gz,\
outputDirectory=gs://YOUR_BUCKET_NAME/decompressed,\
outputFailureFile=OUTPUT_FAILURE_FILE_PATH
API
Exécuter depuis l'API REST
Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :
Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec votre ID de projet. Cette requête nécessite une autorisation.
Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :
Remplacez YOUR_PROJECT_ID par l'ID du projet.
Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
Remplacez YOUR_BUCKET_NAME par le nom de votre bucket Cloud Storage.
Remplacez OUTPUT_FAILURE_FILE_PATH par le chemin d'accès au fichier contenant les informations sur l'échec de votre choix.
/*
* Copyright (C) 2018 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.csv.QuoteMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This pipeline decompresses file(s) from Google Cloud Storage and re-uploads them to a destination
* location.
*
* <p><b>Parameters</b>
*
* <p>The {@code --inputFilePattern} parameter specifies a file glob to process. Files found can be
* expressed in the following formats:
*
* <pre>
* --inputFilePattern=gs://bucket-name/compressed-dir/*
* --inputFilePattern=gs://bucket-name/compressed-dir/demo*.gz
* </pre>
*
* <p>The {@code --outputDirectory} parameter can be expressed in the following formats:
*
* <pre>
* --outputDirectory=gs://bucket-name
* --outputDirectory=gs://bucket-name/decompressed-dir
* </pre>
*
* <p>The {@code --outputFailureFile} parameter indicates the file to write the names of the files
* which failed decompression and their associated error messages. This file can then be used for
* subsequent processing by another process outside of Dataflow (e.g. send an email with the
* failures, etc.). If there are no failures, the file will still be created but will be empty. The
* failure file structure contains both the file that caused the error and the error message in CSV
* format. The file will contain one header row and two columns (Filename, Error). The filename
* output to the failureFile will be the full path of the file for ease of debugging.
*
* <pre>
* --outputFailureFile=gs://bucket-name/decompressed-dir/failed.csv
* </pre>
*
* <p>Example Output File:
*
* <pre>
* Filename,Error
* gs://docs-demo/compressedFile.gz, File is malformed or not compressed in BZIP2 format.
* </pre>
*
* <p><b>Example Usage</b>
*
* <pre>
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.BulkDecompressor \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=gs://${PROJECT_ID}/dataflow/pipelines/${PIPELINE_FOLDER}/staging \
* --tempLocation=gs://${PROJECT_ID}/dataflow/pipelines/${PIPELINE_FOLDER}/temp \
* --runner=DataflowRunner \
* --inputFilePattern=gs://${PROJECT_ID}/compressed-dir/*.gz \
* --outputDirectory=gs://${PROJECT_ID}/decompressed-dir \
* --outputFailureFile=gs://${PROJECT_ID}/decompressed-dir/failed.csv"
* </pre>
*/
public class BulkDecompressor {
/** The logger to output status messages to. */
private static final Logger LOG = LoggerFactory.getLogger(BulkDecompressor.class);
/**
* A list of the {@link Compression} values excluding {@link Compression#AUTO} and {@link
* Compression#UNCOMPRESSED}.
*/
@VisibleForTesting
static final Set<Compression> SUPPORTED_COMPRESSIONS =
Stream.of(Compression.values())
.filter(value -> value != Compression.AUTO && value != Compression.UNCOMPRESSED)
.collect(Collectors.toSet());
/** The error msg given when the pipeline matches a file but cannot determine the compression. */
@VisibleForTesting
static final String UNCOMPRESSED_ERROR_MSG =
"Skipping file %s because it did not match any compression mode (%s)";
@VisibleForTesting
static final String MALFORMED_ERROR_MSG =
"The file resource %s is malformed or not in %s compressed format.";
/** The tag used to identify the main output of the {@link Decompress} DoFn. */
@VisibleForTesting
static final TupleTag<String> DECOMPRESS_MAIN_OUT_TAG = new TupleTag<String>() {};
/** The tag used to identify the dead-letter sideOutput of the {@link Decompress} DoFn. */
@VisibleForTesting
static final TupleTag<KV<String, String>> DEADLETTER_TAG = new TupleTag<KV<String, String>>() {};
/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
*/
public interface Options extends PipelineOptions {
@Description("The input file pattern to read from (e.g. gs://bucket-name/compressed/*.gz)")
@Required
ValueProvider<String> getInputFilePattern();
void setInputFilePattern(ValueProvider<String> value);
@Description("The output location to write to (e.g. gs://bucket-name/decompressed)")
@Required
ValueProvider<String> getOutputDirectory();
void setOutputDirectory(ValueProvider<String> value);
@Description(
"The output file to write failures during the decompression process "
+ "(e.g. gs://bucket-name/decompressed/failed.txt). The contents will be one line for "
+ "each file which failed decompression. Note that this parameter will "
+ "allow the pipeline to continue processing in the event of a failure.")
@Required
ValueProvider<String> getOutputFailureFile();
void setOutputFailureFile(ValueProvider<String> value);
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* BulkDecompressor#run(Options)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(Options options) {
/*
* Steps:
* 1) Find all files matching the input pattern
* 2) Decompress the files found and output them to the output directory
* 3) Write any errors to the failure output file
*/
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
// Run the pipeline over the work items.
PCollectionTuple decompressOut =
pipeline
.apply("MatchFile(s)", FileIO.match().filepattern(options.getInputFilePattern()))
.apply(
"DecompressFile(s)",
ParDo.of(new Decompress(options.getOutputDirectory()))
.withOutputTags(DECOMPRESS_MAIN_OUT_TAG, TupleTagList.of(DEADLETTER_TAG)));
decompressOut
.get(DEADLETTER_TAG)
.apply(
"FormatErrors",
MapElements.into(TypeDescriptors.strings())
.via(
kv -> {
StringWriter stringWriter = new StringWriter();
try {
CSVPrinter printer =
new CSVPrinter(
stringWriter,
CSVFormat.DEFAULT
.withEscape('\\')
.withQuoteMode(QuoteMode.NONE)
.withRecordSeparator('\n')
);
printer.printRecord(kv.getKey(), kv.getValue());
} catch (IOException e) {
throw new RuntimeException(e);
}
return stringWriter.toString();
}))
// We don't expect error files to be large so we'll create a single
// file for ease of reprocessing by processes outside of Dataflow.
.apply(
"WriteErrorFile",
TextIO.write()
.to(options.getOutputFailureFile())
.withHeader("Filename,Error")
.withoutSharding());
return pipeline.run();
}
/**
* Performs the decompression of an object on Google Cloud Storage and uploads the decompressed
* object back to a specified destination location.
*/
@SuppressWarnings("serial")
public static class Decompress extends DoFn<MatchResult.Metadata, String> {
private final ValueProvider<String> destinationLocation;
Decompress(ValueProvider<String> destinationLocation) {
this.destinationLocation = destinationLocation;
}
@ProcessElement
public void processElement(ProcessContext context) {
ResourceId inputFile = context.element().resourceId();
// Output a record to the failure file if the file doesn't match a known compression.
if (!Compression.AUTO.isCompressed(inputFile.toString())) {
String errorMsg =
String.format(UNCOMPRESSED_ERROR_MSG, inputFile.toString(), SUPPORTED_COMPRESSIONS);
context.output(DEADLETTER_TAG, KV.of(inputFile.toString(), errorMsg));
} else {
try {
ResourceId outputFile = decompress(inputFile);
context.output(outputFile.toString());
} catch (IOException e) {
LOG.error(e.getMessage());
context.output(DEADLETTER_TAG, KV.of(inputFile.toString(), e.getMessage()));
}
}
}
/**
* Decompresses the inputFile using the specified compression and outputs to the main output of
* the {@link Decompress} doFn. Files output to the destination will be first written as temp
* files with a "temp-" prefix within the output directory. If a file fails decompression, the
* filename and the associated error will be output to the dead-letter.
*
* @param inputFile The inputFile to decompress.
* @return A {@link ResourceId} which points to the resulting file from the decompression.
*/
private ResourceId decompress(ResourceId inputFile) throws IOException {
// Remove the compressed extension from the file. Example: demo.txt.gz -> demo.txt
String outputFilename = Files.getNameWithoutExtension(inputFile.toString());
// Resolve the necessary resources to perform the transfer.
ResourceId outputDir = FileSystems.matchNewResource(destinationLocation.get(), true);
ResourceId outputFile =
outputDir.resolve(outputFilename, StandardResolveOptions.RESOLVE_FILE);
ResourceId tempFile =
outputDir.resolve(Files.getFileExtension(inputFile.toString())
+ "-temp-" + outputFilename, StandardResolveOptions.RESOLVE_FILE);
// Resolve the compression
Compression compression = Compression.detect(inputFile.toString());
// Perform the copy of the decompressed channel into the destination.
try (ReadableByteChannel readerChannel =
compression.readDecompressed(FileSystems.open(inputFile))) {
try (WritableByteChannel writerChannel = FileSystems.create(tempFile, MimeTypes.TEXT)) {
ByteStreams.copy(readerChannel, writerChannel);
}
// Rename the temp file to the output file.
FileSystems.rename(
ImmutableList.of(tempFile),
ImmutableList.of(outputFile),
MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
} catch (IOException e) {
String msg = e.getMessage();
LOG.error("Error occurred during decompression of {}", inputFile.toString(), e);
throw new IOException(sanitizeDecompressionErrorMsg(msg, inputFile, compression));
}
return outputFile;
}
/**
* The error messages coming from the compression library are not consistent across compression
* modes. Here we'll attempt to unify the messages to inform the user more clearly when we've
* encountered a file which is not compressed or malformed. Note that GZIP and ZIP compression
* modes will not throw an exception when a decompression is attempted on a file which is not
* compressed.
*
* @param errorMsg The error message thrown during decompression.
* @param inputFile The input file which failed decompression.
* @param compression The compression mode used during decompression.
* @return The sanitized error message. If the error was not from a malformed file, the same
* error message passed will be returned.
*/
private String sanitizeDecompressionErrorMsg(
String errorMsg, ResourceId inputFile, Compression compression) {
if (errorMsg != null
&& (errorMsg.contains("not in the BZip2 format")
|| errorMsg.contains("incorrect header check"))) {
errorMsg = String.format(MALFORMED_ERROR_MSG, inputFile.toString(), compression);
}
return errorMsg;
}
}
}
Suppression groupée Datastore
Le modèle de suppression groupée de Datastore est un pipeline qui lit les entités de Datastore avec une requête GQL donnée, puis supprime toutes les entités correspondantes du projet cible sélectionné. Le pipeline peut éventuellement transmettre les entités Datastore encodées en JSON à votre fichier UDF JavaScript, que vous pouvez utiliser pour filtrer les entités en renvoyant des valeurs nulles.
Conditions requises pour ce pipeline :
Datastore doit être configuré dans le projet avant l'exécution du modèle.
Requête GQL qui spécifie les entités à rechercher pour la suppression. Par exemple : "SELECT * FROM MyKind".
datastoreReadProjectId
ID du projet GCP de l'instance Datastore à partir de laquelle vous souhaitez lire les entités (à l'aide de votre requête GQL) utilisées pour la mise en correspondance.
datastoreDeleteProjectId
ID du projet GCP de l'instance Cloud Datastore dans laquelle supprimer les entités correspondantes. Cette valeur peut être identique à datastoreReadProjectId si vous souhaitez lire et supprimer des éléments au sein d'une même instance Datastore.
datastoreReadNamespace
[Facultatif] Espace de noms des entités demandées. Défini comme "" pour l'espace de noms par défaut.
javascriptTextTransformGcsPath
[Facultatif] Un chemin d'accès Cloud Storage contenant tout votre code JavaScript, par exemple : "gs://mybucket/mytransforms/*.js". Si vous ne souhaitez pas utiliser de fonction définie par l'utilisateur, ne renseignez pas ce champ.
javascriptTextTransformFunctionName
[Facultatif] Nom de la fonction à appeler. Si cette fonction renvoie une valeur indéfinie ou nulle pour une entité Datastore donnée, cette entité ne sera pas supprimée. Si votre code JavaScript est le suivant : "function myTransform(inJson) { ...dostuff...}", le nom de votre fonction est "myTransform". Si vous ne souhaitez pas utiliser de fonction définie par l'utilisateur, ne renseignez pas ce champ.
Exécuter le modèle de suppression groupée Datastore
Cliquez sur Create job from template (Créer une tâche à partir d'un modèle).
Sélectionnez the Datastore Bulk Delete template dans le menu déroulant Modèle Dataflow.
Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).Le nom de la tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
Saisissez vos valeurs de paramètres dans les champs fournis.
Cliquez sur Run Job (Exécuter la tâche).
GCLOUD
Exécuter à partir de l'outil de ligne de commande gcloud
Remarque : Pour utiliser l'outil de ligne de commande gcloud afin d'exécuter des modèles, vous devez disposer du SDK Cloud version 138.0.0 ou ultérieure.
Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :
Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :
Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
Remplacez GQL_QUERY par la requête que vous utiliserez pour rechercher les entités à supprimer.
Remplacez DATASTORE_READ_AND_DELETE_PROJECT_ID par l'ID de projet de votre instance Datastore. Cet exemple lira et supprimera dans la même instance Datastore
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates/latest/Datastore_to_Datastore_Delete \
--parameters \
datastoreReadGqlQuery="GQL_QUERY",\
datastoreReadProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID,\
datastoreDeleteProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID
API
Exécuter depuis l'API REST
Lors de l'exécution de ce modèle, vous aurez besoin du chemin d'accès Cloud Storage au modèle :
Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec votre ID de projet. Cette requête nécessite une autorisation.
Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :
Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
Remplacez GQL_QUERY par la requête que vous utiliserez pour rechercher les entités à supprimer.
Remplacez DATASTORE_READ_AND_DELETE_PROJECT_ID par l'ID de projet de votre instance Datastore. Cet exemple lira et supprimera dans la même instance Datastore
/*
* Copyright (C) 2018 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreDeleteEntityJson;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreDeleteOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreReadOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.ReadJsonEntities;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.TransformTextViaJavascript;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
/**
* Dataflow template which deletes pulled Datastore Entities.
*/
public class DatastoreToDatastoreDelete {
/**
* Custom PipelineOptions.
*/
public interface DatastoreToDatastoreDeleteOptions extends
PipelineOptions,
DatastoreReadOptions,
JavascriptTextTransformerOptions,
DatastoreDeleteOptions {}
/**
* Runs a pipeline which reads in Entities from datastore, passes in the JSON encoded Entities
* to a Javascript UDF, and deletes all the Entities.
*
* <p>If the UDF returns value of undefined or null for a given Entity, then that Entity will not
* be deleted.
*
* @param args arguments to the pipeline
*/
public static void main(String[] args) {
DatastoreToDatastoreDeleteOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(DatastoreToDatastoreDeleteOptions.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(ReadJsonEntities.newBuilder()
.setGqlQuery(options.getDatastoreReadGqlQuery())
.setProjectId(options.getDatastoreReadProjectId())
.setNamespace(options.getDatastoreReadNamespace())
.build())
.apply(TransformTextViaJavascript.newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.build())
.apply(DatastoreDeleteEntityJson.newBuilder()
.setProjectId(options.getDatastoreDeleteProjectId())
.build());
pipeline.run();
}
}
Générateur de flux de données vers Pub/Sub
Lors du développement des pipelines Dataflow, il est généralement admis d'effectuer une analyse comparative des performances pour des requêtes spécifiques par seconde (RPS).
Ce modèle peut être utilisé pour générer des messages JSON factices basés sur le schéma fourni par l'utilisateur au tarif indiqué dans un sujet Pub/Sub.
La bibliothèque JSON Data Generator utilisée par le pipeline permet d'utiliser différentes fonctions factices pour chaque champ du schéma. Consultez la documentation pour obtenir plus d'informations sur le format des fonctions factices et du schéma.
Conditions requises pour ce pipeline :
Créez un fichier de schéma et stockez-le dans l'emplacement Cloud Storage.
Le sujet Pub/Sub de sortie doit exister avant l'exécution.
Paramètres de modèle
Paramètres
Description
schemaLocation
Emplacement du fichier de schéma. Exemple : gs://mybucket/filename.json.
topic
Nom du sujet Pub/Sub dans lequel le pipeline doit publier des données.
Par exemple : projects/<project-id>/topics/<topic-name>
qps
Nombre de messages à publier par seconde. Exemple : 100.
autoscalingAlgorithm
[Facultatif] Algorithme utilisé pour l'autoscaling des nœuds de calcul. Les valeurs possibles sont THROUGHPUT_BASED pour activer l'autoscaling ou NONE pour le désactiver.
maxNumWorkers
[Facultatif] Nombre maximal de systèmes de calcul. Exemple : 10.
Exécuter le modèle du générateur de flux de données
Cliquez sur Create job from template (Créer une tâche à partir d'un modèle).
Sélectionnez the Streaming Data Generator template dans le menu déroulant Modèle Dataflow.
Saisissez un nom de tâche dans le champ Job Name (Nom de la tâche).Le nom de la tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
Saisissez vos valeurs de paramètres dans les champs fournis.
Cliquez sur Run Job (Exécuter la tâche).
GCLOUD
Exécuter à partir de l'outil de ligne de commande gcloud
Remarque : Pour exécuter des modèles à l'aide de l'outil de ligne de commande gcloud, vous devez disposer du SDK Cloud version 284.0.0 ou ultérieure.
Lors de l'exécution de ce modèle, vous avez besoin du chemin d'accès Cloud Storage au modèle :
Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :
Remplacez YOUR_PROJECT_ID par l'ID du projet.
Remplacez REGION_NAME par le nom de la région Dataflow. Exemple : us-central1.
Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
Remplacez SCHEMA_LOCATION par le chemin d'accès au fichier de schéma dans Cloud Storage. Exemple : gs://mybucket/filename.json.
Remplacez PUBSUB_TOPIC par le sujet Pub/Sub de sortie. Exemple : projects/<project-id>/topics/<topic-name>.
Remplacez QPS par le nombre de messages à publier par seconde.
Pour exécuter ce modèle avec une requête API REST, envoyez une requête HTTP POST avec l'ID de votre projet. Cette requête nécessite une autorisation.
Vous devez remplacer les valeurs suivantes dans l'exemple ci-dessous :
Remplacez YOUR_PROJECT_ID par l'ID du projet.
Remplacez LOCATION par le nom de la région Dataflow. Exemple : us-central1.
Remplacez JOB_NAME par le nom de tâche de votre choix. Le nom de tâche doit correspondre à l'expression régulière [a-z]([-a-z0-9]{0,38}[a-z0-9])? pour être valide.
Remplacez SCHEMA_LOCATION par le chemin d'accès au fichier de schéma dans Cloud Storage. Exemple : gs://mybucket/filename.json.
Remplacez PUBSUB_TOPIC par le sujet Pub/Sub de sortie. Exemple : projects/<project-id>/topics/<topic-name>.
Remplacez QPS par le nombre de messages à publier par seconde.
/*
* Copyright (C) 2020 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.teleport.v2.templates;
import com.github.vincentrussell.json.datagenerator.JsonDataGenerator;
import com.github.vincentrussell.json.datagenerator.JsonDataGeneratorException;
import com.github.vincentrussell.json.datagenerator.impl.JsonDataGeneratorImpl;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.vendor.guava.v20_0.com.google.common.io.ByteStreams;
import org.joda.time.Duration;
import org.joda.time.Instant;
/**
* The {@link StreamingDataGenerator} is a streaming pipeline which generates messages at a specified
* rate to a Pub/Sub topic. The messages are generated according to a schema template which
* instructs the pipeline how to populate the messages with fake data compliant to constraints.
*
* <p>The number of workers executing the pipeline must be large enough to support the supplied QPS.
* Use a general rule of 2,500 QPS per core in the worker pool.
*
* <p>See <a href="https://github.com/vincentrussell/json-data-generator">json-data-generator</a>
* for instructions on how to construct the schema file.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The schema file exists.
* <li>The Pub/Sub topic exists.
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT=my-project
* BUCKET_NAME=my-bucket
* SCHEMA_LOCATION=gs://<bucket>/<path>/<to>/game-event-schema.json
* PUBSUB_TOPIC=projects/<project-id>/topics/<topic-id>
* QPS=2500
*
* # Set containerization vars
* IMAGE_NAME=my-image-name
* TARGET_GCR_IMAGE=gcr.io/${PROJECT}/${IMAGE_NAME}
* BASE_CONTAINER_IMAGE=my-base-container-image
* BASE_CONTAINER_IMAGE_VERSION=my-base-container-image-version
* APP_ROOT=/path/to/app-root
* COMMAND_SPEC=/path/to/command-spec
*
* # Build and upload image
* mvn clean package \
* -Dimage=${TARGET_GCR_IMAGE} \
* -Dbase-container-image=${BASE_CONTAINER_IMAGE} \
* -Dbase-container-image.version=${BASE_CONTAINER_IMAGE_VERSION} \
* -Dapp-root=${APP_ROOT} \
* -Dcommand-spec=${COMMAND_SPEC}
*
* # Create a template spec containing the details of image location and metadata in GCS
* as specified in README.md file
*
* # Execute template:
* JOB_NAME=<job-name>
* PROJECT=<project-id>
* TEMPLATE_SPEC_GCSPATH=gs://path/to/template-spec
* SCHEMA_LOCATION=gs://path/to/schema.json
* PUBSUB_TOPIC=projects/$PROJECT/topics/<topic-name>
* QPS=1
*
* gcloud beta dataflow jobs run $JOB_NAME \
* --project=$PROJECT --region=us-central1 --flex-template \
* --gcs-location=$TEMPLATE_SPEC_GCSPATH \
* --parameters autoscalingAlgorithm="THROUGHPUT_BASED",schemaLocation=$SCHEMA_LOCATION,topic=$PUBSUB_TOPIC,qps=$QPS,maxNumWorkers=3
* </pre>
*/
public class StreamingDataGenerator {
/**
* The {@link StreamingDataGeneratorOptions} class provides the custom execution options passed by the executor at the
* command-line.
*/
public interface StreamingDataGeneratorOptions extends PipelineOptions {
@Description("Indicates rate of messages per second to be published to Pub/Sub.")
@Required
Long getQps();
void setQps(Long value);
@Description("The path to the schema to generate.")
@Required
String getSchemaLocation();
void setSchemaLocation(String value);
@Description("The Pub/Sub topic to write to.")
@Required
String getTopic();
void setTopic(String value);
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* StreamingDataGenerator#run(Options)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
StreamingDataGeneratorOptions options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(StreamingDataGeneratorOptions.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(StreamingDataGeneratorOptions options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
/*
* Steps:
* 1) Trigger at the supplied QPS
* 2) Generate messages containing fake data
* 3) Write messages to Pub/Sub
*/
pipeline
.apply(
"Trigger",
GenerateSequence.from(0L).withRate(options.getQps(), Duration.standardSeconds(1L)))
.apply("GenerateMessages", ParDo.of(new MessageGeneratorFn(options.getSchemaLocation())))
.apply("WriteToPubsub", PubsubIO.writeMessages().to(options.getTopic()));
return pipeline.run();
}
/**
* The {@link MessageGeneratorFn} class generates {@link PubsubMessage} objects from a supplied
* schema and populating the message with fake data.
*
* <p>See <a href="https://github.com/vincentrussell/json-data-generator">json-data-generator</a>
* for instructions on how to construct the schema file.
*/
static class MessageGeneratorFn extends DoFn<Long, PubsubMessage> {
private final String schemaLocation;
private String schema;
// Not initialized inline or constructor because {@link JsonDataGenerator} is not serializable.
private transient JsonDataGenerator dataGenerator;
MessageGeneratorFn(String schemaLocation) {
this.schemaLocation = schemaLocation;
}
@Setup
public void setup() throws IOException {
dataGenerator = new JsonDataGeneratorImpl();
Metadata metadata = FileSystems.matchSingleFileSpec(schemaLocation);
// Copy the schema file into a string which can be used for generation.
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
try (ReadableByteChannel readerChannel = FileSystems.open(metadata.resourceId())) {
try (WritableByteChannel writerChannel = Channels.newChannel(byteArrayOutputStream)) {
ByteStreams.copy(readerChannel, writerChannel);
}
}
schema = byteArrayOutputStream.toString();
}
}
@ProcessElement
public void processElement(@Element Long element, @Timestamp Instant timestamp,
OutputReceiver<PubsubMessage> receiver, ProcessContext context)
throws IOException, JsonDataGeneratorException {
// TODO: Add the ability to place eventId and eventTimestamp in the attributes.
byte[] payload;
Map<String, String> attributes = new HashMap<>();
// Generate the fake JSON according to the schema.
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
dataGenerator.generateTestDataJson(schema, byteArrayOutputStream);
payload = byteArrayOutputStream.toByteArray();
}
receiver.output(new PubsubMessage(payload, attributes));
}
}
}
[{
"type": "thumb-down",
"id": "hardToUnderstand",
"label":"Hard to understand"
},{
"type": "thumb-down",
"id": "incorrectInformationOrSampleCode",
"label":"Incorrect information or sample code"
},{
"type": "thumb-down",
"id": "missingTheInformationSamplesINeed",
"label":"Missing the information/samples I need"
},{
"type": "thumb-down",
"id": "translationIssue",
"label":"Translation issue"
},{
"type": "thumb-down",
"id": "otherDown",
"label":"Other"
}]
[{
"type": "thumb-up",
"id": "easyToUnderstand",
"label":"Easy to understand"
},{
"type": "thumb-up",
"id": "solvedMyProblem",
"label":"Solved my problem"
},{
"type": "thumb-up",
"id": "otherUp",
"label":"Other"
}]