Plantilla de Bigtable a JSON

La plantilla de Bigtable a JSON es una canalización que lee datos de una tabla de Bigtable para luego escribirlos en un bucket de Cloud Storage en formato JSON.

Requisitos de la canalización

  • La tabla de Bigtable debe existir.
  • El resultado del bucket de Cloud Storage de salida debe existir antes de ejecutar la canalización.

Parámetros de la plantilla

Parámetros obligatorios

  • bigtableProjectId: El ID del proyecto de Google Cloud que contiene la instancia de Bigtable de la que deseas leer datos.
  • bigtableInstanceId: Es el ID de la instancia de Bigtable que contiene la tabla.
  • bigtableTableId: El ID de la tabla de Bigtable desde la que se leerá.
  • outputDirectory: La ruta de acceso de Cloud Storage en la que se almacenan los archivos JSON de salida. Por ejemplo, gs://your-bucket/your-path/

Parámetros opcionales

  • filenamePrefix: El prefijo del nombre del archivo JSON. Por ejemplo, table1- Si no se proporciona ningún valor, el valor predeterminado es part.
  • userOption: Los valores posibles son FLATTEN o NONE. FLATTEN acopla la fila al nivel único. NONE almacena toda la fila como una cadena JSON. La configuración predeterminada es NONE.
  • columnsAliases: Una lista de columnas separadas por comas que se requieren para el índice de búsqueda de vectores de Vertex AI. Las columnas id y embedding son obligatorias para la búsqueda de vectores de Vertex AI. Puedes usar la notación fromfamily:fromcolumn;to. Por ejemplo, si las columnas son rowkey y cf:my_embedding, especifica cf:my_embedding;embedding y rowkey;id donde rowkey tiene un nombre diferente al de la columna de incorporación. Solo usa esta opción cuando el valor de userOption sea FLATTEN.
  • bigtableAppProfileId: Es el ID del perfil de la aplicación de Bigtable que se usará para la exportación. Si no especificas un perfil de aplicación, Bigtable usa el perfil de aplicación predeterminado de la instancia https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile.

Ejecuta la plantilla

  1. Ve a la página Crear un trabajo a partir de una plantilla de Dataflow.
  2. Ir a Crear un trabajo a partir de una plantilla
  3. En el campo Nombre del trabajo, ingresa un nombre de trabajo único.
  4. Opcional: Para Extremo regional, selecciona un valor del menú desplegable. La región predeterminada es us-central1.

    Para obtener una lista de regiones en las que puedes ejecutar un trabajo de Dataflow, consulta Ubicaciones de Dataflow.

  5. En el menú desplegable Plantilla de Dataflow, selecciona the Bigtable to JSON template.
  6. En los campos de parámetros proporcionados, ingresa los valores de tus parámetros.
  7. Haz clic en Ejecutar trabajo.

En tu shell o terminal, ejecuta la plantilla:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/Cloud_Bigtable_to_GCS_Json \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       bigtableProjectId=BIGTABLE_PROJECT_ID,\
       bigtableInstanceId=BIGTABLE_INSTANCE_ID,\
       bigtableTableId=BIGTABLE_TABLE_ID,\
       filenamePrefix=FILENAME_PREFIX,\

Reemplaza lo siguiente:

  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • REGION_NAME: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • BIGTABLE_PROJECT_ID: el ID del proyecto
  • BIGTABLE_INSTANCE_ID: El ID de la instancia
  • BIGTABLE_TABLE_ID: El ID de la tabla
  • FILENAME_PREFIX: El prefijo del archivo JSON

Para ejecutar la plantilla con la API de REST, envía una solicitud POST HTTP. Para obtener más información de la API y sus permisos de autorización, consulta projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_Bigtable_to_GCS_Json
{
   "jobName": "JOB_NAME",
   "parameters": {
     "bigtableProjectId": "BIGTABLE_PROJECT_ID",
     "bigtableInstanceId": "BIGTABLE_INSTANCE_ID",
     "bigtableTableId": "BIGTABLE_TABLE_ID",
     "filenamePrefix": "FILENAME_PREFIX",
   },
   "environment": { "maxWorkers": "10" }
}

Reemplaza lo siguiente:

  • PROJECT_ID: El ID del proyecto de Google Cloud en el que deseas ejecutar el trabajo de Dataflow.
  • JOB_NAME: Es el nombre del trabajo que elijas
  • VERSION: Es la versión de la plantilla que deseas usar.

    Puedes usar los siguientes valores:

    • latest para usar la última versión de la plantilla, que está disponible en la carpeta superior non-dated en el bucket gs://dataflow-templates-REGION_NAME/latest/
    • el nombre de la versión, como 2023-09-12-00_RC00, para usar una versión específica de la plantilla, que se puede encontrar anidada en la carpeta superior con fecha correspondiente en el bucket gs://dataflow-templates-REGION_NAME/
  • LOCATION: La región en la que deseas implementar tu trabajo de Dataflow, por ejemplo, us-central1
  • BIGTABLE_PROJECT_ID: el ID del proyecto
  • BIGTABLE_INSTANCE_ID: El ID de la instancia
  • BIGTABLE_TABLE_ID: El ID de la tabla
  • FILENAME_PREFIX: El prefijo del archivo JSON
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.bigtable;

