BigQuery 커넥터로 맵리듀스 작업 작성

Hadoop BigQuery 커넥터는 기본적으로 모든 Dataproc 1.0~1.2 클러스터 노드에서 /usr/lib/hadoop/lib/ 아래에 설치됩니다. 이는 Spark 환경과 PySpark 환경 모두에서 사용할 수 있습니다.

Dataproc 이미지 버전 1.5 이상: BigQuery 커넥터는 Dataproc 이미지 버전 1.5 이상에 기본적으로 설치되지 않습니다. 이러한 버전을 사용하려면 다음 단계를 따르세요.

  1. 초기화 작업을 사용하여 BigQuery 커넥터를 설치합니다.

  2. 작업을 제출할 때 jars 매개변수에 BigQuery 커넥터 지정

    --jars=gs://hadoop-lib/bigquery/bigquery-connector-hadoop3-latest.jar
    

  3. 애플리케이션의 jar-with-dependencies에 BigQuery 커넥터 클래스 포함

충돌을 피하려면: 애플리케이션이 Dataproc 클러스터에 배포된 커넥터 버전과 다른 커넥터 버전을 사용하는 경우 다음 중 하나를 수행해야 합니다.

  1. 애플리케이션에서 사용하는 커넥터 버전을 설치하는 초기화 작업으로 새 클러스터를 만듭니다.

  2. 커넥터 버전과 Dataproc 클러스터에 배포된 커넥터 버전 간의 충돌을 방지하기 위해 애플리케이션의 jar에 사용 중인 버전의 커넥터 클래스 및 커넥터 종속 항목을 포함시키고 재배치합니다(Maven에서 종속 항목 재배치의 예 참조).

GsonBigQueryInputFormat 클래스

GsonBigQueryInputFormat은 다음 기본 작업을 통해 Hadoop에 JsonObject 형식의 BigQuery 객체를 제공합니다.

  • 사용자 지정 쿼리를 사용하여 BigQuery 객체 선택
  • 쿼리 결과를 Hadoop 노드에 균등하게 분할
  • Mapper에게 전달할 자바 객체로 분할 항목 파싱. Hadoop Mapper 클래스는 선택된 각 BigQuery 객체의 JsonObject 표현을 수신합니다.

BigQueryInputFormat 클래스는 Hadoop InputFormat 클래스를 확장하여 BigQuery 레코드에 대한 액세스를 제공합니다. BigQueryInputFormat 클래스를 사용하려면 다음 안내를 따르세요.

  1. Hadoop 구성에서 매개 변수를 설정하려면 기본 Hadoop 작업에 행을 추가해야 합니다.

  2. InputFormat 클래스는 GsonBigQueryInputFormat로 설정해야 합니다.

아래 섹션에서는 이러한 요구사항을 충족하는 방법을 보여줍니다.

입력 매개변수

QualifiedInputTableId
읽어올 BigQuery 테이블 형식: optional-projectId:datasetId.tableId
예: publicdata:samples.shakespeare
projectId
모든 입력 작업이 진행되는 BigQuery projectId입니다.
예: my-first-cloud-project
// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);

// Configure input parameters.
BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);

// Set InputFormat.
job.setInputFormatClass(GsonBigQueryInputFormat.class);

참고:

  • job은 실행할 Hadoop 작업인 org.apache.hadoop.mapreduce.Job를 나타냅니다.
  • conf은 Hadoop 작업인 org.apache.hadoop.Configuration를 나타냅니다.

Mapper

GsonBigQueryInputFormat 클래스는 BigQuery에서 읽고 Hadoop Mapper 함수에 BigQuery 객체를 한 번에 하나씩 입력으로 전달합니다. 입력은 다음으로 구성된 쌍의 형식입니다.

  • LongWritable: 레코드 번호
  • JsonObject: Json 형식의 BigQuery 레코드

MapperLongWritableJsonObject pair를 입력으로 허용합니다.

