Plantilla de Parquet de Cloud Storage a Bigtable

La plantilla de Parquet de Cloud Storage a Bigtable es una canalización que lee datos de archivos Parquet en un bucket de Cloud Storage y escribe los datos en una tabla de Bigtable. Puedes usar la plantilla para copiar datos de Cloud Storage a Bigtable.

Requisitos de la canalización

  • La tabla de Bigtable debe existir y tener las mismas familias de columnas que se exportaron en los archivos Parquet.
  • Los archivos de Parquet de entrada deben existir en un bucket de Cloud Storage antes de ejecutar la canalización.
  • Bigtable espera un esquema específico de los archivos Parquet de entrada.

Parámetros de la plantilla

Parámetros obligatorios

  • bigtableProjectId: El ID del proyecto de Google Cloud asociado con la instancia de Bigtable.
  • bigtableInstanceId: Es el ID de la instancia de Cloud Bigtable que contiene la tabla.
  • bigtableTableId: El ID de la tabla de Bigtable que se importará.
  • inputFilePattern: La ruta de Cloud Storage con los archivos que contienen los datos. Por ejemplo, gs://your-bucket/your-files/*.parquet

Parámetros opcionales

  • splitLargeRows: La marca para habilitar la división de filas grandes en varias solicitudes MutateRows. Ten en cuenta que cuando una fila grande se divide entre varias llamadas a la API, las actualizaciones de la fila no son atómicas.

Ejecuta la plantilla

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Parquet Files on Cloud Storage to Cloud Bigtable template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haga clic en Ejecutar trabajo.

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_Parquet_to_Cloud_Bigtable \
    --region REGION_NAME \
    --parameters \
bigtableProjectId=BIGTABLE_PROJECT_ID,\
bigtableInstanceId=INSTANCE_ID,\
bigtableTableId=TABLE_ID,\
inputFilePattern=INPUT_FILE_PATTERN

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • BIGTABLE_PROJECT_ID: Es el ID del Google Cloud proyecto de la instancia de Bigtable de la que deseas leer los datos.
  • INSTANCE_ID: Es el ID de la instancia de Bigtable que contiene la tabla.
  • TABLE_ID: Es el ID de la tabla de Bigtable que se exportará.
  • INPUT_FILE_PATTERN: Es el patrón de ruta de acceso de Cloud Storage en la que se encuentran los datos, por ejemplo, gs://mybucket/somefolder/prefix*.

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_Parquet_to_Cloud_Bigtable
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProjectId": "BIGTABLE_PROJECT_ID",
       "bigtableInstanceId": "INSTANCE_ID",
       "bigtableTableId": "TABLE_ID",
       "inputFilePattern": "INPUT_FILE_PATTERN",
   },
   "environment": { "zone": "us-central1-f" }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • BIGTABLE_PROJECT_ID: Es el ID del Google Cloud proyecto de la instancia de Bigtable de la que deseas leer los datos.
  • INSTANCE_ID: Es el ID de la instancia de Bigtable que contiene la tabla.
  • TABLE_ID: Es el ID de la tabla de Bigtable que se exportará.
  • INPUT_FILE_PATTERN: Es el patrón de ruta de acceso de Cloud Storage en la que se encuentran los datos, por ejemplo, gs://mybucket/somefolder/prefix*.
