Organiza tus páginas con colecciones
Guarda y categoriza el contenido según tus preferencias.
La plantilla de Bigtable a Cloud Storage en Parquet es una canalización que lee datos de una tabla de Bigtable y los escribe en un bucket de Cloud Storage en formato Parquet.
Puedes usar la plantilla para transferir datos de Bigtable a Cloud Storage.
Requisitos de la canalización
La tabla de Bigtable debe existir.
El bucket de Cloud Storage de salida debe existir antes de ejecutar la canalización.
Parámetros de la plantilla
Parámetros obligatorios
bigtableProjectId: ID del proyecto de Google Cloud de la instancia de Cloud Bigtable de la que quieres leer datos.
bigtableInstanceId: ID de la instancia de Cloud Bigtable que contiene la tabla.
bigtableTableId: El ID de la tabla de Cloud Bigtable para exportar.
outputDirectory: La ruta de acceso y el prefijo del nombre de archivo para escribir los archivos de salida. Debe terminar con una barra. El formato de fecha y hora se usa a fin de analizar la ruta del directorio para los formateadores de fecha y hora. Por ejemplo: gs://your-bucket/your-folder.
filenamePrefix: El prefijo del nombre del archivo Parquet. Por ejemplo, “table1-”. La configuración predeterminada es: part.
Parámetros opcionales
numShards: La cantidad máxima de fragmentos de salida que se produce con la escritura. Una mayor cantidad de fragmentos implica una mayor capacidad de procesamiento para la escritura en Cloud Storage, pero, también, un mayor costo de agregación de datos entre fragmentos cuando se procesan archivos de salida de Cloud Storage. Dataflow decide el valor predeterminado.
Ejecuta la plantilla
Console
Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
BIGTABLE_PROJECT_ID: Es el ID del proyecto de Google Cloud de la instancia de Bigtable del que deseas leer los datos.
INSTANCE_ID: Es el ID de la instancia de Bigtable que contiene la tabla.
TABLE_ID: Es el ID de la tabla de Bigtable que se exportará.
OUTPUT_DIRECTORY: La ruta de Cloud Storage en la que se escriben los datos, por ejemplo, gs://mybucket/somefolder
FILENAME_PREFIX: Es el prefijo del nombre del archivo Parquet, por ejemplo, output-
NUM_SHARDS: Es la cantidad de archivos de Parquet que se mostrarán, por ejemplo, 1
API
Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.
el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
BIGTABLE_PROJECT_ID: Es el ID del proyecto de Google Cloud de la instancia de Bigtable del que deseas leer los datos.
INSTANCE_ID: Es el ID de la instancia de Bigtable que contiene la tabla.
TABLE_ID: Es el ID de la tabla de Bigtable que se exportará.
OUTPUT_DIRECTORY: La ruta de Cloud Storage en la que se escriben los datos, por ejemplo, gs://mybucket/somefolder
FILENAME_PREFIX: Es el prefijo del nombre del archivo Parquet, por ejemplo, output-
NUM_SHARDS: Es la cantidad de archivos de Parquet que se mostrarán, por ejemplo, 1
Código fuente de la plantilla
Java
/*
* Copyright (C) 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.bigtable;
import static com.google.cloud.teleport.bigtable.BigtableToAvro.toByteArray;
import com.google.bigtable.v2.Cell;
import com.google.bigtable.v2.Column;
import com.google.bigtable.v2.Family;
import com.google.bigtable.v2.Row;
import com.google.cloud.teleport.bigtable.BigtableToParquet.Options;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
/**
* Dataflow pipeline that exports data from a Cloud Bigtable table to Parquet files in GCS.
* Currently, filtering on Cloud Bigtable table is not supported.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_Bigtable_to_GCS_Parquet.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Cloud_Bigtable_to_GCS_Parquet",
category = TemplateCategory.BATCH,
displayName = "Cloud Bigtable to Parquet Files on Cloud Storage",
description =
"The Bigtable to Cloud Storage Parquet template is a pipeline that reads data from a Bigtable table and writes it to a Cloud Storage bucket in Parquet format. "
+ "You can use the template to move data from Bigtable to Cloud Storage.",
optionsClass = Options.class,
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/bigtable-to-parquet",
contactInformation = "https://cloud.google.com/support",
requirements = {
"The Bigtable table must exist.",
"The output Cloud Storage bucket must exist before running the pipeline."
})
public class BigtableToParquet {
/** Options for the export pipeline. */
public interface Options extends PipelineOptions {
@TemplateParameter.ProjectId(
order = 1,
groupName = "Source",
description = "Project ID",
helpText =
"The ID of the Google Cloud project that contains the Cloud Bigtable instance that you want to read data from.")
ValueProvider<String> getBigtableProjectId();
@SuppressWarnings("unused")
void setBigtableProjectId(ValueProvider<String> projectId);
@TemplateParameter.Text(
order = 2,
groupName = "Source",
regexes = {"[a-z][a-z0-9\\-]+[a-z0-9]"},
description = "Instance ID",
helpText = "The ID of the Cloud Bigtable instance that contains the table.")
ValueProvider<String> getBigtableInstanceId();
@SuppressWarnings("unused")
void setBigtableInstanceId(ValueProvider<String> instanceId);
@TemplateParameter.Text(
order = 3,
groupName = "Source",
regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
description = "Table ID",
helpText = "The ID of the Cloud Bigtable table to export.")
ValueProvider<String> getBigtableTableId();
@SuppressWarnings("unused")
void setBigtableTableId(ValueProvider<String> tableId);
@TemplateParameter.GcsWriteFolder(
order = 4,
groupName = "Target",
description = "Output file directory in Cloud Storage",
helpText =
"The path and filename prefix for writing output files. Must end with a slash. DateTime formatting is used to parse the directory path for date and time formatters. For example: gs://your-bucket/your-path.")
ValueProvider<String> getOutputDirectory();
@SuppressWarnings("unused")
void setOutputDirectory(ValueProvider<String> outputDirectory);
@TemplateParameter.Text(
order = 5,
groupName = "Target",
description = "Parquet file prefix",
helpText =
"The prefix of the Parquet file name. For example, \"table1-\". Defaults to: part.")
@Default.String("part")
ValueProvider<String> getFilenamePrefix();
@SuppressWarnings("unused")
void setFilenamePrefix(ValueProvider<String> filenamePrefix);
@TemplateParameter.Integer(
order = 6,
groupName = "Target",
optional = true,
description = "Maximum output shards",
helpText =
"The maximum number of output shards produced when writing. A higher number of shards means higher throughput for writing to Cloud Storage, but potentially higher data aggregation cost across shards when processing output Cloud Storage files. The default value is decided by Dataflow.")
@Default.Integer(0)
ValueProvider<Integer> getNumShards();
@SuppressWarnings("unused")
void setNumShards(ValueProvider<Integer> numShards);
}
/**
* Main entry point for pipeline execution.
*
* @param args Command line arguments to the pipeline.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
PipelineResult result = run(options);
// Wait for pipeline to finish only if it is not constructing a template.
if (options.as(DataflowPipelineOptions.class).getTemplateLocation() == null) {
result.waitUntilFinish();
}
}
/**
* Runs a pipeline to export data from a Cloud Bigtable table to Parquet file(s) in GCS.
*
* @param options arguments to the pipeline
*/
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(PipelineUtils.tweakPipelineOptions(options));
BigtableIO.Read read =
BigtableIO.read()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId());
// Do not validate input fields if it is running as a template.
if (options.as(DataflowPipelineOptions.class).getTemplateLocation() != null) {
read = read.withoutValidation();
}
/**
* Steps: 1) Read records from Bigtable. 2) Convert a Bigtable Row to a GenericRecord. 3) Write
* GenericRecord(s) to GCS in parquet format.
*/
pipeline
.apply("Read from Bigtable", read)
.apply("Transform to Parquet", MapElements.via(new BigtableToParquetFn()))
.setCoder(AvroCoder.of(GenericRecord.class, BigtableRow.getClassSchema()))
.apply(
"Write to Parquet in GCS",
FileIO.<GenericRecord>write()
.via(ParquetIO.sink(BigtableRow.getClassSchema()))
.to(options.getOutputDirectory())
.withPrefix(options.getFilenamePrefix())
.withSuffix(".parquet")
.withNumShards(options.getNumShards()));
return pipeline.run();
}
/**
* Translates a {@link PCollection} of Bigtable {@link Row} to a {@link PCollection} of {@link
* GenericRecord}.
*/
static class BigtableToParquetFn extends SimpleFunction<Row, GenericRecord> {
@Override
public GenericRecord apply(Row row) {
ByteBuffer key = ByteBuffer.wrap(toByteArray(row.getKey()));
List<BigtableCell> cells = new ArrayList<>();
for (Family family : row.getFamiliesList()) {
String familyName = family.getName();
for (Column column : family.getColumnsList()) {
ByteBuffer qualifier = ByteBuffer.wrap(toByteArray(column.getQualifier()));
for (Cell cell : column.getCellsList()) {
long timestamp = cell.getTimestampMicros();
ByteBuffer value = ByteBuffer.wrap(toByteArray(cell.getValue()));
cells.add(new BigtableCell(familyName, qualifier, timestamp, value));
}
}
}
return new GenericRecordBuilder(BigtableRow.getClassSchema())
.set("key", key)
.set("cells", cells)
.build();
}
}
}
[[["Fácil de comprender","easyToUnderstand","thumb-up"],["Resolvió mi problema","solvedMyProblem","thumb-up"],["Otro","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["Problema de traducción","translationIssue","thumb-down"],["Otro","otherDown","thumb-down"]],["Última actualización: 2024-07-19 (UTC)"],[],[]]