Template Bigtable ke JSON

Template Bigtable ke JSON adalah pipeline yang membaca data dari tabel Bigtable dan menulisnya ke bucket Cloud Storage dalam format JSON.

Persyaratan pipeline

  • Tabel Bigtable harus ada.
  • Bucket Cloud Storage output harus ada sebelum Anda menjalankan pipeline.

Parameter template

Parameter yang diperlukan

  • bigtableProjectId: ID untuk project Google Cloud yang berisi instance Bigtable yang datanya ingin Anda baca.
  • bigtableInstanceId: ID instance Bigtable yang berisi tabel.
  • bigtableTableId: ID tabel Bigtable yang akan dibaca.
  • outputDirectory: Jalur Cloud Storage tempat file JSON output disimpan. Contoh, gs://your-bucket/your-path/.

Parameter opsional

  • filenamePrefix: Awalan nama file JSON. Contoh, table1-. Jika tidak ada nilai yang diberikan, setelan defaultnya adalah part.
  • userOption: Kemungkinan nilainya adalah FLATTEN atau NONE. FLATTEN meratakan baris ke tingkat tunggal. NONE menyimpan seluruh baris sebagai string JSON. Nilai default-nya adalah NONE.
  • columnsAliases: Daftar kolom yang dipisahkan koma yang diperlukan untuk indeks Vertex AI Vector Search. Kolom id dan embedding diperlukan untuk Vertex AI Vector Search. Anda dapat menggunakan notasi fromfamily:fromcolumn;to. Misalnya, jika kolomnya adalah rowkey dan cf:my_embedding, dengan rowkey memiliki nama yang berbeda dari kolom penyematan, tentukan cf:my_embedding;embedding dan rowkey;id. Hanya gunakan opsi ini jika nilai untuk userOption adalah FLATTEN.
  • bigtableAppProfileId: ID profil aplikasi Bigtable yang akan digunakan untuk ekspor. Jika Anda tidak menentukan profil aplikasi, Bigtable akan menggunakan profil aplikasi default instance: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile.

Menjalankan template

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Bigtable to JSON template.
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

Di shell atau terminal, jalankan template:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location=gs://dataflow-templates-REGION_NAME/VERSION/Cloud_Bigtable_to_GCS_Json \
    --project=PROJECT_ID \
    --region=REGION_NAME \
    --parameters \
       bigtableProjectId=BIGTABLE_PROJECT_ID,\
       bigtableInstanceId=BIGTABLE_INSTANCE_ID,\
       bigtableTableId=BIGTABLE_TABLE_ID,\
       filenamePrefix=FILENAME_PREFIX,\

Ganti kode berikut:

  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • BIGTABLE_PROJECT_ID: Project ID
  • BIGTABLE_INSTANCE_ID: ID Instance
  • BIGTABLE_TABLE_ID: ID Tabel
  • FILENAME_PREFIX: awalan file JSON

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_Bigtable_to_GCS_Json
{
   "jobName": "JOB_NAME",
   "parameters": {
     "bigtableProjectId": "BIGTABLE_PROJECT_ID",
     "bigtableInstanceId": "BIGTABLE_INSTANCE_ID",
     "bigtableTableId": "BIGTABLE_TABLE_ID",
     "filenamePrefix": "FILENAME_PREFIX",
   },
   "environment": { "maxWorkers": "10" }
}

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • BIGTABLE_PROJECT_ID: Project ID
  • BIGTABLE_INSTANCE_ID: ID Instance
  • BIGTABLE_TABLE_ID: ID Tabel
  • FILENAME_PREFIX: awalan file JSON
