BigQuery to Cloud Storage TFRecords 템플릿

BigQuery to Cloud Storage TFRecords 템플릿은 BigQuery 쿼리에서 데이터를 읽어 TFRecord 형식으로 Cloud Storage 버킷에 데이터를 쓰는 파이프라인입니다. 학습, 테스트, 검증 분할 비율을 지정할 수 있습니다. 기본적으로 학습 세트의 경우 1 또는 100%, 테스트 및 검증 세트의 경우 0 또는 0%입니다. 데이터 세트 분할을 설정할 때 학습, 테스트, 검증의 합계가 1 또는 100%가 되어야 합니다(예: 0.6+0.2+0.2). Dataflow는 각 출력 데이터 세트에 대한 최적 분할 수를 자동으로 결정합니다.

파이프라인 요구사항

  • BigQuery 데이터 세트와 테이블이 있어야 합니다.
  • 파이프라인을 실행하기 전에 출력 Cloud Storage 버킷이 있어야 합니다. 학습, 테스트, 검증 하위 디렉터리는 사전 존재할 필요가 없으며 자동 생성됩니다.

템플릿 매개변수

필수 매개변수

  • readQuery: 소스에서 데이터를 추출하는 BigQuery SQL 쿼리입니다. 예를 들면 select * from dataset1.sample_table입니다.
  • outputDirectory: 학습, 테스트, 검증 TFRecord 파일을 쓸 때 사용할 최상위 Cloud Storage 경로 프리픽스입니다. 학습, 테스트, 검증 결과 TFRecord 파일의 하위 디렉터리는 outputDirectory에서 자동으로 생성됩니다. 예를 들면 gs://mybucket/output입니다.

선택적 매개변수

  • readIdColumn: 행의 고유 식별자를 저장하는 BigQuery 열의 이름입니다.
  • invalidOutputPath: 대상 항목으로 변환할 수 없는 BigQuery 행을 쓸 Cloud Storage 경로입니다. 예를 들면 gs://your-bucket/your-path입니다.
  • outputSuffix: 작성된 학습, 테스트, 검증 TFRecord 파일의 파일 서픽스입니다. 기본값은 .tfrecord입니다.
  • trainingPercentage: 학습 TFRecord 파일에 할당된 쿼리 데이터의 비율입니다. 기본값은 1 또는 100%입니다.
  • testingPercentage: 테스트 TFRecord 파일에 할당된 쿼리 데이터의 비율입니다. 기본값은 0 또는 0%입니다.
  • validationPercentage: 검증 TFRecord 파일에 할당된 쿼리 데이터의 비율입니다. 기본값은 0 또는 0%입니다.

템플릿 실행

콘솔gcloudAPI
  1. Dataflow 템플릿에서 작업 만들기 페이지로 이동합니다.
  2. 템플릿에서 작업 만들기로 이동
  3. 작업 이름 필드에 고유한 작업 이름을 입력합니다.
  4. (선택사항): 리전 엔드포인트의 드롭다운 메뉴에서 값을 선택합니다. 기본 리전은 us-central1입니다.

    Dataflow 작업을 실행할 수 있는 리전 목록은 Dataflow 위치를 참조하세요.

  5. Dataflow 템플릿 드롭다운 메뉴에서 the BigQuery to TFRecords template을 선택합니다.
  6. 제공된 매개변수 필드에 매개변수 값을 입력합니다.
  7. 작업 실행을 클릭합니다.

셸 또는 터미널에서 템플릿을 실행합니다.

gcloud dataflow jobs run JOB_NAME \
    --gcs-location gs://dataflow-templates-REGION_NAME/VERSION/Cloud_BigQuery_to_GCS_TensorFlow_Records \
    --region REGION_NAME \
    --parameters \
readQuery=READ_QUERY,\
outputDirectory=OUTPUT_DIRECTORY,\
trainingPercentage=TRAINING_PERCENTAGE,\
testingPercentage=TESTING_PERCENTAGE,\
validationPercentage=VALIDATION_PERCENTAGE,\
outputSuffix=OUTPUT_FILENAME_SUFFIX