Java
/*
 * Copyright (C) 2019 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.bigtable;

import static com.google.cloud.teleport.bigtable.AvroToBigtable.toByteString;

import com.google.bigtable.v2.Mutation;
import com.google.cloud.teleport.bigtable.ParquetToBigtable.Options;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.protobuf.ByteString;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link ParquetToBigtable} pipeline imports data from Parquet files in GCS to a Cloud Bigtable
 * table. The Cloud Bigtable table must be created before running the pipeline and must have a
 * compatible table schema. For example, if {@link BigtableCell} from the Parquet files has a
 * 'family' of "f1", the Bigtable table should have a column family of "f1".
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_GCS_Parquet_to_Cloud_Bigtable.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "GCS_Parquet_to_Cloud_Bigtable",
    category = TemplateCategory.BATCH,
    displayName = "Parquet Files on Cloud Storage to Cloud Bigtable",
    description =
        "The Cloud Storage Parquet to Bigtable template is a pipeline that reads data from Parquet files in a Cloud Storage bucket and writes the data to a Bigtable table. "
            + "You can use the template to copy data from Cloud Storage to Bigtable.",
    optionsClass = Options.class,
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/parquet-to-bigtable",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Bigtable table must exist and have the same column families as exported in the Parquet files.",
      "The input Parquet files must exist in a Cloud Storage bucket before running the pipeline.",
      "Bigtable expects a specific <a href=\"https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/src/main/resources/schema/avro/bigtable.avsc\">schema</a> from the input Parquet files."
    })
public class ParquetToBigtable {
  private static final Logger LOG = LoggerFactory.getLogger(ParquetToBigtable.class);

  /** Maximum number of mutations allowed per row by Cloud bigtable. */
  private static final int MAX_MUTATIONS_PER_ROW = 100000;

  private static final Boolean DEFAULT_SPLIT_LARGE_ROWS = false;

  /** Options for the import pipeline. */
  public interface Options extends PipelineOptions {
    @TemplateParameter.ProjectId(
        order = 1,
        groupName = "Target",
        description = "Project ID",
        helpText = "The Google Cloud project ID associated with the Bigtable instance.")
    ValueProvider<String> getBigtableProjectId();

    @SuppressWarnings("unused")
    void setBigtableProjectId(ValueProvider<String> projectId);

    @TemplateParameter.Text(
        order = 2,
        groupName = "Target",
        regexes = {"[a-z][a-z0-9\\-]+[a-z0-9]"},
        description = "Instance ID",
        helpText = "The ID of the Cloud Bigtable instance that contains the table")
    ValueProvider<String> getBigtableInstanceId();

    @SuppressWarnings("unused")
    void setBigtableInstanceId(ValueProvider<String> instanceId);

    @TemplateParameter.Text(
        order = 3,
        groupName = "Target",
        regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
        description = "Table ID",
        helpText = "The ID of the Bigtable table to import.")
    ValueProvider<String> getBigtableTableId();

    @SuppressWarnings("unused")
    void setBigtableTableId(ValueProvider<String> tableId);

    @TemplateParameter.GcsReadFile(
        order = 4,
        groupName = "Source",
        description = "Input Cloud Storage File(s)",
        helpText = "The Cloud Storage path with the files that contain the data.",
        example = "gs://your-bucket/your-files/*.parquet")
    ValueProvider<String> getInputFilePattern();

    @SuppressWarnings("unused")
    void setInputFilePattern(ValueProvider<String> inputFilePattern);

    @TemplateParameter.Boolean(
        order = 5,
        groupName = "Target",
        optional = true,
        description = "If true, large rows will be split into multiple MutateRows requests",
        helpText =
            "The flag for enabling splitting of large rows into multiple MutateRows requests. Note that when a large row is split between multiple API calls, the updates to the row are not atomic.")
    ValueProvider<Boolean> getSplitLargeRows();

    void setSplitLargeRows(ValueProvider<Boolean> splitLargeRows);
  }

  /**
   * Runs a pipeline to import Parquet files in GCS to a Cloud Bigtable table.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    PipelineResult result = run(options);
  }

  public static PipelineResult run(Options options) {
    Pipeline pipeline = Pipeline.create(PipelineUtils.tweakPipelineOptions(options));

    BigtableIO.Write write =
        BigtableIO.write()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId());

    /**
     * Steps: 1) Read records from Parquet File. 2) Convert a GenericRecord to a
     * KV<ByteString,Iterable<Mutation>>. 3) Write KV to Bigtable's table.
     */
    pipeline
        .apply(
            "Read from Parquet",
            ParquetIO.read(BigtableRow.getClassSchema()).from(options.getInputFilePattern()))
        .apply(
            "Transform to Bigtable",
            ParDo.of(
                ParquetToBigtableFn.createWithSplitLargeRows(
                    options.getSplitLargeRows(), MAX_MUTATIONS_PER_ROW)))
        .apply("Write to Bigtable", write);

    return pipeline.run();
  }

  static class ParquetToBigtableFn extends DoFn<GenericRecord, KV<ByteString, Iterable<Mutation>>> {

    private final ValueProvider<Boolean> splitLargeRowsFlag;
    private Boolean splitLargeRows;
    private final int maxMutationsPerRow;

    public static ParquetToBigtableFn create() {
      return new ParquetToBigtableFn(StaticValueProvider.of(false), MAX_MUTATIONS_PER_ROW);
    }

    public static ParquetToBigtableFn createWithSplitLargeRows(
        ValueProvider<Boolean> splitLargeRowsFlag, int maxMutationsPerRequest) {
      return new ParquetToBigtableFn(splitLargeRowsFlag, maxMutationsPerRequest);
    }

    @Setup
    public void setup() {
      if (splitLargeRowsFlag != null) {
        splitLargeRows = splitLargeRowsFlag.get();
      }
      splitLargeRows = MoreObjects.firstNonNull(splitLargeRows, DEFAULT_SPLIT_LARGE_ROWS);
      LOG.info("splitLargeRows set to: " + splitLargeRows);
    }

    private ParquetToBigtableFn(
        ValueProvider<Boolean> splitLargeRowsFlag, int maxMutationsPerRequest) {
      this.splitLargeRowsFlag = splitLargeRowsFlag;
      this.maxMutationsPerRow = maxMutationsPerRequest;
    }

    @ProcessElement
    public void processElement(ProcessContext ctx) {
      Class runner = ctx.getPipelineOptions().getRunner();
      ByteString key = toByteString((ByteBuffer) ctx.element().get(0));

      // BulkMutation doesn't split rows. Currently, if a single row contains more than 100,000
      // mutations, the service will fail the request.
      ImmutableList.Builder<Mutation> mutations = ImmutableList.builder();
      List<Object> cells = (List) ctx.element().get(1);
      int cellsProcessed = 0;
      for (Object element : cells) {
        Mutation.SetCell setCell = null;
        if (runner.isAssignableFrom(DirectRunner.class)) {
          setCell =
              Mutation.SetCell.newBuilder()
                  .setFamilyName(((GenericData.Record) element).get(0).toString())
                  .setColumnQualifier(
                      toByteString((ByteBuffer) ((GenericData.Record) element).get(1)))
                  .setTimestampMicros((Long) ((GenericData.Record) element).get(2))
                  .setValue(toByteString((ByteBuffer) ((GenericData.Record) element).get(3)))
                  .build();
        } else {
          BigtableCell bigtableCell = (BigtableCell) element;
          setCell =
              Mutation.SetCell.newBuilder()
                  .setFamilyName(bigtableCell.getFamily().toString())
                  .setColumnQualifier(toByteString(bigtableCell.getQualifier()))
                  .setTimestampMicros(bigtableCell.getTimestamp())
                  .setValue(toByteString(bigtableCell.getValue()))
                  .build();
        }
        mutations.add(Mutation.newBuilder().setSetCell(setCell).build());
        cellsProcessed++;

        if (this.splitLargeRows && cellsProcessed % maxMutationsPerRow == 0) {
          // Send a MutateRow request when we have accumulated max mutations per row.
          ctx.output(KV.of(key, mutations.build()));
          mutations = ImmutableList.builder();
        }
      }

      // Flush any remaining mutations.
      ImmutableList remainingMutations = mutations.build();
      if (!remainingMutations.isEmpty()) {
        ctx.output(KV.of(key, remainingMutations));
      }
    }
  }
}

¿Qué sigue?