import com.google.bigtable.v2.Cell;
import com.google.bigtable.v2.Column;
import com.google.bigtable.v2.Family;
import com.google.bigtable.v2.Row;
import com.google.cloud.teleport.bigtable.BigtableToJson.Options;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Dataflow pipeline that exports data from a Cloud Bigtable table to JSON files in GCS. Currently,
 * filtering on Cloud Bigtable table is not supported.
 *
 * <p>Check out <a href=
 * "https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_Bigtable_to_GCS_JSON.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_Bigtable_to_GCS_Json",
    category = TemplateCategory.BATCH,
    displayName = "Cloud Bigtable to JSON",
    description =
        "The Bigtable to JSON template is a pipeline that reads data from a Bigtable table and writes it to a Cloud Storage bucket in JSON format",
    optionsClass = Options.class,
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/bigtable-to-json",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Bigtable table must exist.",
      "The output Cloud Storage bucket must exist before running the pipeline."
    })
public class BigtableToJson {
  private static final Logger LOG = LoggerFactory.getLogger(BigtableToJson.class);

  /** Options for the export pipeline. */
  public interface Options extends PipelineOptions {
    @TemplateParameter.ProjectId(
        order = 1,
        groupName = "Source",
        description = "Project ID",
        helpText =
            "The ID for the Google Cloud project that contains the Bigtable instance that you want to read data from.")
    ValueProvider<String> getBigtableProjectId();

    @SuppressWarnings("unused")
    void setBigtableProjectId(ValueProvider<String> projectId);

    @TemplateParameter.Text(
        order = 2,
        groupName = "Source",
        regexes = {"[a-z][a-z0-9\\-]+[a-z0-9]"},
        description = "Instance ID",
        helpText = "The ID of the Bigtable instance that contains the table.")
    ValueProvider<String> getBigtableInstanceId();

    @SuppressWarnings("unused")
    void setBigtableInstanceId(ValueProvider<String> instanceId);

    @TemplateParameter.Text(
        order = 3,
        groupName = "Source",
        regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
        description = "Table ID",
        helpText = "The ID of the Bigtable table to read from.")
    ValueProvider<String> getBigtableTableId();

    @SuppressWarnings("unused")
    void setBigtableTableId(ValueProvider<String> tableId);

    @TemplateParameter.GcsWriteFolder(
        order = 4,
        groupName = "Target",
        description = "Cloud Storage directory for storing JSON files",
        helpText = "The Cloud Storage path where the output JSON files are stored.",
        example = "gs://your-bucket/your-path/")
    @Required
    ValueProvider<String> getOutputDirectory();

    @SuppressWarnings("unused")
    void setOutputDirectory(ValueProvider<String> outputDirectory);

    @TemplateParameter.Text(
        order = 5,
        groupName = "Target",
        optional = true,
        description = "JSON file prefix",
        helpText =
            "The prefix of the JSON file name. For example, `table1-`. If no value is provided, defaults to `part`.")
    @Default.String("part")
    ValueProvider<String> getFilenamePrefix();

    @SuppressWarnings("unused")
    void setFilenamePrefix(ValueProvider<String> filenamePrefix);

    @TemplateParameter.Enum(
        order = 6,
        groupName = "Target",
        optional = true,
        enumOptions = {@TemplateEnumOption("FLATTEN"), @TemplateEnumOption("NONE")},
        description = "User option",
        helpText =
            "Possible values are `FLATTEN` or `NONE`. `FLATTEN` flattens the row to the single level. `NONE` stores the whole row as a JSON string. Defaults to `NONE`.")
    @Default.String("NONE")
    String getUserOption();

    @SuppressWarnings("unused")
    void setUserOption(String userOption);

    @TemplateParameter.Text(
        order = 7,
        groupName = "Target",
        optional = true,
        parentName = "userOption",
        parentTriggerValues = {"FLATTEN"},
        description = "Columns aliases",
        helpText =
            "A comma-separated list of columns that are required for the Vertex AI Vector Search index. The"
                + " columns `id` and `embedding` are required for Vertex AI Vector Search. You can use the notation"
                + " `fromfamily:fromcolumn;to`. For example, if the columns are `rowkey` and `cf:my_embedding`, where"
                + " `rowkey` has a different name than the embedding column, specify `cf:my_embedding;embedding` and,"
                + " `rowkey;id`. Only use this option when the value for `userOption` is `FLATTEN`.")
    ValueProvider<String> getColumnsAliases();

    @SuppressWarnings("unused")
    void setColumnsAliases(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 8,
        groupName = "Source",
        optional = true,
        regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
        description = "Application profile ID",
        helpText =
            "The ID of the Bigtable application profile to use for the export. If you don't specify an app profile, Bigtable uses the instance's default app profile: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile.")
    @Default.String("default")
    ValueProvider<String> getBigtableAppProfileId();