다음을 바꿉니다.

  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • REGION_NAME: Dataflow 작업을 배포할 리전(예: us-central1)
  • READ_QUERY: 실행할 BigQuery 쿼리
  • OUTPUT_DIRECTORY: 출력 데이터 세트의 Cloud Storage 경로 프리픽스
  • TRAINING_PERCENTAGE: 학습 데이터 세트의 십진수 비율 분할
  • TESTING_PERCENTAGE: 테스트 데이터 세트의 십진수 비율 분할
  • VALIDATION_PERCENTAGE: 검증 데이터 세트의 십진수 비율 분할
  • OUTPUT_FILENAME_SUFFIX: 선호하는 출력 TensorFlow 레코드 파일 서픽스

REST API를 사용하여 템플릿을 실행하려면 HTTP POST 요청을 전송합니다. API 및 승인 범위에 대한 자세한 내용은 projects.templates.launch를 참조하세요.

POST https://dataflow.googleapis.com/v1b3/projects/PROJECT_ID/locations/LOCATION/templates:launch?gcsPath=gs://dataflow-templates-LOCATION/VERSION/Cloud_BigQuery_to_GCS_TensorFlow_Records
{
   "jobName": "JOB_NAME",
   "parameters": {
       "readQuery":"READ_QUERY",
       "outputDirectory":"OUTPUT_DIRECTORY",
       "trainingPercentage":"TRAINING_PERCENTAGE",
       "testingPercentage":"TESTING_PERCENTAGE",
       "validationPercentage":"VALIDATION_PERCENTAGE",
       "outputSuffix":"OUTPUT_FILENAME_SUFFIX"
   },
   "environment": { "zone": "us-central1-f" }
}

다음을 바꿉니다.

  • PROJECT_ID: Dataflow 작업을 실행하려는 Google Cloud 프로젝트 ID
  • JOB_NAME: 선택한 고유한 작업 이름
  • VERSION: 사용할 템플릿 버전

    다음 값을 사용할 수 있습니다.

  • LOCATION: Dataflow 작업을 배포할 리전(예: us-central1)
  • READ_QUERY: 실행할 BigQuery 쿼리
  • OUTPUT_DIRECTORY: 출력 데이터 세트의 Cloud Storage 경로 프리픽스
  • TRAINING_PERCENTAGE: 학습 데이터 세트의 십진수 비율 분할
  • TESTING_PERCENTAGE: 테스트 데이터 세트의 십진수 비율 분할
  • VALIDATION_PERCENTAGE: 검증 데이터 세트의 십진수 비율 분할
  • OUTPUT_FILENAME_SUFFIX: 선호하는 출력 TensorFlow 레코드 파일 서픽스
Java
/*
 * Copyright (C) 2019 Google LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not
 * use this file except in compliance with the License. You may obtain a copy of
 * the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 * License for the specific language governing permissions and limitations under
 * the License.
 */
package com.google.cloud.teleport.templates;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.cloud.teleport.metadata.Template;
import com.google.cloud.teleport.metadata.TemplateCategory;
import com.google.cloud.teleport.metadata.TemplateParameter;
import com.google.cloud.teleport.templates.BigQueryToTFRecord.Options;
import com.google.cloud.teleport.templates.common.BigQueryConverters.BigQueryReadOptions;
import com.google.protobuf.ByteString;
import java.util.Iterator;
import java.util.Random;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.io.FileIO;
import org.apache.beam.sdk.io.TFRecordIO;
import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
import org.apache.beam.sdk.io.gcp.bigquery.SchemaAndRecord;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Partition;
import org.apache.beam.sdk.transforms.Reshuffle;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PCollectionList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.tensorflow.example.Example;
import org.tensorflow.example.Feature;
import org.tensorflow.example.Features;

/**
 * Dataflow template which reads BigQuery data and writes it to GCS as a set of TFRecords. The
 * source is a SQL query.
 *
 * <p>Check out <a
 * href="https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v1/README_Cloud_BigQuery_to_GCS_TensorFlow_Records.md">README</a>
 * for instructions on how to use or modify this template.
 */