Java
/*
 * Copyright (C) 2023 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.bigtable;

import com.google.bigtable.v2.Cell;
import com.google.bigtable.v2.Column;
import com.google.bigtable.v2.Family;
import com.google.bigtable.v2.Row;
import com.google.cloud.teleport.bigtable.BigtableToJson.Options;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.metadata.TemplateParameter.TemplateEnumOption;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.gson.stream.JsonWriter;
import java.io.IOException;
import java.io.StringWriter;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.Validation.Required;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Dataflow pipeline that exports data from a Cloud Bigtable table to JSON files in GCS. Currently,
 * filtering on Cloud Bigtable table is not supported.
 *
 * <p>Check out <a href=
 * "https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_Bigtable_to_GCS_JSON.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_Bigtable_to_GCS_Json",
    category = TemplateCategory.BATCH,
    displayName = "Cloud Bigtable to JSON",
    description =
        "The Bigtable to JSON template is a pipeline that reads data from a Bigtable table and writes it to a Cloud Storage bucket in JSON format",
    optionsClass = Options.class,
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/bigtable-to-json",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Bigtable table must exist.",
      "The output Cloud Storage bucket must exist before running the pipeline."
    })
public class BigtableToJson {
  private static final Logger LOG = LoggerFactory.getLogger(BigtableToJson.class);

  /** Options for the export pipeline. */
  public interface Options extends PipelineOptions {
    @TemplateParameter.ProjectId(
        order = 1,
        groupName = "Source",
        description = "Project ID",
        helpText =
            "The ID for the Google Cloud project that contains the Bigtable instance that you want to read data from.")
    ValueProvider<String> getBigtableProjectId();

    @SuppressWarnings("unused")
    void setBigtableProjectId(ValueProvider<String> projectId);

    @TemplateParameter.Text(
        order = 2,
        groupName = "Source",
        regexes = {"[a-z][a-z0-9\\-]+[a-z0-9]"},
        description = "Instance ID",
        helpText = "The ID of the Bigtable instance that contains the table.")
    ValueProvider<String> getBigtableInstanceId();

    @SuppressWarnings("unused")
    void setBigtableInstanceId(ValueProvider<String> instanceId);

    @TemplateParameter.Text(
        order = 3,
        groupName = "Source",
        regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
        description = "Table ID",
        helpText = "The ID of the Bigtable table to read from.")
    ValueProvider<String> getBigtableTableId();

    @SuppressWarnings("unused")
    void setBigtableTableId(ValueProvider<String> tableId);

    @TemplateParameter.GcsWriteFolder(
        order = 4,
        groupName = "Target",
        description = "Cloud Storage directory for storing JSON files",
        helpText = "The Cloud Storage path where the output JSON files are stored.",
        example = "gs://your-bucket/your-path/")
    @Required
    ValueProvider<String> getOutputDirectory();

    @SuppressWarnings("unused")
    void setOutputDirectory(ValueProvider<String> outputDirectory);

    @TemplateParameter.Text(
        order = 5,
        groupName = "Target",
        optional = true,
        description = "JSON file prefix",
        helpText =
            "The prefix of the JSON file name. For example, `table1-`. If no value is provided, defaults to `part`.")
    @Default.String("part")
    ValueProvider<String> getFilenamePrefix();

    @SuppressWarnings("unused")
    void setFilenamePrefix(ValueProvider<String> filenamePrefix);

    @TemplateParameter.Enum(
        order = 6,
        groupName = "Target",
        optional = true,
        enumOptions = {@TemplateEnumOption("FLATTEN"), @TemplateEnumOption("NONE")},
        description = "User option",
        helpText =
            "Possible values are `FLATTEN` or `NONE`. `FLATTEN` flattens the row to the single level. `NONE` stores the whole row as a JSON string. Defaults to `NONE`.")
    @Default.String("NONE")
    String getUserOption();

    @SuppressWarnings("unused")
    void setUserOption(String userOption);

    @TemplateParameter.Text(
        order = 7,
        groupName = "Target",
        optional = true,
        parentName = "userOption",
        parentTriggerValues = {"FLATTEN"},
        description = "Columns aliases",
        helpText =
            "A comma-separated list of columns that are required for the Vertex AI Vector Search index. The"
                + " columns `id` and `embedding` are required for Vertex AI Vector Search. You can use the notation"
                + " `fromfamily:fromcolumn;to`. For example, if the columns are `rowkey` and `cf:my_embedding`, where"
                + " `rowkey` has a different name than the embedding column, specify `cf:my_embedding;embedding` and,"
                + " `rowkey;id`. Only use this option when the value for `userOption` is `FLATTEN`.")
    ValueProvider<String> getColumnsAliases();

    @SuppressWarnings("unused")
    void setColumnsAliases(ValueProvider<String> value);

    @TemplateParameter.Text(
        order = 8,
        groupName = "Source",
        optional = true,
        regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
        description = "Application profile ID",
        helpText =
            "The ID of the Bigtable application profile to use for the export. If you don't specify an app profile, Bigtable uses the instance's default app profile: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile.")
    @Default.String("default")
    ValueProvider<String> getBigtableAppProfileId();

    @SuppressWarnings("unused")
    void setBigtableAppProfileId(ValueProvider<String> appProfileId);
  }

  /**
   * Runs a pipeline to export data from a Cloud Bigtable table to JSON files in GCS in JSON format.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    PipelineResult result = run(options);

    // Wait for pipeline to finish only if it is not constructing a template.
    if (options.as(DataflowPipelineOptions.class).getTemplateLocation() == null) {
      result.waitUntilFinish();
    }
    LOG.info("Completed pipeline setup");
  }

  public static PipelineResult run(Options options) {
    Pipeline pipeline = Pipeline.create(PipelineUtils.tweakPipelineOptions(options));

    BigtableIO.Read read =
        BigtableIO.read()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withAppProfileId(options.getBigtableAppProfileId())
            .withTableId(options.getBigtableTableId());

    // Do not validate input fields if it is running as a template.
    if (options.as(DataflowPipelineOptions.class).getTemplateLocation() != null) {
      read = read.withoutValidation();
    }

    ValueProvider<String> filePathPrefix =
        DualInputNestedValueProvider.of(
            options.getOutputDirectory(),
            options.getFilenamePrefix(),
            new SerializableFunction<TranslatorInput<String, String>, String>() {
              @Override
              public String apply(TranslatorInput<String, String> input) {
                return FileSystems.matchNewResource(input.getX(), true)
                    .resolve(input.getY(), StandardResolveOptions.RESOLVE_FILE)
                    .toString();
              }
            });

    String userOption = options.getUserOption();
    pipeline
        .apply("Read from Bigtable", read)
        .apply(
            "Transform to JSON",
            MapElements.via(
                new BigtableToJsonFn(userOption.equals("FLATTEN"), options.getColumnsAliases())))
        .apply("Write to storage", TextIO.write().to(filePathPrefix).withSuffix(".json"));

    return pipeline.run();
  }

  /** Translates Bigtable {@link Row} to JSON. */
  static class BigtableToJsonFn extends SimpleFunction<Row, String> {
    private boolean flatten;
    private ValueProvider<String> columnsAliases;

    public BigtableToJsonFn(boolean flatten, ValueProvider<String> columnsAliases) {
      this.flatten = flatten;
      this.columnsAliases = columnsAliases;
    }

    @Override
    public String apply(Row row) {
      StringWriter stringWriter = new StringWriter();
      JsonWriter jsonWriter = new JsonWriter(stringWriter);
      try {
        if (flatten) {
          serializeFlattented(row, jsonWriter);
        } else {
          serializeUnFlattented(row, jsonWriter);
        }
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
      return stringWriter.toString();
    }

    private void serializeUnFlattented(Row row, JsonWriter jsonWriter) throws IOException {
      jsonWriter.beginObject();
      jsonWriter.name(row.getKey().toStringUtf8());
      jsonWriter.beginObject();
      for (Family family : row.getFamiliesList()) {
        String familyName = family.getName();
        jsonWriter.name(familyName);
        jsonWriter.beginObject();
        for (Column column : family.getColumnsList()) {
          for (Cell cell : column.getCellsList()) {
            jsonWriter
                .name(column.getQualifier().toStringUtf8())
                .value(cell.getValue().toStringUtf8());
          }
        }
        jsonWriter.endObject();
      }
      jsonWriter.endObject();
      jsonWriter.endObject();
    }

    private void serializeFlattented(Row row, JsonWriter jsonWriter) throws IOException {
      jsonWriter.beginObject();
      Map<String, String> columnsWithAliases = extractColumnsAliases();

      maybeAddToJson(jsonWriter, columnsWithAliases, "rowkey", row.getKey().toStringUtf8());
      for (Family family : row.getFamiliesList()) {
        String familyName = family.getName();
        for (Column column : family.getColumnsList()) {
          for (Cell cell : column.getCellsList()) {
            maybeAddToJson(
                jsonWriter,
                columnsWithAliases,
                familyName + ":" + column.getQualifier().toStringUtf8(),
                cell.getValue().toStringUtf8());
          }
        }
      }
      jsonWriter.endObject();
    }

    private void maybeAddToJson(
        JsonWriter jsonWriter, Map<String, String> columnsWithAliases, String key, String value)
        throws IOException {
      if (!columnsWithAliases.isEmpty() && !columnsWithAliases.containsKey(key)) {
        return;
      }
      jsonWriter.name(columnsWithAliases.getOrDefault(key, key)).value(value);
    }

    private Map<String, String> extractColumnsAliases() {
      Map<String, String> columnsWithAliases = new HashMap<>();
      if (StringUtils.isBlank(columnsAliases.get())) {
        return columnsWithAliases;
      }
      String[] columnsList = columnsAliases.get().split(",");

      for (String columnsWithAlias : columnsList) {
        String[] columnWithAlias = columnsWithAlias.split(";");
        if (columnWithAlias.length == 2) {
          columnsWithAliases.put(columnWithAlias[0], columnWithAlias[1]);
        }
      }
      return columnsWithAliases;
    }
  }
}

Langkah berikutnya