[{
"type": "thumb-down",
"id": "hardToUnderstand",
"label":"Hard to understand"
},{
"type": "thumb-down",
"id": "incorrectInformationOrSampleCode",
"label":"Incorrect information or sample code"
},{
"type": "thumb-down",
"id": "missingTheInformationSamplesINeed",
"label":"Missing the information/samples I need"
},{
"type": "thumb-down",
"id": "translationIssue",
"label":"Translation issue"
},{
"type": "thumb-down",
"id": "otherDown",
"label":"Other"
}]
[{
"type": "thumb-up",
"id": "easyToUnderstand",
"label":"Easy to understand"
},{
"type": "thumb-up",
"id": "solvedMyProblem",
"label":"Solved my problem"
},{
"type": "thumb-up",
"id": "otherUp",
"label":"Other"
}]
Plantillas de utilidad proporcionadas por Google
Google proporciona un conjunto de plantillas de código abierto de Dataflow. Para obtener información general sobre las plantillas, consulta la página Descripción general. Para obtener una lista de todas las plantillas proporcionadas por Google, consulta la página Plantillas que proporciona Google.
En esta página, se documentan las siguientes plantillas de utilidad:
La plantilla de compresión masiva de archivos de Cloud Storage es una canalización por lotes que comprime archivos en Cloud Storage en una ubicación especificada. Esta plantilla puede ser útil cuando necesites comprimir grandes lotes de archivos como parte de un proceso de archivo periódico. Los modos de compresión compatibles son BZIP2, DEFLATE y GZIP.
Los archivos que se envían a la ubicación de destino seguirán un esquema de nombres que consta del nombre de archivo original anexado a la extensión del modo de compresión. Las extensiones anexadas serán .bzip2, .deflate y .gz.
Cualquier error que ocurra durante el proceso de compresión se escribirá en el archivo de falla en formato CSV de nombre de archivo, mensaje de error. Si no se producen fallas mientras se ejecuta la canalización, el archivo de error se creará, pero no tendrá registros de errores.
Requisitos para esta canalización:
La compresión debe tener uno de los siguientes formatos: BZIP2, DEFLATE y GZIP.
El directorio de salida debe existir antes de ejecutar la canalización.
Parámetros de la plantilla
Parámetro
Descripción
inputFilePattern
El patrón del archivo de entrada para leer. Por ejemplo, gs://bucket-name/uncompressed/*.txt.
outputDirectory
La ubicación de salida para escribir. Por ejemplo, gs://bucket-name/compressed/.
outputFailureFile
El archivo de salida del registro de errores para escribir fallas que ocurran durante el proceso de compresión. Por ejemplo, gs://bucket-name/compressed/failed.csv. Si no se encuentran fallas, el archivo se creará, pero estará vacío. El contenido del archivo tiene el formato CSV (nombre de archivo, error) y consta de una línea por cada archivo con errores de compresión.
compression
El algoritmo de compresión que se utiliza para comprimir los archivos coincidentes. El modo debe ser uno de los siguientes: BZIP2, DEFLATE o GZIP.
Ejecuta la plantilla de compresión masiva de archivos de Cloud Storage
Haz clic en Crear trabajo a partir de una plantilla (Create job from template).
Selecciona the Bulk Compress Cloud Storage Files template en el menú desplegable Plantilla de Dataflow.
Ingresa un nombre para el trabajo en el campo Nombre del trabajo.El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Ingresa los valores de tus parámetros en los campos de parámetros provistos.
Haz clic en Run Job (Ejecutar trabajo).
GCLOUD
Ejecútala desde la herramienta de línea de comandos de gcloud
Nota: Para usar la herramienta de línea de comandos de gcloud a fin de ejecutar plantillas, debes tener la versión 138.0.0 del SDK de Cloud o una posterior.
En este ejemplo, debes reemplazar los siguientes valores:
Reemplaza YOUR_PROJECT_ID por el ID del proyecto.
Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Reemplaza YOUR_BUCKET_NAME con el nombre del depósito de Cloud Storage.
Reemplaza COMPRESSION con el algoritmo de compresión que elijas.
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates/latest/Bulk_Compress_GCS_Files \
--parameters \
inputFilePattern=gs://YOUR_BUCKET_NAME/uncompressed/*.txt,\
outputDirectory=gs://YOUR_BUCKET_NAME/compressed,\
outputFailureFile=gs://YOUR_BUCKET_NAME/failed/failure.csv,\
compression=COMPRESSION
API
Ejecútala desde la API de REST
Para ejecutar esta plantilla, necesitarás la ruta de acceso de Cloud Storage a la plantilla:
Para ejecutar esta plantilla con una solicitud de la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere de una autorización.
En este ejemplo, debes reemplazar los siguientes valores:
Reemplaza YOUR_PROJECT_ID por el ID del proyecto.
Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Reemplaza YOUR_BUCKET_NAME con el nombre del depósito de Cloud Storage.
Reemplaza COMPRESSION con el algoritmo de compresión que elijas.
/*
* Copyright (C) 2018 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link BulkCompressor} is a batch pipeline that compresses files on matched by an input file
* pattern and outputs them to a specified file location. This pipeline can be useful when you need
* to compress large batches of files as part of a perodic archival process. The supported
* compression modes are: <code>BZIP2</code>, <code>DEFLATE</code>, <code>GZIP</code>, <code>ZIP
* </code>. Files output to the destination location will follow a naming schema of original
* filename appended with the compression mode extension. The extensions appended will be one of:
* <code>.bzip2</code>, <code>.deflate</code>, <code>.gz</code>, <code>.zip</code> as determined by
* the compression type.
*
* <p>Any errors which occur during the compression process will be output to the failure file in
* CSV format of filename, error message. If no failures occur during execution, the error file will
* still be created but will contain no error records.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The compression must be in one of the following formats: <code>BZIP2</code>, <code>DEFLATE
* </code>, <code>GZIP</code>, <code>ZIP</code>.
* <li>The output directory must exist prior to pipeline execution.
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT_ID=PROJECT ID HERE
* PIPELINE_FOLDER=gs://${PROJECT_ID}/dataflow/pipelines/bulk-compressor
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.BulkCompressor \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template \
* --runner=${RUNNER}"
*
* # Execute the template
* JOB_NAME=bulk-compressor-$USER-`date +"%Y%m%d-%H%M%S%z"`
*
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputFilePattern=${PIPELINE_FOLDER}/test/uncompressed/*,\
* outputDirectory=${PIPELINE_FOLDER}/test/compressed,\
* outputFailureFile=${PIPELINE_FOLDER}/test/failure/failed-${JOB_NAME}.csv,\
* compression=GZIP"
* </pre>
*/
public class BulkCompressor {
/** The logger to output status messages to. */
private static final Logger LOG = LoggerFactory.getLogger(BulkCompressor.class);
/** The tag used to identify the main output of the {@link Compressor}. */
private static final TupleTag<String> COMPRESSOR_MAIN_OUT = new TupleTag<String>() {};
/** The tag used to identify the dead-letter output of the {@link Compressor}. */
private static final TupleTag<KV<String, String>> DEADLETTER_TAG =
new TupleTag<KV<String, String>>() {};
/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
*/
public interface Options extends PipelineOptions {
@Description("The input file pattern to read from (e.g. gs://bucket-name/uncompressed/*.gz)")
@Required
ValueProvider<String> getInputFilePattern();
void setInputFilePattern(ValueProvider<String> value);
@Description("The output location to write to (e.g. gs://bucket-name/compressed)")
@Required
ValueProvider<String> getOutputDirectory();
void setOutputDirectory(ValueProvider<String> value);
@Description(
"The output file to write failures during the compression process "
+ "(e.g. gs://bucket-name/compressed/failed.txt). The contents will be one line for "
+ "each file which failed compression. Note that this parameter will "
+ "allow the pipeline to continue processing in the event of a failure.")
@Required
ValueProvider<String> getOutputFailureFile();
void setOutputFailureFile(ValueProvider<String> value);
@Description("The compression algorithm to use on the matched files.")
@Required
ValueProvider<Compression> getCompression();
void setCompression(ValueProvider<Compression> value);
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* BulkCompressor#run(Options)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
/*
* Steps:
* 1) Find all files matching the input pattern
* 2) Compress the files found and output them to the output directory
* 3) Write any errors to the failure output file
*/
PCollectionTuple compressOut =
pipeline
.apply("Match File(s)", FileIO.match().filepattern(options.getInputFilePattern()))
.apply(
"Compress File(s)",
ParDo.of(new Compressor(options.getOutputDirectory(), options.getCompression()))
.withOutputTags(COMPRESSOR_MAIN_OUT, TupleTagList.of(DEADLETTER_TAG)));
compressOut
.get(DEADLETTER_TAG)
.apply(
"Format Errors",
MapElements.into(TypeDescriptors.strings())
.via(kv -> String.format("%s,%s", kv.getKey(), kv.getValue())))
.apply(
"Write Error File",
TextIO.write()
.to(options.getOutputFailureFile())
.withHeader("Filename,Error")
.withoutSharding());
return pipeline.run();
}
/**
* The {@link Compressor} accepts {@link MatchResult.Metadata} from the FileSystems API and
* compresses each file to an output location. Any compression failures which occur during
* execution will be output to a separate output for further processing.
*/
@SuppressWarnings("serial")
public static class Compressor extends DoFn<MatchResult.Metadata, String> {
private final ValueProvider<String> destinationLocation;
private final ValueProvider<Compression> compressionValue;
Compressor(ValueProvider<String> destinationLocation, ValueProvider<Compression> compression) {
this.destinationLocation = destinationLocation;
this.compressionValue = compression;
}
@ProcessElement
public void processElement(ProcessContext context) {
ResourceId inputFile = context.element().resourceId();
Compression compression = compressionValue.get();
// Add the compression extension to the output filename. Example: demo.txt -> demo.txt.gz
String outputFilename = inputFile.getFilename() + compression.getSuggestedSuffix();
// Resolve the necessary resources to perform the transfer
ResourceId outputDir = FileSystems.matchNewResource(destinationLocation.get(), true);
ResourceId outputFile =
outputDir.resolve(outputFilename, StandardResolveOptions.RESOLVE_FILE);
ResourceId tempFile =
outputDir.resolve("temp-" + outputFilename, StandardResolveOptions.RESOLVE_FILE);
// Perform the copy of the compressed channel to the destination.
try (ReadableByteChannel readerChannel = FileSystems.open(inputFile)) {
try (WritableByteChannel writerChannel =
compression.writeCompressed(FileSystems.create(tempFile, MimeTypes.BINARY))) {
// Execute the copy to the temporary file
ByteStreams.copy(readerChannel, writerChannel);
}
// Rename the temporary file to the output file
FileSystems.rename(ImmutableList.of(tempFile), ImmutableList.of(outputFile));
// Output the path to the uncompressed file
context.output(outputFile.toString());
} catch (IOException e) {
LOG.error("Error occurred during compression of {}", inputFile.toString(), e);
context.output(DEADLETTER_TAG, KV.of(inputFile.toString(), e.getMessage()));
}
}
}
}
Descompresión masiva de archivos de Cloud Storage
La plantilla de descompresión masiva de archivos de Cloud Storage es una canalización por lotes que descomprime archivos de Cloud Storage en una ubicación especificada. Esta función es útil cuando deseas utilizar datos comprimidos para minimizar los costos del ancho de banda de la red durante una migración, pero quieres maximizar la velocidad de procesamiento analítico trabajando con datos no comprimidos luego de la migración. La canalización controla de forma automática los diferentes modos de compresión en una sola ejecución y determina el modo de descompresión que utilizará según la extensión de los archivos (.bzip2, .deflate, .gz, .zip).
Requisitos para esta canalización:
Los archivos que se descomprimirán deben tener uno de los siguientes formatos: Bzip2, Deflate, Gzip o Zip.
El directorio de salida debe existir antes de ejecutar la canalización.
Parámetros de la plantilla
Parámetro
Descripción
inputFilePattern
El patrón del archivo de entrada para leer. Por ejemplo, gs://bucket-name/compressed/*.gz.
outputDirectory
La ubicación de salida para escribir. Por ejemplo, gs://bucket-name/decompressed.
outputFailureFile
El archivo de salida del registro de errores para escribir fallas que ocurran durante el proceso de descompresión. Por ejemplo, gs://bucket-name/decompressed/failed.csv. Si no se encuentran fallas, el archivo se creará, pero estará vacío. El contenido del archivo tiene el formato CSV (nombre de archivo, error) y consta de una línea por cada archivo con errores de descompresión.
Ejecuta la plantilla de descompresión masiva de archivos de Cloud Storage
Haz clic en Crear trabajo a partir de una plantilla (Create job from template).
Selecciona the Bulk Decompress Cloud Storage Files template en el menú desplegable Plantilla de Dataflow.
Ingresa un nombre para el trabajo en el campo Nombre del trabajo.El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Ingresa los valores de tus parámetros en los campos de parámetros provistos.
Haz clic en Run Job (Ejecutar trabajo).
GCLOUD
Ejecútala desde la herramienta de línea de comandos de gcloud
Nota: Para usar la herramienta de línea de comandos de gcloud a fin de ejecutar plantillas, debes tener la versión 138.0.0 del SDK de Cloud o una posterior.
Para ejecutar esta plantilla, necesitarás la ruta de acceso de Cloud Storage a la plantilla:
En este ejemplo, debes reemplazar los siguientes valores:
Reemplaza YOUR_PROJECT_ID por el ID del proyecto.
Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Reemplaza YOUR_BUCKET_NAME con el nombre del depósito de Cloud Storage.
Reemplaza OUTPUT_FAILURE_FILE_PATH con tu ruta de acceso al archivo que contiene la información de falla.
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates/latest/Bulk_Decompress_GCS_Files \
--parameters \
inputFilePattern=gs://YOUR_BUCKET_NAME/compressed/*.gz,\
outputDirectory=gs://YOUR_BUCKET_NAME/decompressed,\
outputFailureFile=OUTPUT_FAILURE_FILE_PATH
API
Ejecútala desde la API de REST
Para ejecutar esta plantilla, necesitarás la ruta de acceso de Cloud Storage a la plantilla:
Para ejecutar esta plantilla con una solicitud de la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere de una autorización.
En este ejemplo, debes reemplazar los siguientes valores:
Reemplaza YOUR_PROJECT_ID por el ID del proyecto.
Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Reemplaza YOUR_BUCKET_NAME con el nombre del depósito de Cloud Storage.
Reemplaza OUTPUT_FAILURE_FILE_PATH con tu ruta de acceso al archivo que contiene la información de falla.
/*
* Copyright (C) 2018 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.csv.QuoteMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This pipeline decompresses file(s) from Google Cloud Storage and re-uploads them to a destination
* location.
*
* <p><b>Parameters</b>
*
* <p>The {@code --inputFilePattern} parameter specifies a file glob to process. Files found can be
* expressed in the following formats:
*
* <pre>
* --inputFilePattern=gs://bucket-name/compressed-dir/*
* --inputFilePattern=gs://bucket-name/compressed-dir/demo*.gz
* </pre>
*
* <p>The {@code --outputDirectory} parameter can be expressed in the following formats:
*
* <pre>
* --outputDirectory=gs://bucket-name
* --outputDirectory=gs://bucket-name/decompressed-dir
* </pre>
*
* <p>The {@code --outputFailureFile} parameter indicates the file to write the names of the files
* which failed decompression and their associated error messages. This file can then be used for
* subsequent processing by another process outside of Dataflow (e.g. send an email with the
* failures, etc.). If there are no failures, the file will still be created but will be empty. The
* failure file structure contains both the file that caused the error and the error message in CSV
* format. The file will contain one header row and two columns (Filename, Error). The filename
* output to the failureFile will be the full path of the file for ease of debugging.
*
* <pre>
* --outputFailureFile=gs://bucket-name/decompressed-dir/failed.csv
* </pre>
*
* <p>Example Output File:
*
* <pre>
* Filename,Error
* gs://docs-demo/compressedFile.gz, File is malformed or not compressed in BZIP2 format.
* </pre>
*
* <p><b>Example Usage</b>
*
* <pre>
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.BulkDecompressor \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=gs://${PROJECT_ID}/dataflow/pipelines/${PIPELINE_FOLDER}/staging \
* --tempLocation=gs://${PROJECT_ID}/dataflow/pipelines/${PIPELINE_FOLDER}/temp \
* --runner=DataflowRunner \
* --inputFilePattern=gs://${PROJECT_ID}/compressed-dir/*.gz \
* --outputDirectory=gs://${PROJECT_ID}/decompressed-dir \
* --outputFailureFile=gs://${PROJECT_ID}/decompressed-dir/failed.csv"
* </pre>
*/
public class BulkDecompressor {
/** The logger to output status messages to. */
private static final Logger LOG = LoggerFactory.getLogger(BulkDecompressor.class);
/**
* A list of the {@link Compression} values excluding {@link Compression#AUTO} and {@link
* Compression#UNCOMPRESSED}.
*/
@VisibleForTesting
static final Set<Compression> SUPPORTED_COMPRESSIONS =
Stream.of(Compression.values())
.filter(value -> value != Compression.AUTO && value != Compression.UNCOMPRESSED)
.collect(Collectors.toSet());
/** The error msg given when the pipeline matches a file but cannot determine the compression. */
@VisibleForTesting
static final String UNCOMPRESSED_ERROR_MSG =
"Skipping file %s because it did not match any compression mode (%s)";
@VisibleForTesting
static final String MALFORMED_ERROR_MSG =
"The file resource %s is malformed or not in %s compressed format.";
/** The tag used to identify the main output of the {@link Decompress} DoFn. */
@VisibleForTesting
static final TupleTag<String> DECOMPRESS_MAIN_OUT_TAG = new TupleTag<String>() {};
/** The tag used to identify the dead-letter sideOutput of the {@link Decompress} DoFn. */
@VisibleForTesting
static final TupleTag<KV<String, String>> DEADLETTER_TAG = new TupleTag<KV<String, String>>() {};
/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
*/
public interface Options extends PipelineOptions {
@Description("The input file pattern to read from (e.g. gs://bucket-name/compressed/*.gz)")
@Required
ValueProvider<String> getInputFilePattern();
void setInputFilePattern(ValueProvider<String> value);
@Description("The output location to write to (e.g. gs://bucket-name/decompressed)")
@Required
ValueProvider<String> getOutputDirectory();
void setOutputDirectory(ValueProvider<String> value);
@Description(
"The output file to write failures during the decompression process "
+ "(e.g. gs://bucket-name/decompressed/failed.txt). The contents will be one line for "
+ "each file which failed decompression. Note that this parameter will "
+ "allow the pipeline to continue processing in the event of a failure.")
@Required
ValueProvider<String> getOutputFailureFile();
void setOutputFailureFile(ValueProvider<String> value);
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* BulkDecompressor#run(Options)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(Options options) {
/*
* Steps:
* 1) Find all files matching the input pattern
* 2) Decompress the files found and output them to the output directory
* 3) Write any errors to the failure output file
*/
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
// Run the pipeline over the work items.
PCollectionTuple decompressOut =
pipeline
.apply("MatchFile(s)", FileIO.match().filepattern(options.getInputFilePattern()))
.apply(
"DecompressFile(s)",
ParDo.of(new Decompress(options.getOutputDirectory()))
.withOutputTags(DECOMPRESS_MAIN_OUT_TAG, TupleTagList.of(DEADLETTER_TAG)));
decompressOut
.get(DEADLETTER_TAG)
.apply(
"FormatErrors",
MapElements.into(TypeDescriptors.strings())
.via(
kv -> {
StringWriter stringWriter = new StringWriter();
try {
CSVPrinter printer =
new CSVPrinter(
stringWriter,
CSVFormat.DEFAULT
.withEscape('\\')
.withQuoteMode(QuoteMode.NONE)
.withRecordSeparator('\n')
);
printer.printRecord(kv.getKey(), kv.getValue());
} catch (IOException e) {
throw new RuntimeException(e);
}
return stringWriter.toString();
}))
// We don't expect error files to be large so we'll create a single
// file for ease of reprocessing by processes outside of Dataflow.
.apply(
"WriteErrorFile",
TextIO.write()
.to(options.getOutputFailureFile())
.withHeader("Filename,Error")
.withoutSharding());
return pipeline.run();
}
/**
* Performs the decompression of an object on Google Cloud Storage and uploads the decompressed
* object back to a specified destination location.
*/
@SuppressWarnings("serial")
public static class Decompress extends DoFn<MatchResult.Metadata, String> {
private final ValueProvider<String> destinationLocation;
Decompress(ValueProvider<String> destinationLocation) {
this.destinationLocation = destinationLocation;
}
@ProcessElement
public void processElement(ProcessContext context) {
ResourceId inputFile = context.element().resourceId();
// Output a record to the failure file if the file doesn't match a known compression.
if (!Compression.AUTO.isCompressed(inputFile.toString())) {
String errorMsg =
String.format(UNCOMPRESSED_ERROR_MSG, inputFile.toString(), SUPPORTED_COMPRESSIONS);
context.output(DEADLETTER_TAG, KV.of(inputFile.toString(), errorMsg));
} else {
try {
ResourceId outputFile = decompress(inputFile);
context.output(outputFile.toString());
} catch (IOException e) {
LOG.error(e.getMessage());
context.output(DEADLETTER_TAG, KV.of(inputFile.toString(), e.getMessage()));
}
}
}
/**
* Decompresses the inputFile using the specified compression and outputs to the main output of
* the {@link Decompress} doFn. Files output to the destination will be first written as temp
* files with a "temp-" prefix within the output directory. If a file fails decompression, the
* filename and the associated error will be output to the dead-letter.
*
* @param inputFile The inputFile to decompress.
* @return A {@link ResourceId} which points to the resulting file from the decompression.
*/
private ResourceId decompress(ResourceId inputFile) throws IOException {
// Remove the compressed extension from the file. Example: demo.txt.gz -> demo.txt
String outputFilename = Files.getNameWithoutExtension(inputFile.toString());
// Resolve the necessary resources to perform the transfer.
ResourceId outputDir = FileSystems.matchNewResource(destinationLocation.get(), true);
ResourceId outputFile =
outputDir.resolve(outputFilename, StandardResolveOptions.RESOLVE_FILE);
ResourceId tempFile =
outputDir.resolve(Files.getFileExtension(inputFile.toString())
+ "-temp-" + outputFilename, StandardResolveOptions.RESOLVE_FILE);
// Resolve the compression
Compression compression = Compression.detect(inputFile.toString());
// Perform the copy of the decompressed channel into the destination.
try (ReadableByteChannel readerChannel =
compression.readDecompressed(FileSystems.open(inputFile))) {
try (WritableByteChannel writerChannel = FileSystems.create(tempFile, MimeTypes.TEXT)) {
ByteStreams.copy(readerChannel, writerChannel);
}
// Rename the temp file to the output file.
FileSystems.rename(
ImmutableList.of(tempFile),
ImmutableList.of(outputFile),
MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
} catch (IOException e) {
String msg = e.getMessage();
LOG.error("Error occurred during decompression of {}", inputFile.toString(), e);
throw new IOException(sanitizeDecompressionErrorMsg(msg, inputFile, compression));
}
return outputFile;
}
/**
* The error messages coming from the compression library are not consistent across compression
* modes. Here we'll attempt to unify the messages to inform the user more clearly when we've
* encountered a file which is not compressed or malformed. Note that GZIP and ZIP compression
* modes will not throw an exception when a decompression is attempted on a file which is not
* compressed.
*
* @param errorMsg The error message thrown during decompression.
* @param inputFile The input file which failed decompression.
* @param compression The compression mode used during decompression.
* @return The sanitized error message. If the error was not from a malformed file, the same
* error message passed will be returned.
*/
private String sanitizeDecompressionErrorMsg(
String errorMsg, ResourceId inputFile, Compression compression) {
if (errorMsg != null
&& (errorMsg.contains("not in the BZip2 format")
|| errorMsg.contains("incorrect header check"))) {
errorMsg = String.format(MALFORMED_ERROR_MSG, inputFile.toString(), compression);
}
return errorMsg;
}
}
}
Borrado masivo de Datastore
La plantilla de borrado masivo de Datastore es una canalización que lee entidades de Datastore con una consulta de GQL determinada y, luego, borra todas las entidades coincidentes en el proyecto de destino seleccionado. De forma opcional, la canalización puede pasar las entidades de Datastore codificadas en JSON a tu UDF de JavaScript, que puedes usar para filtrar entidades mostrando valores nulos.
Requisitos para esta canalización:
Datastore debe configurarse en el proyecto antes de ejecutar la plantilla.
Si se realiza la lectura y el borrado desde instancias de Datastore diferentes, la cuenta de servicio del controlador de Dataflow debe tener permiso para leer desde una instancia y borrar desde la otra.
Parámetros de la plantilla
Parámetro
Descripción
datastoreReadGqlQuery
Consulta de GQL que especifica las entidades que deben coincidir para la eliminación. Por ejemplo, "SELECT * FROM MyKind".
datastoreReadProjectId
ID del proyecto de GCP de la instancia de Datastore desde la que deseas leer las entidades (mediante tu consulta de GQL) que se usan para las coincidencias.
datastoreDeleteProjectId
ID del proyecto de GCP de la instancia de Datastore desde la cual borrar las entidades coincidentes. Esto puede ser igual a datastoreReadProjectId si deseas leer y borrar dentro de la misma instancia de Datastore.
datastoreReadNamespace
Espacio de nombres de las entidades solicitadas [opcional]. Configurado como "" para el espacio de nombres predeterminado.
javascriptTextTransformGcsPath
Ruta de acceso de Cloud Storage que contiene todo el código de JavaScript. Por ejemplo, "gs://mybucket/mytransforms/*.js" [opcional]. Si no quieres usar una UDF, deja este campo en blanco.
javascriptTextTransformFunctionName
[Opcional] Nombre de la función para llamar. Si esta función muestra un valor no definido o nulo para una entidad de Datastore determinada, esa entidad no se borrará. Si tienes el código de JavaScript "function myTransform(inJson) { …dostuff…}", el nombre de la función será "myTransform". Si no quieres usar una UDF, deja este campo en blanco.
Ejecuta la plantilla de borrado masivo de Datastore
Haz clic en Crear trabajo a partir de una plantilla (Create job from template).
Selecciona the Datastore Bulk Delete template en el menú desplegable Plantilla de Dataflow.
Ingresa un nombre para el trabajo en el campo Nombre del trabajo.El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Ingresa los valores de tus parámetros en los campos de parámetros provistos.
Haz clic en Run Job (Ejecutar trabajo).
GCLOUD
Ejecútala desde la herramienta de línea de comandos de gcloud
Nota: Para usar la herramienta de línea de comandos de gcloud a fin de ejecutar plantillas, debes tener la versión 138.0.0 del SDK de Cloud o una posterior.
Para ejecutar esta plantilla, necesitarás la ruta de acceso de Cloud Storage a la plantilla:
En este ejemplo, debes reemplazar los siguientes valores:
Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Reemplaza GQL_QUERY con la consulta que usarás para borrar las entidades coincidentes.
Reemplaza DATASTORE_READ_AND_DELETE_PROJECT_ID con tu ID del proyecto de la instancia de Datastore. En este ejemplo, se realizan operaciones de lectura y de eliminación desde la misma instancia de Datastore.
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates/latest/Datastore_to_Datastore_Delete \
--parameters \
datastoreReadGqlQuery="GQL_QUERY",\
datastoreReadProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID,\
datastoreDeleteProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID
API
Ejecútala desde la API de REST
Para ejecutar esta plantilla, necesitarás la ruta de acceso de Cloud Storage a la plantilla:
Para ejecutar esta plantilla con una solicitud de la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere de una autorización.
En este ejemplo, debes reemplazar los siguientes valores:
Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Reemplaza GQL_QUERY con la consulta que usarás para borrar las entidades coincidentes.
Reemplaza DATASTORE_READ_AND_DELETE_PROJECT_ID con tu ID del proyecto de la instancia de Datastore. En este ejemplo, se realizan operaciones de lectura y de eliminación desde la misma instancia de Datastore.
/*
* Copyright (C) 2018 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreDeleteEntityJson;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreDeleteOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreReadOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.ReadJsonEntities;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.TransformTextViaJavascript;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
/**
* Dataflow template which deletes pulled Datastore Entities.
*/
public class DatastoreToDatastoreDelete {
/**
* Custom PipelineOptions.
*/
public interface DatastoreToDatastoreDeleteOptions extends
PipelineOptions,
DatastoreReadOptions,
JavascriptTextTransformerOptions,
DatastoreDeleteOptions {}
/**
* Runs a pipeline which reads in Entities from datastore, passes in the JSON encoded Entities
* to a Javascript UDF, and deletes all the Entities.
*
* <p>If the UDF returns value of undefined or null for a given Entity, then that Entity will not
* be deleted.
*
* @param args arguments to the pipeline
*/
public static void main(String[] args) {
DatastoreToDatastoreDeleteOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(DatastoreToDatastoreDeleteOptions.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(ReadJsonEntities.newBuilder()
.setGqlQuery(options.getDatastoreReadGqlQuery())
.setProjectId(options.getDatastoreReadProjectId())
.setNamespace(options.getDatastoreReadNamespace())
.build())
.apply(TransformTextViaJavascript.newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.build())
.apply(DatastoreDeleteEntityJson.newBuilder()
.setProjectId(options.getDatastoreDeleteProjectId())
.build());
pipeline.run();
}
}
Generador de datos de transmisión a Pub/Sub
Durante el desarrollo de canalizaciones de Dataflow, uno de los requisitos comunes es ejecutar una comparativa de rendimiento cada cierta cantidad de consultas por segundo (QPS).
Esta plantilla se puede usar a fin de generar mensajes JSON falsos basados en el esquema proporcionado por el usuario a la velocidad especificada para un tema de Pub/Sub.
La biblioteca del Generador de datos JSON que usa la canalización permite usar varias funciones de este tipo para cada campo de esquema. Consulta los documentos para obtener más información sobre estas funciones y el formato del esquema.
Requisitos para esta canalización:
Crea un archivo de esquema y almacénalo en la ubicación de Cloud Storage
El tema de Pub/Sub de salida debe existir antes de la ejecución.
Parámetros de la plantilla
Parámetro
Descripción
schemaLocation
Ubicación del archivo de esquema. Por ejemplo: gs://mybucket/filename.json.
topic
Nombre del tema de Pub/Sub en el que la canalización debe publicar datos.
Por ejemplo: projects/<project-id>/topics/<topic-name>
qps
Cantidad de mensajes que se publicarán por segundo. Por ejemplo: 100.
autoscalingAlgorithm
Algoritmo que se usa para el ajuste de escala automático de los trabajadores (opcional). Los valores posibles son THROUGHPUT_BASED, para habilitar el ajuste de escala automático, o NONE, si deseas inhabilitarlo.
maxNumWorkers
Cantidad máxima de máquinas de trabajador (opcional). Por ejemplo: 10.
Ejecuta la plantilla de Generador de datos de transmisión
Haz clic en Crear trabajo a partir de una plantilla (Create job from template).
Selecciona the Streaming Data Generator template en el menú desplegable Plantilla de Dataflow.
Ingresa un nombre para el trabajo en el campo Nombre del trabajo.El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Ingresa los valores de tus parámetros en los campos de parámetros provistos.
Haz clic en Run Job (Ejecutar trabajo).
GCLOUD
Ejecuta desde la herramienta de línea de comandos de gcloud
Nota: Si quieres usar la herramienta de línea de comandos de gcloud para ejecutar plantillas, debes tener la versión 284.0.0 o superior del SDK de Cloud.
Cuando ejecutas esta plantilla, necesitas la ruta de acceso de Cloud Storage a la plantilla:
En este ejemplo, debes reemplazar los siguientes valores:
Reemplaza YOUR_PROJECT_ID por el ID del proyecto.
Reemplaza REGION_NAME por el nombre de la región de Dataflow. Por ejemplo: us-central1.
Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Reemplaza SCHEMA_LOCATION por la ruta de acceso al archivo de esquema en Cloud Storage. Por ejemplo: gs://mybucket/filename.json.
Reemplaza PUBSUB_TOPIC por el tema de Pub/Sub de salida. Por ejemplo: projects/<project-id>/topics/<topic-name>.
Reemplaza QPS por la cantidad de mensajes que se publicarán por segundo.
Para ejecutar esta plantilla con una solicitud a la API de REST, envía una solicitud HTTP POST con tu ID del proyecto. Esta solicitud requiere autorización.
En este ejemplo, debes reemplazar los siguientes valores:
Reemplaza YOUR_PROJECT_ID por el ID del proyecto.
Reemplaza LOCATION por el nombre de la región de Dataflow. Por ejemplo: us-central1.
Reemplaza JOB_NAME por un nombre de trabajo a elección. El nombre del trabajo debe coincidir con la expresión regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Reemplaza SCHEMA_LOCATION por la ruta de acceso al archivo de esquema en Cloud Storage. Por ejemplo: gs://mybucket/filename.json.
Reemplaza PUBSUB_TOPIC por el tema de Pub/Sub de salida. Por ejemplo: projects/<project-id>/topics/<topic-name>.
Reemplaza QPS por la cantidad de mensajes que se publicarán por segundo.
/*
* Copyright (C) 2020 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.cloud.teleport.v2.templates;
import com.github.vincentrussell.json.datagenerator.JsonDataGenerator;
import com.github.vincentrussell.json.datagenerator.JsonDataGeneratorException;
import com.github.vincentrussell.json.datagenerator.impl.JsonDataGeneratorImpl;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.nio.channels.Channels;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.fs.MatchResult.Metadata;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubMessage;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.vendor.guava.v20_0.com.google.common.io.ByteStreams;
import org.joda.time.Duration;
import org.joda.time.Instant;
/**
* The {@link StreamingDataGenerator} is a streaming pipeline which generates messages at a specified
* rate to a Pub/Sub topic. The messages are generated according to a schema template which
* instructs the pipeline how to populate the messages with fake data compliant to constraints.
*
* <p>The number of workers executing the pipeline must be large enough to support the supplied QPS.
* Use a general rule of 2,500 QPS per core in the worker pool.
*
* <p>See <a href="https://github.com/vincentrussell/json-data-generator">json-data-generator</a>
* for instructions on how to construct the schema file.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The schema file exists.
* <li>The Pub/Sub topic exists.
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT=my-project
* BUCKET_NAME=my-bucket
* SCHEMA_LOCATION=gs://<bucket>/<path>/<to>/game-event-schema.json
* PUBSUB_TOPIC=projects/<project-id>/topics/<topic-id>
* QPS=2500
*
* # Set containerization vars
* IMAGE_NAME=my-image-name
* TARGET_GCR_IMAGE=gcr.io/${PROJECT}/${IMAGE_NAME}
* BASE_CONTAINER_IMAGE=my-base-container-image
* BASE_CONTAINER_IMAGE_VERSION=my-base-container-image-version
* APP_ROOT=/path/to/app-root
* COMMAND_SPEC=/path/to/command-spec
*
* # Build and upload image
* mvn clean package \
* -Dimage=${TARGET_GCR_IMAGE} \
* -Dbase-container-image=${BASE_CONTAINER_IMAGE} \
* -Dbase-container-image.version=${BASE_CONTAINER_IMAGE_VERSION} \
* -Dapp-root=${APP_ROOT} \
* -Dcommand-spec=${COMMAND_SPEC}
*
* # Create a template spec containing the details of image location and metadata in GCS
* as specified in README.md file
*
* # Execute template:
* JOB_NAME=<job-name>
* PROJECT=<project-id>
* TEMPLATE_SPEC_GCSPATH=gs://path/to/template-spec
* SCHEMA_LOCATION=gs://path/to/schema.json
* PUBSUB_TOPIC=projects/$PROJECT/topics/<topic-name>
* QPS=1
*
* gcloud beta dataflow jobs run $JOB_NAME \
* --project=$PROJECT --region=us-central1 --flex-template \
* --gcs-location=$TEMPLATE_SPEC_GCSPATH \
* --parameters autoscalingAlgorithm="THROUGHPUT_BASED",schemaLocation=$SCHEMA_LOCATION,topic=$PUBSUB_TOPIC,qps=$QPS,maxNumWorkers=3
* </pre>
*/
public class StreamingDataGenerator {
/**
* The {@link StreamingDataGeneratorOptions} class provides the custom execution options passed by the executor at the
* command-line.
*/
public interface StreamingDataGeneratorOptions extends PipelineOptions {
@Description("Indicates rate of messages per second to be published to Pub/Sub.")
@Required
Long getQps();
void setQps(Long value);
@Description("The path to the schema to generate.")
@Required
String getSchemaLocation();
void setSchemaLocation(String value);
@Description("The Pub/Sub topic to write to.")
@Required
String getTopic();
void setTopic(String value);
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* StreamingDataGenerator#run(Options)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
StreamingDataGeneratorOptions options = PipelineOptionsFactory
.fromArgs(args)
.withValidation()
.as(StreamingDataGeneratorOptions.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(StreamingDataGeneratorOptions options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
/*
* Steps:
* 1) Trigger at the supplied QPS
* 2) Generate messages containing fake data
* 3) Write messages to Pub/Sub
*/
pipeline
.apply(
"Trigger",
GenerateSequence.from(0L).withRate(options.getQps(), Duration.standardSeconds(1L)))
.apply("GenerateMessages", ParDo.of(new MessageGeneratorFn(options.getSchemaLocation())))
.apply("WriteToPubsub", PubsubIO.writeMessages().to(options.getTopic()));
return pipeline.run();
}
/**
* The {@link MessageGeneratorFn} class generates {@link PubsubMessage} objects from a supplied
* schema and populating the message with fake data.
*
* <p>See <a href="https://github.com/vincentrussell/json-data-generator">json-data-generator</a>
* for instructions on how to construct the schema file.
*/
static class MessageGeneratorFn extends DoFn<Long, PubsubMessage> {
private final String schemaLocation;
private String schema;
// Not initialized inline or constructor because {@link JsonDataGenerator} is not serializable.
private transient JsonDataGenerator dataGenerator;
MessageGeneratorFn(String schemaLocation) {
this.schemaLocation = schemaLocation;
}
@Setup
public void setup() throws IOException {
dataGenerator = new JsonDataGeneratorImpl();
Metadata metadata = FileSystems.matchSingleFileSpec(schemaLocation);
// Copy the schema file into a string which can be used for generation.
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
try (ReadableByteChannel readerChannel = FileSystems.open(metadata.resourceId())) {
try (WritableByteChannel writerChannel = Channels.newChannel(byteArrayOutputStream)) {
ByteStreams.copy(readerChannel, writerChannel);
}
}
schema = byteArrayOutputStream.toString();
}
}
@ProcessElement
public void processElement(@Element Long element, @Timestamp Instant timestamp,
OutputReceiver<PubsubMessage> receiver, ProcessContext context)
throws IOException, JsonDataGeneratorException {
// TODO: Add the ability to place eventId and eventTimestamp in the attributes.
byte[] payload;
Map<String, String> attributes = new HashMap<>();
// Generate the fake JSON according to the schema.
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
dataGenerator.generateTestDataJson(schema, byteArrayOutputStream);
payload = byteArrayOutputStream.toByteArray();
}
receiver.output(new PubsubMessage(payload, attributes));
}
}
}
[{
"type": "thumb-down",
"id": "hardToUnderstand",
"label":"Hard to understand"
},{
"type": "thumb-down",
"id": "incorrectInformationOrSampleCode",
"label":"Incorrect information or sample code"
},{
"type": "thumb-down",
"id": "missingTheInformationSamplesINeed",
"label":"Missing the information/samples I need"
},{
"type": "thumb-down",
"id": "translationIssue",
"label":"Translation issue"
},{
"type": "thumb-down",
"id": "otherDown",
"label":"Other"
}]
[{
"type": "thumb-up",
"id": "easyToUnderstand",
"label":"Easy to understand"
},{
"type": "thumb-up",
"id": "solvedMyProblem",
"label":"Solved my problem"
},{
"type": "thumb-up",
"id": "otherUp",
"label":"Other"
}]