Cloud Storage Parquet to Bigtable 템플릿

Cloud Storage Parquet to Bigtable 템플릿은 Cloud Storage 버킷의 Parquet 파일에서 데이터를 읽고 Bigtable 테이블에 데이터를 쓰는 파이프라인입니다. 템플릿을 사용하여 Cloud Storage에서 Bigtable로 데이터를 복사할 수 있습니다.

파이프라인 요구사항

  • Bigtable 테이블이 있어야 하며 Parquet 파일에서 내보낸 것과 같은 column family가 있어야 합니다.
  • 파이프라인을 실행하기 전에 Cloud Storage 버킷에 입력 Parquet 파일이 있어야 합니다.
  • Bigtable에는 입력 Parquet 파일의 특정 스키마가 있어야 합니다.

템플릿 매개변수

필수 매개변수

  • bigtableProjectId: Bigtable 인스턴스와 연결된 Google Cloud 프로젝트 ID입니다.
  • bigtableInstanceId: 테이블이 포함된 Cloud Bigtable 인스턴스의 ID입니다.
  • bigtableTableId: 가져올 Bigtable 테이블의 ID입니다.
  • inputFilePattern: 데이터가 포함된 파일이 있는 Cloud Storage 경로입니다. 예를 들면 gs://your-bucket/your-files/*.parquet입니다.

선택적 매개변수

  • splitLargeRows: 큰 행을 여러 MutateRows 요청으로 분할하기 위한 플래그입니다. 큰 행이 여러 API 호출 간에 분할되는 경우 행에 대한 업데이트는 원자적이지 않습니다.

템플릿 실행

콘솔gcloudAPI
  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the Parquet Files on Cloud Storage to Cloud Bigtable template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/GCS_Parquet_to_Cloud_Bigtable \
    --region REGION_NAME \
    --parameters \
