Bigtable ke template Avro Cloud Storage

Template Bigtable to Cloud Storage Avro adalah pipeline yang membaca data dari tabel Bigtable dan menulisnya ke bucket Cloud Storage dalam format Avro. Anda dapat menggunakan template untuk memindahkan data dari Bigtable ke Cloud Storage.

Persyaratan pipeline

  • Tabel Bigtable harus ada.
  • Bucket Cloud Storage output harus ada sebelum menjalankan pipeline.

Parameter template

Parameter yang diperlukan

  • bigtableProjectId: ID project Google Cloud yang berisi instance Bigtable yang datanya ingin Anda baca.
  • bigtableInstanceId: ID instance Bigtable yang berisi tabel.
  • bigtableTableId: ID tabel Bigtable yang akan diekspor.
  • outputDirectory: Jalur Cloud Storage tempat data ditulis. Contoh, gs://mybucket/somefolder.
  • filenamePrefix: Awalan nama file Avro. Contoh, output-. Defaultnya adalah: part.

Parameter opsional

Menjalankan template

  1. Buka halaman Create job from template Dataflow.
  2. Buka Buat tugas dari template
  3. Di kolom Nama tugas, masukkan nama tugas yang unik.
  4. Opsional: Untuk Endpoint regional, pilih nilai dari menu drop-down. Region defaultnya adalah us-central1.

    Untuk mengetahui daftar region tempat Anda dapat menjalankan tugas Dataflow, lihat Lokasi Dataflow.

  5. Dari menu drop-down Dataflow template, pilih the Cloud Bigtable to Avro Files on Cloud Storage template .
  6. Di kolom parameter yang disediakan, masukkan nilai parameter Anda.
  7. Klik Run job.

Di shell atau terminal, jalankan template:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_Bigtable_to_GCS_Avro \
    --region REGION_NAME \
    --parameters \
bigtableProjectId=BIGTABLE_PROJECT_ID,\
bigtableInstanceId=INSTANCE_ID,\
bigtableTableId=TABLE_ID,\
outputDirectory=OUTPUT_DIRECTORY,\
filenamePrefix=FILENAME_PREFIX

Ganti kode berikut:

  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • REGION_NAME: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • BIGTABLE_PROJECT_ID: ID Google Cloud project instance Bigtable tempat Anda ingin membaca data
  • INSTANCE_ID: ID instance Bigtable yang berisi tabel
  • TABLE_ID: ID tabel Bigtable yang akan diekspor
  • OUTPUT_DIRECTORY: jalur Cloud Storage tempat data ditulis, misalnya, gs://mybucket/somefolder
  • FILENAME_PREFIX: awalan nama file Avro, misalnya, output-

Untuk menjalankan template menggunakan REST API, kirim permintaan POST HTTP. Untuk mengetahui informasi selengkapnya tentang API dan cakupan otorisasinya, lihat projects.templates.launch.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_Bigtable_to_GCS_Avro
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProjectId": "BIGTABLE_PROJECT_ID",
       "bigtableInstanceId": "INSTANCE_ID",
       "bigtableTableId": "TABLE_ID",
       "outputDirectory": "OUTPUT_DIRECTORY",
       "filenamePrefix": "FILENAME_PREFIX",
   },
   "environment": { "zone": "us-central1-f" }
}

Ganti kode berikut:

  • PROJECT_ID: ID project Google Cloud tempat Anda ingin menjalankan tugas Dataflow
  • JOB_NAME: nama tugas unik pilihan Anda
  • VERSION: versi template yang ingin Anda gunakan

    Anda dapat menggunakan nilai berikut:

  • LOCATION: region tempat Anda ingin men-deploy tugas Dataflow—misalnya, us-central1
  • BIGTABLE_PROJECT_ID: ID Google Cloud project instance Bigtable tempat Anda ingin membaca data
  • INSTANCE_ID: ID instance Bigtable yang berisi tabel
  • TABLE_ID: ID tabel Bigtable yang akan diekspor
  • OUTPUT_DIRECTORY: jalur Cloud Storage tempat data ditulis, misalnya, gs://mybucket/somefolder
  • FILENAME_PREFIX: awalan nama file Avro, misalnya, output-