@Template(
    name = "Cloud_BigQuery_to_GCS_TensorFlow_Records",
    category = TemplateCategory.BATCH,
    displayName = "BigQuery to TensorFlow Records",
    description =
        "The BigQuery to Cloud Storage TFRecords template is a pipeline that reads data from a BigQuery query and writes it to a Cloud Storage bucket in TFRecord format. "
            + "You can specify the training, testing, and validation percentage splits. "
            + "By default, the split is 1 or 100% for the training set and 0 or 0% for testing and validation sets. "
            + "When setting the dataset split, the sum of training, testing, and validation needs to add up to 1 or 100% (for example, 0.6+0.2+0.2). "
            + "Dataflow automatically determines the optimal number of shards for each output dataset.",
    optionsClass = Options.class,
    optionsOrder = {BigQueryReadOptions.class, Options.class},
    documentation =
        "https://cloud.google.com/dataflow/docs/guides/templates/provided/bigquery-to-tfrecords",
    contactInformation = "https://cloud.google.com/support",
    requirements = {
      "The BigQuery dataset and table must exist.",
      "The output Cloud Storage bucket must exist before pipeline execution. Training, testing, and validation subdirectories don't need to preexist and are autogenerated."
    })
public class BigQueryToTFRecord {

  /**
   * The {@link BigQueryToTFRecord#buildFeatureFromIterator(Class, Object, Feature.Builder)} method
   * handles {@link GenericData.Array} that are passed into the {@link
   * BigQueryToTFRecord#buildFeature} method creating a TensorFlow feature from the record.
   */
  private static final String TRAIN = "train/";

  private static final String TEST = "test/";
  private static final String VAL = "val/";

  private static void buildFeatureFromIterator(
      Class<?> fieldType, Object field, Feature.Builder feature) {
    ByteString byteString;
    GenericData.Array f = (GenericData.Array) field;
    if (fieldType == Long.class) {
      Iterator<Long> longIterator = f.iterator();
      while (longIterator.hasNext()) {
        Long longValue = longIterator.next();
        feature.getInt64ListBuilder().addValue(longValue);
      }
    } else if (fieldType == double.class) {
      Iterator<Double> doubleIterator = f.iterator();
      while (doubleIterator.hasNext()) {
        double doubleValue = doubleIterator.next();
        feature.getFloatListBuilder().addValue((float) doubleValue);
      }
    } else if (fieldType == String.class) {
      Iterator<Utf8> stringIterator = f.iterator();
      while (stringIterator.hasNext()) {
        String stringValue = stringIterator.next().toString();
        byteString = ByteString.copyFromUtf8(stringValue);
        feature.getBytesListBuilder().addValue(byteString);
      }
    } else if (fieldType == boolean.class) {
      Iterator<Boolean> booleanIterator = f.iterator();
      while (booleanIterator.hasNext()) {
        Boolean boolValue = booleanIterator.next();
        int boolAsInt = boolValue ? 1 : 0;
        feature.getInt64ListBuilder().addValue(boolAsInt);
      }
    }
  }

  /**
   * The {@link BigQueryToTFRecord#buildFeature} method takes in an individual field and type
   * corresponding to a column value from a SchemaAndRecord Object returned from a BigQueryIO.read()
   * step. The method builds a TensorFlow Feature based on the type of the object- ie: STRING, TIME,
   * INTEGER etc..
   */
  private static Feature buildFeature(Object field, String type) {
    Feature.Builder feature = Feature.newBuilder();
    ByteString byteString;

    switch (type) {
      case "STRING":
      case "TIME":
      case "DATE":
        if (field instanceof GenericData.Array) {
          buildFeatureFromIterator(String.class, field, feature);
        } else {
          byteString = ByteString.copyFromUtf8(field.toString());
          feature.getBytesListBuilder().addValue(byteString);
        }
        break;
      case "BYTES":
        byteString = ByteString.copyFrom((byte[]) field);
        feature.getBytesListBuilder().addValue(byteString);
        break;
      case "INTEGER":
      case "INT64":
      case "TIMESTAMP":
        if (field instanceof GenericData.Array) {
          buildFeatureFromIterator(Long.class, field, feature);
        } else {
          feature.getInt64ListBuilder().addValue((long) field);
        }
        break;
      case "FLOAT":
      case "FLOAT64":
        if (field instanceof GenericData.Array) {
          buildFeatureFromIterator(double.class, field, feature);
        } else {
          feature.getFloatListBuilder().addValue((float) (double) field);
        }
        break;
      case "BOOLEAN":
      case "BOOL":
        if (field instanceof GenericData.Array) {
          buildFeatureFromIterator(boolean.class, field, feature);
        } else {
          int boolAsInt = (boolean) field ? 1 : 0;
          feature.getInt64ListBuilder().addValue(boolAsInt);
        }
        break;
      default:
        throw new RuntimeException("Unsupported type: " + type);
    }
    return feature.build();
  }