bigtableProjectId=BIGTABLE_PROJECT_ID,\
bigtableInstanceId=INSTANCE_ID,\
bigtableTableId=TABLE_ID,\
inputFilePattern=INPUT_FILE_PATTERN

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • BIGTABLE_PROJECT_ID: 데이터를 읽으려는 Bigtable 인스턴스의 Google Cloud 프로젝트 ID입니다.
  • INSTANCE_ID: 테이블을 포함하는 Bigtable 인스턴스의 ID입니다.
  • TABLE_ID: 내보낼 Bigtable 테이블의 ID입니다.
  • INPUT_FILE_PATTERN: 데이터가 있는 Cloud Storage 경로 패턴입니다(예: gs://mybucket/somefolder/prefix*).

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/GCS_Parquet_to_Cloud_Bigtable
{
   "jobName": "JOB_NAME",
   "parameters": {
       "bigtableProjectId": "BIGTABLE_PROJECT_ID",
       "bigtableInstanceId": "INSTANCE_ID",
       "bigtableTableId": "TABLE_ID",
       "inputFilePattern": "INPUT_FILE_PATTERN",
   },
   "environment": { "zone": "us-central1-f" }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • BIGTABLE_PROJECT_ID: 데이터를 읽으려는 Bigtable 인스턴스의 Google Cloud 프로젝트 ID입니다.
  • INSTANCE_ID: 테이블을 포함하는 Bigtable 인스턴스의 ID입니다.
  • TABLE_ID: 내보낼 Bigtable 테이블의 ID입니다.
  • INPUT_FILE_PATTERN: 데이터가 있는 Cloud Storage 경로 패턴입니다(예: gs://mybucket/somefolder/prefix*).
Java
/*
 * Copyright (C) 2019 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.bigtable;

import static com.google.cloud.teleport.bigtable.AvroToBigtable.toByteString;

import com.google.bigtable.v2.Mutation;
import com.google.cloud.teleport.bigtable.ParquetToBigtable.Options;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.protobuf.ByteString;
import java.nio.ByteBuffer;
import java.util.List;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.beam.runners.direct.DirectRunner;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
import org.apache.beam.sdk.io.parquet.ParquetIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * The {@link ParquetToBigtable} pipeline imports data from Parquet files in GCS to a Cloud Bigtable
 * table. The Cloud Bigtable table must be created before running the pipeline and must have a
 * compatible table schema. For example, if {@link BigtableCell} from the Parquet files has a
 * 'family' of "f1", the Bigtable table should have a column family of "f1".
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_GCS_Parquet_to_Cloud_Bigtable.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "GCS_Parquet_to_Cloud_Bigtable",
    category = TemplateCategory.BATCH,
    displayName = "Parquet Files on Cloud Storage to Cloud Bigtable",
    description =
        "The Cloud Storage Parquet to Bigtable template is a pipeline that reads data from Parquet files in a Cloud Storage bucket and writes the data to a Bigtable table. "
            + "You can use the template to copy data from Cloud Storage to Bigtable.",
    optionsClass = Options.class,
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/parquet-to-bigtable",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The Bigtable table must exist and have the same column families as exported in the Parquet files.",
      "The input Parquet files must exist in a Cloud Storage bucket before running the pipeline.",
      "Bigtable expects a specific <a href=\"https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/src/main/resources/schema/avro/bigtable.avsc\">schema</a> from the input Parquet files."
    })
public class ParquetToBigtable {
  private static final Logger LOG = LoggerFactory.getLogger(ParquetToBigtable.class);

  /** Maximum number of mutations allowed per row by Cloud bigtable. */
  private static final int MAX_MUTATIONS_PER_ROW = 100000;

  private static final Boolean DEFAULT_SPLIT_LARGE_ROWS = false;

  /** Options for the import pipeline. */
  public interface Options extends PipelineOptions {
    @TemplateParameter.ProjectId(
        order = 1,
        groupName = "Target",
        description = "Project ID",
        helpText = "The Google Cloud project ID associated with the Bigtable instance.")
    ValueProvider<String> getBigtableProjectId();

    @SuppressWarnings("unused")
    void setBigtableProjectId(ValueProvider<String> projectId);

    @TemplateParameter.Text(
        order = 2,
        groupName = "Target",
        regexes = {"[a-z][a-z0-9\\-]+[a-z0-9]"},
        description = "Instance ID",
        helpText = "The ID of the Cloud Bigtable instance that contains the table")
    ValueProvider<String> getBigtableInstanceId();

    @SuppressWarnings("unused")
    void setBigtableInstanceId(ValueProvider<String> instanceId);

    @TemplateParameter.Text(
        order = 3,
        groupName = "Target",
        regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"},
        description = "Table ID",
        helpText = "The ID of the Bigtable table to import.")
    ValueProvider<String> getBigtableTableId();

    @SuppressWarnings("unused")
    void setBigtableTableId(ValueProvider<String> tableId);

    @TemplateParameter.GcsReadFile(
        order = 4,
        groupName = "Source",
        description = "Input Cloud Storage File(s)",
        helpText = "The Cloud Storage path with the files that contain the data.",
        example = "gs://your-bucket/your-files/*.parquet")
    ValueProvider<String> getInputFilePattern();

    @SuppressWarnings("unused")
    void setInputFilePattern(ValueProvider<String> inputFilePattern);

    @TemplateParameter.Boolean(
        order = 5,
        groupName = "Target",
        optional = true,
        description = "If true, large rows will be split into multiple MutateRows requests",
        helpText =
            "The flag for enabling splitting of large rows into multiple MutateRows requests. Note that when a large row is split between multiple API calls, the updates to the row are not atomic.")
    ValueProvider<Boolean> getSplitLargeRows();

    void setSplitLargeRows(ValueProvider<Boolean> splitLargeRows);
  }

  /**
   * Runs a pipeline to import Parquet files in GCS to a Cloud Bigtable table.
   *
   * @param args arguments to the pipeline
   */
  public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);

    PipelineResult result = run(options);
  }

  public static PipelineResult run(Options options) {
    Pipeline pipeline = Pipeline.create(PipelineUtils.tweakPipelineOptions(options));

    BigtableIO.Write write =
        BigtableIO.write()
            .withProjectId(options.getBigtableProjectId())
            .withInstanceId(options.getBigtableInstanceId())
            .withTableId(options.getBigtableTableId());

    /**
     * Steps: 1) Read records from Parquet File. 2) Convert a GenericRecord to a
     * KV<ByteString,Iterable<Mutation>>. 3) Write KV to Bigtable's table.
     */
    pipeline
        .apply(
            "Read from Parquet",
            ParquetIO.read(BigtableRow.getClassSchema()).from(options.getInputFilePattern()))
        .apply(
            "Transform to Bigtable",
            ParDo.of(
                ParquetToBigtableFn.createWithSplitLargeRows(
                    options.getSplitLargeRows(), MAX_MUTATIONS_PER_ROW)))
        .apply("Write to Bigtable", write);

    return pipeline.run();
  }

  static class ParquetToBigtableFn extends DoFn<GenericRecord, KV<ByteString, Iterable<Mutation>>> {

    private final ValueProvider<Boolean> splitLargeRowsFlag;
    private Boolean splitLargeRows;
    private final int maxMutationsPerRow;

    public static ParquetToBigtableFn create() {
      return new ParquetToBigtableFn(StaticValueProvider.of(false), MAX_MUTATIONS_PER_ROW);
    }

    public static ParquetToBigtableFn createWithSplitLargeRows(
        ValueProvider<Boolean> splitLargeRowsFlag, int maxMutationsPerRequest) {
      return new ParquetToBigtableFn(splitLargeRowsFlag, maxMutationsPerRequest);
    }

    @Setup
    public void setup() {
      if (splitLargeRowsFlag != null) {
        splitLargeRows = splitLargeRowsFlag.get();
      }
      splitLargeRows = MoreObjects.firstNonNull(splitLargeRows, DEFAULT_SPLIT_LARGE_ROWS);
      LOG.info("splitLargeRows set to: " + splitLargeRows);
    }

    private ParquetToBigtableFn(
        ValueProvider<Boolean> splitLargeRowsFlag, int maxMutationsPerRequest) {
      this.splitLargeRowsFlag = splitLargeRowsFlag;
      this.maxMutationsPerRow = maxMutationsPerRequest;
    }

    @ProcessElement
    public void processElement(ProcessContext ctx) {
      Class runner = ctx.getPipelineOptions().getRunner();
      ByteString key = toByteString((ByteBuffer) ctx.element().get(0));

      // BulkMutation doesn't split rows. Currently, if a single row contains more than 100,000
      // mutations, the service will fail the request.
      ImmutableList.Builder<Mutation> mutations = ImmutableList.builder();
      List<Object> cells = (List) ctx.element().get(1);
      int cellsProcessed = 0;
      for (Object element : cells) {
        Mutation.SetCell setCell = null;
        if (runner.isAssignableFrom(DirectRunner.class)) {
          setCell =
              Mutation.SetCell.newBuilder()
                  .setFamilyName(((GenericData.Record) element).get(0).toString())
                  .setColumnQualifier(
                      toByteString((ByteBuffer) ((GenericData.Record) element).get(1)))
                  .setTimestampMicros((Long) ((GenericData.Record) element).get(2))
                  .setValue(toByteString((ByteBuffer) ((GenericData.Record) element).get(3)))
                  .build();
        } else {
          BigtableCell bigtableCell = (BigtableCell) element;
          setCell =
              Mutation.SetCell.newBuilder()
                  .setFamilyName(bigtableCell.getFamily().toString())
                  .setColumnQualifier(toByteString(bigtableCell.getQualifier()))
                  .setTimestampMicros(bigtableCell.getTimestamp())
                  .setValue(toByteString(bigtableCell.getValue()))
                  .build();
        }
        mutations.add(Mutation.newBuilder().setSetCell(setCell).build());
        cellsProcessed++;

        if (this.splitLargeRows && cellsProcessed % maxMutationsPerRow == 0) {
          // Send a MutateRow request when we have accumulated max mutations per row.
          ctx.output(KV.of(key, mutations.build()));
          mutations = ImmutableList.builder();
        }
      }

      // Flush any remaining mutations.
      ImmutableList remainingMutations = mutations.build();
      if (!remainingMutations.isEmpty()) {
        ctx.output(KV.of(key, remainingMutations));
      }
    }
  }
}

다음 단계