다음은 샘플 WordCount 작업의 Mapper를 보여주는 스니펫입니다.

  // private static final LongWritable ONE = new LongWritable(1);
  // The configuration key used to specify the BigQuery field name
  // ("column name").
  public static final String WORDCOUNT_WORD_FIELDNAME_KEY =
      "mapred.bq.samples.wordcount.word.key";

  // Default value for the configuration entry specified by
  // WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in
  // publicdata:samples.shakespeare or 'repository_name'
  // in publicdata:samples.github_timeline.
  public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word";

  /**
   * The mapper function for WordCount.
   */
  public static class Map
      extends Mapper <LongWritable, JsonObject, Text, LongWritable> {
    private static final LongWritable ONE = new LongWritable(1);
    private Text word = new Text();
    private String wordKey;

    @Override
    public void setup(Context context)
        throws IOException, InterruptedException {
      // Find the runtime-configured key for the field name we're looking for
      // in the map task.
      Configuration conf = context.getConfiguration();
      wordKey = conf.get(WORDCOUNT_WORD_FIELDNAME_KEY,
          WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT);
    }

    @Override
    public void map(LongWritable key, JsonObject value, Context context)
        throws IOException, InterruptedException {
      JsonElement countElement = value.get(wordKey);
      if (countElement != null) {
        String wordInRecord = countElement.getAsString();
        word.set(wordInRecord);
        // Write out the key, value pair (write out a value of 1, which will be
        // added to the total count for this word in the Reducer).
        context.write(word, ONE);
      }
    }
  }

IndirectBigQueryOutputFormat 클래스

IndirectBigQueryOutputFormatJsonObject 값을 BigQuery 테이블에 직접 쓰는 기능을 Hadoop에 제공합니다. 이 클래스는 Hadoop OutputFormat 클래스를 확장하여 BigQuery 레코드에 대한 액세스 권한을 제공합니다. 이를 올바르게 사용하려면 Hadoop 구성에서 여러 매개 변수를 설정해야 하며, OutputFormat 클래스를 IndirectBigQueryOutputFormat으로 설정해야 합니다. 다음은 설정할 매개 변수와 IndirectBigQueryOutputFormat를 올바르게 사용하는 데 필요한 코드 행의 예시입니다.

출력 매개변수

projectId
모든 출력 작업이 진행되는 BigQuery projectId입니다.
: 'my-first-cloud-project'
QualifiedOutputTableId
optional-projectId:datasetId.tableId 형식으로 최종 작업 결과를 기록할 BigQuery 데이터 세트입니다. datasetId는 이미 프로젝트에 있어야 합니다. outputDatasetId_hadoop_temporary 데이터 세트는 임시 결과를 제공하기 위해 BigQuery에서 만들어집니다. 이 데이터세트는 기존 데이터세트와 충돌하지 않아야 합니다.
:
test_output_dataset.wordcount_output
my-first-cloud-project:test_output_dataset.wordcount_output
outputTableFieldSchema
BigQuery 출력 테이블의 스키마를 정의하는 스키마입니다.
GcsOutputPath
임시 Cloud Storage 데이터(gs://bucket/dir/)를 저장하기 위한 출력 경로
    // Define the schema we will be using for the output BigQuery table.
    List<TableFieldSchema> outputTableFieldSchema = new ArrayList<TableFieldSchema>();
    outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING"));
    outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER"));
    TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema);

    // Create the job and get its configuration.
    Job job = new Job(parser.getConfiguration(), "wordcount");
    Configuration conf = job.getConfiguration();

    // Set the job-level projectId.
    conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);

    // Configure input.
    BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);

    // Configure output.
    BigQueryOutputConfiguration.configure(
        conf,
        outputQualifiedTableId,
        outputSchema,
        outputGcsPath,
        BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
        TextOutputFormat.class);

    // (Optional) Configure the KMS key used to encrypt the output table.
    BigQueryOutputConfiguration.setKmsKeyName(
        conf,
        "projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1");
);

감소기

IndirectBigQueryOutputFormat 클래스는 BigQuery에 작성합니다. 이 클래스는 키와 JsonObject 값을 입력으로 사용하고 JsonObject 값만 BigQuery에 작성합니다(키는 무시됨). JsonObject에는 Json 형식의 BigQuery 레코드가 포함되어 있어야 합니다. 감소기는 모든 유형의 키(NullWritable샘플 WordCount 작업에 사용됨) 및 JsonObject 값 쌍을 출력해야 합니다. 다음은 샘플 WordCount 작업의 감소기입니다.

  /**
   * Reducer function for WordCount.
   */
  public static class Reduce
      extends Reducer<Text, LongWritable, JsonObject, NullWritable> {

    @Override
    public void reduce(Text key, Iterable<LongWritable> values, Context context)
        throws IOException, InterruptedException {
      // Add up the values to get a total number of occurrences of our word.
      long count = 0;
      for (LongWritable val : values) {
        count = count + val.get();
      }

      JsonObject jsonObject = new JsonObject();
      jsonObject.addProperty("Word", key.toString());
      jsonObject.addProperty("Count", count);
      // Key does not matter.
      context.write(jsonObject, NullWritable.get());
    }
  }