  /**
   * The {@link BigQueryToTFRecord#record2Example(SchemaAndRecord)} method uses takes in a
   * SchemaAndRecord Object returned from a BigQueryIO.read() step and builds a TensorFlow Example
   * from the record.
   */
  @VisibleForTesting
  protected static byte[] record2Example(SchemaAndRecord schemaAndRecord) {
    Example.Builder example = Example.newBuilder();
    Features.Builder features = example.getFeaturesBuilder();
    GenericRecord record = schemaAndRecord.getRecord();
    for (TableFieldSchema field : schemaAndRecord.getTableSchema().getFields()) {
      Object fieldValue = record.get(field.getName());
      if (fieldValue != null) {
        Feature feature = buildFeature(fieldValue, field.getType());
        features.putFeature(field.getName(), feature);
      }
    }
    return example.build().toByteArray();
  }

  /**
   * The {@link BigQueryToTFRecord#concatURI} method uses takes in a Cloud Storage URI and a
   * subdirectory name and safely concatenates them. The resulting String is used as a sink for
   * TFRecords.
   */
  private static String concatURI(String dir, String folder) {
    if (dir.endsWith("/")) {
      return dir + folder;
    } else {
      return dir + "/" + folder;
    }
  }

  /**
   * The {@link BigQueryToTFRecord#applyTrainTestValSplit} method transforms the PCollection by
   * randomly partitioning it into PCollections for each dataset.
   */
  static PCollectionList<byte[]> applyTrainTestValSplit(
      PCollection<byte[]> input,
      ValueProvider<Float> trainingPercentage,
      ValueProvider<Float> testingPercentage,
      ValueProvider<Float> validationPercentage,
      Random rand) {
    return input.apply(
        Partition.of(
            3,
            (Partition.PartitionFn<byte[]>)
                (number, numPartitions) -> {
                  Float train = trainingPercentage.get();
                  Float test = testingPercentage.get();
                  Float validation = validationPercentage.get();
                  Double d = rand.nextDouble();
                  if (train + test + validation != 1) {
                    throw new RuntimeException(
                        String.format(
                            "Train %.2f, Test %.2f, Validation"
                                + " %.2f percentages must add up to 100 percent",
                            train, test, validation));
                  }
                  if (d < train) {
                    return 0;
                  } else if (d >= train && d < train + test) {
                    return 1;
                  } else {
                    return 2;
                  }
                }));
  }

  /** Run the pipeline. */
  public static void main(String[] args) {
    Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
    run(options);
  }

