/*
* Copyright (C) 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.BigQueryToTFRecord.Options;
import com.google.cloud.teleport.templates.common.BigQueryConverters.BigQueryReadOptions;
import com.google.protobuf.ByteString;
import java.util.Iterator;
import java.util.Random;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.TFRecordIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Partition;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.annotations.VisibleForTesting;
import org.tensorflow.example.Example;
import org.tensorflow.example.Feature;
import org.tensorflow.example.Features;
/**
* Dataflow template which reads BigQuery data and writes it to GCS as a set of TFRecords. The
* source is a SQL query.
*/
@Template(
name = "Cloud_BigQuery_to_GCS_TensorFlow_Records",
category = TemplateCategory.BATCH,
displayName = "BigQuery to TensorFlow Records",
description =
"A pipeline that reads rows from BigQuery and writes them as TFRecords in Cloud Storage. (NOTE: Nested BigQuery columns are currently not supported and should be unnested within the SQL query.)",
optionsClass = Options.class,
optionsOrder = {BigQueryReadOptions.class, Options.class},
contactInformation = "https://cloud.google.com/support")
public class BigQueryToTFRecord {
/**
* The {@link BigQueryToTFRecord#buildFeatureFromIterator(Class, Object, Feature.Builder)} method
* handles {@link GenericData.Array} that are passed into the {@link
* BigQueryToTFRecord#buildFeature} method creating a TensorFlow feature from the record.
*/
private static final String TRAIN = "train/";
private static final String TEST = "test/";
private static final String VAL = "val/";
private static void buildFeatureFromIterator(
Class<?> fieldType, Object field, Feature.Builder feature) {
ByteString byteString;
GenericData.Array f = (GenericData.Array) field;
if (fieldType == Long.class) {
Iterator<Long> longIterator = f.iterator();
while (longIterator.hasNext()) {
Long longValue = longIterator.next();
feature.getInt64ListBuilder().addValue(longValue);
}
} else if (fieldType == double.class) {
Iterator<Double> doubleIterator = f.iterator();
while (doubleIterator.hasNext()) {
double doubleValue = doubleIterator.next();
feature.getFloatListBuilder().addValue((float) doubleValue);
}
} else if (fieldType == String.class) {
Iterator<Utf8> stringIterator = f.iterator();
while (stringIterator.hasNext()) {
String stringValue = stringIterator.next().toString();
byteString = ByteString.copyFromUtf8(stringValue);
feature.getBytesListBuilder().addValue(byteString);
}
} else if (fieldType == boolean.class) {
Iterator<Boolean> booleanIterator = f.iterator();
while (booleanIterator.hasNext()) {
Boolean boolValue = booleanIterator.next();
int boolAsInt = boolValue ? 1 : 0;
feature.getInt64ListBuilder().addValue(boolAsInt);
}
}
}
/**
* The {@link BigQueryToTFRecord#buildFeature} method takes in an individual field and type
* corresponding to a column value from a SchemaAndRecord Object returned from a BigQueryIO.read()
* step. The method builds a TensorFlow Feature based on the type of the object- ie: STRING, TIME,
* INTEGER etc..
*/
private static Feature buildFeature(Object field, String type) {
Feature.Builder feature = Feature.newBuilder();
ByteString byteString;
switch (type) {
case "STRING":
case "TIME":
case "DATE":
if (field instanceof GenericData.Array) {
buildFeatureFromIterator(String.class, field, feature);
} else {
byteString = ByteString.copyFromUtf8(field.toString());
feature.getBytesListBuilder().addValue(byteString);
}
break;
case "BYTES":
byteString = ByteString.copyFrom((byte[]) field);
feature.getBytesListBuilder().addValue(byteString);
break;
case "INTEGER":
case "INT64":
case "TIMESTAMP":
if (field instanceof GenericData.Array) {
buildFeatureFromIterator(Long.class, field, feature);
} else {
feature.getInt64ListBuilder().addValue((long) field);
}
break;
case "FLOAT":
case "FLOAT64":
if (field instanceof GenericData.Array) {
buildFeatureFromIterator(double.class, field, feature);
} else {
feature.getFloatListBuilder().addValue((float) (double) field);
}
break;
case "BOOLEAN":
case "BOOL":
if (field instanceof GenericData.Array) {
buildFeatureFromIterator(boolean.class, field, feature);
} else {
int boolAsInt = (boolean) field ? 1 : 0;
feature.getInt64ListBuilder().addValue(boolAsInt);
}
break;
default:
throw new RuntimeException("Unsupported type: " + type);
}
return feature.build();
}
/**
* The {@link BigQueryToTFRecord#record2Example(SchemaAndRecord)} method uses takes in a
* SchemaAndRecord Object returned from a BigQueryIO.read() step and builds a TensorFlow Example
* from the record.
*/
@VisibleForTesting
protected static byte[] record2Example(SchemaAndRecord schemaAndRecord) {
Example.Builder example = Example.newBuilder();
Features.Builder features = example.getFeaturesBuilder();
GenericRecord record = schemaAndRecord.getRecord();
for (TableFieldSchema field : schemaAndRecord.getTableSchema().getFields()) {
Object fieldValue = record.get(field.getName());
if (fieldValue != null) {
Feature feature = buildFeature(fieldValue, field.getType());
features.putFeature(field.getName(), feature);
}
}
return example.build().toByteArray();
}
/**
* The {@link BigQueryToTFRecord#concatURI} method uses takes in a Cloud Storage URI and a
* subdirectory name and safely concatenates them. The resulting String is used as a sink for
* TFRecords.
*/
private static String concatURI(String dir, String folder) {
if (dir.endsWith("/")) {
return dir + folder;
} else {
return dir + "/" + folder;
}
}
/**
* The {@link BigQueryToTFRecord#applyTrainTestValSplit} method transforms the PCollection by
* randomly partitioning it into PCollections for each dataset.
*/
static PCollectionList<byte[]> applyTrainTestValSplit(
PCollection<byte[]> input,
ValueProvider<Float> trainingPercentage,
ValueProvider<Float> testingPercentage,
ValueProvider<Float> validationPercentage,
Random rand) {
return input.apply(
Partition.of(
3,
(Partition.PartitionFn<byte[]>)
(number, numPartitions) -> {
Float train = trainingPercentage.get();
Float test = testingPercentage.get();
Float validation = validationPercentage.get();
Double d = rand.nextDouble();
if (train + test + validation != 1) {
throw new RuntimeException(
String.format(
"Train %.2f, Test %.2f, Validation"
+ " %.2f percentages must add up to 100 percent",
train, test, validation));
}
if (d < train) {
return 0;
} else if (d >= train && d < train + test) {
return 1;
} else {
return 2;
}
}));
}
/** Run the pipeline. */
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
/**
* Runs the pipeline to completion with the specified options. This method does not wait until the
* pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
* object to block until the pipeline is finished running if blocking programmatic execution is
* required.
*
* @param options The execution options.
* @return The pipeline result.
*/
public static PipelineResult run(Options options) {
Random rand = new Random(100); // set random seed
Pipeline pipeline = Pipeline.create(options);
PCollection<byte[]> bigQueryToExamples =
pipeline
.apply(
"RecordToExample",
BigQueryIO.read(BigQueryToTFRecord::record2Example)
.fromQuery(options.getReadQuery())
.withCoder(ByteArrayCoder.of())
.withTemplateCompatibility()
.withoutValidation()
.usingStandardSql()
.withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ)
// Enable BigQuery Storage API
)
.apply("ReshuffleResults", Reshuffle.viaRandomKey());
PCollectionList<byte[]> partitionedExamples =
applyTrainTestValSplit(
bigQueryToExamples,
options.getTrainingPercentage(),
options.getTestingPercentage(),
options.getValidationPercentage(),
rand);
partitionedExamples
.get(0)
.apply(
"WriteTFTrainingRecord",
FileIO.<byte[]>write()
.via(TFRecordIO.sink())
.to(
ValueProvider.NestedValueProvider.of(
options.getOutputDirectory(), dir -> concatURI(dir, TRAIN)))
.withNumShards(0)
.withSuffix(options.getOutputSuffix()));
partitionedExamples
.get(1)
.apply(
"WriteTFTestingRecord",
FileIO.<byte[]>write()
.via(TFRecordIO.sink())
.to(
ValueProvider.NestedValueProvider.of(
options.getOutputDirectory(), dir -> concatURI(dir, TEST)))
.withNumShards(0)
.withSuffix(options.getOutputSuffix()));
partitionedExamples
.get(2)
.apply(
"WriteTFValidationRecord",
FileIO.<byte[]>write()
.via(TFRecordIO.sink())
.to(
ValueProvider.NestedValueProvider.of(
options.getOutputDirectory(), dir -> concatURI(dir, VAL)))
.withNumShards(0)
.withSuffix(options.getOutputSuffix()));
return pipeline.run();
}
/** Define command line arguments. */
public interface Options extends BigQueryReadOptions {
@TemplateParameter.GcsWriteFolder(
order = 1,
description = "Output Cloud Storage directory.",
helpText = "Cloud Storage directory to store output TFRecord files.",
example = "gs://your-bucket/your-path")
ValueProvider<String> getOutputDirectory();
void setOutputDirectory(ValueProvider<String> outputDirectory);
@TemplateParameter.Text(
order = 2,
optional = true,
regexes = {"^[A-Za-z_0-9.]*"},
description = "The output suffix for TFRecord files",
helpText = "File suffix to append to TFRecord files. Defaults to .tfrecord")
@Default.String(".tfrecord")
ValueProvider<String> getOutputSuffix();
void setOutputSuffix(ValueProvider<String> outputSuffix);
@TemplateParameter.Text(
order = 3,
optional = true,
regexes = {"(^\\.[1-9]*$)|(^[01]*)"},
description = "Percentage of data to be in the training set ",
helpText = "Defaults to 1 or 100%. Should be decimal between 0 and 1 inclusive")
@Default.Float(1)
ValueProvider<Float> getTrainingPercentage();
void setTrainingPercentage(ValueProvider<Float> trainingPercentage);
@TemplateParameter.Text(
order = 4,
optional = true,
regexes = {"(^\\.[1-9]*$)|(^[01]*)"},
description = "Percentage of data to be in the testing set ",
helpText = "Defaults to 0 or 0%. Should be decimal between 0 and 1 inclusive")
@Default.Float(0)
ValueProvider<Float> getTestingPercentage();
void setTestingPercentage(ValueProvider<Float> testingPercentage);
@TemplateParameter.Text(
order = 5,
optional = true,
regexes = {"(^\\.[1-9]*$)|(^[01]*)"},
description = "Percentage of data to be in the validation set ",
helpText = "Defaults to 0 or 0%. Should be decimal between 0 and 1 inclusive")
@Default.Float(0)
ValueProvider<Float> getValidationPercentage();
void setValidationPercentage(ValueProvider<Float> validationPercentage);
}
}
BigQuery export to Parquet(Storage API 経由)
BigQuery export to Parquet テンプレートは、BigQuery テーブルからデータを読み取り、Parquet 形式で Cloud Storage バケットに書き込むバッチ パイプラインです。このテンプレートは、BigQuery Storage API を使用してデータをエクスポートします。
/*
* Copyright (C) 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;
import com.google.api.gax.rpc.InvalidArgumentException;
import com.google.api.services.bigquery.model.TableReference;
import com.google.cloud.bigquery.storage.v1beta1.BigQueryStorageClient;
import com.google.cloud.bigquery.storage.v1beta1.ReadOptions.TableReadOptions;
import com.google.cloud.bigquery.storage.v1beta1.Storage.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1beta1.Storage.ReadSession;
import com.google.cloud.bigquery.storage.v1beta1.TableReferenceProto;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.templates.BigQueryToParquet.BigQueryToParquetOptions;
import com.google.common.base.Strings;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryHelpers;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Splitter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* The {@link BigQueryToParquet} pipeline exports data from a BigQuery table to Parquet file(s) in a
* Google Cloud Storage bucket.
*
* <p><b>Pipeline Requirements</b>
*
* <ul>
* <li>BigQuery Table exists.
* <li>Google Cloud Storage bucket exists.
* </ul>
*
* <p><b>Example Usage</b>
*
* <pre>
* # Set the pipeline vars
* PROJECT=my-project
* BUCKET_NAME=my-bucket
* TABLE={$PROJECT}:my-dataset.my-table
*
* # Set containerization vars
* IMAGE_NAME=my-image-name
* TARGET_GCR_IMAGE=gcr.io/${PROJECT}/${IMAGE_NAME}
* BASE_CONTAINER_IMAGE=my-base-container-image
* BASE_CONTAINER_IMAGE_VERSION=my-base-container-image-version
* APP_ROOT=/path/to/app-root
* COMMAND_SPEC=/path/to/command-spec
*
* # Build and upload image
* mvn clean package \
* -Dimage=${TARGET_GCR_IMAGE} \
* -Dbase-container-image=${BASE_CONTAINER_IMAGE} \
* -Dbase-container-image.version=${BASE_CONTAINER_IMAGE_VERSION} \
* -Dapp-root=${APP_ROOT} \
* -Dcommand-spec=${COMMAND_SPEC}
*
* # Create an image spec in GCS that contains the path to the image
* {
* "docker_template_spec": {
* "docker_image": $TARGET_GCR_IMAGE
* }
* }
*
* # Execute template:
* API_ROOT_URL="https://dataflow.googleapis.com"
* TEMPLATES_LAUNCH_API="${API_ROOT_URL}/v1b3/projects/${PROJECT}/templates:launch"
* JOB_NAME="bigquery-to-parquet-`date +%Y%m%d-%H%M%S-%N`"
*
* time curl -X POST -H "Content-Type: application/json" \
* -H "Authorization: Bearer $(gcloud auth print-access-token)" \
* "${TEMPLATES_LAUNCH_API}"`
* `"?validateOnly=false"`
* `"&dynamicTemplate.gcsPath=${BUCKET_NAME}/path/to/image-spec"`
* `"&dynamicTemplate.stagingLocation=${BUCKET_NAME}/staging" \
* -d '
* {
* "jobName":"'$JOB_NAME'",
* "parameters": {
* "tableRef":"'$TABLE'",
* "bucket":"'$BUCKET_NAME/results'",
* "numShards":"5",
* "fields":"field1,field2"
* }
* }
* '
* </pre>
*/
@Template(
name = "BigQuery_to_Parquet",
category = TemplateCategory.BATCH,
displayName = "BigQuery export to Parquet (via Storage API)",
description =
"A pipeline to export a BigQuery table into Parquet files using the BigQuery Storage API.",
optionsClass = BigQueryToParquetOptions.class,
flexContainerName = "bigquery-to-parquet",
contactInformation = "https://cloud.google.com/support")
public class BigQueryToParquet {
/* Logger for class. */
private static final Logger LOG = LoggerFactory.getLogger(BigQueryToParquet.class);
/** File suffix for file to be written. */
private static final String FILE_SUFFIX = ".parquet";
/** Factory to create BigQueryStorageClients. */
static class BigQueryStorageClientFactory {
/**
* Creates BigQueryStorage client for use in extracting table schema.
*
* @return BigQueryStorageClient
*/
static BigQueryStorageClient create() {
try {
return BigQueryStorageClient.create();
} catch (IOException e) {
LOG.error("Error connecting to BigQueryStorage API: " + e.getMessage());
throw new RuntimeException(e);
}
}
}
/** Factory to create ReadSessions. */
static class ReadSessionFactory {
/**
* Creates ReadSession for schema extraction.
*
* @param client BigQueryStorage client used to create ReadSession.
* @param tableString String that represents table to export from.
* @param tableReadOptions TableReadOptions that specify any fields in the table to filter on.
* @return session ReadSession object that contains the schema for the export.
*/
static ReadSession create(
BigQueryStorageClient client, String tableString, TableReadOptions tableReadOptions) {
TableReference tableReference = BigQueryHelpers.parseTableSpec(tableString);
String parentProjectId = "projects/" + tableReference.getProjectId();
TableReferenceProto.TableReference storageTableRef =
TableReferenceProto.TableReference.newBuilder()
.setProjectId(tableReference.getProjectId())
.setDatasetId(tableReference.getDatasetId())
.setTableId(tableReference.getTableId())
.build();
CreateReadSessionRequest.Builder builder =
CreateReadSessionRequest.newBuilder()
.setParent(parentProjectId)
.setReadOptions(tableReadOptions)
.setTableReference(storageTableRef);
try {
return client.createReadSession(builder.build());
} catch (InvalidArgumentException iae) {
LOG.error("Error creating ReadSession: " + iae.getMessage());
throw new RuntimeException(iae);
}
}
}
/**
* The {@link BigQueryToParquetOptions} class provides the custom execution options passed by the
* executor at the command-line.
*/
public interface BigQueryToParquetOptions extends PipelineOptions {
@TemplateParameter.BigQueryTable(
order = 1,
description = "BigQuery table to export",
helpText = "BigQuery table location to export in the format <project>:<dataset>.<table>.",
example = "your-project:your-dataset.your-table-name")
@Required
String getTableRef();
void setTableRef(String tableRef);
@TemplateParameter.GcsWriteFile(
order = 2,
description = "Output Cloud Storage file(s)",
helpText = "Path and filename prefix for writing output files.",
example = "gs://your-bucket/export/")
@Required
String getBucket();
void setBucket(String bucket);
@TemplateParameter.Integer(
order = 3,
optional = true,
description = "Maximum output shards",
helpText =
"The maximum number of output shards produced when writing. A higher number of shards"
+ " means higher throughput for writing to Cloud Storage, but potentially higher"
+ " data aggregation cost across shards when processing output Cloud Storage"
+ " files.")
@Default.Integer(0)
Integer getNumShards();
void setNumShards(Integer numShards);
@TemplateParameter.Text(
order = 4,
optional = true,
description = "List of field names",
helpText = "Comma separated list of fields to select from the table.")
String getFields();
void setFields(String fields);
@TemplateParameter.Text(
order = 5,
optional = true,
description = "Row restrictions/filter.",
helpText =
"Read only rows which match the specified filter, which must be a SQL expression"
+ " compatible with Google standard SQL"
+ " (https://cloud.google.com/bigquery/docs/reference/standard-sql). If no value is"
+ " specified, then all rows are returned.")
String getRowRestriction();
void setRowRestriction(String restriction);
}
/**
* The {@link BigQueryToParquet#getTableSchema(ReadSession)} method gets Avro schema for table
* using from the {@link ReadSession} object.
*
* @param session ReadSession that contains schema for table, filtered by fields if any.
* @return avroSchema Avro schema for table. If fields are provided then schema will only contain
* those fields.
*/
private static Schema getTableSchema(ReadSession session) {
Schema avroSchema;
avroSchema = new Schema.Parser().parse(session.getAvroSchema().getSchema());
LOG.info("Schema for export is: " + avroSchema.toString());
return avroSchema;
}
/**
* Main entry point for pipeline execution.
*
* @param args Command line arguments to the pipeline.
*/
public static void main(String[] args) {
UncaughtExceptionLogger.register();
BigQueryToParquetOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(BigQueryToParquetOptions.class);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
private static PipelineResult run(BigQueryToParquetOptions options) {
// Create the pipeline.
Pipeline pipeline = Pipeline.create(options);
TableReadOptions.Builder builder = TableReadOptions.newBuilder();
/* Add fields to filter export on, if any. */
if (options.getFields() != null) {
builder.addAllSelectedFields(Arrays.asList(options.getFields().split(",\\s*")));
}
TableReadOptions tableReadOptions = builder.build();
BigQueryStorageClient client = BigQueryStorageClientFactory.create();
ReadSession session =
ReadSessionFactory.create(client, options.getTableRef(), tableReadOptions);
// Extract schema from ReadSession
Schema schema = getTableSchema(session);
client.close();
TypedRead<GenericRecord> readFromBQ =
BigQueryIO.read(SchemaAndRecord::getRecord)
.from(options.getTableRef())
.withTemplateCompatibility()
.withMethod(Method.DIRECT_READ)
.withCoder(AvroCoder.of(schema));
if (options.getFields() != null) {
List<String> selectedFields = Splitter.on(",").splitToList(options.getFields());
readFromBQ =
selectedFields.isEmpty() ? readFromBQ : readFromBQ.withSelectedFields(selectedFields);
}
// Add row restrictions/filter if any.
if (!Strings.isNullOrEmpty(options.getRowRestriction())) {
readFromBQ = readFromBQ.withRowRestriction(options.getRowRestriction());
}
/*
* Steps: 1) Read records from BigQuery via BigQueryIO.
* 2) Write records to Google Cloud Storage in Parquet format.
*/
pipeline
/*
* Step 1: Read records via BigQueryIO using supplied schema as a PCollection of
* {@link GenericRecord}.
*/
.apply("ReadFromBigQuery", readFromBQ)
/*
* Step 2: Write records to Google Cloud Storage as one or more Parquet files
* via {@link ParquetIO}.
*/
.apply(
"WriteToParquet",
FileIO.<GenericRecord>write()
.via(ParquetIO.sink(schema))
.to(options.getBucket())
.withNumShards(options.getNumShards())
.withSuffix(FILE_SUFFIX));
// Execute the pipeline and return the result.
return pipeline.run();
}
}
BigQuery to Elasticsearch
BigQuery to Elasticsearch テンプレートは、BigQuery テーブルから Elasticsearch にデータをドキュメントとして取り込むバッチ パイプラインです。テンプレートでは、テーブル全体を読み取ることも、クエリを使用して特定のレコードを読み取ることもできます。
/*
* Copyright (C) 2021 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.elasticsearch.templates;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.elasticsearch.options.BigQueryToElasticsearchOptions;
import com.google.cloud.teleport.v2.elasticsearch.transforms.WriteToElasticsearch;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters.ReadBigQuery;
import com.google.cloud.teleport.v2.transforms.BigQueryConverters.TableRowToJsonFn;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.ParDo;
/**
* The {@link BigQueryToElasticsearch} pipeline exports data from a BigQuery table to Elasticsearch.
*
* <p>Please refer to <b><a href=
* "https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/v2/googlecloud-to-elasticsearch/docs/BigQueryToElasticsearch/README.md">
* README.md</a></b> for further information.
*/
@Template(
name = "BigQuery_to_Elasticsearch",
category = TemplateCategory.BATCH,
displayName = "BigQuery to Elasticsearch",
description =
"A pipeline which sends BigQuery records into an Elasticsearch instance as json documents.",
optionsClass = BigQueryToElasticsearchOptions.class,
flexContainerName = "bigquery-to-elasticsearch",
contactInformation = "https://cloud.google.com/support")
public class BigQueryToElasticsearch {
/**
* Main entry point for pipeline execution.
*
* @param args Command line arguments to the pipeline.
*/
public static void main(String[] args) {
UncaughtExceptionLogger.register();
BigQueryToElasticsearchOptions options =
PipelineOptionsFactory.fromArgs(args)
.withValidation()
.as(BigQueryToElasticsearchOptions.class);
run(options);
}
/**
* Runs the pipeline with the supplied options.
*
* @param options The execution parameters to the pipeline.
* @return The result of the pipeline execution.
*/
private static PipelineResult run(BigQueryToElasticsearchOptions options) {
// Create the pipeline.
Pipeline pipeline = Pipeline.create(options);
/*
* Steps: 1) Read records from BigQuery via BigQueryIO.
* 2) Create json string from Table Row.
* 3) Write records to Elasticsearch.
*
*
* Step #1: Read from BigQuery. If a query is provided then it is used to get the TableRows.
*/
pipeline
.apply(
"ReadFromBigQuery",
ReadBigQuery.newBuilder()
.setOptions(options.as(BigQueryToElasticsearchOptions.class))
.build())
/*
* Step #2: Convert table rows to JSON documents.
*/
.apply("TableRowsToJsonDocument", ParDo.of(new TableRowToJsonFn()))
/*
* Step #3: Write converted records to Elasticsearch
*/
.apply(
"WriteToElasticsearch",
WriteToElasticsearch.newBuilder()
.setOptions(options.as(BigQueryToElasticsearchOptions.class))
.build());
return pipeline.run();
}
}
BigQuery to MongoDB
BigQuery to MongoDB テンプレートは、BigQuery から行を読み取り、ドキュメントとして MongoDB に書き込むバッチ パイプラインです。現在、各行がドキュメントとして格納されています。
/*
* Copyright (C) 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.mongodb.templates;
import com.google.api.services.bigquery.model.TableRow;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.mongodb.options.BigQueryToMongoDbOptions.BigQueryReadOptions;
import com.google.cloud.teleport.v2.mongodb.options.BigQueryToMongoDbOptions.MongoDbOptions;
import com.google.cloud.teleport.v2.mongodb.templates.BigQueryToMongoDb.Options;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.mongodb.MongoDbIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.bson.Document;
/**
* The {@link BigQueryToMongoDb} pipeline is a batch pipeline which reads data from BigQuery and
* outputs the resulting records to MongoDB.
*/
@Template(
name = "BigQuery_to_MongoDB",
category = TemplateCategory.BATCH,
displayName = "BigQuery to MongoDB",
description =
"A batch pipeline which reads data rows from BigQuery and writes them to MongoDB as"
+ " documents.",
optionsClass = Options.class,
flexContainerName = "bigquery-to-mongodb",
contactInformation = "https://cloud.google.com/support")
public class BigQueryToMongoDb {
/**
* Options supported by {@link BigQueryToMongoDb}
*
* <p>Inherits standard configuration options.
*/
public interface Options extends PipelineOptions, MongoDbOptions, BigQueryReadOptions {}
private static class ParseAsDocumentsFn extends DoFn<String, Document> {
@ProcessElement
public void processElement(ProcessContext context) {
context.output(Document.parse(context.element()));
}
}
public static void main(String[] args) {
UncaughtExceptionLogger.register();
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
run(options);
}
public static boolean run(Options options) {
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(BigQueryIO.readTableRows().withoutValidation().from(options.getInputTableSpec()))
.apply(
"bigQueryDataset",
ParDo.of(
new DoFn<TableRow, Document>() {
@ProcessElement
public void process(ProcessContext c) {
Document doc = new Document();
TableRow row = c.element();
row.forEach(
(key, value) -> {
if (key != "_id") {
doc.append(key, value);
}
});
c.output(doc);
}
}))
.apply(
MongoDbIO.write()
.withUri(options.getMongoDbUri())
.withDatabase(options.getDatabase())
.withCollection(options.getCollection()));
pipeline.run();
return true;
}
}
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.bigtable;
import com.google.bigtable.v2.Cell;
import com.google.bigtable.v2.Column;
import com.google.bigtable.v2.Family;
import com.google.bigtable.v2.Row;
import com.google.cloud.teleport.bigtable.BigtableToAvro.Options;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.protobuf.ByteOutput;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
/**
* Dataflow pipeline that exports data from a Cloud Bigtable table to Avro files in GCS. Currently,
* filtering on Cloud Bigtable table is not supported.
*/
@Template(
name = "Cloud_Bigtable_to_GCS_Avro",
category = TemplateCategory.BATCH,
displayName = "Cloud Bigtable to Avro Files in Cloud Storage",
description =
"A pipeline which reads in Cloud Bigtable table and writes it to Cloud Storage in Avro format.",
optionsClass = Options.class,
contactInformation = "https://cloud.google.com/support")
public class BigtableToAvro {
/** Options for the export pipeline. */
public interface Options extends PipelineOptions {
@TemplateParameter.ProjectId(
order = 1,
description = "Project ID",
helpText =
"The ID of the Google Cloud project of the Cloud Bigtable instance that you want to read data from")
ValueProvider<String> getBigtableProjectId();
@SuppressWarnings("unused")
void setBigtableProjectId(ValueProvider<String> projectId);
@TemplateParameter.Text(
order = 2,
regexes = {"[a-z][a-z0-9\\-]+[a-z0-9]"},
description = "Instance ID",
helpText = "The ID of the Cloud Bigtable instance that contains the table")
ValueProvider<String> getBigtableInstanceId();
@SuppressWarnings("unused")
void setBigtableInstanceId(ValueProvider<String> instanceId);
@TemplateParameter.Text(
order = 3,
regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
description = "Table ID",
helpText = "The ID of the Cloud Bigtable table to read")
ValueProvider<String> getBigtableTableId();
@SuppressWarnings("unused")
void setBigtableTableId(ValueProvider<String> tableId);
@TemplateParameter.GcsWriteFolder(
order = 4,
description = "Output file directory in Cloud Storage",
helpText =
"The path and filename prefix for writing output files. Must end with a slash. DateTime formatting is used to parse directory path for date & time formatters.",
example = "gs://your-bucket/your-path")
ValueProvider<String> getOutputDirectory();
@SuppressWarnings("unused")
void setOutputDirectory(ValueProvider<String> outputDirectory);
@TemplateParameter.Text(
order = 5,
description = "Avro file prefix",
helpText = "The prefix of the Avro file name. For example, \"table1-\"")
ValueProvider<String> getFilenamePrefix();
@SuppressWarnings("unused")
void setFilenamePrefix(ValueProvider<String> filenamePrefix);
}
/**
* Runs a pipeline to export data from a Cloud Bigtable table to Avro files in GCS.
*
* @param args arguments to the pipeline
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
PipelineResult result = run(options);
// Wait for pipeline to finish only if it is not constructing a template.
if (options.as(DataflowPipelineOptions.class).getTemplateLocation() == null) {
result.waitUntilFinish();
}
}
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(PipelineUtils.tweakPipelineOptions(options));
BigtableIO.Read read =
BigtableIO.read()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId());
// Do not validate input fields if it is running as a template.
if (options.as(DataflowPipelineOptions.class).getTemplateLocation() != null) {
read = read.withoutValidation();
}
ValueProvider<String> filePathPrefix =
DualInputNestedValueProvider.of(
options.getOutputDirectory(),
options.getFilenamePrefix(),
new SerializableFunction<TranslatorInput<String, String>, String>() {
@Override
public String apply(TranslatorInput<String, String> input) {
return new StringBuilder(input.getX()).append(input.getY()).toString();
}
});
pipeline
.apply("Read from Bigtable", read)
.apply("Transform to Avro", MapElements.via(new BigtableToAvroFn()))
.apply(
"Write to Avro in GCS",
AvroIO.write(BigtableRow.class).to(filePathPrefix).withSuffix(".avro"));
return pipeline.run();
}
/** Translates Bigtable {@link Row} to Avro {@link BigtableRow}. */
static class BigtableToAvroFn extends SimpleFunction<Row, BigtableRow> {
@Override
public BigtableRow apply(Row row) {
ByteBuffer key = ByteBuffer.wrap(toByteArray(row.getKey()));
List<BigtableCell> cells = new ArrayList<>();
for (Family family : row.getFamiliesList()) {
String familyName = family.getName();
for (Column column : family.getColumnsList()) {
ByteBuffer qualifier = ByteBuffer.wrap(toByteArray(column.getQualifier()));
for (Cell cell : column.getCellsList()) {
long timestamp = cell.getTimestampMicros();
ByteBuffer value = ByteBuffer.wrap(toByteArray(cell.getValue()));
cells.add(new BigtableCell(familyName, qualifier, timestamp, value));
}
}
}
return new BigtableRow(key, cells);
}
}
/**
* Extracts the byte array from the given {@link ByteString} without copy.
*
* @param byteString A {@link ByteString} from which to extract the array.
* @return an array of byte.
*/
protected static byte[] toByteArray(final ByteString byteString) {
try {
ZeroCopyByteOutput byteOutput = new ZeroCopyByteOutput();
UnsafeByteOperations.unsafeWriteTo(byteString, byteOutput);
return byteOutput.bytes;
} catch (IOException e) {
return byteString.toByteArray();
}
}
private static final class ZeroCopyByteOutput extends ByteOutput {
private byte[] bytes;
@Override
public void writeLazy(byte[] value, int offset, int length) {
if (offset != 0 || length != value.length) {
throw new UnsupportedOperationException();
}
bytes = value;
}
@Override
public void write(byte value) {
throw new UnsupportedOperationException();
}
@Override
public void write(byte[] value, int offset, int length) {
throw new UnsupportedOperationException();
}
@Override
public void write(ByteBuffer value) {
throw new UnsupportedOperationException();
}
@Override
public void writeLazy(ByteBuffer value) {
throw new UnsupportedOperationException();
}
}
}
/*
* Copyright (C) 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.bigtable;
import static com.google.cloud.teleport.bigtable.BigtableToAvro.toByteArray;
import com.google.bigtable.v2.Cell;
import com.google.bigtable.v2.Column;
import com.google.bigtable.v2.Family;
import com.google.bigtable.v2.Row;
import com.google.cloud.teleport.bigtable.BigtableToParquet.Options;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.AvroCoder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
/**
* Dataflow pipeline that exports data from a Cloud Bigtable table to Parquet files in GCS.
* Currently, filtering on Cloud Bigtable table is not supported.
*/
@Template(
name = "Cloud_Bigtable_to_GCS_Parquet",
category = TemplateCategory.BATCH,
displayName = "Cloud Bigtable to Parquet Files on Cloud Storage",
description =
"A pipeline which reads in Cloud Bigtable table and writes it to Cloud Storage in Parquet format.",
optionsClass = Options.class,
contactInformation = "https://cloud.google.com/support")
public class BigtableToParquet {
/** Options for the export pipeline. */
public interface Options extends PipelineOptions {
@TemplateParameter.ProjectId(
order = 1,
description = "Project ID",
helpText =
"The ID of the Google Cloud project of the Cloud Bigtable instance that you want to read data from")
ValueProvider<String> getBigtableProjectId();
@SuppressWarnings("unused")
void setBigtableProjectId(ValueProvider<String> projectId);
@TemplateParameter.Text(
order = 2,
regexes = {"[a-z][a-z0-9\\-]+[a-z0-9]"},
description = "Instance ID",
helpText = "The ID of the Cloud Bigtable instance that contains the table")
ValueProvider<String> getBigtableInstanceId();
@SuppressWarnings("unused")
void setBigtableInstanceId(ValueProvider<String> instanceId);
@TemplateParameter.Text(
order = 3,
regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
description = "Table ID",
helpText = "The ID of the Cloud Bigtable table to export")
ValueProvider<String> getBigtableTableId();
@SuppressWarnings("unused")
void setBigtableTableId(ValueProvider<String> tableId);
@TemplateParameter.GcsWriteFolder(
order = 4,
description = "Output file directory in Cloud Storage",
helpText =
"The path and filename prefix for writing output files. Must end with a slash. DateTime formatting is used to parse directory path for date & time formatters.",
example = "gs://your-bucket/your-path")
ValueProvider<String> getOutputDirectory();
@SuppressWarnings("unused")
void setOutputDirectory(ValueProvider<String> outputDirectory);
@TemplateParameter.Text(
order = 5,
description = "Parquet file prefix",
helpText = "The prefix of the Parquet file name. For example, \"table1-\"")
@Default.String("output")
ValueProvider<String> getFilenamePrefix();
@SuppressWarnings("unused")
void setFilenamePrefix(ValueProvider<String> filenamePrefix);
@TemplateParameter.Integer(
order = 6,
optional = true,
description = "Maximum output shards",
helpText =
"The maximum number of output shards produced when writing. A higher number of "
+ "shards means higher throughput for writing to Cloud Storage, but potentially higher "
+ "data aggregation cost across shards when processing output Cloud Storage files. "
+ "Default value is decided by the runner.")
@Default.Integer(0)
ValueProvider<Integer> getNumShards();
@SuppressWarnings("unused")
void setNumShards(ValueProvider<Integer> numShards);
}
/**
* Main entry point for pipeline execution.
*
* @param args Command line arguments to the pipeline.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
PipelineResult result = run(options);
// Wait for pipeline to finish only if it is not constructing a template.
if (options.as(DataflowPipelineOptions.class).getTemplateLocation() == null) {
result.waitUntilFinish();
}
}
/**
* Runs a pipeline to export data from a Cloud Bigtable table to Parquet file(s) in GCS.
*
* @param options arguments to the pipeline
*/
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(PipelineUtils.tweakPipelineOptions(options));
BigtableIO.Read read =
BigtableIO.read()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId());
// Do not validate input fields if it is running as a template.
if (options.as(DataflowPipelineOptions.class).getTemplateLocation() != null) {
read = read.withoutValidation();
}
/**
* Steps: 1) Read records from Bigtable. 2) Convert a Bigtable Row to a GenericRecord. 3) Write
* GenericRecord(s) to GCS in parquet format.
*/
pipeline
.apply("Read from Bigtable", read)
.apply("Transform to Parquet", MapElements.via(new BigtableToParquetFn()))
.setCoder(AvroCoder.of(GenericRecord.class, BigtableRow.getClassSchema()))
.apply(
"Write to Parquet in GCS",
FileIO.<GenericRecord>write()
.via(ParquetIO.sink(BigtableRow.getClassSchema()))
.to(options.getOutputDirectory())
.withPrefix(options.getFilenamePrefix())
.withSuffix(".parquet")
.withNumShards(options.getNumShards()));
return pipeline.run();
}
/**
* Translates a {@link PCollection} of Bigtable {@link Row} to a {@link PCollection} of {@link
* GenericRecord}.
*/
static class BigtableToParquetFn extends SimpleFunction<Row, GenericRecord> {
@Override
public GenericRecord apply(Row row) {
ByteBuffer key = ByteBuffer.wrap(toByteArray(row.getKey()));
List<BigtableCell> cells = new ArrayList<>();
for (Family family : row.getFamiliesList()) {
String familyName = family.getName();
for (Column column : family.getColumnsList()) {
ByteBuffer qualifier = ByteBuffer.wrap(toByteArray(column.getQualifier()));
for (Cell cell : column.getCellsList()) {
long timestamp = cell.getTimestampMicros();
ByteBuffer value = ByteBuffer.wrap(toByteArray(cell.getValue()));
cells.add(new BigtableCell(familyName, qualifier, timestamp, value));
}
}
}
return new GenericRecordBuilder(BigtableRow.getClassSchema())
.set("key", key)
.set("cells", cells)
.build();
}
}
}
Bigtable to Cloud Storage SequenceFile
Bigtable to Cloud Storage to SequenceFile テンプレートは、Bigtable テーブルからデータを読み取り、SequenceFile 形式で Cloud Storage バケットに書き込むパイプラインです。このテンプレートは、Bigtable から Cloud Storage にデータをコピーする場合に使用できます。
このパイプラインの要件:
Bigtable テーブルが存在していること。
パイプラインを実行する前に、出力先の Cloud Storage バケットが存在すること。
テンプレートのパラメータ
パラメータ
説明
bigtableProject
データを読み取る Bigtable インスタンスの Google Cloud プロジェクトの ID。
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.templates.DatastoreToText.DatastoreToTextOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreReadOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.ReadJsonEntities;
import com.google.cloud.teleport.templates.common.FirestoreNestedValueProvider;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.TransformTextViaJavascript;
import com.google.cloud.teleport.templates.common.TextConverters.FilesystemWriteOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
/**
* Dataflow template which copies Datastore Entities to a Text sink. Text is encoded using JSON
* encoded entity in the v1/Entity rest format:
* https://cloud.google.com/datastore/docs/reference/rest/v1/Entity
*/
@Template(
name = "Datastore_to_GCS_Text",
category = TemplateCategory.BATCH,
displayName = "Datastore to Text Files on Cloud Storage [Deprecated]",
description =
"Batch pipeline. Reads Datastore entities and writes them to Cloud Storage as text files.",
optionsClass = DatastoreToTextOptions.class,
skipOptions = {"firestoreReadNamespace", "firestoreReadGqlQuery", "firestoreReadProjectId"},
contactInformation = "https://cloud.google.com/support")
@Template(
name = "Firestore_to_GCS_Text",
category = TemplateCategory.BATCH,
displayName = "Firestore (Datastore mode) to Text Files on Cloud Storage",
description =
"Batch pipeline. Reads Firestore entities and writes them to Cloud Storage as text files.",
optionsClass = DatastoreToTextOptions.class,
skipOptions = {"datastoreReadNamespace", "datastoreReadGqlQuery", "datastoreReadProjectId"},
contactInformation = "https://cloud.google.com/support")
public class DatastoreToText {
public static ValueProvider<String> selectProvidedInput(
ValueProvider<String> datastoreInput, ValueProvider<String> firestoreInput) {
return new FirestoreNestedValueProvider(datastoreInput, firestoreInput);
}
/** Custom PipelineOptions. */
public interface DatastoreToTextOptions
extends PipelineOptions,
DatastoreReadOptions,
JavascriptTextTransformerOptions,
FilesystemWriteOptions {}
/**
* Runs a pipeline which reads in Entities from Datastore, passes in the JSON encoded Entities to
* a Javascript UDF, and writes the JSON to TextIO sink.
*
* @param args arguments to the pipeline
*/
public static void main(String[] args) {
DatastoreToTextOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(DatastoreToTextOptions.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(
ReadJsonEntities.newBuilder()
.setGqlQuery(
selectProvidedInput(
options.getDatastoreReadGqlQuery(), options.getFirestoreReadGqlQuery()))
.setProjectId(
selectProvidedInput(
options.getDatastoreReadProjectId(), options.getFirestoreReadProjectId()))
.setNamespace(
selectProvidedInput(
options.getDatastoreReadNamespace(), options.getFirestoreReadNamespace()))
.build())
.apply(
TransformTextViaJavascript.newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.build())
.apply(TextIO.write().to(options.getTextWritePrefix()).withSuffix(".json"));
pipeline.run();
}
}
Firestore to Cloud Storage Text
Firestore to Cloud Storage Text テンプレートは、Firestore エンティティを読み取り、Cloud Storage にテキスト ファイルとして書き込むバッチ パイプラインです。各エンティティを JSON 文字列として扱う関数を使用できます。このような関数を使用しない場合、出力ファイルの各行はシリアル化された JSON エンティティとなります。
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.templates.DatastoreToText.DatastoreToTextOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.DatastoreReadOptions;
import com.google.cloud.teleport.templates.common.DatastoreConverters.ReadJsonEntities;
import com.google.cloud.teleport.templates.common.FirestoreNestedValueProvider;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.templates.common.JavascriptTextTransformer.TransformTextViaJavascript;
import com.google.cloud.teleport.templates.common.TextConverters.FilesystemWriteOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
/**
* Dataflow template which copies Datastore Entities to a Text sink. Text is encoded using JSON
* encoded entity in the v1/Entity rest format:
* https://cloud.google.com/datastore/docs/reference/rest/v1/Entity
*/
@Template(
name = "Datastore_to_GCS_Text",
category = TemplateCategory.BATCH,
displayName = "Datastore to Text Files on Cloud Storage [Deprecated]",
description =
"Batch pipeline. Reads Datastore entities and writes them to Cloud Storage as text files.",
optionsClass = DatastoreToTextOptions.class,
skipOptions = {"firestoreReadNamespace", "firestoreReadGqlQuery", "firestoreReadProjectId"},
contactInformation = "https://cloud.google.com/support")
@Template(
name = "Firestore_to_GCS_Text",
category = TemplateCategory.BATCH,
displayName = "Firestore (Datastore mode) to Text Files on Cloud Storage",
description =
"Batch pipeline. Reads Firestore entities and writes them to Cloud Storage as text files.",
optionsClass = DatastoreToTextOptions.class,
skipOptions = {"datastoreReadNamespace", "datastoreReadGqlQuery", "datastoreReadProjectId"},
contactInformation = "https://cloud.google.com/support")
public class DatastoreToText {
public static ValueProvider<String> selectProvidedInput(
ValueProvider<String> datastoreInput, ValueProvider<String> firestoreInput) {
return new FirestoreNestedValueProvider(datastoreInput, firestoreInput);
}
/** Custom PipelineOptions. */
public interface DatastoreToTextOptions
extends PipelineOptions,
DatastoreReadOptions,
JavascriptTextTransformerOptions,
FilesystemWriteOptions {}
/**
* Runs a pipeline which reads in Entities from Datastore, passes in the JSON encoded Entities to
* a Javascript UDF, and writes the JSON to TextIO sink.
*
* @param args arguments to the pipeline
*/
public static void main(String[] args) {
DatastoreToTextOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(DatastoreToTextOptions.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
.apply(
ReadJsonEntities.newBuilder()
.setGqlQuery(
selectProvidedInput(
options.getDatastoreReadGqlQuery(), options.getFirestoreReadGqlQuery()))
.setProjectId(
selectProvidedInput(
options.getDatastoreReadProjectId(), options.getFirestoreReadProjectId()))
.setNamespace(
selectProvidedInput(
options.getDatastoreReadNamespace(), options.getFirestoreReadNamespace()))
.build())
.apply(
TransformTextViaJavascript.newBuilder()
.setFileSystemPath(options.getJavascriptTextTransformGcsPath())
.setFunctionName(options.getJavascriptTextTransformFunctionName())
.build())
.apply(TextIO.write().to(options.getTextWritePrefix()).withSuffix(".json"));
pipeline.run();
}
}
Cloud Spanner to Cloud Storage Avro
Cloud Spanner to Avro Files on Cloud Storage テンプレートは、Cloud Spanner データベース全体を Avro 形式で Cloud Storage にエクスポートするバッチ パイプラインです。Cloud Spanner データベースをエクスポートすると、選択したバケット内にフォルダが作成されます。フォルダには以下が含まれています。
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.spanner;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateCreationParameter;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.spanner.ExportPipeline.ExportPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
/** Dataflow template that exports a Cloud Spanner database to Avro files in GCS. */
@Template(
name = "Cloud_Spanner_to_GCS_Avro",
category = TemplateCategory.BATCH,
displayName = "Cloud Spanner to Avro Files on Cloud Storage",
description =
"A pipeline to export a Cloud Spanner database to a set of Avro files in Cloud Storage.",
optionsClass = ExportPipelineOptions.class,
contactInformation = "https://cloud.google.com/support")
public class ExportPipeline {
/** Options for Export pipeline. */
public interface ExportPipelineOptions extends PipelineOptions {
@TemplateParameter.Text(
order = 1,
regexes = {"[a-z][a-z0-9\\-]*[a-z0-9]"},
description = "Cloud Spanner instance id",
helpText = "The instance id of the Cloud Spanner database that you want to export.")
ValueProvider<String> getInstanceId();
void setInstanceId(ValueProvider<String> value);
@TemplateParameter.Text(
order = 2,
regexes = {"[a-z][a-z0-9_\\-]*[a-z0-9]"},
description = "Cloud Spanner database id",
helpText = "The database id of the Cloud Spanner database that you want to export.")
ValueProvider<String> getDatabaseId();
void setDatabaseId(ValueProvider<String> value);
@TemplateParameter.GcsWriteFolder(
order = 3,
description = "Cloud Storage output directory",
helpText =
"The Cloud Storage path where the Avro files should be exported to. A new directory will be created under this path that contains the export.",
example = "gs://your-bucket/your-path")
ValueProvider<String> getOutputDir();
void setOutputDir(ValueProvider<String> value);
@TemplateParameter.GcsWriteFolder(
order = 4,
optional = true,
description = "Cloud Storage temp directory for storing Avro files",
helpText =
"The Cloud Storage path where the temporary Avro files can be created. Ex: gs://your-bucket/your-path")
ValueProvider<String> getAvroTempDirectory();
void setAvroTempDirectory(ValueProvider<String> value);
@TemplateCreationParameter(value = "")
@Description("Test dataflow job identifier for Beam Direct Runner")
@Default.String(value = "")
ValueProvider<String> getTestJobId();
void setTestJobId(ValueProvider<String> jobId);
@TemplateParameter.Text(
order = 6,
optional = true,
description = "Cloud Spanner Endpoint to call",
helpText = "The Cloud Spanner endpoint to call in the template. Only used for testing.",
example = "https://batch-spanner.googleapis.com")
@Default.String("https://batch-spanner.googleapis.com")
ValueProvider<String> getSpannerHost();
void setSpannerHost(ValueProvider<String> value);
@TemplateCreationParameter(value = "false")
@Description("If true, wait for job finish")
@Default.Boolean(true)
boolean getWaitUntilFinish();
void setWaitUntilFinish(boolean value);
@TemplateParameter.Text(
order = 7,
optional = true,
regexes = {
"^([0-9]{4})-([0-9]{2})-([0-9]{2})T([0-9]{2}):([0-9]{2}):(([0-9]{2})(\\.[0-9]+)?)Z$"
},
description = "Snapshot time",
helpText =
"Specifies the snapshot time as RFC 3339 format in UTC time without the timezone offset(always ends in 'Z'). Timestamp must be in the past and Maximum timestamp staleness applies. See https://cloud.google.com/spanner/docs/timestamp-bounds#maximum_timestamp_staleness",
example = "1990-12-31T23:59:59Z")
@Default.String(value = "")
ValueProvider<String> getSnapshotTime();
void setSnapshotTime(ValueProvider<String> value);
@TemplateParameter.ProjectId(
order = 8,
optional = true,
description = "Cloud Spanner Project Id",
helpText = "The project id of the Cloud Spanner instance.")
ValueProvider<String> getSpannerProjectId();
void setSpannerProjectId(ValueProvider<String> value);
@TemplateParameter.Boolean(
order = 9,
optional = true,
description = "Export Timestamps as Timestamp-micros type",
helpText =
"If true, Timestamps are exported as timestamp-micros type. Timestamps are exported as ISO8601 strings at nanosecond precision by default.")
@Default.Boolean(false)
ValueProvider<Boolean> getShouldExportTimestampAsLogicalType();
void setShouldExportTimestampAsLogicalType(ValueProvider<Boolean> value);
@TemplateParameter.Text(
order = 10,
optional = true,
regexes = {"^[a-zA-Z0-9_]+(,[a-zA-Z0-9_]+)*$"},
description = "Cloud Spanner table name(s).",
helpText =
"If provided, only this comma separated list of tables are exported. Ancestor tables and tables that are referenced via foreign keys are required. If not explicitly listed, the `shouldExportRelatedTables` flag must be set for a successful export.")
@Default.String(value = "")
ValueProvider<String> getTableNames();
void setTableNames(ValueProvider<String> value);
@TemplateParameter.Boolean(
order = 11,
optional = true,
description = "Export necessary Related Spanner tables.",
helpText =
"Used in conjunction with `tableNames`. If true, add related tables necessary for the export, such as interleaved parent tables and foreign keys tables. If `tableNames` is specified but doesn't include related tables, this option must be set to true for a successful export.")
@Default.Boolean(false)
ValueProvider<Boolean> getShouldExportRelatedTables();
void setShouldExportRelatedTables(ValueProvider<Boolean> value);
@TemplateParameter.Enum(
order = 12,
enumOptions = {"LOW", "MEDIUM", "HIGH"},
optional = true,
description = "Priority for Spanner RPC invocations",
helpText =
"The request priority for Cloud Spanner calls. The value must be one of: [HIGH,MEDIUM,LOW].")
ValueProvider<RpcPriority> getSpannerPriority();
void setSpannerPriority(ValueProvider<RpcPriority> value);
}
/**
* Runs a pipeline to export a Cloud Spanner database to Avro files.
*
* @param args arguments to the pipeline
*/
public static void main(String[] args) {
ExportPipelineOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(ExportPipelineOptions.class);
Pipeline p = Pipeline.create(options);
SpannerConfig spannerConfig =
SpannerConfig.create()
// Temporary fix explicitly setting SpannerConfig.projectId to the default project
// if spannerProjectId is not provided as a parameter. Required as of Beam 2.38,
// which no longer accepts null label values on metrics, and SpannerIO#setup() has
// a bug resulting in the label value being set to the original parameter value,
// with no fallback to the default project.
// TODO: remove NestedValueProvider when this is fixed in Beam.
.withProjectId(
NestedValueProvider.of(
options.getSpannerProjectId(),
(SerializableFunction<String, String>)
input -> input != null ? input : SpannerOptions.getDefaultProjectId()))
.withHost(options.getSpannerHost())
.withInstanceId(options.getInstanceId())
.withDatabaseId(options.getDatabaseId())
.withRpcPriority(options.getSpannerPriority());
p.begin()
.apply(
"Run Export",
new ExportTransform(
spannerConfig,
options.getOutputDir(),
options.getTestJobId(),
options.getSnapshotTime(),
options.getTableNames(),
options.getShouldExportRelatedTables(),
options.getShouldExportTimestampAsLogicalType(),
options.getAvroTempDirectory()));
PipelineResult result = p.run();
if (options.getWaitUntilFinish()
&&
/* Only if template location is null, there is a dataflow job to wait for. Else it's
* template generation which doesn't start a dataflow job.
*/
options.as(DataflowPipelineOptions.class).getTemplateLocation() == null) {
result.waitUntilFinish();
}
}
}
Cloud Spanner to Cloud Storage Text
Cloud Spanner to Cloud Storage Text テンプレートは、Cloud Spanner テーブルからデータを読み取り、CSV テキスト ファイルとして Cloud Storage に書き込むバッチ パイプラインです。
このパイプラインの要件:
パイプラインを実行する前に、入力 Spanner テーブルが存在すること。
テンプレートのパラメータ
パラメータ
説明
spannerProjectId
データを読み取る Cloud Spanner データベースの Google Cloud プロジェクト ID。
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.templates;
import static com.google.cloud.teleport.util.ValueProviderUtils.eitherOrValueProvider;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.SpannerToText.SpannerToTextOptions;
import com.google.cloud.teleport.templates.common.SpannerConverters;
import com.google.cloud.teleport.templates.common.SpannerConverters.CreateTransactionFnWithTimestamp;
import com.google.cloud.teleport.templates.common.SpannerConverters.SpannerReadOptions;
import com.google.cloud.teleport.templates.common.TextConverters.FilesystemWriteOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResourceId;
import org.apache.beam.sdk.io.gcp.spanner.LocalSpannerIO;
import org.apache.beam.sdk.io.gcp.spanner.ReadOperation;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.io.gcp.spanner.Transaction;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.View;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.TypeDescriptors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Dataflow template which copies a Spanner table to a Text sink. It exports a Spanner table using
* <a href="https://cloud.google.com/spanner/docs/reads#read_data_in_parallel">Batch API</a>, which
* creates multiple workers in parallel for better performance. The result is written to a CSV file
* in Google Cloud Storage. The table schema file is saved in json format along with the exported
* table.
*
* <p>Schema file sample: { "id":"INT64", "name":"STRING(MAX)" }
*
* <p>A sample run:
*
* <pre>
* mvn compile exec:java \
* -Dexec.mainClass=com.google.cloud.teleport.templates.SpannerToText \
* -Dexec.args="--runner=DataflowRunner \
* --spannerProjectId=projectId \
* --gcpTempLocation=gs://gsTmpLocation \
* --spannerInstanceId=instanceId \
* --spannerDatabaseId=databaseId \
* --spannerTable=table_name \
* --spannerSnapshotTime=snapshot_time \
* --textWritePrefix=gcsOutputPath"
* </pre>
*/
@Template(
name = "Spanner_to_GCS_Text",
category = TemplateCategory.BATCH,
displayName = "Cloud Spanner to Text Files on Cloud Storage",
description =
"A pipeline which reads in Cloud Spanner table and writes it to Cloud Storage as CSV text files.",
optionsClass = SpannerToTextOptions.class,
contactInformation = "https://cloud.google.com/support")
public class SpannerToText {
private static final Logger LOG = LoggerFactory.getLogger(SpannerToText.class);
/** Custom PipelineOptions. */
public interface SpannerToTextOptions
extends PipelineOptions, SpannerReadOptions, FilesystemWriteOptions {
@TemplateParameter.GcsWriteFolder(
order = 1,
optional = true,
description = "Cloud Storage temp directory for storing CSV files",
helpText = "The Cloud Storage path where the temporary CSV files can be stored.",
example = "gs://your-bucket/your-path")
ValueProvider<String> getCsvTempDirectory();
@SuppressWarnings("unused")
void setCsvTempDirectory(ValueProvider<String> value);
@TemplateParameter.Enum(
order = 2,
enumOptions = {"LOW", "MEDIUM", "HIGH"},
optional = true,
description = "Priority for Spanner RPC invocations",
helpText =
"The request priority for Cloud Spanner calls. The value must be one of: [HIGH,MEDIUM,LOW].")
ValueProvider<RpcPriority> getSpannerPriority();
void setSpannerPriority(ValueProvider<RpcPriority> value);
}
/**
* Runs a pipeline which reads in Records from Spanner, and writes the CSV to TextIO sink.
*
* @param args arguments to the pipeline
*/
public static void main(String[] args) {
LOG.info("Starting pipeline setup");
PipelineOptionsFactory.register(SpannerToTextOptions.class);
SpannerToTextOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(SpannerToTextOptions.class);
FileSystems.setDefaultPipelineOptions(options);
Pipeline pipeline = Pipeline.create(options);
SpannerConfig spannerConfig =
SpannerConfig.create()
.withHost(options.getSpannerHost())
.withProjectId(options.getSpannerProjectId())
.withInstanceId(options.getSpannerInstanceId())
.withDatabaseId(options.getSpannerDatabaseId())
.withRpcPriority(options.getSpannerPriority());
PTransform<PBegin, PCollection<ReadOperation>> spannerExport =
SpannerConverters.ExportTransformFactory.create(
options.getSpannerTable(),
spannerConfig,
options.getTextWritePrefix(),
options.getSpannerSnapshotTime());
/* CreateTransaction and CreateTransactionFn classes in LocalSpannerIO
* only take a timestamp object for exact staleness which works when
* parameters are provided during template compile time. They do not work with
* a Timestamp valueProvider which can take parameters at runtime. Hence a new
* ParDo class CreateTransactionFnWithTimestamp had to be created for this
* purpose.
*/
PCollectionView<Transaction> tx =
pipeline
.apply("Setup for Transaction", Create.of(1))
.apply(
"Create transaction",
ParDo.of(
new CreateTransactionFnWithTimestamp(
spannerConfig, options.getSpannerSnapshotTime())))
.apply("As PCollectionView", View.asSingleton());
PCollection<String> csv =
pipeline
.apply("Create export", spannerExport)
// We need to use LocalSpannerIO.readAll() instead of LocalSpannerIO.read()
// because ValueProvider parameters such as table name required for
// LocalSpannerIO.read() can be read only inside DoFn but LocalSpannerIO.read() is of
// type PTransform<PBegin, Struct>, which prevents prepending it with DoFn that reads
// these parameters at the pipeline execution time.
.apply(
"Read all records",
LocalSpannerIO.readAll().withTransaction(tx).withSpannerConfig(spannerConfig))
.apply(
"Struct To Csv",
MapElements.into(TypeDescriptors.strings())
.via(struct -> (new SpannerConverters.StructCsvPrinter()).print(struct)));
ValueProvider<ResourceId> tempDirectoryResource =
ValueProvider.NestedValueProvider.of(
eitherOrValueProvider(options.getCsvTempDirectory(), options.getTextWritePrefix()),
(SerializableFunction<String, ResourceId>) s -> FileSystems.matchNewResource(s, true));
csv.apply(
"Write to storage",
TextIO.write()
.to(options.getTextWritePrefix())
.withSuffix(".csv")
.withTempDirectory(tempDirectoryResource));
pipeline.run();
LOG.info("Completed pipeline setup");
}
}
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.bigtable;
import com.google.bigtable.v2.Mutation;
import com.google.bigtable.v2.Mutation.SetCell;
import com.google.cloud.teleport.bigtable.AvroToBigtable.Options;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.common.base.MoreObjects;
import com.google.common.collect.ImmutableList;
import com.google.protobuf.ByteString;
import java.nio.ByteBuffer;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.AvroIO;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Dataflow pipeline that imports data from Avro files in GCS to a Cloud Bigtable table. The Cloud
* Bigtable table must be created before running the pipeline and must have a compatible table
* schema. For example, if {@link BigtableCell} from the Avro files has a 'family' of "f1", the
* Bigtable table should have a column family of "f1".
*/
@Template(
name = "GCS_Avro_to_Cloud_Bigtable",
category = TemplateCategory.BATCH,
displayName = "Avro Files on Cloud Storage to Cloud Bigtable",
description =
"A pipeline which reads data from Avro files in Cloud Storage and writes it to Cloud Bigtable table.",
optionsClass = Options.class,
contactInformation = "https://cloud.google.com/support")
public final class AvroToBigtable {
private static final Logger LOG = LoggerFactory.getLogger(AvroToBigtable.class);
/** Maximum number of mutations allowed per row by Cloud bigtable. */
private static final int MAX_MUTATIONS_PER_ROW = 100000;
private static final Boolean DEFAULT_SPLIT_LARGE_ROWS = false;
/** Options for the import pipeline. */
public interface Options extends PipelineOptions {
@TemplateParameter.ProjectId(
order = 1,
description = "Project ID",
helpText =
"The ID of the Google Cloud project of the Cloud Bigtable instance that you want to write data to")
ValueProvider<String> getBigtableProjectId();
@SuppressWarnings("unused")
void setBigtableProjectId(ValueProvider<String> projectId);
@TemplateParameter.Text(
order = 2,
regexes = {"[a-z][a-z0-9\\-]+[a-z0-9]"},
description = "Instance ID",
helpText = "The ID of the Cloud Bigtable instance that contains the table")
ValueProvider<String> getBigtableInstanceId();
@SuppressWarnings("unused")
void setBigtableInstanceId(ValueProvider<String> instanceId);
@TemplateParameter.Text(
order = 4,
regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
description = "Table ID",
helpText = "The ID of the Cloud Bigtable table to write")
ValueProvider<String> getBigtableTableId();
@SuppressWarnings("unused")
void setBigtableTableId(ValueProvider<String> tableId);
@TemplateParameter.GcsReadFile(
order = 5,
description = "Input Cloud Storage File(s)",
helpText = "The Cloud Storage location of the files you'd like to process.",
example = "gs://your-bucket/your-files/*.avro")
ValueProvider<String> getInputFilePattern();
@SuppressWarnings("unused")
void setInputFilePattern(ValueProvider<String> inputFilePattern);
@TemplateParameter.Boolean(
order = 6,
optional = true,
description = "If true, large rows will be split into multiple MutateRows requests",
helpText =
"The flag for enabling splitting of large rows into multiple MutateRows requests. Note that when a large row is split between multiple API calls, the updates to the row are not atomic. ")
ValueProvider<Boolean> getSplitLargeRows();
void setSplitLargeRows(ValueProvider<Boolean> splitLargeRows);
}
/**
* Runs a pipeline to import Avro files in GCS to a Cloud Bigtable table.
*
* @param args arguments to the pipeline
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
PipelineResult result = run(options);
// Wait for pipeline to finish only if it is not constructing a template.
if (options.as(DataflowPipelineOptions.class).getTemplateLocation() == null) {
result.waitUntilFinish();
}
}
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(PipelineUtils.tweakPipelineOptions(options));
BigtableIO.Write write =
BigtableIO.write()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId());
pipeline
.apply("Read from Avro", AvroIO.read(BigtableRow.class).from(options.getInputFilePattern()))
.apply(
"Transform to Bigtable",
ParDo.of(
AvroToBigtableFn.createWithSplitLargeRows(
options.getSplitLargeRows(), MAX_MUTATIONS_PER_ROW)))
.apply("Write to Bigtable", write);
return pipeline.run();
}
/**
* Translates {@link BigtableRow} to {@link Mutation}s along with a row key. The mutations are
* {@link SetCell}s that set the value for specified cells with family name, column qualifier and
* timestamp.
*/
static class AvroToBigtableFn extends DoFn<BigtableRow, KV<ByteString, Iterable<Mutation>>> {
private final ValueProvider<Boolean> splitLargeRowsFlag;
private Boolean splitLargeRows;
private final int maxMutationsPerRow;
public static AvroToBigtableFn create() {
return new AvroToBigtableFn(StaticValueProvider.of(false), MAX_MUTATIONS_PER_ROW);
}
public static AvroToBigtableFn createWithSplitLargeRows(
ValueProvider<Boolean> splitLargeRowsFlag, int maxMutationsPerRequest) {
return new AvroToBigtableFn(splitLargeRowsFlag, maxMutationsPerRequest);
}
private AvroToBigtableFn(
ValueProvider<Boolean> splitLargeRowsFlag, int maxMutationsPerRequest) {
this.splitLargeRowsFlag = splitLargeRowsFlag;
this.maxMutationsPerRow = maxMutationsPerRequest;
}
@Setup
public void setup() {
if (splitLargeRowsFlag != null) {
splitLargeRows = splitLargeRowsFlag.get();
}
splitLargeRows = MoreObjects.firstNonNull(splitLargeRows, DEFAULT_SPLIT_LARGE_ROWS);
LOG.info("splitLargeRows set to: " + splitLargeRows);
}
@ProcessElement
public void processElement(
@Element BigtableRow row, OutputReceiver<KV<ByteString, Iterable<Mutation>>> out) {
ByteString key = toByteString(row.getKey());
// BulkMutation doesn't split rows. Currently, if a single row contains more than 100,000
// mutations, the service will fail the request.
ImmutableList.Builder<Mutation> mutations = ImmutableList.builder();
int cellsProcessed = 0;
for (BigtableCell cell : row.getCells()) {
SetCell setCell =
SetCell.newBuilder()
.setFamilyName(cell.getFamily().toString())
.setColumnQualifier(toByteString(cell.getQualifier()))
.setTimestampMicros(cell.getTimestamp())
.setValue(toByteString(cell.getValue()))
.build();
mutations.add(Mutation.newBuilder().setSetCell(setCell).build());
cellsProcessed++;
if (this.splitLargeRows && cellsProcessed % maxMutationsPerRow == 0) {
// Send a MutateRow request when we have accumulated max mutations per row.
out.output(KV.of(key, mutations.build()));
mutations = ImmutableList.builder();
}
}
// Flush any remaining mutations.
ImmutableList remainingMutations = mutations.build();
if (!remainingMutations.isEmpty()) {
out.output(KV.of(key, remainingMutations));
}
}
}
/** Copies the content in {@code byteBuffer} into a {@link ByteString}. */
protected static ByteString toByteString(ByteBuffer byteBuffer) {
return ByteString.copyFrom(byteBuffer.array());
}
}
/*
* Copyright (C) 2018 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.spanner;
import com.google.cloud.spanner.Options.RpcPriority;
import com.google.cloud.spanner.SpannerOptions;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateCreationParameter;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.spanner.ImportPipeline.Options;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.transforms.SerializableFunction;
/** Avro to Cloud Spanner Import pipeline. */
@Template(
name = "GCS_Avro_to_Cloud_Spanner",
category = TemplateCategory.BATCH,
displayName = "Avro Files on Cloud Storage to Cloud Spanner",
description =
"A pipeline to import a Cloud Spanner database from a set of Avro files in Cloud Storage.",
optionsClass = Options.class,
contactInformation = "https://cloud.google.com/support")
public class ImportPipeline {
/** Options for {@link ImportPipeline}. */
public interface Options extends PipelineOptions {
@TemplateParameter.Text(
order = 1,
regexes = {"^[a-z0-9\\-]+$"},
description = "Cloud Spanner instance id",
helpText = "The instance id of the Cloud Spanner database that you want to import to.")
ValueProvider<String> getInstanceId();
void setInstanceId(ValueProvider<String> value);
@TemplateParameter.Text(
order = 2,
regexes = {"^[a-z_0-9\\-]+$"},
description = "Cloud Spanner database id",
helpText =
"The database id of the Cloud Spanner database that you want to import into (must already exist).")
ValueProvider<String> getDatabaseId();
void setDatabaseId(ValueProvider<String> value);
@TemplateParameter.GcsReadFolder(
order = 3,
description = "Cloud storage input directory",
helpText = "The Cloud Storage path where the Avro files should be imported from.")
ValueProvider<String> getInputDir();
void setInputDir(ValueProvider<String> value);
@TemplateParameter.Text(
order = 4,
optional = true,
description = "Cloud Spanner Endpoint to call",
helpText = "The Cloud Spanner endpoint to call in the template. Only used for testing.",
example = "https://batch-spanner.googleapis.com")
@Default.String("https://batch-spanner.googleapis.com")
ValueProvider<String> getSpannerHost();
void setSpannerHost(ValueProvider<String> value);
@TemplateParameter.Boolean(
order = 5,
optional = true,
description = "Wait for Indexes",
helpText =
"By default the import pipeline is not blocked on index creation, and it "
+ "may complete with indexes still being created in the background. In testing, it may "
+ "be useful to set this option to false so that the pipeline waits until indexes are "
+ "finished.")
@Default.Boolean(false)
ValueProvider<Boolean> getWaitForIndexes();
void setWaitForIndexes(ValueProvider<Boolean> value);
@TemplateParameter.Boolean(
order = 6,
optional = true,
description = "Wait for Foreign Keys",
helpText =
"By default the import pipeline is not blocked on foreign key creation, and it may complete"
+ " with foreign keys still being created in the background. In testing, it may be"
+ " useful to set this option to false so that the pipeline waits until foreign keys"
+ " are finished.")
@Default.Boolean(false)
ValueProvider<Boolean> getWaitForForeignKeys();
void setWaitForForeignKeys(ValueProvider<Boolean> value);
@TemplateParameter.Boolean(
order = 7,
optional = true,
description = "Wait for Foreign Keys",
helpText =
"By default the import pipeline is blocked on change stream creation. If false, it may"
+ " complete with change streams still being created in the background.")
@Default.Boolean(true)
ValueProvider<Boolean> getWaitForChangeStreams();
void setWaitForChangeStreams(ValueProvider<Boolean> value);
@TemplateParameter.Boolean(
order = 8,
optional = true,
description = "Create Indexes early",
helpText =
"Flag to turn off early index creation if there are many indexes. Indexes and Foreign keys are created after dataload. If there are more than "
+ "40 DDL statements to be executed after dataload, it is preferable to create the "
+ "indexes before datalod. This is the flag to turn the feature off.")
@Default.Boolean(true)
ValueProvider<Boolean> getEarlyIndexCreateFlag();
void setEarlyIndexCreateFlag(ValueProvider<Boolean> value);
@TemplateCreationParameter(value = "false")
@Description("If true, wait for job finish")
@Default.Boolean(true)
boolean getWaitUntilFinish();
@TemplateParameter.ProjectId(
order = 9,
optional = true,
description = "Cloud Spanner Project Id",
helpText = "The project id of the Cloud Spanner instance.")
ValueProvider<String> getSpannerProjectId();
void setSpannerProjectId(ValueProvider<String> value);
void setWaitUntilFinish(boolean value);
@TemplateParameter.Text(
order = 10,
optional = true,
regexes = {"[0-9]+"},
description = "DDL Creation timeout in minutes",
helpText = "DDL Creation timeout in minutes.")
@Default.Integer(30)
ValueProvider<Integer> getDDLCreationTimeoutInMinutes();
void setDDLCreationTimeoutInMinutes(ValueProvider<Integer> value);
@TemplateParameter.Enum(
order = 11,
enumOptions = {"LOW", "MEDIUM", "HIGH"},
optional = true,
description = "Priority for Spanner RPC invocations",
helpText =
"The request priority for Cloud Spanner calls. The value must be one of: [HIGH,MEDIUM,LOW].")
ValueProvider<RpcPriority> getSpannerPriority();
void setSpannerPriority(ValueProvider<RpcPriority> value);
}
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
Pipeline p = Pipeline.create(options);
SpannerConfig spannerConfig =
SpannerConfig.create()
// Temporary fix explicitly setting SpannerConfig.projectId to the default project
// if spannerProjectId is not provided as a parameter. Required as of Beam 2.38,
// which no longer accepts null label values on metrics, and SpannerIO#setup() has
// a bug resulting in the label value being set to the original parameter value,
// with no fallback to the default project.
// TODO: remove NestedValueProvider when this is fixed in Beam.
.withProjectId(
NestedValueProvider.of(
options.getSpannerProjectId(),
(SerializableFunction<String, String>)
input -> input != null ? input : SpannerOptions.getDefaultProjectId()))
.withHost(options.getSpannerHost())
.withInstanceId(options.getInstanceId())
.withDatabaseId(options.getDatabaseId())
.withRpcPriority(options.getSpannerPriority());
p.apply(
new ImportTransform(
spannerConfig,
options.getInputDir(),
options.getWaitForIndexes(),
options.getWaitForForeignKeys(),
options.getWaitForChangeStreams(),
options.getEarlyIndexCreateFlag(),
options.getDDLCreationTimeoutInMinutes()));
PipelineResult result = p.run();
if (options.getWaitUntilFinish()
&&
/* Only if template location is null, there is a dataflow job to wait for. Else it's
* template generation which doesn't start a dataflow job.
*/
options.as(DataflowPipelineOptions.class).getTemplateLocation() == null) {
result.waitUntilFinish();
}
}
}