Modelo do Bigtable para Cloud Storage Parquet

O modelo do Bigtable para Cloud Storage Parquet é um pipeline que lê dados de uma tabela do Bigtable e os grava em um bucket do Cloud Storage no formato Parquet. É possível usar o modelo para mover dados do Bigtable para o Cloud Storage.

Requisitos de pipeline

  • A tabela do Bigtable precisa existir.
  • O bucket de saída do Cloud Storage precisa existir antes da execução do pipeline.

Parâmetros do modelo

Parâmetros obrigatórios

  • bigtableProjectId: o ID do projeto do Google Cloud que contém a instância do Cloud Bigtable em que você quer ler os dados.
  • bigtableInstanceId: o ID da instância do Cloud Bigtable que contém a tabela.
  • bigtableTableId: o ID da tabela do Cloud Bigtable a ser exportada.
  • outputDirectory: o caminho e o prefixo do nome do arquivo para gravar arquivos de saída. Precisa terminar com uma barra. A formatação DateTime é usada para analisar o caminho do diretório em busca de formatadores de data e hora. Por exemplo, gs://your-bucket/your-path.
  • filenamePrefix: o prefixo do nome do arquivo Parquet. Por exemplo, table1-. O padrão é part.

Parâmetros opcionais

  • numShards: o número máximo de fragmentos de saída produzidos durante a gravação. Um número maior de fragmentos significa maior capacidade de gravação no Cloud Storage, mas um custo de agregação de dados potencialmente maior entre os fragmentos ao processar os arquivos de saída do Cloud Storage. O valor padrão é decidido pelo Dataflow.
  • bigtableAppProfileId: o ID do perfil do aplicativo do Bigtable a ser usado na exportação. Se você não especificar um perfil de aplicativo, o Bigtable usará o perfil de aplicativo padrão da instância: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile.

Executar o modelo

  1. Acesse a página Criar job usando um modelo do Dataflow.
  2. Acesse Criar job usando um modelo
  3. No campo Nome do job, insira um nome exclusivo.
  4. Opcional: em Endpoint regional, selecione um valor no menu suspenso. A região padrão é us-central1.

    Para ver uma lista de regiões em que é possível executar um job do Dataflow, consulte Locais do Dataflow.

  5. No menu suspenso Modelo do Dataflow, selecione the Cloud Bigtable to Parquet Files on Cloud Storage template.
  6. Nos campos de parâmetro fornecidos, insira os valores de parâmetro.
  7. Cliquem em Executar job.

No shell ou no terminal, execute o modelo:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_Bigtable_to_GCS_Parquet \
    --region REGION_NAME \
    --parameters \
bigtableProjectId=BIGTABLE_PROJECT_ID,\
bigtableInstanceId=INSTANCE_ID,\
bigtableTableId=TABLE_ID,\
outputDirectory=OUTPUT_DIRECTORY,\
filenamePrefix=FILENAME_PREFIX,\
numShards=NUM_SHARDS

Substitua:

  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • REGION_NAME: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • BIGTABLE_PROJECT_ID: o ID do projeto Google Cloud da instância do Bigtable da qual você quer ler os dados.
  • INSTANCE_ID: o ID da instância do Bigtable que contém a tabela.
  • TABLE_ID: o ID da tabela do Cloud Bigtable a ser exportada.
  • OUTPUT_DIRECTORY: o caminho do Cloud Storage em que os dados são gravados, por exemplo, gs://mybucket/somefolder.
  • FILENAME_PREFIX: prefixo do nome de arquivo Parquet, por exemplo, output-.
  • NUM_SHARDS: o número de arquivos Parquet a serem gerados, por exemplo, 1.

Para executar o modelo usando a API REST, envie uma solicitação HTTP POST. Para mais informações sobre a API e os respectivos escopos de autorização, consulte projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_Bigtable_to_GCS_Parquet
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProjectId": "BIGTABLE_PROJECT_ID",
       "bigtableInstanceId": "INSTANCE_ID",
       "bigtableTableId": "TABLE_ID",
       "outputDirectory": "OUTPUT_DIRECTORY",
       "filenamePrefix": "FILENAME_PREFIX",
       "numShards": "NUM_SHARDS"
   },
   "environment": { "zone": "us-central1-f" }
}