  /**
   * Runs the pipeline to completion with the specified options. This method does not wait until the
   * pipeline is finished before returning. Invoke {@code result.waitUntilFinish()} on the result
   * object to block until the pipeline is finished running if blocking programmatic execution is
   * required.
   *
   * @param options The execution options.
   * @return The pipeline result.
   */
  public static PipelineResult run(Options options) {
    Random rand = new Random(100); // set random seed
    Pipeline pipeline = Pipeline.create(options);

    PCollection<byte[]> bigQueryToExamples =
        pipeline
            .apply(
                "RecordToExample",
                BigQueryIO.read(BigQueryToTFRecord::record2Example)
                    .fromQuery(options.getReadQuery())
                    .withCoder(ByteArrayCoder.of())
                    .withTemplateCompatibility()
                    .withoutValidation()
                    .usingStandardSql()
                    .withMethod(BigQueryIO.TypedRead.Method.DIRECT_READ)
                // Enable BigQuery Storage API
                )
            .apply("ReshuffleResults", Reshuffle.viaRandomKey());

    PCollectionList<byte[]> partitionedExamples =
        applyTrainTestValSplit(
            bigQueryToExamples,
            options.getTrainingPercentage(),
            options.getTestingPercentage(),
            options.getValidationPercentage(),
            rand);

    partitionedExamples
        .get(0)
        .apply(
            "WriteTFTrainingRecord",
            FileIO.<byte[]>write()
                .via(TFRecordIO.sink())
                .to(
                    ValueProvider.NestedValueProvider.of(
                        options.getOutputDirectory(), dir -> concatURI(dir, TRAIN)))
                .withNumShards(0)
                .withSuffix(options.getOutputSuffix()));

    partitionedExamples
        .get(1)
        .apply(
            "WriteTFTestingRecord",
            FileIO.<byte[]>write()
                .via(TFRecordIO.sink())
                .to(
                    ValueProvider.NestedValueProvider.of(
                        options.getOutputDirectory(), dir -> concatURI(dir, TEST)))
                .withNumShards(0)
                .withSuffix(options.getOutputSuffix()));

    partitionedExamples
        .get(2)
        .apply(
            "WriteTFValidationRecord",
            FileIO.<byte[]>write()
                .via(TFRecordIO.sink())
                .to(
                    ValueProvider.NestedValueProvider.of(
                        options.getOutputDirectory(), dir -> concatURI(dir, VAL)))
                .withNumShards(0)
                .withSuffix(options.getOutputSuffix()));

    return pipeline.run();
  }

  /** Define command line arguments. */
  public interface Options extends BigQueryReadOptions {

    @TemplateParameter.GcsWriteFolder(
        order = 1,
        groupName = "Target",
        description = "Output Cloud Storage directory.",
        helpText =
            "The top-level Cloud Storage path prefix to use when writing the training, testing, and validation TFRecord files. Subdirectories for resulting training, testing, and validation TFRecord files are automatically generated from `outputDirectory`.",
        example = "gs://mybucket/output")
    ValueProvider<String> getOutputDirectory();

    void setOutputDirectory(ValueProvider<String> outputDirectory);

    @TemplateParameter.Text(
        order = 2,
        groupName = "Target",
        optional = true,
        regexes = {"^[A-Za-z_0-9.]*"},
        description = "The output suffix for TFRecord files",
        helpText =
            "The file suffix for the training, testing, and validation TFRecord files that are written. The default value is `.tfrecord`.")
    @Default.String(".tfrecord")
    ValueProvider<String> getOutputSuffix();

    void setOutputSuffix(ValueProvider<String> outputSuffix);

    @TemplateParameter.Float(
        order = 3,
        optional = true,
        description = "Percentage of data to be in the training set ",
        helpText =
            "The percentage of query data allocated to training TFRecord files. The default value is `1`, or `100%`.")
    @Default.Float(1)
    ValueProvider<Float> getTrainingPercentage();

    void setTrainingPercentage(ValueProvider<Float> trainingPercentage);

    @TemplateParameter.Float(
        order = 4,
        optional = true,
        description = "Percentage of data to be in the testing set ",
        helpText =
            "The percentage of query data allocated to testing TFRecord files. The default value is `0`, or `0%`.")
    @Default.Float(0)
    ValueProvider<Float> getTestingPercentage();

    void setTestingPercentage(ValueProvider<Float> testingPercentage);

    @TemplateParameter.Float(
        order = 5,
        optional = true,
        description = "Percentage of data to be in the validation set ",
        helpText =
            "The percentage of query data allocated to validation TFRecord files. The default value is `0`, or `0%`.")
    @Default.Float(0)
    ValueProvider<Float> getValidationPercentage();

    void setValidationPercentage(ValueProvider<Float> validationPercentage);
  }
}

다음 단계