Bigtable to Cloud Storage Parquet 템플릿은 Bigtable 테이블에서 데이터를 읽어 Parquet 형식으로 Cloud Storage 버킷에 쓰는 파이프라인입니다.
템플릿을 사용하여 Bigtable에서 Cloud Storage로 데이터를 이동할 수 있습니다.
파이프라인 요구사항
Bigtable 테이블이 있어야 합니다.
파이프라인을 실행하기 전에 출력 Cloud Storage 버킷이 있어야 합니다.
템플릿 매개변수
필수 매개변수
bigtableProjectId : 데이터를 읽으려는 Cloud Bigtable 인스턴스가 포함된 Google Cloud 프로젝트 ID입니다.
bigtableInstanceId: 테이블이 있는 Cloud Bigtable 인스턴스의 ID입니다.
bigtableTableId: 내보낼 Cloud Bigtable 테이블의 ID입니다.
outputDirectory: 출력 파일을 쓰기 위한 경로와 파일 이름 프리픽스입니다. 슬래시로 끝나야 합니다. 날짜/시간 형식은 날짜 및 시간 형식 지정 도구의 디렉터리 경로를 파싱하는 데 사용됩니다. 예를 들면 gs://your-bucket/your-path입니다.
filenamePrefix: Parquet 파일 이름의 프리픽스입니다. 예를 들면 'table1-'입니다. 기본값은 part입니다.
선택적 매개변수
numShards: 쓰는 동안에 생성되는 최대 출력 샤드 수입니다. 샤드 수가 많을수록 Cloud Storage 쓰기 처리량이 높아지지만 출력 Cloud Storage 파일을 처리할 때 샤드 간에 데이터 집계 비용이 늘어날 수 있습니다. Dataflow에서 기본값을 결정합니다.
BIGTABLE_PROJECT_ID: 데이터를 읽으려는 Bigtable 인스턴스의 Google Cloud 프로젝트 ID
INSTANCE_ID: 테이블을 포함하는 Bigtable 인스턴스의 ID입니다.
TABLE_ID: 내보낼 Bigtable 테이블의 ID입니다.
OUTPUT_DIRECTORY: 데이터가 작성되는 Cloud Storage 경로입니다(예: gs://mybucket/somefolder).
FILENAME_PREFIX: Parquet 파일 이름의 프리픽스입니다(예: output-).
NUM_SHARDS: 출력할 Parquet 파일 수입니다(예: 1).
템플릿 소스 코드
Java
/*
* Copyright (C) 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.bigtable;
import static com.google.cloud.teleport.bigtable.BigtableToAvro.toByteArray;
import com.google.bigtable.v2.Cell;
import com.google.bigtable.v2.Column;
import com.google.bigtable.v2.Family;
import com.google.bigtable.v2.Row;
import com.google.cloud.teleport.bigtable.BigtableToParquet.Options;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
/**
* Dataflow pipeline that exports data from a Cloud Bigtable table to Parquet files in GCS.
* Currently, filtering on Cloud Bigtable table is not supported.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_Bigtable_to_GCS_Parquet.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Cloud_Bigtable_to_GCS_Parquet",
category = TemplateCategory.BATCH,
displayName = "Cloud Bigtable to Parquet Files on Cloud Storage",
description =
"The Bigtable to Cloud Storage Parquet template is a pipeline that reads data from a Bigtable table and writes it to a Cloud Storage bucket in Parquet format. "
+ "You can use the template to move data from Bigtable to Cloud Storage.",
optionsClass = Options.class,
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/bigtable-to-parquet",
contactInformation = "https://cloud.google.com/support",
requirements = {
"The Bigtable table must exist.",
"The output Cloud Storage bucket must exist before running the pipeline."
})
public class BigtableToParquet {
/** Options for the export pipeline. */
public interface Options extends PipelineOptions {
@TemplateParameter.ProjectId(
order = 1,
groupName = "Source",
description = "Project ID",
helpText =
"The ID of the Google Cloud project that contains the Cloud Bigtable instance that you want to read data from.")
ValueProvider<String> getBigtableProjectId();
@SuppressWarnings("unused")
void setBigtableProjectId(ValueProvider<String> projectId);
@TemplateParameter.Text(
order = 2,
groupName = "Source",
regexes = {"[a-z][a-z0-9\\-]+[a-z0-9]"},
description = "Instance ID",
helpText = "The ID of the Cloud Bigtable instance that contains the table.")
ValueProvider<String> getBigtableInstanceId();
@SuppressWarnings("unused")
void setBigtableInstanceId(ValueProvider<String> instanceId);
@TemplateParameter.Text(
order = 3,
groupName = "Source",
regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
description = "Table ID",
helpText = "The ID of the Cloud Bigtable table to export.")
ValueProvider<String> getBigtableTableId();
@SuppressWarnings("unused")
void setBigtableTableId(ValueProvider<String> tableId);
@TemplateParameter.GcsWriteFolder(
order = 4,
groupName = "Target",
description = "Output file directory in Cloud Storage",
helpText =
"The path and filename prefix for writing output files. Must end with a slash. DateTime formatting is used to parse the directory path for date and time formatters. For example: gs://your-bucket/your-path.")
ValueProvider<String> getOutputDirectory();
@SuppressWarnings("unused")
void setOutputDirectory(ValueProvider<String> outputDirectory);
@TemplateParameter.Text(
order = 5,
groupName = "Target",
description = "Parquet file prefix",
helpText =
"The prefix of the Parquet file name. For example, \"table1-\". Defaults to: part.")
@Default.String("part")
ValueProvider<String> getFilenamePrefix();
@SuppressWarnings("unused")
void setFilenamePrefix(ValueProvider<String> filenamePrefix);
@TemplateParameter.Integer(
order = 6,
groupName = "Target",
optional = true,
description = "Maximum output shards",
helpText =
"The maximum number of output shards produced when writing. A higher number of shards means higher throughput for writing to Cloud Storage, but potentially higher data aggregation cost across shards when processing output Cloud Storage files. The default value is decided by Dataflow.")
@Default.Integer(0)
ValueProvider<Integer> getNumShards();
@SuppressWarnings("unused")
void setNumShards(ValueProvider<Integer> numShards);
}
/**
* Main entry point for pipeline execution.
*
* @param args Command line arguments to the pipeline.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
PipelineResult result = run(options);
// Wait for pipeline to finish only if it is not constructing a template.
if (options.as(DataflowPipelineOptions.class).getTemplateLocation() == null) {
result.waitUntilFinish();
}
}
/**
* Runs a pipeline to export data from a Cloud Bigtable table to Parquet file(s) in GCS.
*
* @param options arguments to the pipeline
*/
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(PipelineUtils.tweakPipelineOptions(options));
BigtableIO.Read read =
BigtableIO.read()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId());
// Do not validate input fields if it is running as a template.
if (options.as(DataflowPipelineOptions.class).getTemplateLocation() != null) {
read = read.withoutValidation();
}
/**
* Steps: 1) Read records from Bigtable. 2) Convert a Bigtable Row to a GenericRecord. 3) Write
* GenericRecord(s) to GCS in parquet format.
*/
pipeline
.apply("Read from Bigtable", read)
.apply("Transform to Parquet", MapElements.via(new BigtableToParquetFn()))
.setCoder(AvroCoder.of(GenericRecord.class, BigtableRow.getClassSchema()))
.apply(
"Write to Parquet in GCS",
FileIO.<GenericRecord>write()
.via(ParquetIO.sink(BigtableRow.getClassSchema()))
.to(options.getOutputDirectory())
.withPrefix(options.getFilenamePrefix())
.withSuffix(".parquet")
.withNumShards(options.getNumShards()));
return pipeline.run();
}
/**
* Translates a {@link PCollection} of Bigtable {@link Row} to a {@link PCollection} of {@link
* GenericRecord}.
*/
static class BigtableToParquetFn extends SimpleFunction<Row, GenericRecord> {
@Override
public GenericRecord apply(Row row) {
ByteBuffer key = ByteBuffer.wrap(toByteArray(row.getKey()));
List<BigtableCell> cells = new ArrayList<>();
for (Family family : row.getFamiliesList()) {
String familyName = family.getName();
for (Column column : family.getColumnsList()) {
ByteBuffer qualifier = ByteBuffer.wrap(toByteArray(column.getQualifier()));
for (Cell cell : column.getCellsList()) {
long timestamp = cell.getTimestampMicros();
ByteBuffer value = ByteBuffer.wrap(toByteArray(cell.getValue()));
cells.add(new BigtableCell(familyName, qualifier, timestamp, value));
}
}
}
return new GenericRecordBuilder(BigtableRow.getClassSchema())
.set("key", key)
.set("cells", cells)
.build();
}
}
}