    @SuppressWarnings("unused")
    void setBigtableAppProfileId(ValueProvider<String> appProfileId);
  }

  /**
   * Runs a pipeline to export data from a Cloud Bigtable table to JSON files in GCS in JSON format.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    PipelineResult result = run(options);

    // Wait for pipeline to finish only if it is not constructing a template.
    if (options.as(DataflowPipelineOptions.class).getTemplateLocation() == null) {
      result.waitUntilFinish();
    }
    LOG.info("Completed pipeline setup");
  }

  public static PipelineResult run(Options options) {
    Pipeline pipeline = Pipeline.create(PipelineUtils.tweakPipelineOptions(options));

    BigtableIO.Read read =
        BigtableIO.read()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withAppProfileId(options.getBigtableAppProfileId())
            .withTableId(options.getBigtableTableId());

    // Do not validate input fields if it is running as a template.
    if (options.as(DataflowPipelineOptions.class).getTemplateLocation() != null) {
      read = read.withoutValidation();
    }

    ValueProvider<String> filePathPrefix =
        DualInputNestedValueProvider.of(
            options.getOutputDirectory(),
            options.getFilenamePrefix(),
            new SerializableFunction<TranslatorInput<String, String>, String>() {
              @Override
              public String apply(TranslatorInput<String, String> input) {
                return FileSystems.matchNewResource(input.getX(), true)
                    .resolve(input.getY(), StandardResolveOptions.RESOLVE_FILE)
                    .toString();
              }
            });

    String userOption = options.getUserOption();
    pipeline
        .apply("Read from Bigtable", read)
        .apply(
            "Transform to JSON",
            MapElements.via(
                new BigtableToJsonFn(userOption.equals("FLATTEN"), options.getColumnsAliases())))
        .apply("Write to storage", TextIO.write().to(filePathPrefix).withSuffix(".json"));

    return pipeline.run();
  }

  /** Translates Bigtable {@link Row} to JSON. */
  static class BigtableToJsonFn extends SimpleFunction<Row, String> {
    private boolean flatten;
    private ValueProvider<String> columnsAliases;

    public BigtableToJsonFn(boolean flatten, ValueProvider<String> columnsAliases) {
      this.flatten = flatten;
      this.columnsAliases = columnsAliases;
    }

    @Override
    public String apply(Row row) {
      StringWriter stringWriter = new StringWriter();
      JsonWriter jsonWriter = new JsonWriter(stringWriter);
      try {
        if (flatten) {
          serializeFlattented(row, jsonWriter);
        } else {
          serializeUnFlattented(row, jsonWriter);
        }
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
      return stringWriter.toString();
    }

    private void serializeUnFlattented(Row row, JsonWriter jsonWriter) throws IOException {
      jsonWriter.beginObject();
      jsonWriter.name(row.getKey().toStringUtf8());
      jsonWriter.beginObject();
      for (Family family : row.getFamiliesList()) {
        String familyName = family.getName();
        jsonWriter.name(familyName);
        jsonWriter.beginObject();
        for (Column column : family.getColumnsList()) {
          for (Cell cell : column.getCellsList()) {
            jsonWriter
                .name(column.getQualifier().toStringUtf8())
                .value(cell.getValue().toStringUtf8());
          }
        }
        jsonWriter.endObject();
      }
      jsonWriter.endObject();
      jsonWriter.endObject();
    }

    private void serializeFlattented(Row row, JsonWriter jsonWriter) throws IOException {
      jsonWriter.beginObject();
      Map<String, String> columnsWithAliases = extractColumnsAliases();

      maybeAddToJson(jsonWriter, columnsWithAliases, "rowkey", row.getKey().toStringUtf8());
      for (Family family : row.getFamiliesList()) {
        String familyName = family.getName();
        for (Column column : family.getColumnsList()) {
          for (Cell cell : column.getCellsList()) {
            maybeAddToJson(
                jsonWriter,
                columnsWithAliases,
                familyName + ":" + column.getQualifier().toStringUtf8(),
                cell.getValue().toStringUtf8());
          }
        }
      }
      jsonWriter.endObject();
    }

    private void maybeAddToJson(
        JsonWriter jsonWriter, Map<String, String> columnsWithAliases, String key, String value)
        throws IOException {
      if (!columnsWithAliases.isEmpty() && !columnsWithAliases.containsKey(key)) {
        return;
      }
      jsonWriter.name(columnsWithAliases.getOrDefault(key, key)).value(value);
    }

    private Map<String, String> extractColumnsAliases() {
      Map<String, String> columnsWithAliases = new HashMap<>();
      if (StringUtils.isBlank(columnsAliases.get())) {
        return columnsWithAliases;
      }
      String[] columnsList = columnsAliases.get().split(",");

      for (String columnsWithAlias : columnsList) {
        String[] columnWithAlias = columnsWithAlias.split(";");
        if (columnWithAlias.length == 2) {
          columnsWithAliases.put(columnWithAlias[0], columnWithAlias[1]);
        }
      }
      return columnsWithAliases;
    }
  }
}

¿Qué sigue?