Mantenha tudo organizado com as coleções
Salve e categorize o conteúdo com base nas suas preferências.
O modelo do Bigtable para Cloud Storage Parquet é um pipeline que lê dados de uma
tabela do Bigtable e os grava em um bucket do Cloud Storage no formato Parquet.
É possível usar o modelo para mover dados do Bigtable para o Cloud Storage.
Requisitos de pipeline
A tabela do Bigtable precisa existir.
O bucket de saída do Cloud Storage precisa existir antes da execução do pipeline.
Parâmetros do modelo
Parâmetros obrigatórios
bigtableProjectId: o ID do projeto do Google Cloud que contém a instância do Cloud Bigtable em que você quer ler dados.
bigtableInstanceId: o ID da instância do Cloud Bigtable que contém a tabela.
bigtableTableId: o ID da tabela do Cloud Bigtable a ser exportada.
outputDirectory: o caminho e o prefixo do nome do arquivo para gravar arquivos de saída. Precisa terminar com uma barra. A formatação DateTime é usada para analisar o caminho do diretório em busca de formatadores de data e hora. Por exemplo: gs://your-bucket/your-path.
filenamePrefix: o prefixo do nome do arquivo Parquet. Por exemplo, "table1-". O padrão é: part.
Parâmetros opcionais
numShards: o número máximo de fragmentos de saída produzidos durante a gravação. Um número maior de fragmentos significa maior capacidade de gravação no Cloud Storage, mas um custo de agregação de dados potencialmente maior entre os fragmentos ao processar os arquivos de saída do Cloud Storage. O valor padrão é decidido pelo Dataflow.
Executar o modelo
Console
Acesse a página Criar job usando um modelo do Dataflow.
o nome da versão, como 2023-09-12-00_RC00, para usar uma versão específica do
modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket:
gs://dataflow-templates-REGION_NAME/
REGION_NAME:
a região em que você quer
implantar o job do Dataflow, por exemplo, us-central1
BIGTABLE_PROJECT_ID: o ID do projeto do Google Cloud da instância do Bigtable da qual você quer ler os dados.
INSTANCE_ID: o ID da instância do Bigtable que contém a tabela.
TABLE_ID: o ID da tabela do Cloud Bigtable a ser exportada.
OUTPUT_DIRECTORY: o caminho do Cloud Storage em que os dados são gravados, por exemplo, gs://mybucket/somefolder.
FILENAME_PREFIX: prefixo do nome de arquivo Parquet, por exemplo, output-.
NUM_SHARDS: o número de arquivos Parquet a serem gerados, por exemplo, 1.
API
Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a
API e os respectivos escopos de autorização, consulte
projects.templates.launch.
o nome da versão, como 2023-09-12-00_RC00, para usar uma versão específica do
modelo, que pode ser aninhada na respectiva pasta mãe datada no bucket:
gs://dataflow-templates-REGION_NAME/
LOCATION:
a região em que você quer
implantar o job do Dataflow, por exemplo, us-central1
BIGTABLE_PROJECT_ID: o ID do projeto do Google Cloud da instância do Bigtable da qual você quer ler os dados.
INSTANCE_ID: o ID da instância do Bigtable que contém a tabela.
TABLE_ID: o ID da tabela do Cloud Bigtable a ser exportada.
OUTPUT_DIRECTORY: o caminho do Cloud Storage em que os dados são gravados, por exemplo, gs://mybucket/somefolder.
FILENAME_PREFIX: prefixo do nome de arquivo Parquet, por exemplo, output-.
NUM_SHARDS: o número de arquivos Parquet a serem gerados, por exemplo, 1.
Código-fonte do modelo
Java
/*
* Copyright (C) 2019 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.bigtable;
import static com.google.cloud.teleport.bigtable.BigtableToAvro.toByteArray;
import com.google.bigtable.v2.Cell;
import com.google.bigtable.v2.Column;
import com.google.bigtable.v2.Family;
import com.google.bigtable.v2.Row;
import com.google.cloud.teleport.bigtable.BigtableToParquet.Options;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;
/**
* Dataflow pipeline that exports data from a Cloud Bigtable table to Parquet files in GCS.
* Currently, filtering on Cloud Bigtable table is not supported.
*
* <p>Check out <a
* href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_Bigtable_to_GCS_Parquet.md">README</a>
* for instructions on how to use or modify this template.
*/
@Template(
name = "Cloud_Bigtable_to_GCS_Parquet",
category = TemplateCategory.BATCH,
displayName = "Cloud Bigtable to Parquet Files on Cloud Storage",
description =
"The Bigtable to Cloud Storage Parquet template is a pipeline that reads data from a Bigtable table and writes it to a Cloud Storage bucket in Parquet format. "
+ "You can use the template to move data from Bigtable to Cloud Storage.",
optionsClass = Options.class,
documentation =
"https://cloud.google.com/dataflow/docs/guides/templates/provided/bigtable-to-parquet",
contactInformation = "https://cloud.google.com/support",
requirements = {
"The Bigtable table must exist.",
"The output Cloud Storage bucket must exist before running the pipeline."
})
public class BigtableToParquet {
/** Options for the export pipeline. */
public interface Options extends PipelineOptions {
@TemplateParameter.ProjectId(
order = 1,
description = "Project ID",
helpText =
"The ID of the Google Cloud project that contains the Cloud Bigtable instance that you want to read data from.")
ValueProvider<String> getBigtableProjectId();
@SuppressWarnings("unused")
void setBigtableProjectId(ValueProvider<String> projectId);
@TemplateParameter.Text(
order = 2,
regexes = {"[a-z][a-z0-9\\-]+[a-z0-9]"},
description = "Instance ID",
helpText = "The ID of the Cloud Bigtable instance that contains the table.")
ValueProvider<String> getBigtableInstanceId();
@SuppressWarnings("unused")
void setBigtableInstanceId(ValueProvider<String> instanceId);
@TemplateParameter.Text(
order = 3,
regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
description = "Table ID",
helpText = "The ID of the Cloud Bigtable table to export.")
ValueProvider<String> getBigtableTableId();
@SuppressWarnings("unused")
void setBigtableTableId(ValueProvider<String> tableId);
@TemplateParameter.GcsWriteFolder(
order = 4,
description = "Output file directory in Cloud Storage",
helpText =
"The path and filename prefix for writing output files. Must end with a slash. DateTime formatting is used to parse the directory path for date and time formatters. For example: gs://your-bucket/your-path.")
ValueProvider<String> getOutputDirectory();
@SuppressWarnings("unused")
void setOutputDirectory(ValueProvider<String> outputDirectory);
@TemplateParameter.Text(
order = 5,
description = "Parquet file prefix",
helpText =
"The prefix of the Parquet file name. For example, \"table1-\". Defaults to: part.")
@Default.String("part")
ValueProvider<String> getFilenamePrefix();
@SuppressWarnings("unused")
void setFilenamePrefix(ValueProvider<String> filenamePrefix);
@TemplateParameter.Integer(
order = 6,
optional = true,
description = "Maximum output shards",
helpText =
"The maximum number of output shards produced when writing. A higher number of shards means higher throughput for writing to Cloud Storage, but potentially higher data aggregation cost across shards when processing output Cloud Storage files. The default value is decided by Dataflow.")
@Default.Integer(0)
ValueProvider<Integer> getNumShards();
@SuppressWarnings("unused")
void setNumShards(ValueProvider<Integer> numShards);
}
/**
* Main entry point for pipeline execution.
*
* @param args Command line arguments to the pipeline.
*/
public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
PipelineResult result = run(options);
// Wait for pipeline to finish only if it is not constructing a template.
if (options.as(DataflowPipelineOptions.class).getTemplateLocation() == null) {
result.waitUntilFinish();
}
}
/**
* Runs a pipeline to export data from a Cloud Bigtable table to Parquet file(s) in GCS.
*
* @param options arguments to the pipeline
*/
public static PipelineResult run(Options options) {
Pipeline pipeline = Pipeline.create(PipelineUtils.tweakPipelineOptions(options));
BigtableIO.Read read =
BigtableIO.read()
.withProjectId(options.getBigtableProjectId())
.withInstanceId(options.getBigtableInstanceId())
.withTableId(options.getBigtableTableId());
// Do not validate input fields if it is running as a template.
if (options.as(DataflowPipelineOptions.class).getTemplateLocation() != null) {
read = read.withoutValidation();
}
/**
* Steps: 1) Read records from Bigtable. 2) Convert a Bigtable Row to a GenericRecord. 3) Write
* GenericRecord(s) to GCS in parquet format.
*/
pipeline
.apply("Read from Bigtable", read)
.apply("Transform to Parquet", MapElements.via(new BigtableToParquetFn()))
.setCoder(AvroCoder.of(GenericRecord.class, BigtableRow.getClassSchema()))
.apply(
"Write to Parquet in GCS",
FileIO.<GenericRecord>write()
.via(ParquetIO.sink(BigtableRow.getClassSchema()))
.to(options.getOutputDirectory())
.withPrefix(options.getFilenamePrefix())
.withSuffix(".parquet")
.withNumShards(options.getNumShards()));
return pipeline.run();
}
/**
* Translates a {@link PCollection} of Bigtable {@link Row} to a {@link PCollection} of {@link
* GenericRecord}.
*/
static class BigtableToParquetFn extends SimpleFunction<Row, GenericRecord> {
@Override
public GenericRecord apply(Row row) {
ByteBuffer key = ByteBuffer.wrap(toByteArray(row.getKey()));
List<BigtableCell> cells = new ArrayList<>();
for (Family family : row.getFamiliesList()) {
String familyName = family.getName();
for (Column column : family.getColumnsList()) {
ByteBuffer qualifier = ByteBuffer.wrap(toByteArray(column.getQualifier()));
for (Cell cell : column.getCellsList()) {
long timestamp = cell.getTimestampMicros();
ByteBuffer value = ByteBuffer.wrap(toByteArray(cell.getValue()));
cells.add(new BigtableCell(familyName, qualifier, timestamp, value));
}
}
}
return new GenericRecordBuilder(BigtableRow.getClassSchema())
.set("key", key)
.set("cells", cells)
.build();
}
}
}
[[["Fácil de entender","easyToUnderstand","thumb-up"],["Meu problema foi resolvido","solvedMyProblem","thumb-up"],["Outro","otherUp","thumb-up"]],[["Hard to understand","hardToUnderstand","thumb-down"],["Incorrect information or sample code","incorrectInformationOrSampleCode","thumb-down"],["Missing the information/samples I need","missingTheInformationSamplesINeed","thumb-down"],["Problema na tradução","translationIssue","thumb-down"],["Outro","otherDown","thumb-down"]],["Última atualização 2024-06-05 UTC."],[],[]]