Java
/*
 * Copyright (C) 2018 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.bigtable;

import com.google.bigtable.v2.Cell;
import com.google.bigtable.v2.Column;
import com.google.bigtable.v2.Family;
import com.google.bigtable.v2.Row;
import com.google.cloud.teleport.bigtable.BigtableToAvro.Options;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.util.DualInputNestedValueProvider;
import com.google.cloud.teleport.util.DualInputNestedValueProvider.TranslatorInput;
import com.google.protobuf.ByteOutput;
import com.google.protobuf.ByteString;
import com.google.protobuf.UnsafeByteOperations;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.extensions.avro.io.AvroIO;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.fs.ResolveOptions.StandardResolveOptions;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.transforms.SimpleFunction;

/**
 * Dataflow pipeline that exports data from a Cloud Bigtable table to Avro files in GCS. Currently,
 * filtering on Cloud Bigtable table is not supported.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_Bigtable_to_GCS_Avro.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_Bigtable_to_GCS_Avro",
    category = TemplateCategory.BATCH,
    displayName = "Cloud Bigtable to Avro Files in Cloud Storage",
    description =
        "The Bigtable to Cloud Storage Avro template is a pipeline that reads data from a Bigtable table and writes it to a Cloud Storage bucket in Avro format. "
            + "You can use the template to move data from Bigtable to Cloud Storage.",
    optionsClass = Options.class,
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/bigtable-to-avro",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Bigtable table must exist.",
      "The output Cloud Storage bucket must exist before running the pipeline."
    })
public class BigtableToAvro {

  /** Options for the export pipeline. */
  public interface Options extends PipelineOptions {
    @TemplateParameter.ProjectId(
        order = 1,
        groupName = "Source",
        description = "Project ID",
        helpText =
            "The ID of the Google Cloud project that contains the Bigtable instance that you want to read data from.")
    ValueProvider<String> getBigtableProjectId();

    @SuppressWarnings("unused")
    void setBigtableProjectId(ValueProvider<String> projectId);

    @TemplateParameter.Text(
        order = 2,
        groupName = "Source",
        regexes = {"[a-z][a-z0-9\\-]+[a-z0-9]"},
        description = "Instance ID",
        helpText = "The ID of the Bigtable instance that contains the table.")
    ValueProvider<String> getBigtableInstanceId();

    @SuppressWarnings("unused")
    void setBigtableInstanceId(ValueProvider<String> instanceId);

    @TemplateParameter.Text(
        order = 3,
        groupName = "Source",
        regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
        description = "Table ID",
        helpText = "The ID of the Bigtable table to export.")
    ValueProvider<String> getBigtableTableId();

    @SuppressWarnings("unused")
    void setBigtableTableId(ValueProvider<String> tableId);

    @TemplateParameter.GcsWriteFolder(
        order = 4,
        groupName = "Target",
        description = "Output file directory in Cloud Storage",
        helpText = "The Cloud Storage path where data is written.",
        example = "gs://mybucket/somefolder")
    ValueProvider<String> getOutputDirectory();

    @SuppressWarnings("unused")
    void setOutputDirectory(ValueProvider<String> outputDirectory);

    @TemplateParameter.Text(
        order = 5,
        groupName = "Target",
        description = "Avro file prefix",
        helpText = "The prefix of the Avro filename. For example, `output-`.")
    @Default.String("part")
    ValueProvider<String> getFilenamePrefix();

    @SuppressWarnings("unused")
    void setFilenamePrefix(ValueProvider<String> filenamePrefix);

    @TemplateParameter.Text(
        order = 6,
        groupName = "Source",
        optional = true,
        regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
        description = "Application profile ID",
        helpText =
            "The ID of the Bigtable application profile to use for the export. If you don't specify an app profile, Bigtable uses the instance's default app profile: https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile.")
    @Default.String("default")
    ValueProvider<String> getBigtableAppProfileId();

    @SuppressWarnings("unused")
    void setBigtableAppProfileId(ValueProvider<String> appProfileId);
  }

  /**
   * Runs a pipeline to export data from a Cloud Bigtable table to Avro files in GCS.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    PipelineResult result = run(options);

    // Wait for pipeline to finish only if it is not constructing a template.
    if (options.as(DataflowPipelineOptions.class).getTemplateLocation() == null) {
      result.waitUntilFinish();
    }
  }

  public static PipelineResult run(Options options) {
    Pipeline pipeline = Pipeline.create(PipelineUtils.tweakPipelineOptions(options));

    BigtableIO.Read read =
        BigtableIO.read()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withAppProfileId(options.getBigtableAppProfileId())
            .withTableId(options.getBigtableTableId());

    // Do not validate input fields if it is running as a template.
    if (options.as(DataflowPipelineOptions.class).getTemplateLocation() != null) {
      read = read.withoutValidation();
    }

    ValueProvider<String> filePathPrefix =
        DualInputNestedValueProvider.of(
            options.getOutputDirectory(),
            options.getFilenamePrefix(),
            new SerializableFunction<TranslatorInput<String, String>, String>() {
              @Override
              public String apply(TranslatorInput<String, String> input) {
                return FileSystems.matchNewResource(input.getX(), true)
                    .resolve(input.getY(), StandardResolveOptions.RESOLVE_FILE)
                    .toString();
              }
            });

    pipeline
        .apply("Read from Bigtable", read)
        .apply("Transform to Avro", MapElements.via(new BigtableToAvroFn()))
        .apply(
            "Write to Avro in GCS",
            AvroIO.write(BigtableRow.class).to(filePathPrefix).withSuffix(".avro"));

    return pipeline.run();
  }

  /** Translates Bigtable {@link Row} to Avro {@link BigtableRow}. */
  static class BigtableToAvroFn extends SimpleFunction<Row, BigtableRow> {
    @Override
    public BigtableRow apply(Row row) {
      ByteBuffer key = ByteBuffer.wrap(toByteArray(row.getKey()));
      List<BigtableCell> cells = new ArrayList<>();
      for (Family family : row.getFamiliesList()) {
        String familyName = family.getName();
        for (Column column : family.getColumnsList()) {
          ByteBuffer qualifier = ByteBuffer.wrap(toByteArray(column.getQualifier()));
          for (Cell cell : column.getCellsList()) {
            long timestamp = cell.getTimestampMicros();
            ByteBuffer value = ByteBuffer.wrap(toByteArray(cell.getValue()));
            cells.add(new BigtableCell(familyName, qualifier, timestamp, value));
          }
        }
      }
      return new BigtableRow(key, cells);
    }
  }

  /**
   * Extracts the byte array from the given {@link ByteString} without copy.
   *
   * @param byteString A {@link ByteString} from which to extract the array.
   * @return an array of byte.
   */
  protected static byte[] toByteArray(final ByteString byteString) {
    try {
      ZeroCopyByteOutput byteOutput = new ZeroCopyByteOutput();
      UnsafeByteOperations.unsafeWriteTo(byteString, byteOutput);
      return byteOutput.bytes;
    } catch (IOException e) {
      return byteString.toByteArray();
    }
  }

  private static final class ZeroCopyByteOutput extends ByteOutput {
    private byte[] bytes;

    @Override
    public void writeLazy(byte[] value, int offset, int length) {
      if (offset != 0 || length != value.length) {
        throw new UnsupportedOperationException();
      }
      bytes = value;
    }

    @Override
    public void write(byte value) {
      throw new UnsupportedOperationException();
    }

    @Override
    public void write(byte[] value, int offset, int length) {
      throw new UnsupportedOperationException();
    }

    @Override
    public void write(ByteBuffer value) {
      throw new UnsupportedOperationException();
    }

    @Override
    public void writeLazy(ByteBuffer value) {
      throw new UnsupportedOperationException();
    }
  }
}

Langkah berikutnya