Cloud Storage Parquet to Bigtable 模板

Cloudtable Parquet to Bigtable 模板是一种流水线,可从 Cloud Storage 存储桶中的 Parquet 文件读取数据并将数据写入 Bigtable 表。您可以使用该模板将数据从 Cloud Storage 复制到 Bigtable。

流水线要求

  • Bigtable 表必须已存在,并且列族必须与 Parquet 文件中导出的列族相同。
  • 在运行此流水线之前,输入 Parquet 文件必须已存在于 Cloud Storage 存储桶中。
  • Bigtable 需要输入 Parquet 文件具有特定架构

模板参数

必需参数

  • bigtableProjectId:与 Bigtable 实例关联的 Google Cloud 项目 ID。
  • bigtableInstanceId:表所属的 Cloud Bigtable 实例的 ID。
  • bigtableTableId:要导入的 Bigtable 表的 ID。
  • inputFilePattern:包含包含数据的文件的 Cloud Storage 路径。例如 gs://your-bucket/your-files/*.parquet

可选参数

  • splitLargeRows:用于启用将大行拆分为多个 MutateRows 请求的标志。请注意,当一个较大的行被拆分为多个 API 调用时,对该行的更新不是原子性的。

运行模板

  1. 转到 Dataflow 基于模板创建作业页面。
  2. 转到“基于模板创建作业”
  3. 作业名称字段中,输入唯一的作业名称。
  4. 可选:对于区域性端点,从下拉菜单中选择一个值。默认区域为 us-central1

    如需查看可以在其中运行 Dataflow 作业的区域列表,请参阅 Dataflow 位置

  5. Dataflow 模板下拉菜单中,选择 the Parquet Files on Cloud Storage to Cloud Bigtable template。
  6. 在提供的参数字段中,输入您的参数值。
  7. 点击运行作业

在 shell 或终端中,运行模板:

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_Parquet_to_Cloud_Bigtable \
    --region REGION_NAME \
    --parameters \
bigtableProjectId=BIGTABLE_PROJECT_ID,\
bigtableInstanceId=INSTANCE_ID,\
bigtableTableId=TABLE_ID,\
inputFilePattern=INPUT_FILE_PATTERN

替换以下内容:

  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • REGION_NAME:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • BIGTABLE_PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • INPUT_FILE_PATTERN:数据所在的 Cloud Storage 路径模式,例如 gs://mybucket/somefolder/prefix*

如需使用 REST API 来运行模板,请发送 HTTP POST 请求。如需详细了解 API 及其授权范围,请参阅 projects.templates.launch

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_Parquet_to_Cloud_Bigtable
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProjectId": "BIGTABLE_PROJECT_ID",
       "bigtableInstanceId": "INSTANCE_ID",
       "bigtableTableId": "TABLE_ID",
       "inputFilePattern": "INPUT_FILE_PATTERN",
   },
   "environment": { "zone": "us-central1-f" }
}

替换以下内容:

  • PROJECT_ID:您要在其中运行 Dataflow 作业的 Google Cloud 项目 ID
  • JOB_NAME:您选择的唯一性作业名称
  • VERSION:您要使用的模板的版本

    您可使用以下值:

  • LOCATION:要在其中部署 Dataflow 作业的区域,例如 us-central1
  • BIGTABLE_PROJECT_ID:您要从中读取数据的 Bigtable 实例的 Google Cloud 项目的 ID。
  • INSTANCE_ID:表所属的 Bigtable 实例的 ID。
  • TABLE_ID:需要导出的 Bigtable 表的 ID。
  • INPUT_FILE_PATTERN:数据所在的 Cloud Storage 路径模式,例如 gs://mybucket/somefolder/prefix*