삭제

작업이 완료되면 Cloud Storage 내보내기 경로를 삭제합니다.

job.waitForCompletion(true);
GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());

Google Cloud 콘솔의 BigQuery 출력 테이블에서 단어 수를 볼 수 있습니다.

샘플 WordCount 작업에 대한 전체 코드

아래 코드는 BigQuery의 객체에서 단어 수를 집계하는 간단한 WordCount 작업의 예입니다.

package com.google.cloud.hadoop.io.bigquery.samples;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration;
import com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * Sample program to run the Hadoop Wordcount example over tables in BigQuery.
 */
public class WordCount {

 // The configuration key used to specify the BigQuery field name
  // ("column name").
  public static final String WORDCOUNT_WORD_FIELDNAME_KEY =
      "mapred.bq.samples.wordcount.word.key";

  // Default value for the configuration entry specified by
  // WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in
  // publicdata:samples.shakespeare or 'repository_name'
  // in publicdata:samples.github_timeline.
  public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word";

  // Guava might not be available, so define a null / empty helper:
  private static boolean isStringNullOrEmpty(String toTest) {
    return toTest == null || "".equals(toTest);
  }

  /**
   * The mapper function for WordCount. For input, it consumes a LongWritable
   * and JsonObject as the key and value. These correspond to a row identifier
   * and Json representation of the row's values/columns.
   * For output, it produces Text and a LongWritable as the key and value.
   * These correspond to the word and a count for the number of times it has
   * occurred.
   */

  public static class Map
      extends Mapper <LongWritable, JsonObject, Text, LongWritable> {
    private static final LongWritable ONE = new LongWritable(1);
    private Text word = new Text();
    private String wordKey;

    @Override
    public void setup(Context context)
        throws IOException, InterruptedException {
      // Find the runtime-configured key for the field name we're looking for in
      // the map task.
      Configuration conf = context.getConfiguration();
      wordKey = conf.get(WORDCOUNT_WORD_FIELDNAME_KEY, WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT);
    }

    @Override
    public void map(LongWritable key, JsonObject value, Context context)
        throws IOException, InterruptedException {
      JsonElement countElement = value.get(wordKey);
      if (countElement != null) {
        String wordInRecord = countElement.getAsString();
        word.set(wordInRecord);
        // Write out the key, value pair (write out a value of 1, which will be
        // added to the total count for this word in the Reducer).
        context.write(word, ONE);
      }
    }
  }

  /**
   * Reducer function for WordCount. For input, it consumes the Text and
   * LongWritable that the mapper produced. For output, it produces a JsonObject
   * and NullWritable. The JsonObject represents the data that will be
   * loaded into BigQuery.
   */
  public static class Reduce
      extends Reducer<Text, LongWritable, JsonObject, NullWritable> {

    @Override
    public void reduce(Text key, Iterable<LongWritable> values, Context context)
        throws IOException, InterruptedException {
      // Add up the values to get a total number of occurrences of our word.
      long count = 0;
      for (LongWritable val : values) {
        count = count + val.get();
      }

      JsonObject jsonObject = new JsonObject();
      jsonObject.addProperty("Word", key.toString());
      jsonObject.addProperty("Count", count);
      // Key does not matter.
      context.write(jsonObject, NullWritable.get());
    }
  }

