/*
* Copyright (C) 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import com.google.cloud.teleport.v2.transforms.AvroConverters.AvroOptions;
import com.google.cloud.teleport.v2.transforms.CsvConverters.CsvPipelineOptions;
import com.google.cloud.teleport.v2.transforms.ParquetConverters.ParquetOptions;
import java.io.IOException;
import java.util.EnumMap;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link FileFormatConversion} pipeline takes in an input file, converts it to a desired format
* and saves it to Cloud Storage. Supported file transformations are:
*
* <ul>
* <li>Csv to Avro
* <li>Csv to Parquet
* <li>Avro to Parquet
* <li>Parquet to Avro
* </ul>
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>Input file exists in Google Cloud Storage.
* <li>Google Cloud Storage output bucket exists.
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT=my-project
* BUCKET_NAME=my-bucket
*
* # Set containerization vars
* IMAGE_NAME=my-image-name
* TARGET_GCR_IMAGE=gcr.io/${PROJECT}/${IMAGE_NAME}
* BASE_CONTAINER_IMAGE=my-base-container-image
* BASE_CONTAINER_IMAGE_VERSION=my-base-container-image-version
* APP_ROOT=/path/to/app-root
* COMMAND_SPEC=/path/to/command-spec
*
* # Set vars for execution
* export INPUT_FILE_FORMAT=Csv
* export OUTPUT_FILE_FORMAT=Avro
* export AVRO_SCHEMA_PATH=gs://path/to/avro/schema
* export HEADERS=false
* export DELIMITER=","
*
* # Build and upload image
* mvn clean package \
* -Dimage=${TARGET_GCR_IMAGE} \
* -Dbase-container-image=${BASE_CONTAINER_IMAGE} \
* -Dbase-container-image.version=${BASE_CONTAINER_IMAGE_VERSION} \
* -Dapp-root=${APP_ROOT} \
* -Dcommand-spec=${COMMAND_SPEC}
*
* # Create an image spec in GCS that contains the path to the image
* {
* "docker_template_spec": {
* "docker_image": $TARGET_GCR_IMAGE
* }
* }
*
* # Execute template:
* API_ROOT_URL="https://dataflow.googleapis.com"
* TEMPLATES_LAUNCH_API="${API_ROOT_URL}/v1b3/projects/${PROJECT}/templates:launch"
* JOB_NAME="csv-to-avro-`date +%Y%m%d-%H%M%S-%N`"
*
* time curl -X POST -H "Content-Type: application/json" \
* -H "Authorization: Bearer $(gcloud auth print-access-token)" \
* "${TEMPLATES_LAUNCH_API}"`
* `"?validateOnly=false"`
* `"&dynamicTemplate.gcsPath=${BUCKET_NAME}/path/to/image-spec"`
* `"&dynamicTemplate.stagingLocation=${BUCKET_NAME}/staging" \
* -d '
* {
* "jobName":"'$JOB_NAME'",
* "parameters": {
* "inputFileFormat":"'$INPUT_FILE_FORMAT'",
* "outputFileFormat":"'$OUTPUT_FILE_FORMAT'",
* "inputFileSpec":"'$BUCKET_NAME/path/to/input-file'",
* "outputBucket":"'$BUCKET_NAME/path/to/output-location/'",
* "containsHeaders":"'$HEADERS'",
* "schema":"'$AVRO_SCHEMA_PATH'",
* "outputFilePrefix":"output-file",
* "numShards":"3",
* "delimiter":"'$DELIMITER'"
* }
* }
* '
* </pre>
*/
public class FileFormatConversion {
/** Logger for class. */
private static final Logger LOG = LoggerFactory.getLogger(FileFormatConversionFactory.class);
private static EnumMap<ValidFileFormats, String> validFileFormats =
new EnumMap<ValidFileFormats, String>(ValidFileFormats.class);
/**
* The {@link FileFormatConversionOptions} provides the custom execution options passed by the
* executor at the command-line.
*/
public interface FileFormatConversionOptions
extends PipelineOptions, CsvPipelineOptions, AvroOptions, ParquetOptions {
@Description("Input file format.")
@Required
String getInputFileFormat();
void setInputFileFormat(String inputFileFormat);
@Description("Output file format.")
@Required
String getOutputFileFormat();
void setOutputFileFormat(String outputFileFormat);
}
/** The {@link ValidFileFormats} enum contains all valid file formats. */
public enum ValidFileFormats {
CSV,
AVRO,
PARQUET
}
/**
* Main entry point for pipeline execution.
*
* @param args Command line arguments to the pipeline.
*/
public static void main(String[] args) {
FileFormatConversionOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(FileFormatConversionOptions.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options.
*
* @param options The execution options.
* @return The pipeline result.
* @throws RuntimeException thrown if incorrect file formats are passed.
*/
public static PipelineResult run(FileFormatConversionOptions options) {
String inputFileFormat = options.getInputFileFormat().toUpperCase();
String outputFileFormat = options.getOutputFileFormat().toUpperCase();
validFileFormats.put(ValidFileFormats.CSV, "CSV");
validFileFormats.put(ValidFileFormats.AVRO, "AVRO");
validFileFormats.put(ValidFileFormats.PARQUET, "PARQUET");
try {
if (inputFileFormat.equals(outputFileFormat)) {
LOG.error("Input and output file format cannot be the same.");
throw new IOException();
}
if (!validFileFormats.containsValue(inputFileFormat)
|| !validFileFormats.containsValue(outputFileFormat)) {
LOG.error("Invalid input or output file format.");
throw new IOException();
}
} catch (IOException e) {
throw new RuntimeException("Provide correct input/output file format.");
}
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
pipeline.apply(
inputFileFormat + " to " + outputFileFormat,
FileFormatConversionFactory.FileFormat.newBuilder()
.setOptions(options)
.setInputFileFormat(inputFileFormat)
.setOutputFileFormat(outputFileFormat)
.build());
return pipeline.run();
}
}
Bulk Compress Cloud Storage Files
Bulk Compress Cloud Storage Files 템플릿은 Cloud Storage의 파일을 지정된 위치에 압축하는 일괄 파이프라인입니다. 이 템플릿은 주기적인 보관처리 프로세스의 일환으로 큰 파일 배치를 압축해야 할 때 유용할 수 있습니다. 지원되는 압축 모드는 BZIP2, DEFLATE, GZIP입니다.
대상 위치로의 파일 출력은 원래 파일 이름에 압축 모드 확장자를 추가하는 명명 스키마를 따릅니다. 추가되는 확장자는 .bzip2, .deflate, .gz 중 하나입니다.
압축 프로세스 중에 발생하는 오류는 파일 이름, 오류 메시지의 CSV 형식으로 오류 파일로 출력됩니다. 파이프라인 실행 중에 오류가 발생하지 않는 경우에도 오류 파일은 생성되지만 오류 레코드를 포함하지 않습니다.
파이프라인 요구사항:
압축은 BZIP2, DEFLATE, GZIP 형식 중 하나여야 합니다.
파이프라인을 실행하기 전에 출력 디렉터리가 있어야 합니다.
템플릿 매개변수
매개변수
설명
inputFilePattern
읽을 입력 파일 패턴입니다. 예를 들면 gs://bucket-name/uncompressed/*.txt입니다.
outputDirectory
출력을 쓸 위치입니다. 예를 들면 gs://bucket-name/compressed/입니다.
outputFailureFile
압축 프로세스 중에 발생하는 쓰기 오류에 사용할 오류 로그 출력 파일입니다. 예를 들면 gs://bucket-name/compressed/failed.csv입니다. 오류가 없는 경우에도 파일은 생성되지만 비어 있게 됩니다. 파일 콘텐츠는 CSV 형식(파일 이름, 오류)이며 압축에 실패한 파일이 한 줄에 하나씩 표시됩니다.
compression
일치하는 파일을 압축하는 데 사용된 압축 알고리즘입니다. BZIP2, DEFLATE, GZIP 중 하나여야 합니다.
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
import java.io.IOException;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link BulkCompressor} is a batch pipeline that compresses files on matched by an input file
* pattern and outputs them to a specified file location. This pipeline can be useful when you need
* to compress large batches of files as part of a perodic archival process. The supported
* compression modes are: <code>BZIP2</code>, <code>DEFLATE</code>, <code>GZIP</code>, <code>ZIP
* </code>. Files output to the destination location will follow a naming schema of original
* filename appended with the compression mode extension. The extensions appended will be one of:
* <code>.bzip2</code>, <code>.deflate</code>, <code>.gz</code>, <code>.zip</code> as determined by
* the compression type.
*
* <p>Any errors which occur during the compression process will be output to the failure file in
* CSV format of filename, error message. If no failures occur during execution, the error file will
* still be created but will contain no error records.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>The compression must be in one of the following formats: <code>BZIP2</code>, <code>DEFLATE
* </code>, <code>GZIP</code>, <code>ZIP</code>.
* <li>The output directory must exist prior to pipeline execution.
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT_ID=PROJECT ID HERE
* PIPELINE_FOLDER=gs://${PROJECT_ID}/dataflow/pipelines/bulk-compressor
*
* # Set the runner
* RUNNER=DataflowRunner
*
* # Build the template
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.BulkCompressor \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=${PIPELINE_FOLDER}/staging \
* --tempLocation=${PIPELINE_FOLDER}/temp \
* --templateLocation=${PIPELINE_FOLDER}/template \
* --runner=${RUNNER}"
*
* # Execute the template
* JOB_NAME=bulk-compressor-$USER-`date +"%Y%m%d-%H%M%S%z"`
*
* gcloud dataflow jobs run ${JOB_NAME} \
* --gcs-location=${PIPELINE_FOLDER}/template \
* --zone=us-east1-d \
* --parameters \
* "inputFilePattern=${PIPELINE_FOLDER}/test/uncompressed/*,\
* outputDirectory=${PIPELINE_FOLDER}/test/compressed,\
* outputFailureFile=${PIPELINE_FOLDER}/test/failure/failed-${JOB_NAME}.csv,\
* compression=GZIP"
* </pre>
*/
public class BulkCompressor {
/** The logger to output status messages to. */
private static final Logger LOG = LoggerFactory.getLogger(BulkCompressor.class);
/** The tag used to identify the main output of the {@link Compressor}. */
private static final TupleTag<String> COMPRESSOR_MAIN_OUT = new TupleTag<String>() {};
/** The tag used to identify the dead-letter output of the {@link Compressor}. */
private static final TupleTag<KV<String, String>> DEADLETTER_TAG =
new TupleTag<KV<String, String>>() {};
/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
*/
public interface Options extends PipelineOptions {
@Description("The input file pattern to read from (e.g. gs://bucket-name/uncompressed/*.gz)")
@Required
ValueProvider<String> getInputFilePattern();
void setInputFilePattern(ValueProvider<String> value);
@Description("The output location to write to (e.g. gs://bucket-name/compressed)")
@Required
ValueProvider<String> getOutputDirectory();
void setOutputDirectory(ValueProvider<String> value);
@Description(
"The output file to write failures during the compression process "
+ "(e.g. gs://bucket-name/compressed/failed.txt). The contents will be one line for "
+ "each file which failed compression. Note that this parameter will "
+ "allow the pipeline to continue processing in the event of a failure.")
@Required
ValueProvider<String> getOutputFailureFile();
void setOutputFailureFile(ValueProvider<String> value);
@Description("The compression algorithm to use on the matched files.")
@Required
ValueProvider<Compression> getCompression();
void setCompression(ValueProvider<Compression> value);
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* BulkCompressor#run(Options)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(Options options) {
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
/*
* Steps:
* 1) Find all files matching the input pattern
* 2) Compress the files found and output them to the output directory
* 3) Write any errors to the failure output file
*/
PCollectionTuple compressOut =
pipeline
.apply("Match File(s)", FileIO.match().filepattern(options.getInputFilePattern()))
.apply(
"Compress File(s)",
ParDo.of(new Compressor(options.getOutputDirectory(), options.getCompression()))
.withOutputTags(COMPRESSOR_MAIN_OUT, TupleTagList.of(DEADLETTER_TAG)));
compressOut
.get(DEADLETTER_TAG)
.apply(
"Format Errors",
MapElements.into(TypeDescriptors.strings())
.via(kv -> String.format("%s,%s", kv.getKey(), kv.getValue())))
.apply(
"Write Error File",
TextIO.write()
.to(options.getOutputFailureFile())
.withHeader("Filename,Error")
.withoutSharding());
return pipeline.run();
}
/**
* The {@link Compressor} accepts {@link MatchResult.Metadata} from the FileSystems API and
* compresses each file to an output location. Any compression failures which occur during
* execution will be output to a separate output for further processing.
*/
@SuppressWarnings("serial")
public static class Compressor extends DoFn<MatchResult.Metadata, String> {
private final ValueProvider<String> destinationLocation;
private final ValueProvider<Compression> compressionValue;
Compressor(ValueProvider<String> destinationLocation, ValueProvider<Compression> compression) {
this.destinationLocation = destinationLocation;
this.compressionValue = compression;
}
@ProcessElement
public void processElement(ProcessContext context) {
ResourceId inputFile = context.element().resourceId();
Compression compression = compressionValue.get();
// Add the compression extension to the output filename. Example: demo.txt -> demo.txt.gz
String outputFilename = inputFile.getFilename() + compression.getSuggestedSuffix();
// Resolve the necessary resources to perform the transfer
ResourceId outputDir = FileSystems.matchNewResource(destinationLocation.get(), true);
ResourceId outputFile =
outputDir.resolve(outputFilename, StandardResolveOptions.RESOLVE_FILE);
ResourceId tempFile =
outputDir.resolve("temp-" + outputFilename, StandardResolveOptions.RESOLVE_FILE);
// Perform the copy of the compressed channel to the destination.
try (ReadableByteChannel readerChannel = FileSystems.open(inputFile)) {
try (WritableByteChannel writerChannel =
compression.writeCompressed(FileSystems.create(tempFile, MimeTypes.BINARY))) {
// Execute the copy to the temporary file
ByteStreams.copy(readerChannel, writerChannel);
}
// Rename the temporary file to the output file
FileSystems.rename(ImmutableList.of(tempFile), ImmutableList.of(outputFile));
// Output the path to the uncompressed file
context.output(outputFile.toString());
} catch (IOException e) {
LOG.error("Error occurred during compression of {}", inputFile.toString(), e);
context.output(DEADLETTER_TAG, KV.of(inputFile.toString(), e.getMessage()));
}
}
}
}
Bulk Decompress Cloud Storage Files
Bulk Decompress Cloud Storage Files 템플릿은 Cloud Storage의 파일을 지정된 위치로 압축 해제하는 일괄 파이프라인입니다. 이 기능은 이전 중에는 압축된 데이터를 사용하여 네트워크 대역폭 비용을 최소화하되, 이전 후에는 압축 해제된 데이터를 사용하여 분석 처리 속도를 최대화하려는 경우에 유용합니다. 파이프라인은 단일 실행 중에 여러 압축 모드를 자동으로 처리하며, 파일 확장자(.bzip2, .deflate, .gz, .zip)에 따라 사용할 압축 해제 모드를 결정합니다.
참고: Bulk Decompress Cloud Storage Files 템플릿은 압축된 폴더가 아닌 단일 압축 파일을 대상으로 합니다.
파이프라인 요구사항:
압축 해제할 파일은 Bzip2, Deflate, Gzip, Zip 형식 중 하나여야 합니다.
파이프라인을 실행하기 전에 출력 디렉터리가 있어야 합니다.
템플릿 매개변수
매개변수
설명
inputFilePattern
읽을 입력 파일 패턴입니다. 예를 들면 gs://bucket-name/compressed/*.gz입니다.
outputDirectory
출력을 쓸 위치입니다. 예를 들면 gs://bucket-name/decompressed입니다.
outputFailureFile
압축 해제 프로세스 중에 발생하는 쓰기 오류에 사용할 오류 로그 출력 파일입니다. 예를 들면 gs://bucket-name/decompressed/failed.csv입니다. 오류가 없는 경우에도 파일은 생성되지만 비어 있게 됩니다. 파일 콘텐츠는 CSV 형식(파일 이름, 오류)이며 압축 해제에 실패한 파일이 한 줄에 하나씩 표시됩니다.
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.io.ByteStreams;
import com.google.common.io.Files;
import java.io.IOException;
import java.io.StringWriter;
import java.nio.channels.ReadableByteChannel;
import java.nio.channels.WritableByteChannel;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import javax.annotation.Nullable;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.Compression;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.MatchResult;
import org.apache.beam.sdk.io.fs.MoveOptions;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.MimeTypes;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionTuple;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.apache.commons.csv.CSVFormat;
import org.apache.commons.csv.CSVPrinter;
import org.apache.commons.csv.QuoteMode;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* This pipeline decompresses file(s) from Google Cloud Storage and re-uploads them to a destination
* location.
*
* <p><b>Parameters</b>
*
* <p>The {@code --inputFilePattern} parameter specifies a file glob to process. Files found can be
* expressed in the following formats:
*
* <pre>
* --inputFilePattern=gs://bucket-name/compressed-dir/*
* --inputFilePattern=gs://bucket-name/compressed-dir/demo*.gz
* </pre>
*
* <p>The {@code --outputDirectory} parameter can be expressed in the following formats:
*
* <pre>
* --outputDirectory=gs://bucket-name
* --outputDirectory=gs://bucket-name/decompressed-dir
* </pre>
*
* <p>The {@code --outputFailureFile} parameter indicates the file to write the names of the files
* which failed decompression and their associated error messages. This file can then be used for
* subsequent processing by another process outside of Dataflow (e.g. send an email with the
* failures, etc.). If there are no failures, the file will still be created but will be empty. The
* failure file structure contains both the file that caused the error and the error message in CSV
* format. The file will contain one header row and two columns (Filename, Error). The filename
* output to the failureFile will be the full path of the file for ease of debugging.
*
* <pre>
* --outputFailureFile=gs://bucket-name/decompressed-dir/failed.csv
* </pre>
*
* <p>Example Output File:
*
* <pre>
* Filename,Error
* gs://docs-demo/compressedFile.gz, File is malformed or not compressed in BZIP2 format.
* </pre>
*
* <p><b>Example Usage</b>
*
* <pre>
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.BulkDecompressor \
* -Dexec.cleanupDaemonThreads=false \
* -Dexec.args=" \
* --project=${PROJECT_ID} \
* --stagingLocation=gs://${PROJECT_ID}/dataflow/pipelines/${PIPELINE_FOLDER}/staging \
* --tempLocation=gs://${PROJECT_ID}/dataflow/pipelines/${PIPELINE_FOLDER}/temp \
* --runner=DataflowRunner \
* --inputFilePattern=gs://${PROJECT_ID}/compressed-dir/*.gz \
* --outputDirectory=gs://${PROJECT_ID}/decompressed-dir \
* --outputFailureFile=gs://${PROJECT_ID}/decompressed-dir/failed.csv"
* </pre>
*/
public class BulkDecompressor {
/** The logger to output status messages to. */
private static final Logger LOG = LoggerFactory.getLogger(BulkDecompressor.class);
/**
* A list of the {@link Compression} values excluding {@link Compression#AUTO} and {@link
* Compression#UNCOMPRESSED}.
*/
@VisibleForTesting
static final Set<Compression> SUPPORTED_COMPRESSIONS =
Stream.of(Compression.values())
.filter(value -> value != Compression.AUTO && value != Compression.UNCOMPRESSED)
.collect(Collectors.toSet());
/** The error msg given when the pipeline matches a file but cannot determine the compression. */
@VisibleForTesting
static final String UNCOMPRESSED_ERROR_MSG =
"Skipping file %s because it did not match any compression mode (%s)";
@VisibleForTesting
static final String MALFORMED_ERROR_MSG =
"The file resource %s is malformed or not in %s compressed format.";
/** The tag used to identify the main output of the {@link Decompress} DoFn. */
@VisibleForTesting
static final TupleTag<String> DECOMPRESS_MAIN_OUT_TAG = new TupleTag<String>() {};
/** The tag used to identify the dead-letter sideOutput of the {@link Decompress} DoFn. */
@VisibleForTesting
static final TupleTag<KV<String, String>> DEADLETTER_TAG = new TupleTag<KV<String, String>>() {};
/**
* The {@link Options} class provides the custom execution options passed by the executor at the
* command-line.
*/
public interface Options extends PipelineOptions {
@Description("The input file pattern to read from (e.g. gs://bucket-name/compressed/*.gz)")
@Required
ValueProvider<String> getInputFilePattern();
void setInputFilePattern(ValueProvider<String> value);
@Description("The output location to write to (e.g. gs://bucket-name/decompressed)")
@Required
ValueProvider<String> getOutputDirectory();
void setOutputDirectory(ValueProvider<String> value);
@Description(
"The output file to write failures during the decompression process "
+ "(e.g. gs://bucket-name/decompressed/failed.txt). The contents will be one line for "
+ "each file which failed decompression. Note that this parameter will "
+ "allow the pipeline to continue processing in the event of a failure.")
@Required
ValueProvider<String> getOutputFailureFile();
void setOutputFailureFile(ValueProvider<String> value);
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* BulkDecompressor#run(Options)} method to start the pipeline and invoke {@code
* result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args The command-line args passed by the executor.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(Options options) {
/*
* Steps:
* 1) Find all files matching the input pattern
* 2) Decompress the files found and output them to the output directory
* 3) Write any errors to the failure output file
*/
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
// Run the pipeline over the work items.
PCollectionTuple decompressOut =
pipeline
.apply("MatchFile(s)", FileIO.match().filepattern(options.getInputFilePattern()))
.apply(
"DecompressFile(s)",
ParDo.of(new Decompress(options.getOutputDirectory()))
.withOutputTags(DECOMPRESS_MAIN_OUT_TAG, TupleTagList.of(DEADLETTER_TAG)));
decompressOut
.get(DEADLETTER_TAG)
.apply(
"FormatErrors",
MapElements.into(TypeDescriptors.strings())
.via(
kv -> {
StringWriter stringWriter = new StringWriter();
try {
CSVPrinter printer =
new CSVPrinter(
stringWriter,
CSVFormat.DEFAULT
.withEscape('\\')
.withQuoteMode(QuoteMode.NONE)
.withRecordSeparator('\n'));
printer.printRecord(kv.getKey(), kv.getValue());
} catch (IOException e) {
throw new RuntimeException(e);
}
return stringWriter.toString();
}))
// We don't expect error files to be large so we'll create a single
// file for ease of reprocessing by processes outside of Dataflow.
.apply(
"WriteErrorFile",
TextIO.write()
.to(options.getOutputFailureFile())
.withHeader("Filename,Error")
.withoutSharding());
return pipeline.run();
}
/**
* Performs the decompression of an object on Google Cloud Storage and uploads the decompressed
* object back to a specified destination location.
*/
@SuppressWarnings("serial")
public static class Decompress extends DoFn<MatchResult.Metadata, String> {
private final ValueProvider<String> destinationLocation;
Decompress(ValueProvider<String> destinationLocation) {
this.destinationLocation = destinationLocation;
}
@ProcessElement
public void processElement(ProcessContext context) {
ResourceId inputFile = context.element().resourceId();
// Output a record to the failure file if the file doesn't match a known compression.
if (!Compression.AUTO.isCompressed(inputFile.toString())) {
String errorMsg =
String.format(UNCOMPRESSED_ERROR_MSG, inputFile.toString(), SUPPORTED_COMPRESSIONS);
context.output(DEADLETTER_TAG, KV.of(inputFile.toString(), errorMsg));
} else {
try {
ResourceId outputFile = decompress(inputFile);
context.output(outputFile.toString());
} catch (IOException e) {
LOG.error(e.getMessage());
context.output(DEADLETTER_TAG, KV.of(inputFile.toString(), e.getMessage()));
}
}
}
/**
* Decompresses the inputFile using the specified compression and outputs to the main output of
* the {@link Decompress} doFn. Files output to the destination will be first written as temp
* files with a "temp-" prefix within the output directory. If a file fails decompression, the
* filename and the associated error will be output to the dead-letter.
*
* @param inputFile The inputFile to decompress.
* @return A {@link ResourceId} which points to the resulting file from the decompression.
*/
private ResourceId decompress(ResourceId inputFile) throws IOException {
// Remove the compressed extension from the file. Example: demo.txt.gz -> demo.txt
String outputFilename = Files.getNameWithoutExtension(inputFile.toString());
// Resolve the necessary resources to perform the transfer.
ResourceId outputDir = FileSystems.matchNewResource(destinationLocation.get(), true);
ResourceId outputFile =
outputDir.resolve(outputFilename, StandardResolveOptions.RESOLVE_FILE);
ResourceId tempFile =
outputDir.resolve(
Files.getFileExtension(inputFile.toString()) + "-temp-" + outputFilename,
StandardResolveOptions.RESOLVE_FILE);
// Resolve the compression
Compression compression = Compression.detect(inputFile.toString());
// Perform the copy of the decompressed channel into the destination.
try (ReadableByteChannel readerChannel =
compression.readDecompressed(FileSystems.open(inputFile))) {
try (WritableByteChannel writerChannel = FileSystems.create(tempFile, MimeTypes.TEXT)) {
ByteStreams.copy(readerChannel, writerChannel);
}
// Rename the temp file to the output file.
FileSystems.rename(
ImmutableList.of(tempFile),
ImmutableList.of(outputFile),
MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
} catch (IOException e) {
String msg = e.getMessage();
LOG.error("Error occurred during decompression of {}", inputFile.toString(), e);
throw new IOException(sanitizeDecompressionErrorMsg(msg, inputFile, compression));
}
return outputFile;
}
/**
* The error messages coming from the compression library are not consistent across compression
* modes. Here we'll attempt to unify the messages to inform the user more clearly when we've
* encountered a file which is not compressed or malformed. Note that GZIP and ZIP compression
* modes will not throw an exception when a decompression is attempted on a file which is not
* compressed.
*
* @param errorMsg The error message thrown during decompression.
* @param inputFile The input file which failed decompression.
* @param compression The compression mode used during decompression.
* @return The sanitized error message. If the error was not from a malformed file, the same
* error message passed will be returned (if not null) or an empty string will be returned
* (if null).
*/
private String sanitizeDecompressionErrorMsg(
@Nullable String errorMsg, ResourceId inputFile, Compression compression) {
if (errorMsg != null
&& (errorMsg.contains("not in the BZip2 format")
|| errorMsg.contains("incorrect header check"))) {
errorMsg = String.format(MALFORMED_ERROR_MSG, inputFile.toString(), compression);
}
return errorMsg == null ? "" : errorMsg;
}
}
}
Datastore 일괄 삭제[지원 중단됨]
이 템플릿은 지원 중단되었으며 2022년 1분기에 삭제됩니다.
Firestore 일괄 삭제 템플릿으로 마이그레이션하세요.
Datastore 일괄 삭제 템플릿은 지정된 GQL 쿼리로 Datastore 에서 항목을 읽고 선택한 대상 프로젝트에서 일치하는 모든 항목을 삭제하는 파이프라인입니다. 이 파이프라인은 JSON 인코딩된 Datastore 항목을 자바스크립트 UDF(null 값을 반환하여 항목 필터링)로 선택적으로 전달할 수 있습니다.
파이프라인 요구사항:
템플릿을 실행하기 전에 프로젝트에서 Datastore를 설정해야 합니다.
별도의 Datastore 인스턴스에서 읽고 삭제하는 경우 Dataflow 작업자 서비스 계정에 한 인스턴스에서 읽고 다른 인스턴스에서 삭제할 수 있는 권한이 있어야 합니다.
템플릿 매개변수
매개변수
설명
datastoreReadGqlQuery
삭제 대상과 일치하는 항목을 지정하는 GQL 쿼리입니다. 키 전용 쿼리를 사용하면 성능이 향상될 수 있습니다. 예를 들면 다음과 같습니다. 'SELECT __key__ FROM MyKind'
datastoreReadProjectId
일치하는 데 사용되는 항목(GQL 쿼리 사용)을 읽으려는 Datastore 인스턴스의 GCP 프로젝트 ID입니다.
datastoreDeleteProjectId
일치하는 항목을 삭제할 Datastore 인스턴스의 GCP 프로젝트 ID입니다. Datastore 인스턴스 내에서 읽고 삭제하려는 경우, datastoreReadProjectId와 같을 수 있습니다.
datastoreReadNamespace
(선택사항) 요청한 항목의 네임스페이스입니다. 기본 네임스페이스는 ""로 설정되어 있습니다.
datastoreHintNumWorkers
(선택사항) Datastore 증가 제한 단계의 예상 작업자 수에 대한 힌트입니다. 기본값은 500입니다.
javascriptTextTransformGcsPath
(선택사항)
사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
javascriptTextTransformFunctionName
(선택사항)
사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.
예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.
이 함수가 지정된 Datastore 항목에 대해 undefined 또는 null 값을 반환하는 경우 해당 항목은 삭제되지 않습니다.
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreDeleteEntityJson;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreDeleteOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreReadOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.ReadJsonEntities;
import com.google.cloud.teleport.templates.common.FirestoreNestedValueProvider;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.TransformTextViaJavascript;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
/** Dataflow template which deletes pulled Datastore Entities. */
public class DatastoreToDatastoreDelete {
public static <T> ValueProvider<T> selectProvidedInput(
ValueProvider<T> datastoreInput, ValueProvider<T> firestoreInput) {
return new FirestoreNestedValueProvider(datastoreInput, firestoreInput);
}
/** Custom PipelineOptions. */
public interface DatastoreToDatastoreDeleteOptions
extends PipelineOptions,
DatastoreReadOptions,
JavascriptTextTransformerOptions,
DatastoreDeleteOptions {}
/**
* Runs a pipeline which reads in Entities from datastore, passes in the JSON encoded Entities to
* a Javascript UDF, and deletes all the Entities.
*
* <p>If the UDF returns value of undefined or null for a given Entity, then that Entity will not
* be deleted.
*
* @param args arguments to the pipeline
*/
public static void main(String[] args) {
DatastoreToDatastoreDeleteOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(DatastoreToDatastoreDeleteOptions.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(
ReadJsonEntities.newBuilder()
.setGqlQuery(
selectProvidedInput(
options.getDatastoreReadGqlQuery(), options.getFirestoreReadGqlQuery()))
.setProjectId(
selectProvidedInput(
options.getDatastoreReadProjectId(), options.getFirestoreReadProjectId()))
.setNamespace(
selectProvidedInput(
options.getDatastoreReadNamespace(), options.getFirestoreReadNamespace()))
.build())
.apply(
TransformTextViaJavascript.newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.build())
.apply(
DatastoreDeleteEntityJson.newBuilder()
.setProjectId(
selectProvidedInput(
options.getDatastoreDeleteProjectId(),
options.getFirestoreDeleteProjectId()))
.setHintNumWorkers(
selectProvidedInput(
options.getDatastoreHintNumWorkers(), options.getFirestoreHintNumWorkers()))
.build());
pipeline.run();
}
}
Firestore 일괄 삭제
Firestore 일괄 삭제 템플릿은 지정된 GQL 쿼리로 Firestore에서 항목을 읽고 선택한 대상 프로젝트에서 일치하는 모든 항목을 삭제하는 파이프라인입니다. 이 파이프라인은 JSON 인코딩된 Firestore 항목을 자바스크립트 UDF(null 값을 반환하여 항목 필터링)로 선택적으로 전달할 수 있습니다.
파이프라인 요구사항:
템플릿을 실행하기 전에 프로젝트에서 Firestore를 설정해야 합니다.
별도의 Firestore 인스턴스에서 읽고 삭제하는 경우 Dataflow 작업자 서비스 계정에 한 인스턴스에서 읽고 다른 인스턴스에서 삭제할 수 있는 권한이 있어야 합니다.
템플릿 매개변수
매개변수
설명
firestoreReadGqlQuery
삭제 대상과 일치하는 항목을 지정하는 GQL 쿼리입니다. 키 전용 쿼리를 사용하면 성능이 향상될 수 있습니다. 예를 들면 다음과 같습니다. 'SELECT __key__ FROM MyKind'
firestoreReadProjectId
일치하는 데 사용되는 항목(GQL 쿼리 사용)을 읽으려는 Firestore 인스턴스의 GCP 프로젝트 ID입니다.
firestoreDeleteProjectId
일치하는 항목을 삭제할 Firestore 인스턴스의 GCP 프로젝트 ID입니다. Firestore 인스턴스 내에서 읽고 삭제하려는 경우, firestoreReadProjectId와 같을 수 있습니다.
firestoreReadNamespace
(선택사항) 요청한 항목의 네임스페이스입니다. 기본 네임스페이스는 ""로 설정되어 있습니다.
firestoreHintNumWorkers
(선택사항) Firestore 증가 제한 단계의 예상 작업자 수에 대한 힌트입니다. 기본값은 500입니다.
javascriptTextTransformGcsPath
(선택사항)
사용할 자바스크립트 사용자 정의 함수(UDF)를 정의하는 .js 파일의 Cloud Storage URI입니다. 예를 들면 gs://my-bucket/my-udfs/my_file.js입니다.
javascriptTextTransformFunctionName
(선택사항)
사용할 자바스크립트 사용자 정의 함수(UDF)의 이름입니다.
예를 들어 자바스크립트 함수가 myTransform(inJson) { /*...do stuff...*/ }이면 함수 이름은 myTransform입니다. 샘플 자바스크립트 UDF는 UDF 예시를 참조하세요.
이 함수가 지정된 Firestore 항목에 대해 undefined 또는 null 값을 반환하는 경우 해당 항목은 삭제되지 않습니다.
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreDeleteEntityJson;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreDeleteOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreReadOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.ReadJsonEntities;
import com.google.cloud.teleport.templates.common.FirestoreNestedValueProvider;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.TransformTextViaJavascript;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
/** Dataflow template which deletes pulled Datastore Entities. */
public class DatastoreToDatastoreDelete {
public static <T> ValueProvider<T> selectProvidedInput(
ValueProvider<T> datastoreInput, ValueProvider<T> firestoreInput) {
return new FirestoreNestedValueProvider(datastoreInput, firestoreInput);
}
/** Custom PipelineOptions. */
public interface DatastoreToDatastoreDeleteOptions
extends PipelineOptions,
DatastoreReadOptions,
JavascriptTextTransformerOptions,
DatastoreDeleteOptions {}
/**
* Runs a pipeline which reads in Entities from datastore, passes in the JSON encoded Entities to
* a Javascript UDF, and deletes all the Entities.
*
* <p>If the UDF returns value of undefined or null for a given Entity, then that Entity will not
* be deleted.
*
* @param args arguments to the pipeline
*/
public static void main(String[] args) {
DatastoreToDatastoreDeleteOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(DatastoreToDatastoreDeleteOptions.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(
ReadJsonEntities.newBuilder()
.setGqlQuery(
selectProvidedInput(
options.getDatastoreReadGqlQuery(), options.getFirestoreReadGqlQuery()))
.setProjectId(
selectProvidedInput(
options.getDatastoreReadProjectId(), options.getFirestoreReadProjectId()))
.setNamespace(
selectProvidedInput(
options.getDatastoreReadNamespace(), options.getFirestoreReadNamespace()))
.build())
.apply(
TransformTextViaJavascript.newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.build())
.apply(
DatastoreDeleteEntityJson.newBuilder()
.setProjectId(
selectProvidedInput(
options.getDatastoreDeleteProjectId(),
options.getFirestoreDeleteProjectId()))
.setHintNumWorkers(
selectProvidedInput(
options.getDatastoreHintNumWorkers(), options.getFirestoreHintNumWorkers()))
.build());
pipeline.run();
}
}
Pub/Sub/BigQuery/Cloud Storage에 대한 Streaming Data Generator
Streaming Data Generator 템플릿은 지정된 속도로 사용자가 제공한 스키마를 기반으로 무제한 또는 고정된 수의 종합 레코드 또는 메시지를 생성하는 데 사용됩니다. 호환되는 대상에는 Pub/Sub 주제, BigQuery 테이블, Cloud Storage 버킷이 있습니다.
다음은 몇 가지 가능한 사용 사례입니다.
Pub/Sub 주제에 대한 대규모 실시간 이벤트 게시를 시뮬레이션하여 게시된 이벤트를 처리하는 데 필요한 소비자의 수와 크기를 측정하고 결정합니다.
BigQuery 테이블 또는 Cloud Storage 버킷에 합성 데이터를 생성하여 성능 벤치마크를 평가하거나 개념 증명을 수행합니다.
실행하기 전에 출력 대상이 있어야 합니다. 대상은 싱크 유형에 따라 Pub/Sub 주제, BigQuery 테이블 또는 Cloud Storage 버킷이어야 합니다.
출력 인코딩이 Avro 또는 Parquet이면 Avro 스키마 파일을 만들어 Cloud Storage 위치에 저장합니다.
템플릿 매개변수
매개변수
설명
schemaLocation
스키마 파일의 위치입니다. 예를 들면 gs://mybucket/filename.json입니다.
qps
초당 게시될 메시지 수입니다. 예를 들면 100입니다.
sinkType
(선택사항) 출력 싱크 유형입니다. 가능한 값은 PUBSUB, BIGQUERY, GCS입니다. 기본값은 PUBSUB입니다.
outputType
(선택사항) 출력 인코딩 유형입니다. 가능한 값은 JSON, AVRO, PARQUET입니다. 기본값은 JSON입니다.
avroSchemaLocation
(선택사항) AVRO 스키마 파일의 위치입니다. outputType이 AVRO 또는 PARQUET인 경우 필수입니다. 예를 들면 gs://mybucket/filename.avsc입니다.
topic
(선택사항) 파이프라인에서 데이터를 게시해야 하는 Pub/Sub 주제의 이름입니다. sinkType이 Pub/Sub인 경우 필수입니다. 예를 들면 projects/my-project-ID/topics/my-topic-ID입니다.
outputTableSpec
(선택사항) 출력 BigQuery 테이블의 이름입니다. sinkType이 BigQuery인 경우 필수입니다. 예를 들면 my-project-ID:my_dataset_name.my-table-name입니다.
writeDisposition
(선택사항) BigQuery 쓰기 처리입니다. 가능한 값은 WRITE_APPEND, WRITE_EMPTY 또는 WRITE_TRUNCATE입니다. 기본값은 WRITE_APPEND입니다.
outputDeadletterTable
(선택사항) 실패한 레코드를 저장할 출력 BigQuery 테이블의 이름입니다. 제공되지 않으면 파이프라인은 실행 중에 이름이 {output_table_name}_error_records인 테이블을 만듭니다. 예를 들면 my-project-ID:my_dataset_name.my-table-name입니다.
outputDirectory
(선택사항) 출력 Cloud Storage 위치의 경로입니다. sinkType이 Cloud Storage인 경우 필수입니다. 예를 들면 gs://mybucket/pathprefix/입니다.
outputFilenamePrefix
(선택사항) Cloud Storage에 기록된 출력 파일의 파일 이름 프리픽스입니다. 기본값은 output-입니다.
windowDuration
(선택사항) Cloud Storage에 출력이 기록되는 기간 간격입니다. 기본값은 1m입니다. 즉, 1분입니다.
numShards
(선택사항) 최대 출력 샤드 수입니다. sinkType이 Cloud Storage이고 1 이상의 숫자로 설정되어야 하는 경우 필수입니다.
messagesLimit
(선택사항) 출력 메시지의 최대 개수입니다. 기본값은 0이며 무제한을 의미합니다.
autoscalingAlgorithm
(선택사항) 작업자 자동 확장에 사용되는 알고리즘입니다. 가능한 값은 자동 확장을 사용 설정하는 THROUGHPUT_BASED 또는 중지하는 NONE입니다.
/*
* Copyright (C) 2020 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Preconditions.checkNotNull;
import com.github.vincentrussell.json.datagenerator.JsonDataGenerator;
import com.github.vincentrussell.json.datagenerator.JsonDataGeneratorException;
import com.github.vincentrussell.json.datagenerator.impl.JsonDataGeneratorImpl;
import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToBigQuery;
import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToGcs;
import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToPubSub;
import com.google.cloud.teleport.v2.utils.DurationUtils;
import com.google.cloud.teleport.v2.utils.GCSUtils;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import javax.annotation.Nonnull;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.windowing.FixedWindows;
import org.apache.beam.sdk.transforms.windowing.Window;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PDone;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link StreamingDataGenerator} is a streaming pipeline which generates messages at a
* specified rate to either Pub/Sub topic or BigQuery/GCS. The messages are generated according to a
* schema template which instructs the pipeline how to populate the messages with fake data
* compliant to constraints.
*
* <p>The number of workers executing the pipeline must be large enough to support the supplied QPS.
* Use a general rule of 2,500 QPS per core in the worker pool.
*
* <p>See <a href="https://github.com/vincentrussell/json-data-generator">json-data-generator</a>
* for instructions on how to construct the schema file.
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT=my-project
* BUCKET_NAME=my-bucket
* SCHEMA_LOCATION=gs://<bucket>/<path>/<to>/game-event-schema.json
* PUBSUB_TOPIC=projects/<project-id>/topics/<topic-id>
* QPS=2500
*
* # Set containerization vars
* IMAGE_NAME=my-image-name
* TARGET_GCR_IMAGE=gcr.io/${PROJECT}/${IMAGE_NAME}
* BASE_CONTAINER_IMAGE=my-base-container-image
* BASE_CONTAINER_IMAGE_VERSION=my-base-container-image-version
* APP_ROOT=/path/to/app-root
* COMMAND_SPEC=/path/to/command-spec
*
* # Build and upload image
* mvn clean package \
* -Dimage=${TARGET_GCR_IMAGE} \
* -Dbase-container-image=${BASE_CONTAINER_IMAGE} \
* -Dbase-container-image.version=${BASE_CONTAINER_IMAGE_VERSION} \
* -Dapp-root=${APP_ROOT} \
* -Dcommand-spec=${COMMAND_SPEC}
*
* # Create a template spec containing the details of image location and metadata in GCS
* as specified in README.md file
*
* # Execute template:
* JOB_NAME=<job-name>
* PROJECT=<project-id>
* TEMPLATE_SPEC_GCSPATH=gs://path/to/template-spec
* SCHEMA_LOCATION=gs://path/to/schema.json
* PUBSUB_TOPIC=projects/$PROJECT/topics/<topic-name>
* QPS=1
*
* gcloud beta dataflow flex-template run $JOB_NAME \
* --project=$PROJECT --region=us-central1 --flex-template \
* --template-file-gcs-location=$TEMPLATE_SPEC_GCSPATH \
* --parameters autoscalingAlgorithm="THROUGHPUT_BASED",schemaLocation=$SCHEMA_LOCATION,topic=$PUBSUB_TOPIC,qps=$QPS,maxNumWorkers=3
*
* </pre>
*/
public class StreamingDataGenerator {
private static final Logger logger = LoggerFactory.getLogger(StreamingDataGenerator.class);
/**
* The {@link StreamingDataGeneratorOptions} class provides the custom execution options passed by
* the executor at the command-line.
*/
public interface StreamingDataGeneratorOptions extends PipelineOptions {
@Description("Indicates rate of messages per second to be published to Pub/Sub.")
@Required
Long getQps();
void setQps(Long value);
@Description("The path to the schema to generate.")
@Required
String getSchemaLocation();
void setSchemaLocation(String value);
@Description("The Pub/Sub topic to write to.")
String getTopic();
void setTopic(String value);
@Description(
"Indicates maximum number of messages to be generated. Default is 0 indicating unlimited.")
@Default.Long(0L)
Long getMessagesLimit();
void setMessagesLimit(Long value);
@Description("The message Output type. --outputType must be one of:[JSON,AVRO,PARQUET]")
@Default.Enum("JSON")
OutputType getOutputType();
void setOutputType(OutputType value);
@Description("The path to Avro schema for encoding message into AVRO output type.")
String getAvroSchemaLocation();
void setAvroSchemaLocation(String value);
@Description("The message sink type. Must be one of:[PUBSUB,BIGQUERY,GCS]")
@Default.Enum("PUBSUB")
SinkType getSinkType();
void setSinkType(SinkType value);
@Description(
"Output BigQuery table spec. "
+ "The name should be in the format: "
+ "<project>:<dataset>.<table_name>.")
String getOutputTableSpec();
void setOutputTableSpec(String value);
@Description("Write disposition to use for BigQuery. Default: WRITE_APPEND")
@Default.String("WRITE_APPEND")
String getWriteDisposition();
void setWriteDisposition(String writeDisposition);
@Description(
"The dead-letter table to output to within BigQuery in <project-id>:<dataset>.<table> "
+ "format. If it doesn't exist, it will be created during pipeline execution.")
String getOutputDeadletterTable();
void setOutputDeadletterTable(String outputDeadletterTable);
@Description(
"The window duration in which data will be written. Defaults to 5m."
+ "Allowed formats are: "
+ "Ns (for seconds, example: 5s), "
+ "Nm (for minutes, example: 12m), "
+ "Nh (for hours, example: 2h).")
@Default.String("1m")
String getWindowDuration();
void setWindowDuration(String windowDuration);
@Description("The directory to write output files. Must end with a slash. ")
String getOutputDirectory();
void setOutputDirectory(String outputDirectory);
@Description(
"The filename prefix of the files to write to. Default file prefix is set to \"output-\". ")
@Default.String("output-")
String getOutputFilenamePrefix();
void setOutputFilenamePrefix(String outputFilenamePrefix);
@Description(
"The maximum number of output shards produced while writing to FileSystem. Default number"
+ " is runner defined.")
@Default.Integer(0)
Integer getNumShards();
void setNumShards(Integer numShards);
}
/** Allowed list of message encoding types. */
public enum OutputType {
JSON(".json"),
AVRO(".avro"),
PARQUET(".parquet");
private final String fileExtension;
/** Sets file extension associated with output type. */
OutputType(String fileExtension) {
this.fileExtension = fileExtension;
}
/** Returns file extension associated with output type. */
public String getFileExtension() {
return fileExtension;
}
}
/** Allowed list of sink types. */
public enum SinkType {
PUBSUB,
BIGQUERY,
GCS
}
/**
* The main entry-point for pipeline execution. This method will start the pipeline but will not
* wait for it's execution to finish. If blocking execution is required, use the {@link
* StreamingDataGenerator#run(StreamingDataGeneratorOptions)} method to start the pipeline and
* invoke {@code result.waitUntilFinish()} on the {@link PipelineResult}.
*
* @param args command-line args passed by the executor.
*/
public static void main(String[] args) {
StreamingDataGeneratorOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(StreamingDataGeneratorOptions.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options the execution options.
* @return the pipeline result.
*/
public static PipelineResult run(@Nonnull StreamingDataGeneratorOptions options) {
checkNotNull(options, "options argument to run method cannot be null.");
// Create the pipeline
Pipeline pipeline = Pipeline.create(options);
/*
* Steps:
* 1) Trigger at the supplied QPS
* 2) Generate messages containing fake data
* 3) Write messages to appropriate Sink
*/
PCollection<byte[]> fakeMessages =
pipeline
.apply("Trigger", createTrigger(options))
.apply(
"Generate Fake Messages",
ParDo.of(new MessageGeneratorFn(options.getSchemaLocation())));
if (options.getSinkType().equals(SinkType.GCS)) {
fakeMessages =
fakeMessages.apply(
options.getWindowDuration() + " Window",
Window.into(
FixedWindows.of(DurationUtils.parseDuration(options.getWindowDuration()))));
}
fakeMessages.apply("Write To " + options.getSinkType().name(), createSink(options));
return pipeline.run();
}
/**
* Creates either Bounded or UnBounded Source based on messageLimit pipeline option.
*
* @param options the pipeline options.
*/
private static GenerateSequence createTrigger(@Nonnull StreamingDataGeneratorOptions options) {
checkNotNull(options, "options argument to createTrigger method cannot be null.");
GenerateSequence generateSequence =
GenerateSequence.from(0L)
.withRate(options.getQps(), /* periodLength = */ Duration.standardSeconds(1L));
return options.getMessagesLimit() > 0
? generateSequence.to(options.getMessagesLimit())
: generateSequence;
}
/**
* The {@link MessageGeneratorFn} class generates fake messages based on supplied schema
*
* <p>See <a href="https://github.com/vincentrussell/json-data-generator">json-data-generator</a>
* for instructions on how to construct the schema file.
*/
@VisibleForTesting
static class MessageGeneratorFn extends DoFn<Long, byte[]> {
// Not initialized inline or constructor because {@link JsonDataGenerator} is not serializable.
private transient JsonDataGenerator dataGenerator;
private final String schemaLocation;
private String schema;
MessageGeneratorFn(@Nonnull String schemaLocation) {
checkNotNull(
schemaLocation, "schemaLocation argument of MessageGeneratorFn class cannot be null.");
this.schemaLocation = schemaLocation;
}
@Setup
public void setup() throws IOException {
dataGenerator = new JsonDataGeneratorImpl();
schema = GCSUtils.getGcsFileAsString(schemaLocation);
}
@ProcessElement
public void processElement(
@Element Long element,
@Timestamp Instant timestamp,
OutputReceiver<byte[]> receiver,
ProcessContext context)
throws IOException, JsonDataGeneratorException {
byte[] payload;
// Generate the fake JSON according to the schema.
try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) {
dataGenerator.generateTestDataJson(schema, byteArrayOutputStream);
payload = byteArrayOutputStream.toByteArray();
}
receiver.output(payload);
}
}
/**
* Creates appropriate sink based on sinkType pipeline option.
*
* @param options the pipeline options.
*/
@VisibleForTesting
static PTransform<PCollection<byte[]>, PDone> createSink(
@Nonnull StreamingDataGeneratorOptions options) {
checkNotNull(options, "options argument to createSink method cannot be null.");
switch (options.getSinkType()) {
case PUBSUB:
checkArgument(
options.getTopic() != null,
String.format(
"Missing required value --topic for %s sink type", options.getSinkType().name()));
return StreamingDataGeneratorWriteToPubSub.Writer.builder(options).build();
case BIGQUERY:
checkArgument(
options.getOutputTableSpec() != null,
String.format(
"Missing required value --outputTableSpec in format"
+ " <project>:<dataset>.<table_name> for %s sink type",
options.getSinkType().name()));
return StreamingDataGeneratorWriteToBigQuery.builder(options).build();
case GCS:
checkArgument(
options.getOutputDirectory() != null,
String.format(
"Missing required value --outputDirectory in format gs:// for %s sink type",
options.getSinkType().name()));
return StreamingDataGeneratorWriteToGcs.builder(options).build();
default:
throw new IllegalArgumentException("Unsupported Sink.");
}
}
}