Java
/*
 * Copyright (C) 2019 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.bigtable;

import static com.google.cloud.teleport.bigtable.AvroToBigtable.toByteString;

import com.google.bigtable.v2.Mutation;
import com.google.cloud.teleport.bigtable.ParquetToBigtable.Options;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.protobuf.ByteString;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link ParquetToBigtable} pipeline imports data from Parquet files in GCS to a Cloud Bigtable
 * table. The Cloud Bigtable table must be created before running the pipeline and must have a
 * compatible table schema. For example, if {@link BigtableCell} from the Parquet files has a
 * 'family' of "f1", the Bigtable table should have a column family of "f1".
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_GCS_Parquet_to_Cloud_Bigtable.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "GCS_Parquet_to_Cloud_Bigtable",
    category = TemplateCategory.BATCH,
    displayName = "Parquet Files on Cloud Storage to Cloud Bigtable",
    description =
        "The Cloud Storage Parquet to Bigtable template is a pipeline that reads data from Parquet files in a Cloud Storage bucket and writes the data to a Bigtable table. "
            + "You can use the template to copy data from Cloud Storage to Bigtable.",
    optionsClass = Options.class,
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/parquet-to-bigtable",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Bigtable table must exist and have the same column families as exported in the Parquet files.",
      "The input Parquet files must exist in a Cloud Storage bucket before running the pipeline.",
      "Bigtable expects a specific <a href=\"https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/src/main/resources/schema/avro/bigtable.avsc\">schema</a> from the input Parquet files."
    })
public class ParquetToBigtable {
  private static final Logger LOG = LoggerFactory.getLogger(ParquetToBigtable.class);

  /** Maximum number of mutations allowed per row by Cloud bigtable. */
  private static final int MAX_MUTATIONS_PER_ROW = 100000;

  private static final Boolean DEFAULT_SPLIT_LARGE_ROWS = false;

  /** Options for the import pipeline. */
  public interface Options extends PipelineOptions {
    @TemplateParameter.ProjectId(
        order = 1,
        groupName = "Target",
        description = "Project ID",
        helpText = "The Google Cloud project ID associated with the Bigtable instance.")
    ValueProvider<String> getBigtableProjectId();

    @SuppressWarnings("unused")
    void setBigtableProjectId(ValueProvider<String> projectId);

    @TemplateParameter.Text(
        order = 2,
        groupName = "Target",
        regexes = {"[a-z][a-z0-9\\-]+[a-z0-9]"},
        description = "Instance ID",
        helpText = "The ID of the Cloud Bigtable instance that contains the table")
    ValueProvider<String> getBigtableInstanceId();

    @SuppressWarnings("unused")
    void setBigtableInstanceId(ValueProvider<String> instanceId);

    @TemplateParameter.Text(
        order = 3,
        groupName = "Target",
        regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
        description = "Table ID",
        helpText = "The ID of the Bigtable table to import.")
    ValueProvider<String> getBigtableTableId();

    @SuppressWarnings("unused")
    void setBigtableTableId(ValueProvider<String> tableId);

    @TemplateParameter.GcsReadFile(
        order = 4,
        groupName = "Source",
        description = "Input Cloud Storage File(s)",
        helpText = "The Cloud Storage path with the files that contain the data.",
        example = "gs://your-bucket/your-files/*.parquet")
    ValueProvider<String> getInputFilePattern();

    @SuppressWarnings("unused")
    void setInputFilePattern(ValueProvider<String> inputFilePattern);

    @TemplateParameter.Boolean(
        order = 5,
        groupName = "Target",
        optional = true,
        description = "If true, large rows will be split into multiple MutateRows requests",
        helpText =
            "The flag for enabling splitting of large rows into multiple MutateRows requests. Note that when a large row is split between multiple API calls, the updates to the row are not atomic.")
    ValueProvider<Boolean> getSplitLargeRows();

    void setSplitLargeRows(ValueProvider<Boolean> splitLargeRows);
  }

  /**
   * Runs a pipeline to import Parquet files in GCS to a Cloud Bigtable table.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    PipelineResult result = run(options);
  }

  public static PipelineResult run(Options options) {
    Pipeline pipeline = Pipeline.create(PipelineUtils.tweakPipelineOptions(options));

    BigtableIO.Write write =
        BigtableIO.write()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId());

    /**
     * Steps: 1) Read records from Parquet File. 2) Convert a GenericRecord to a
     * KV<ByteString,Iterable<Mutation>>. 3) Write KV to Bigtable's table.
     */
    pipeline
        .apply(
            "Read from Parquet",
            ParquetIO.read(BigtableRow.getClassSchema()).from(options.getInputFilePattern()))
        .apply(
            "Transform to Bigtable",
            ParDo.of(
                ParquetToBigtableFn.createWithSplitLargeRows(
                    options.getSplitLargeRows(), MAX_MUTATIONS_PER_ROW)))
        .apply("Write to Bigtable", write);

    return pipeline.run();
  }

  static class ParquetToBigtableFn extends DoFn<GenericRecord, KV<ByteString, Iterable<Mutation>>> {

    private final ValueProvider<Boolean> splitLargeRowsFlag;
    private Boolean splitLargeRows;
    private final int maxMutationsPerRow;

    public static ParquetToBigtableFn create() {
      return new ParquetToBigtableFn(StaticValueProvider.of(false), MAX_MUTATIONS_PER_ROW);
    }

    public static ParquetToBigtableFn createWithSplitLargeRows(
        ValueProvider<Boolean> splitLargeRowsFlag, int maxMutationsPerRequest) {
      return new ParquetToBigtableFn(splitLargeRowsFlag, maxMutationsPerRequest);
    }

    @Setup
    public void setup() {
      if (splitLargeRowsFlag != null) {
        splitLargeRows = splitLargeRowsFlag.get();
      }
      splitLargeRows = MoreObjects.firstNonNull(splitLargeRows, DEFAULT_SPLIT_LARGE_ROWS);
      LOG.info("splitLargeRows set to: " + splitLargeRows);
    }

    private ParquetToBigtableFn(
        ValueProvider<Boolean> splitLargeRowsFlag, int maxMutationsPerRequest) {
      this.splitLargeRowsFlag = splitLargeRowsFlag;
      this.maxMutationsPerRow = maxMutationsPerRequest;
    }

    @ProcessElement
    public void processElement(ProcessContext ctx) {
      Class runner = ctx.getPipelineOptions().getRunner();
      ByteString key = toByteString((ByteBuffer) ctx.element().get(0));

      // BulkMutation doesn't split rows. Currently, if a single row contains more than 100,000
      // mutations, the service will fail the request.
      ImmutableList.Builder<Mutation> mutations = ImmutableList.builder();
      List<Object> cells = (List) ctx.element().get(1);
      int cellsProcessed = 0;
      for (Object element : cells) {
        Mutation.SetCell setCell = null;
        if (runner.isAssignableFrom(DirectRunner.class)) {
          setCell =
              Mutation.SetCell.newBuilder()
                  .setFamilyName(((GenericData.Record) element).get(0).toString())
                  .setColumnQualifier(
                      toByteString((ByteBuffer) ((GenericData.Record) element).get(1)))
                  .setTimestampMicros((Long) ((GenericData.Record) element).get(2))
                  .setValue(toByteString((ByteBuffer) ((GenericData.Record) element).get(3)))
                  .build();
        } else {
          BigtableCell bigtableCell = (BigtableCell) element;
          setCell =
              Mutation.SetCell.newBuilder()
                  .setFamilyName(bigtableCell.getFamily().toString())
                  .setColumnQualifier(toByteString(bigtableCell.getQualifier()))
                  .setTimestampMicros(bigtableCell.getTimestamp())
                  .setValue(toByteString(bigtableCell.getValue()))
                  .build();
        }
        mutations.add(Mutation.newBuilder().setSetCell(setCell).build());
        cellsProcessed++;

        if (this.splitLargeRows && cellsProcessed % maxMutationsPerRow == 0) {
          // Send a MutateRow request when we have accumulated max mutations per row.
          ctx.output(KV.of(key, mutations.build()));
          mutations = ImmutableList.builder();
        }
      }

      // Flush any remaining mutations.
      ImmutableList remainingMutations = mutations.build();
      if (!remainingMutations.isEmpty()) {
        ctx.output(KV.of(key, remainingMutations));
      }
    }
  }
}

后续步骤