  /**
   * Configures and runs the main Hadoop job. Takes a String[] of 5 parameters:
   * [ProjectId] [QualifiedInputTableId] [InputTableFieldName]
   * [QualifiedOutputTableId] [GcsOutputPath]
   *
   * ProjectId - Project under which to issue the BigQuery
   * operations. Also serves as the default project for table IDs that don't
   * specify a project for the table.
   *
   * QualifiedInputTableId - Input table ID of the form
   * (Optional ProjectId):[DatasetId].[TableId]
   *
   * InputTableFieldName - Name of the field to count in the
   * input table, e.g., 'word' in publicdata:samples.shakespeare or
   * 'repository_name' in publicdata:samples.github_timeline.
   *
   * QualifiedOutputTableId - Input table ID of the form
   * (Optional ProjectId):[DatasetId].[TableId]
   *
   * GcsOutputPath - The output path to store temporary
   * Cloud Storage data, e.g., gs://bucket/dir/
   *
   * @param args a String[] containing ProjectId, QualifiedInputTableId,
   *     InputTableFieldName, QualifiedOutputTableId, and GcsOutputPath.
   * @throws IOException on IO Error.
   * @throws InterruptedException on Interrupt.
   * @throws ClassNotFoundException if not all classes are present.
   */
  public static void main(String[] args)
      throws IOException, InterruptedException, ClassNotFoundException {

    // GenericOptionsParser is a utility to parse command line arguments
    // generic to the Hadoop framework. This example doesn't cover the specifics,
    // but recognizes several standard command line arguments, enabling
    // applications to easily specify a NameNode, a ResourceManager, additional
    // configuration resources, etc.
    GenericOptionsParser parser = new GenericOptionsParser(args);
    args = parser.getRemainingArgs();

    // Make sure we have the right parameters.
    if (args.length != 5) {
      System.out.println(
          "Usage: hadoop jar bigquery_wordcount.jar [ProjectId] [QualifiedInputTableId] "
              + "[InputTableFieldName] [QualifiedOutputTableId] [GcsOutputPath]\n"
              + "    ProjectId - Project under which to issue the BigQuery operations. Also serves "
              + "as the default project for table IDs that don't explicitly specify a project for "
              + "the table.\n"
              + "    QualifiedInputTableId - Input table ID of the form "
              + "(Optional ProjectId):[DatasetId].[TableId]\n"
              + "    InputTableFieldName - Name of the field to count in the input table, e.g., "
              + "'word' in publicdata:samples.shakespeare or 'repository_name' in "
              + "publicdata:samples.github_timeline.\n"
              + "    QualifiedOutputTableId - Input table ID of the form "
              + "(Optional ProjectId):[DatasetId].[TableId]\n"
              + "    GcsOutputPath - The output path to store temporary Cloud Storage data, e.g., "
              + "gs://bucket/dir/");
      System.exit(1);
    }

    // Get the individual parameters from the command line.
    String projectId = args[0];
    String inputQualifiedTableId = args[1];
    String inputTableFieldId = args[2];
    String outputQualifiedTableId = args[3];
    String outputGcsPath = args[4];

   // Define the schema we will be using for the output BigQuery table.
    List<TableFieldSchema> outputTableFieldSchema = new ArrayList<TableFieldSchema>();
    outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING"));
    outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER"));
    TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema);

    // Create the job and get its configuration.
    Job job = new Job(parser.getConfiguration(), "wordcount");
    Configuration conf = job.getConfiguration();

    // Set the job-level projectId.
    conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);

    // Configure input.
    BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);

    // Configure output.
    BigQueryOutputConfiguration.configure(
        conf,
        outputQualifiedTableId,
        outputSchema,
        outputGcsPath,
        BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
        TextOutputFormat.class);

    // (Optional) Configure the KMS key used to encrypt the output table.
    BigQueryOutputConfiguration.setKmsKeyName(
        conf,
        "projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1");

    conf.set(WORDCOUNT_WORD_FIELDNAME_KEY, inputTableFieldId);

    // This helps Hadoop identify the Jar which contains the mapper and reducer
    // by specifying a class in that Jar. This is required if the jar is being
    // passed on the command line to Hadoop.
    job.setJarByClass(WordCount.class);

    // Tell the job what data the mapper will output.
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setInputFormatClass(GsonBigQueryInputFormat.class);

    // Instead of using BigQueryOutputFormat, we use the newer
    // IndirectBigQueryOutputFormat, which works by first buffering all the data
    // into a Cloud Storage temporary file, and then on commitJob, copies all data from
    // Cloud Storage into BigQuery in one operation. Its use is recommended for large jobs
    // since it only requires one BigQuery "load" job per Hadoop/Spark job, as
    // compared to BigQueryOutputFormat, which performs one BigQuery job for each
    // Hadoop/Spark task.
    job.setOutputFormatClass(IndirectBigQueryOutputFormat.class);

    job.waitForCompletion(true);

    // After the job completes, clean up the Cloud Storage export paths.
    GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());

    // You can view word counts in the BigQuery output table at
    // https://console.cloud.google.com/.
  }
}

자바 버전

BigQuery 커넥터는 자바 8이 필요합니다.

Apache Maven 종속 항목 정보

<dependency>
    <groupId>com.google.cloud.bigdataoss</groupId>
    <artifactId>bigquery-connector</artifactId>
    <version>insert "hadoopX-X.X.X" connector version number here</version>
</dependency>

자세한 내용은 BigQuery 커넥터 출시 노트Javadoc 참조를 확인하세요.