O modelo "Compactação em massa de arquivos do Cloud Storage" é um pipeline em lote que compacta arquivos do Cloud Storage para um local específico. Esse modelo pode ser útil para compactar grandes lotes de arquivos como parte de um processo de arquivamento periódico. Os modos de compactação compatíveis são: BZIP2, DEFLATE e GZIP.
A saída de arquivos para o local de destino seguirá o esquema de nomeação do arquivo original anexado à extensão do modo de compactação. As extensões anexadas podem ser dos tipos a seguir: .bzip2, .deflate e .gz.
Qualquer erro ocorrido durante o processo de compactação é enviado para o arquivo de falha em formato CSV com o nome do arquivo e a mensagem de erro. Se nenhuma falha ocorrer durante a execução do pipeline, o arquivo de erro ainda será criado, mas não conterá registros de erro.
Requisitos para este pipeline:
A compactação precisa estar em um dos seguintes formatos: BZIP2, DEFLATE e GZIP.
O diretório de saída precisa ser criado antes de executar o pipeline.
Parâmetros do modelo
Parâmetro
Descrição
inputFilePattern
O padrão do arquivo de entrada a ser lido. Por exemplo, gs://bucket-name/uncompressed/*.txt
outputDirectory
O local de saída da gravação. Por exemplo, gs://bucket-name/compressed/
outputFailureFile
O arquivo de saída do registro de erros a ser usado para gravação de falhas ocorridas durante o processo de compactação. Por exemplo, gs://bucket-name/compressed/failed.csv. O arquivo será criado mesmo que não haja falhas, mas estará vazio. O conteúdo do arquivo está no formato CSV (nome do arquivo, erro) e consiste em uma linha para cada arquivo com falha na compactação.
compression
O algoritmo de compactação usado para compactar os arquivos correspondentes. Precisa ser um dos seguintes formatos: BZIP2, DEFLATE e GZIP.
Como executar o modelo "Compactação em massa de arquivos do Cloud Storage"
Selecione the Bulk Compress Cloud Storage Files template no menu suspenso Modelo do Dataflow.
Digite o nome de um job no campo Nome do job.
O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Digite os valores de parâmetro nos campos fornecidos.
Clique em Executar job.
GCLOUD
Executar na ferramenta de linha de comandogcloud
Observação: para executar modelos usando a ferramenta de linha de comando gcloud, você precisa ter a versão 138.0.0 ou mais recente do SDK do Cloud.
Substitua os valores a seguir neste exemplo:
Substitua YOUR_PROJECT_ID pelo ID do projeto.
Substitua JOB_NAME por um nome de job de sua escolha. O nome do job
precisa corresponder à expressão regular
[a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Substitua YOUR_BUCKET_NAME pelo nome do bucket no Cloud Storage.
Substitua COMPRESSION pela opção de algoritmo de compactação.
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates/latest/Bulk_Compress_GCS_Files \
--parameters \
inputFilePattern=gs://YOUR_BUCKET_NAME/uncompressed/*.txt,\
outputDirectory=gs://YOUR_BUCKET_NAME/compressed,\
outputFailureFile=gs://YOUR_BUCKET_NAME/failed/failure.csv,\
compression=COMPRESSION
API
Executar pela API REST
Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:
Substitua JOB_NAME por um nome de job de sua escolha. O nome do job
precisa corresponder à expressão regular
[a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Substitua YOUR_BUCKET_NAME pelo nome do bucket no Cloud Storage.
Substitua COMPRESSION pela opção de algoritmo de compactação.
/*
* Copyright (C) 2018 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link BulkCompressor} is a batch pipeline that compresses files on matched by an input file
* pattern and outputs them to a specified file location. This pipeline can be useful when you need
* to compress large batches of files as part of a perodic archival process. The supported
* compression modes are: <code>BZIP2</code>, <code>DEFLATE</code>, <code>GZIP</code>, <code>ZIP
* </code>. Files output to the destination location will follow a naming schema of original
* filename appended with the compression mode extension. The extensions appended will be one of:
* <code>.bzip2</code>, <code>.deflate</code>, <code>.gz</code>, <code>.zip</code> as determined by
* the compression type.
*
* <p>Any errors which occur during the compression process will be output to the failure file in
* CSV format of filename, error message. If no failures occur during execution, the error file will
* still be created but will contain no error records.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The compression must be in one of the following formats: <code>BZIP2</code>, <code>DEFLATE
* </code>, <code>GZIP</code>, <code>ZIP</code>.
* <li>The output directory must exist prior to pipeline execution.
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT_ID=PROJECT ID HERE
* PIPELINE_FOLDER=gs://${PROJECT_ID}/dataflow/pipelines/bulk-compressor
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.BulkCompressor \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template \
* --runner=${RUNNER}"
*
* # Execute the template
* JOB_NAME=bulk-compressor-$USER-`date +"%Y%m%d-%H%M%S%z"`
*
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputFilePattern=${PIPELINE_FOLDER}/test/uncompressed/*,\
* outputDirectory=${PIPELINE_FOLDER}/test/compressed,\
* outputFailureFile=${PIPELINE_FOLDER}/test/failure/failed-${JOB_NAME}.csv,\
* compression=GZIP"
* </pre>
*/
public class BulkCompressor {
/** The logger to output status messages to. */
private static final Logger LOG = LoggerFactory.getLogger(BulkCompressor.class);
/** The tag used to identify the main output of the {@link Compressor}. */
private static final TupleTag<String> COMPRESSOR_MAIN_OUT = new TupleTag<String>() {};
/** The tag used to identify the dead-letter output of the {@link Compressor}. */
private static final TupleTag<KV<String, String>> DEADLETTER_TAG =
new TupleTag<KV<String, String>>() {};
/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
*/
public interface Options extends PipelineOptions {
@Description("The input file pattern to read from (e.g. gs://bucket-name/uncompressed/*.gz)")
@Required
ValueProvider<String> getInputFilePattern();
void setInputFilePattern(ValueProvider<String> value);
@Description("The output location to write to (e.g. gs://bucket-name/compressed)")
@Required
ValueProvider<String> getOutputDirectory();
void setOutputDirectory(ValueProvider<String> value);
@Description(
"The output file to write failures during the compression process "
+ "(e.g. gs://bucket-name/compressed/failed.txt). The contents will be one line for "
+ "each file which failed compression. Note that this parameter will "
+ "allow the pipeline to continue processing in the event of a failure.")
@Required
ValueProvider<String> getOutputFailureFile();
void setOutputFailureFile(ValueProvider<String> value);
@Description("The compression algorithm to use on the matched files.")
@Required
ValueProvider<Compression> getCompression();
void setCompression(ValueProvider<Compression> value);
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* BulkCompressor#run(Options)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
/*
* Steps:
* 1) Find all files matching the input pattern
* 2) Compress the files found and output them to the output directory
* 3) Write any errors to the failure output file
*/
PCollectionTuple compressOut =
pipeline
.apply("Match File(s)", FileIO.match().filepattern(options.getInputFilePattern()))
.apply(
"Compress File(s)",
ParDo.of(new Compressor(options.getOutputDirectory(), options.getCompression()))
.withOutputTags(COMPRESSOR_MAIN_OUT, TupleTagList.of(DEADLETTER_TAG)));
compressOut
.get(DEADLETTER_TAG)
.apply(
"Format Errors",
MapElements.into(TypeDescriptors.strings())
.via(kv -> String.format("%s,%s", kv.getKey(), kv.getValue())))
.apply(
"Write Error File",
TextIO.write()
.to(options.getOutputFailureFile())
.withHeader("Filename,Error")
.withoutSharding());
return pipeline.run();
}
/**
* The {@link Compressor} accepts {@link MatchResult.Metadata} from the FileSystems API and
* compresses each file to an output location. Any compression failures which occur during
* execution will be output to a separate output for further processing.
*/
@SuppressWarnings("serial")
public static class Compressor extends DoFn<MatchResult.Metadata, String> {
private final ValueProvider<String> destinationLocation;
private final ValueProvider<Compression> compressionValue;
Compressor(ValueProvider<String> destinationLocation, ValueProvider<Compression> compression) {
this.destinationLocation = destinationLocation;
this.compressionValue = compression;
}
@ProcessElement
public void processElement(ProcessContext context) {
ResourceId inputFile = context.element().resourceId();
Compression compression = compressionValue.get();
// Add the compression extension to the output filename. Example: demo.txt -> demo.txt.gz
String outputFilename = inputFile.getFilename() + compression.getSuggestedSuffix();
// Resolve the necessary resources to perform the transfer
ResourceId outputDir = FileSystems.matchNewResource(destinationLocation.get(), true);
ResourceId outputFile =
outputDir.resolve(outputFilename, StandardResolveOptions.RESOLVE_FILE);
ResourceId tempFile =
outputDir.resolve("temp-" + outputFilename, StandardResolveOptions.RESOLVE_FILE);
// Perform the copy of the compressed channel to the destination.
try (ReadableByteChannel readerChannel = FileSystems.open(inputFile)) {
try (WritableByteChannel writerChannel =
compression.writeCompressed(FileSystems.create(tempFile, MimeTypes.BINARY))) {
// Execute the copy to the temporary file
ByteStreams.copy(readerChannel, writerChannel);
}
// Rename the temporary file to the output file
FileSystems.rename(ImmutableList.of(tempFile), ImmutableList.of(outputFile));
// Output the path to the uncompressed file
context.output(outputFile.toString());
} catch (IOException e) {
LOG.error("Error occurred during compression of {}", inputFile.toString(), e);
context.output(DEADLETTER_TAG, KV.of(inputFile.toString(), e.getMessage()));
}
}
}
}
Descompactação em massa de arquivos do Cloud Storage
O modelo "Descompactação em massa de arquivos do Cloud Storage" é um pipeline em lote que descompacta arquivos no Cloud Storage para um local específico. Essa funcionalidade é útil quando você quer usar dados compactados para minimizar os custos de largura de banda da rede durante uma migração, mas quer maximizar a velocidade de processamento analítico ao operar em dados não compactados após a migração. O pipeline lida automaticamente com vários modos de compactação durante uma única execução e determina o modo de descompactação a ser usado com base na extensão do arquivo (.bzip2, .deflate, .gz, .zip).
Requisitos para este pipeline:
Os arquivos a serem descompactados precisam estar em um dos formatos a seguir: Bzip2, Deflate, Gzip, Zip.
O diretório de saída precisa ser criado antes de executar o pipeline.
Parâmetros do modelo
Parâmetro
Descrição
inputFilePattern
O padrão do arquivo de entrada a ser lido. Por exemplo, gs://bucket-name/compressed/*.gz
outputDirectory
O local de saída da gravação. Por exemplo, gs://bucket-name/decompressed
outputFailureFile
O arquivo de saída do registro de erros a ser usado para falhas de gravação que ocorrem durante o processo de descompactação. Por exemplo, gs://bucket-name/decompressed/failed.csv. O arquivo será criado mesmo que não haja falhas, mas estará vazio. O conteúdo do arquivo está no formato CSV (nome do arquivo, erro) e consiste em uma linha para cada arquivo que falhou na descompactação.
Como executar o modelo Descompactação em massa de arquivos do Cloud Storage
Substitua JOB_NAME por um nome de job de sua escolha. O nome do job
precisa corresponder à expressão regular
[a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Substitua YOUR_BUCKET_NAME pelo nome do bucket no Cloud Storage.
Substitua OUTPUT_FAILURE_FILE_PATH pela sua opção de caminho para o arquivo que contém as informações de falha.
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates/latest/Bulk_Decompress_GCS_Files \
--parameters \
inputFilePattern=gs://YOUR_BUCKET_NAME/compressed/*.gz,\
outputDirectory=gs://YOUR_BUCKET_NAME/decompressed,\
outputFailureFile=OUTPUT_FAILURE_FILE_PATH
API
Executar pela API REST
Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:
Substitua JOB_NAME por um nome de job de sua escolha. O nome do job
precisa corresponder à expressão regular
[a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Substitua YOUR_BUCKET_NAME pelo nome do bucket no Cloud Storage.
Substitua OUTPUT_FAILURE_FILE_PATH pela sua opção de caminho para o arquivo que contém as informações de falha.
/*
* Copyright (C) 2018 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.csv.QuoteMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This pipeline decompresses file(s) from Google Cloud Storage and re-uploads them to a destination
* location.
*
* <p><b>Parameters</b>
*
* <p>The {@code --inputFilePattern} parameter specifies a file glob to process. Files found can be
* expressed in the following formats:
*
* <pre>
* --inputFilePattern=gs://bucket-name/compressed-dir/*
* --inputFilePattern=gs://bucket-name/compressed-dir/demo*.gz
* </pre>
*
* <p>The {@code --outputDirectory} parameter can be expressed in the following formats:
*
* <pre>
* --outputDirectory=gs://bucket-name
* --outputDirectory=gs://bucket-name/decompressed-dir
* </pre>
*
* <p>The {@code --outputFailureFile} parameter indicates the file to write the names of the files
* which failed decompression and their associated error messages. This file can then be used for
* subsequent processing by another process outside of Dataflow (e.g. send an email with the
* failures, etc.). If there are no failures, the file will still be created but will be empty. The
* failure file structure contains both the file that caused the error and the error message in CSV
* format. The file will contain one header row and two columns (Filename, Error). The filename
* output to the failureFile will be the full path of the file for ease of debugging.
*
* <pre>
* --outputFailureFile=gs://bucket-name/decompressed-dir/failed.csv
* </pre>
*
* <p>Example Output File:
*
* <pre>
* Filename,Error
* gs://docs-demo/compressedFile.gz, File is malformed or not compressed in BZIP2 format.
* </pre>
*
* <p><b>Example Usage</b>
*
* <pre>
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.BulkDecompressor \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=gs://${PROJECT_ID}/dataflow/pipelines/${PIPELINE_FOLDER}/staging \
* --tempLocation=gs://${PROJECT_ID}/dataflow/pipelines/${PIPELINE_FOLDER}/temp \
* --runner=DataflowRunner \
* --inputFilePattern=gs://${PROJECT_ID}/compressed-dir/*.gz \
* --outputDirectory=gs://${PROJECT_ID}/decompressed-dir \
* --outputFailureFile=gs://${PROJECT_ID}/decompressed-dir/failed.csv"
* </pre>
*/
public class BulkDecompressor {
/** The logger to output status messages to. */
private static final Logger LOG = LoggerFactory.getLogger(BulkDecompressor.class);
/**
* A list of the {@link Compression} values excluding {@link Compression#AUTO} and {@link
* Compression#UNCOMPRESSED}.
*/
@VisibleForTesting
static final Set<Compression> SUPPORTED_COMPRESSIONS =
Stream.of(Compression.values())
.filter(value -> value != Compression.AUTO && value != Compression.UNCOMPRESSED)
.collect(Collectors.toSet());
/** The error msg given when the pipeline matches a file but cannot determine the compression. */
@VisibleForTesting
static final String UNCOMPRESSED_ERROR_MSG =
"Skipping file %s because it did not match any compression mode (%s)";
@VisibleForTesting
static final String MALFORMED_ERROR_MSG =
"The file resource %s is malformed or not in %s compressed format.";
/** The tag used to identify the main output of the {@link Decompress} DoFn. */
@VisibleForTesting
static final TupleTag<String> DECOMPRESS_MAIN_OUT_TAG = new TupleTag<String>() {};
/** The tag used to identify the dead-letter sideOutput of the {@link Decompress} DoFn. */
@VisibleForTesting
static final TupleTag<KV<String, String>> DEADLETTER_TAG = new TupleTag<KV<String, String>>() {};
/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
*/
public interface Options extends PipelineOptions {
@Description("The input file pattern to read from (e.g. gs://bucket-name/compressed/*.gz)")
@Required
ValueProvider<String> getInputFilePattern();
void setInputFilePattern(ValueProvider<String> value);
@Description("The output location to write to (e.g. gs://bucket-name/decompressed)")
@Required
ValueProvider<String> getOutputDirectory();
void setOutputDirectory(ValueProvider<String> value);
@Description(
"The output file to write failures during the decompression process "
+ "(e.g. gs://bucket-name/decompressed/failed.txt). The contents will be one line for "
+ "each file which failed decompression. Note that this parameter will "
+ "allow the pipeline to continue processing in the event of a failure.")
@Required
ValueProvider<String> getOutputFailureFile();
void setOutputFailureFile(ValueProvider<String> value);
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* BulkDecompressor#run(Options)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(Options options) {
/*
* Steps:
* 1) Find all files matching the input pattern
* 2) Decompress the files found and output them to the output directory
* 3) Write any errors to the failure output file
*/
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
// Run the pipeline over the work items.
PCollectionTuple decompressOut =
pipeline
.apply("MatchFile(s)", FileIO.match().filepattern(options.getInputFilePattern()))
.apply(
"DecompressFile(s)",
ParDo.of(new Decompress(options.getOutputDirectory()))
.withOutputTags(DECOMPRESS_MAIN_OUT_TAG, TupleTagList.of(DEADLETTER_TAG)));
decompressOut
.get(DEADLETTER_TAG)
.apply(
"FormatErrors",
MapElements.into(TypeDescriptors.strings())
.via(
kv -> {
StringWriter stringWriter = new StringWriter();
try {
CSVPrinter printer =
new CSVPrinter(
stringWriter,
CSVFormat.DEFAULT
.withEscape('\\')
.withQuoteMode(QuoteMode.NONE)
.withRecordSeparator('\n')
);
printer.printRecord(kv.getKey(), kv.getValue());
} catch (IOException e) {
throw new RuntimeException(e);
}
return stringWriter.toString();
}))
// We don't expect error files to be large so we'll create a single
// file for ease of reprocessing by processes outside of Dataflow.
.apply(
"WriteErrorFile",
TextIO.write()
.to(options.getOutputFailureFile())
.withHeader("Filename,Error")
.withoutSharding());
return pipeline.run();
}
/**
* Performs the decompression of an object on Google Cloud Storage and uploads the decompressed
* object back to a specified destination location.
*/
@SuppressWarnings("serial")
public static class Decompress extends DoFn<MatchResult.Metadata, String> {
private final ValueProvider<String> destinationLocation;
Decompress(ValueProvider<String> destinationLocation) {
this.destinationLocation = destinationLocation;
}
@ProcessElement
public void processElement(ProcessContext context) {
ResourceId inputFile = context.element().resourceId();
// Output a record to the failure file if the file doesn't match a known compression.
if (!Compression.AUTO.isCompressed(inputFile.toString())) {
String errorMsg =
String.format(UNCOMPRESSED_ERROR_MSG, inputFile.toString(), SUPPORTED_COMPRESSIONS);
context.output(DEADLETTER_TAG, KV.of(inputFile.toString(), errorMsg));
} else {
try {
ResourceId outputFile = decompress(inputFile);
context.output(outputFile.toString());
} catch (IOException e) {
LOG.error(e.getMessage());
context.output(DEADLETTER_TAG, KV.of(inputFile.toString(), e.getMessage()));
}
}
}
/**
* Decompresses the inputFile using the specified compression and outputs to the main output of
* the {@link Decompress} doFn. Files output to the destination will be first written as temp
* files with a "temp-" prefix within the output directory. If a file fails decompression, the
* filename and the associated error will be output to the dead-letter.
*
* @param inputFile The inputFile to decompress.
* @return A {@link ResourceId} which points to the resulting file from the decompression.
*/
private ResourceId decompress(ResourceId inputFile) throws IOException {
// Remove the compressed extension from the file. Example: demo.txt.gz -> demo.txt
String outputFilename = Files.getNameWithoutExtension(inputFile.toString());
// Resolve the necessary resources to perform the transfer.
ResourceId outputDir = FileSystems.matchNewResource(destinationLocation.get(), true);
ResourceId outputFile =
outputDir.resolve(outputFilename, StandardResolveOptions.RESOLVE_FILE);
ResourceId tempFile =
outputDir.resolve(Files.getFileExtension(inputFile.toString())
+ "-temp-" + outputFilename, StandardResolveOptions.RESOLVE_FILE);
// Resolve the compression
Compression compression = Compression.detect(inputFile.toString());
// Perform the copy of the decompressed channel into the destination.
try (ReadableByteChannel readerChannel =
compression.readDecompressed(FileSystems.open(inputFile))) {
try (WritableByteChannel writerChannel = FileSystems.create(tempFile, MimeTypes.TEXT)) {
ByteStreams.copy(readerChannel, writerChannel);
}
// Rename the temp file to the output file.
FileSystems.rename(
ImmutableList.of(tempFile),
ImmutableList.of(outputFile),
MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
} catch (IOException e) {
String msg = e.getMessage();
LOG.error("Error occurred during decompression of {}", inputFile.toString(), e);
throw new IOException(sanitizeDecompressionErrorMsg(msg, inputFile, compression));
}
return outputFile;
}
/**
* The error messages coming from the compression library are not consistent across compression
* modes. Here we'll attempt to unify the messages to inform the user more clearly when we've
* encountered a file which is not compressed or malformed. Note that GZIP and ZIP compression
* modes will not throw an exception when a decompression is attempted on a file which is not
* compressed.
*
* @param errorMsg The error message thrown during decompression.
* @param inputFile The input file which failed decompression.
* @param compression The compression mode used during decompression.
* @return The sanitized error message. If the error was not from a malformed file, the same
* error message passed will be returned.
*/
private String sanitizeDecompressionErrorMsg(
String errorMsg, ResourceId inputFile, Compression compression) {
if (errorMsg != null
&& (errorMsg.contains("not in the BZip2 format")
|| errorMsg.contains("incorrect header check"))) {
errorMsg = String.format(MALFORMED_ERROR_MSG, inputFile.toString(), compression);
}
return errorMsg;
}
}
}
Exclusão em massa do Datastore
O modelo "Exclusão em massa do Datastore" é um pipeline que lê as entidades do Datastore com uma consulta GQL e exclui todas as entidades correspondentes no projeto de destino selecionado. Como opção, o pipeline pode passar as entidades do Datastore codificadas por JSON para sua UDF do JavaScript. Use-a para filtrar as entidades retornando valores nulos.
Requisitos para este pipeline:
O Datastore precisa ser configurado no projeto antes de executar o modelo.
Em caso de leitura e exclusão de instâncias do Datastore separadas, a Conta de serviço do controlador do Dataflow precisa ter permissão para ler de uma instância e excluir da outra.
Parâmetros do modelo
Parâmetro
Descrição
datastoreReadGqlQuery
Consulta GQL que especifica quais entidades precisam corresponder para exclusão. Por exemplo, "SELECT * FROM MyKind".
datastoreReadProjectId
O ID do projeto do GCP da instância do Datastore a partir da qual você quer ler entidades (usando sua consulta GQL) que são usadas para correspondência.
datastoreDeleteProjectId
O ID do projeto do GCP da instância do Datastore a partir da qual as entidades correspondentes serão excluídas. Isso pode ser o mesmo que datastoreReadProjectId se você quiser ler e excluir na mesma instância do Datastore.
datastoreReadNamespace
Opcional: namespace das entidades solicitadas. Defina como "" para o namespace padrão.
javascriptTextTransformGcsPath
[Opcional] Caminho do Cloud Storage que contém todo o código JavaScript. Por exemplo, "gs://mybucket/mytransforms/*.js". Se você não quiser usar uma UDF, deixe esse campo em branco.
javascriptTextTransformFunctionName
[Opcional] Nome da função a ser chamada. Se essa função retornar um valor indefinido ou nulo para uma determinada entidade do Datastore, a entidade não será excluída. Caso você tenha o código JavaScript "function myTransform(inJson) { ...dostuff...}", o nome da função será "myTransform". Se você não quiser usar uma UDF, deixe esse campo em branco.
Como executar o modelo "Exclusão em massa do Datastore"
Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Substitua GQL_QUERY pela consulta que você usará para corresponder entidades para exclusão.
Substitua DATASTORE_READ_AND_DELETE_PROJECT_ID pelo código do projeto referente à instância do Datastore. Este exemplo será lido e excluído da mesma instância do Datastore.
gcloud dataflow jobs run JOB_NAME \
--gcs-location gs://dataflow-templates/latest/Datastore_to_Datastore_Delete \
--parameters \
datastoreReadGqlQuery="GQL_QUERY",\
datastoreReadProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID,\
datastoreDeleteProjectId=DATASTORE_READ_AND_DELETE_PROJECT_ID
API
Executar pela API REST
Ao executar este modelo, é necessário indicar o caminho dele no Cloud Storage:
Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Substitua GQL_QUERY pela consulta que você usará para corresponder entidades para exclusão.
Substitua DATASTORE_READ_AND_DELETE_PROJECT_ID pelo código do projeto referente à instância do Datastore. Este exemplo será lido e excluído da mesma instância do Datastore.
/*
* Copyright (C) 2018 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreDeleteEntityJson;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreDeleteOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreReadOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.ReadJsonEntities;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.TransformTextViaJavascript;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
/**
* Dataflow template which deletes pulled Datastore Entities.
*/
public class DatastoreToDatastoreDelete {
/**
* Custom PipelineOptions.
*/
public interface DatastoreToDatastoreDeleteOptions extends
PipelineOptions,
DatastoreReadOptions,
JavascriptTextTransformerOptions,
DatastoreDeleteOptions {}
/**
* Runs a pipeline which reads in Entities from datastore, passes in the JSON encoded Entities
* to a Javascript UDF, and deletes all the Entities.
*
* <p>If the UDF returns value of undefined or null for a given Entity, then that Entity will not
* be deleted.
*
* @param args arguments to the pipeline
*/
public static void main(String[] args) {
DatastoreToDatastoreDeleteOptions options = PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(DatastoreToDatastoreDeleteOptions.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(ReadJsonEntities.newBuilder()
.setGqlQuery(options.getDatastoreReadGqlQuery())
.setProjectId(options.getDatastoreReadProjectId())
.setNamespace(options.getDatastoreReadNamespace())
.build())
.apply(TransformTextViaJavascript.newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.build())
.apply(DatastoreDeleteEntityJson.newBuilder()
.setProjectId(options.getDatastoreDeleteProjectId())
.build());
pipeline.run();
}
}
Gerador de dados de streaming para o Pub/Sub
Durante o desenvolvimento de pipelines do Dataflow, um dos requisitos comuns é executar uma comparação de desempenho em consultas específicas por segundo (QPS, na sigla em inglês).
Este modelo pode ser usado para gerar mensagens JSON falsas com base no esquema fornecido pelo usuário na taxa especificada para um tópico do Pub/Sub.
A biblioteca Gerador de dados JSON usada pelo pipeline permite que várias funções de falsificação sejam usadas para cada campo de esquema. Consulte os documentos para mais informações sobre as funções e o formato do esquema fictícios.
Requisitos para este pipeline:
Crie um arquivo de esquema e armazene-o no local do Cloud Storage.
O tópico de saída do Pub/Sub precisa existir antes da execução.
Parâmetros do modelo
Parâmetro
Descrição
schemaLocation
Local do arquivo de esquema. Por exemplo, gs://mybucket/filename.json.
topic
Nome do tópico do Pub/Sub no qual o pipeline deve publicar os dados.
Por exemplo: projects/<project-id>/topics/<topic-name>
qps
Número de mensagens a serem publicadas por segundo. Por exemplo, 100.
autoscalingAlgorithm
[Opcional] Algoritmo usado para escalonamento automático dos workers. Os valores possíveis são THROUGHPUT_BASED para ativar o escalonamento automático ou NONE para desativá-lo.
maxNumWorkers
[Opcional] Número máximo de máquinas de worker. Por exemplo, 10.
Como executar o modelo do Gerador de dados de streaming
Substitua REGION_NAME pelo nome da região do Dataflow. Por exemplo, us-central1.
Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Substitua SCHEMA_LOCATION pelo caminho para o arquivo de esquema no Cloud Storage. Por exemplo, gs://mybucket/filename.json.
Substitua PUBSUB_TOPIC pelo tópico do Pub/Sub de saída. Por exemplo, projects/<project-id>/topics/<topic-name>.
Substitua QPS pelo número de mensagens a serem publicadas por segundo.
Substitua LOCATION pelo nome da região do Dataflow. Por exemplo, us-central1.
Substitua JOB_NAME por um nome de job de sua escolha. O nome do job precisa corresponder à expressão regular [a-z]([-a-z0-9]{0,38}[a-z0-9])? para ser válido.
Substitua SCHEMA_LOCATION pelo caminho para o arquivo de esquema no Cloud Storage. Por exemplo, gs://mybucket/filename.json.
Substitua PUBSUB_TOPIC pelo tópico do Pub/Sub de saída. Por exemplo, projects/<project-id>/topics/<topic-name>.
Substitua QPS pelo número de mensagens a serem publicadas por segundo.
/*
* Copyright (C) 2020 Google Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import com.github.vincentrussell.json.datagenerator.JsonDataGenerator;
import com.github.vincentrussell.json.datagenerator.JsonDataGeneratorException;
import com.github.vincentrussell.json.datagenerator.impl.JsonDataGeneratorImpl;
import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToBigQuery;
import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToGcs;
import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToPubSub;
import com.google.cloud.teleport.v2.utils.DurationUtils;
import com.google.cloud.teleport.v2.utils.SchemaUtils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import javax.annotation.Nonnull;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link StreamingDataGenerator} is a streaming pipeline which generates messages at a
* specified rate to a Pub/Sub topic. The messages are generated according to a schema template
* which instructs the pipeline how to populate the messages with fake data compliant to
* constraints.
*
* <p>The number of workers executing the pipeline must be large enough to support the supplied QPS.
* Use a general rule of 2,500 QPS per core in the worker pool.
*
* <p>See <a href="https://github.com/vincentrussell/json-data-generator">json-data-generator</a>
* for instructions on how to construct the schema file.
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT=my-project
* BUCKET_NAME=my-bucket
* SCHEMA_LOCATION=gs://<bucket>/<path>/<to>/game-event-schema.json
* PUBSUB_TOPIC=projects/<project-id>/topics/<topic-id>
* QPS=2500
*
* # Set containerization vars
* IMAGE_NAME=my-image-name
* TARGET_GCR_IMAGE=gcr.io/${PROJECT}/${IMAGE_NAME}
* BASE_CONTAINER_IMAGE=my-base-container-image
* BASE_CONTAINER_IMAGE_VERSION=my-base-container-image-version
* APP_ROOT=/path/to/app-root
* COMMAND_SPEC=/path/to/command-spec
*
* # Build and upload image
* mvn clean package \
* -Dimage=${TARGET_GCR_IMAGE} \
* -Dbase-container-image=${BASE_CONTAINER_IMAGE} \
* -Dbase-container-image.version=${BASE_CONTAINER_IMAGE_VERSION} \
* -Dapp-root=${APP_ROOT} \
* -Dcommand-spec=${COMMAND_SPEC}
*
* # Create a template spec containing the details of image location and metadata in GCS
* as specified in README.md file
*
* # Execute template:
* JOB_NAME=<job-name>
* PROJECT=<project-id>
* TEMPLATE_SPEC_GCSPATH=gs://path/to/template-spec
* SCHEMA_LOCATION=gs://path/to/schema.json
* PUBSUB_TOPIC=projects/$PROJECT/topics/<topic-name>
* QPS=1
*
* gcloud beta dataflow flex-template run $JOB_NAME \
* --project=$PROJECT --region=us-central1 --flex-template \
* --template-file-gcs-location=$TEMPLATE_SPEC_GCSPATH \
* --parameters autoscalingAlgorithm="THROUGHPUT_BASED",schemaLocation=$SCHEMA_LOCATION,topic=$PUBSUB_TOPIC,qps=$QPS,maxNumWorkers=3
*
* </pre>
*/
public class StreamingDataGenerator {
private static final Logger logger = LoggerFactory.getLogger(StreamingDataGenerator.class);
/**
* The {@link StreamingDataGeneratorOptions} class provides the custom execution options passed by
* the executor at the command-line.
*/
public interface StreamingDataGeneratorOptions extends PipelineOptions {
@Description("Indicates rate of messages per second to be published to Pub/Sub.")
@Required
Long getQps();
void setQps(Long value);
@Description("The path to the schema to generate.")
@Required
String getSchemaLocation();
void setSchemaLocation(String value);
@Description("The Pub/Sub topic to write to.")
String getTopic();
void setTopic(String value);
@Description(
"Indicates maximum number of messages to be generated. Default is 0 indicating unlimited.")
@Default.Long(0L)
Long getMessagesLimit();
void setMessagesLimit(Long value);
@Description("The message Output type. --outputType must be one of:[JSON,AVRO,PARQUET]")
@Default.Enum("JSON")
OutputType getOutputType();
void setOutputType(OutputType value);
@Description("The path to Avro schema for encoding message into AVRO output type.")
String getAvroSchemaLocation();
void setAvroSchemaLocation(String value);
@Description("The message sink type. Must be one of:[PUBSUB,BIGQUERY]")
@Default.Enum("PUBSUB")
SinkType getSinkType();
void setSinkType(SinkType value);
@Description(
"Output BigQuery table spec. "
+ "The name should be in the format: "
+ "<project>:<dataset>.<table_name>.")
String getOutputTableSpec();
void setOutputTableSpec(String value);
@Description("Write disposition to use for BigQuery. Default: WRITE_APPEND")
@Default.String("WRITE_APPEND")
String getWriteDisposition();
void setWriteDisposition(String writeDisposition);
@Description(
"The dead-letter table to output to within BigQuery in <project-id>:<dataset>.<table> "
+ "format. If it doesn't exist, it will be created during pipeline execution.")
String getOutputDeadletterTable();
void setOutputDeadletterTable(String outputDeadletterTable);
@Description(
"The window duration in which data will be written. Defaults to 5m."
+ "Allowed formats are: "
+ "Ns (for seconds, example: 5s), "
+ "Nm (for minutes, example: 12m), "
+ "Nh (for hours, example: 2h).")
@Default.String("1m")
String getWindowDuration();
void setWindowDuration(String windowDuration);
@Description("The directory to write output files. Must end with a slash. ")
String getOutputDirectory();
void setOutputDirectory(String outputDirectory);
@Description(
"The filename prefix of the files to write to. Default file prefix is set to \"output-\". ")
@Default.String("output-")
String getOutputFilenamePrefix();
void setOutputFilenamePrefix(String outputFilenamePrefix);
@Description(
"The maximum number of output shards produced while writing to FileSystem. Default number"
+ " is runner defined.")
@Default.Integer(0)
Integer getNumShards();
void setNumShards(Integer numShards);
}
/** Allowed list of message encoding types. */
public enum OutputType {
JSON(".json"),
AVRO(".avro"),
PARQUET(".parquet");
private final String fileExtension;
/** Sets file extension associated with output type. */
OutputType(String fileExtension) {
this.fileExtension = fileExtension;
}
/** Returns file extension associated with output type. */
public String getFileExtension() {
return fileExtension;
}
}
/** Allowed list of sink types. */
public enum SinkType {
PUBSUB,
BIGQUERY,
GCS
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* StreamingDataGenerator#run(StreamingDataGeneratorOptions)} method to start the pipeline and
* invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args command-line args passed by the executor.
*/
public static void main(String[] args) {
StreamingDataGeneratorOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(StreamingDataGeneratorOptions.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options the execution options.
* @return the pipeline result.
*/
public static PipelineResult run(@Nonnull StreamingDataGeneratorOptions options) {
checkNotNull(options, "options argument to run method cannot be null.");
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
/*
* Steps:
* 1) Trigger at the supplied QPS
* 2) Generate messages containing fake data
* 3) Write messages to appropriate Sink
*/
PCollection<byte[]> fakeMessages =
pipeline
.apply("Trigger", createTrigger(options))
.apply(
"Generate Fake Messages",
ParDo.of(new MessageGeneratorFn(options.getSchemaLocation())));
if (options.getSinkType().equals(SinkType.GCS)) {
fakeMessages =
fakeMessages.apply(
options.getWindowDuration() + " Window",
Window.into(
FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))));
}
fakeMessages.apply("Write To " + options.getSinkType().name(), createSink(options));
return pipeline.run();
}
/**
* Creates either Bounded or UnBounded Source based on messageLimit pipeline option.
*
* @param options the pipeline options.
*/
private static GenerateSequence createTrigger(@Nonnull StreamingDataGeneratorOptions options) {
checkNotNull(options, "options argument to createTrigger method cannot be null.");
GenerateSequence generateSequence =
GenerateSequence.from(0L)
.withRate(options.getQps(), /* periodLength = */ Duration.standardSeconds(1L));
return options.getMessagesLimit() > 0
? generateSequence.to(options.getMessagesLimit())
: generateSequence;
}
/**
* The {@link MessageGeneratorFn} class generates fake messages based on supplied schema
*
* <p>See <a href="https://github.com/vincentrussell/json-data-generator">json-data-generator</a>
* for instructions on how to construct the schema file.
*/
@VisibleForTesting
static class MessageGeneratorFn extends DoFn<Long, byte[]> {
// Not initialized inline or constructor because {@link JsonDataGenerator} is not serializable.
private transient JsonDataGenerator dataGenerator;
private final String schemaLocation;
private String schema;
MessageGeneratorFn(@Nonnull String schemaLocation) {
checkNotNull(schemaLocation,
"schemaLocation argument of MessageGeneratorFn class cannot be null.");
this.schemaLocation = schemaLocation;
}
@Setup
public void setup() throws IOException {
dataGenerator = new JsonDataGeneratorImpl();
schema = SchemaUtils.getGcsFileAsString(schemaLocation);
}
@ProcessElement
public void processElement(
@Element Long element,
@Timestamp Instant timestamp,
OutputReceiver<byte[]> receiver,
ProcessContext context)
throws IOException, JsonDataGeneratorException {
byte[] payload;
// Generate the fake JSON according to the schema.
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
dataGenerator.generateTestDataJson(schema, byteArrayOutputStream);
payload = byteArrayOutputStream.toByteArray();
}
receiver.output(payload);
}
}
/**
* Creates appropriate sink based on sinkType pipeline option.
*
* @param options the pipeline options.
*/
@VisibleForTesting
static PTransform<PCollection<byte[]>, PDone> createSink(
@Nonnull StreamingDataGeneratorOptions options) {
checkNotNull(options, "options argument to createSink method cannot be null.");
switch (options.getSinkType()) {
case PUBSUB:
checkArgument(
options.getTopic() != null,
String.format(
"Missing required value --topic for %s sink type", options.getSinkType().name()));
return StreamingDataGeneratorWriteToPubSub.Writer.builder(options).build();
case BIGQUERY:
checkArgument(
options.getOutputTableSpec() != null,
String.format(
"Missing required value --outputTableSpec in format"
+ " <project>:<dataset>.<table_name> for %s sink type",
options.getSinkType().name()));
return StreamingDataGeneratorWriteToBigQuery.builder(options).build();
case GCS:
checkArgument(
options.getOutputDirectory() != null,
String.format(
"Missing required value --outputDirectory in format gs:// for %s sink type",
options.getSinkType().name()));
return StreamingDataGeneratorWriteToGcs.builder(options).build();
default:
throw new IllegalArgumentException("Unsupported Sink.");
}
}
}
[{
"type": "thumb-down",
"id": "hardToUnderstand",
"label":"Hard to understand"
},{
"type": "thumb-down",
"id": "incorrectInformationOrSampleCode",
"label":"Incorrect information or sample code"
},{
"type": "thumb-down",
"id": "missingTheInformationSamplesINeed",
"label":"Missing the information/samples I need"
},{
"type": "thumb-down",
"id": "translationIssue",
"label":"Translation issue"
},{
"type": "thumb-down",
"id": "otherDown",
"label":"Other"
}]
[{
"type": "thumb-up",
"id": "easyToUnderstand",
"label":"Easy to understand"
},{
"type": "thumb-up",
"id": "solvedMyProblem",
"label":"Solved my problem"
},{
"type": "thumb-up",
"id": "otherUp",
"label":"Other"
}]