Substitua:

  • PROJECT_ID: o ID do projeto do Google Cloud em que você quer executar o job do Dataflow
  • JOB_NAME: um nome de job de sua escolha
  • VERSION: a versão do modelo que você quer usar

    Use estes valores:

  • LOCATION: a região em que você quer implantar o job do Dataflow, por exemplo, us-central1
  • BIGTABLE_PROJECT_ID: o ID do projeto Google Cloud da instância do Bigtable da qual você quer ler os dados.
  • INSTANCE_ID: o ID da instância do Bigtable que contém a tabela.
  • TABLE_ID: o ID da tabela do Cloud Bigtable a ser exportada.
  • OUTPUT_DIRECTORY: o caminho do Cloud Storage em que os dados são gravados, por exemplo, gs://mybucket/somefolder.
  • FILENAME_PREFIX: prefixo do nome de arquivo Parquet, por exemplo, output-.
  • NUM_SHARDS: o número de arquivos Parquet a serem gerados, por exemplo, 1.
Java
/*
 * Copyright (C) 2019 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.bigtable;

import static com.google.cloud.teleport.bigtable.BigtableToAvro.toByteArray;

import com.google.bigtable.v2.Cell;
import com.google.bigtable.v2.Column;
import com.google.bigtable.v2.Family;
import com.google.bigtable.v2.Row;
import com.google.cloud.teleport.bigtable.BigtableToParquet.Options;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.generic.GenericRecordBuilder;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.beam.sdk.values.PCollection;

/**
 * Dataflow pipeline that exports data from a Cloud Bigtable table to Parquet files in GCS.
 * Currently, filtering on Cloud Bigtable table is not supported.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_Bigtable_to_GCS_Parquet.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_Bigtable_to_GCS_Parquet",
    category = TemplateCategory.BATCH,
    displayName = "Cloud Bigtable to Parquet Files on Cloud Storage",
    description =
        "The Bigtable to Cloud Storage Parquet template is a pipeline that reads data from a Bigtable table and writes it to a Cloud Storage bucket in Parquet format. "
            + "You can use the template to move data from Bigtable to Cloud Storage.",
    optionsClass = Options.class,
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/bigtable-to-parquet",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Bigtable table must exist.",
      "The output Cloud Storage bucket must exist before running the pipeline."
    })
public class BigtableToParquet {

  /** Options for the export pipeline. */
  public interface Options extends PipelineOptions {

    @TemplateParameter.ProjectId(
        order = 1,
        groupName = "Source",
        description = "Project ID",
        helpText =
            "The ID of the Google Cloud project that contains the Cloud Bigtable instance that you want to read data from.")
    ValueProvider<String> getBigtableProjectId();

    @SuppressWarnings("unused")
    void setBigtableProjectId(ValueProvider<String> projectId);

    @TemplateParameter.Text(
        order = 2,
        groupName = "Source",
        regexes = {"[a-z][a-z0-9\\-]+[a-z0-9]"},
        description = "Instance ID",
        helpText = "The ID of the Cloud Bigtable instance that contains the table.")
    ValueProvider<String> getBigtableInstanceId();

    @SuppressWarnings("unused")
    void setBigtableInstanceId(ValueProvider<String> instanceId);

    @TemplateParameter.Text(
        order = 3,
        groupName = "Source",
        regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
        description = "Table ID",
        helpText = "The ID of the Cloud Bigtable table to export.")
    ValueProvider<String> getBigtableTableId();

    @SuppressWarnings("unused")
    void setBigtableTableId(ValueProvider<String> tableId);

    @TemplateParameter.GcsWriteFolder(
        order = 4,
        groupName = "Target",
        description = "Output file directory in Cloud Storage",
        helpText =
            "The path and filename prefix for writing output files. Must end with a slash. DateTime formatting is used to parse the directory path for date and time formatters. For example: `gs://your-bucket/your-path`.")
    ValueProvider<String> getOutputDirectory();

    @SuppressWarnings("unused")
    void setOutputDirectory(ValueProvider<String> outputDirectory);

    @TemplateParameter.Text(
        order = 5,
        groupName = "Target",
        description = "Parquet file prefix",
        helpText =
            "The prefix of the Parquet file name. For example, `table1-`. Defaults to: `part`.")
    @Default.String("part")
    ValueProvider<String> getFilenamePrefix();

    @SuppressWarnings("unused")
    void setFilenamePrefix(ValueProvider<String> filenamePrefix);

    @TemplateParameter.Integer(
        order = 6,
        groupName = "Target",
        optional = true,
        description = "Maximum output shards",
        helpText =
            "The maximum number of output shards produced when writing. A higher number of shards means higher throughput for writing to Cloud Storage, but potentially higher data aggregation cost across shards when processing output Cloud Storage files. The default value is decided by Dataflow.")
    @Default.Integer(0)
    ValueProvider<Integer> getNumShards();

    @SuppressWarnings("unused")
    void setNumShards(ValueProvider<Integer> numShards);

    @TemplateParameter.Text(
        order = 7,
        groupName = "Source",
        optional = true,
        regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
        description = "Application profile ID",
        helpText =
            "The ID of the Bigtable application profile to use for the export. If you don't specify an app profile, Bigtable uses the instance's default app profile: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile.")
    @Default.String("default")
    ValueProvider<String> getBigtableAppProfileId();

    @SuppressWarnings("unused")
    void setBigtableAppProfileId(ValueProvider<String> appProfileId);
  }

  /**
   * Main entry point for pipeline execution.
   *
   * @param args Command line arguments to the pipeline.
   */
  public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    PipelineResult result = run(options);

    // Wait for pipeline to finish only if it is not constructing a template.
    if (options.as(DataflowPipelineOptions.class).getTemplateLocation() == null) {
      result.waitUntilFinish();
    }
  }

  /**
   * Runs a pipeline to export data from a Cloud Bigtable table to Parquet file(s) in GCS.
   *
   * @param options arguments to the pipeline
   */
  public static PipelineResult run(Options options) {
    Pipeline pipeline = Pipeline.create(PipelineUtils.tweakPipelineOptions(options));
    BigtableIO.Read read =
        BigtableIO.read()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withAppProfileId(options.getBigtableAppProfileId())
            .withTableId(options.getBigtableTableId());

    // Do not validate input fields if it is running as a template.
    if (options.as(DataflowPipelineOptions.class).getTemplateLocation() != null) {
      read = read.withoutValidation();
    }

    /**
     * Steps: 1) Read records from Bigtable. 2) Convert a Bigtable Row to a GenericRecord. 3) Write
     * GenericRecord(s) to GCS in parquet format.
     */
    FileIO.Write<Void, GenericRecord> write =
        FileIO.<GenericRecord>write()
            .via(ParquetIO.sink(BigtableRow.getClassSchema()))
            .to(options.getOutputDirectory())
            .withPrefix(options.getFilenamePrefix())
            .withSuffix(".parquet");
    ValueProvider<Integer> numShardsOpt = options.getNumShards();
    if (numShardsOpt.isAccessible()) {
      Integer numShards = numShardsOpt.get();
      if (numShards != null && numShards > 0) {
        write = write.withNumShards(options.getNumShards());
      }
    }
    pipeline
        .apply("Read from Bigtable", read)
        .apply("Transform to Parquet", MapElements.via(new BigtableToParquetFn()))
        .setCoder(AvroCoder.of(GenericRecord.class, BigtableRow.getClassSchema()))
        .apply("Write to Parquet in GCS", write);

    return pipeline.run();
  }

  /**
   * Translates a {@link PCollection} of Bigtable {@link Row} to a {@link PCollection} of {@link
   * GenericRecord}.
   */
  static class BigtableToParquetFn extends SimpleFunction<Row, GenericRecord> {
    @Override
    public GenericRecord apply(Row row) {
      ByteBuffer key = ByteBuffer.wrap(toByteArray(row.getKey()));
      List<BigtableCell> cells = new ArrayList<>();
      for (Family family : row.getFamiliesList()) {
        String familyName = family.getName();
        for (Column column : family.getColumnsList()) {
          ByteBuffer qualifier = ByteBuffer.wrap(toByteArray(column.getQualifier()));
          for (Cell cell : column.getCellsList()) {
            long timestamp = cell.getTimestampMicros();
            ByteBuffer value = ByteBuffer.wrap(toByteArray(cell.getValue()));
            cells.add(new BigtableCell(familyName, qualifier, timestamp, value));
          }
        }
      }
      return new GenericRecordBuilder(BigtableRow.getClassSchema())
          .set("key", key)
          .set("cells", cells)
          .build();
    }
  }
}

A seguir