BigQuery コネクタを使用した MapReduce ジョブの書き込み

Hadoop BigQuery コネクタは、デフォルトで /usr/lib/hadoop/lib/ にあるすべての Dataproc 1.0~1.2 クラスタノードにインストールされます。Spark と PySpark の両方の環境で利用できます。

Dataproc のイメージ バージョン 1.5 以降: Dataproc のイメージ バージョン 1.5 以降では、BigQuery コネクタはデフォルトでインストールされません。これらのバージョンで使用するには、次の手順に従います。

  1. 初期化アクションを使用して BigQuery コネクタをインストールします。

  2. ジョブを送信するときに、次のようにして jars パラメータに BigQuery コネクタを指定します。

    --jars=gs://hadoop-lib/bigquery/bigquery-connector-hadoop3-latest.jar

  3. アプリケーションの jar-with-dependencies に BigQuery コネクタのクラスを含めます。

競合を回避する場合: Dataproc クラスタでデプロイされたコネクタのバージョンと異なるコネクタのバージョンをアプリケーションで使用する場合には、次のいずれかを行う必要があります。

  1. アプリケーションで使用されるコネクタ バージョンをインストールするための初期化アクションを使用して新しいクラスタを作成する。

  2. 使用しているコネクタのバージョンと Dataproc クラスタにデプロイされているコネクタのバージョンが競合しないように、アプリケーションの jar に使用しているバージョンのコネクタクラスとコネクタの依存関係をインクルードして再配置する(Maven における依存関係再配置の例をご覧ください)。

GsonBigQueryInputFormat クラス

GsonBigQueryInputFormat は、次の主な操作により、Hadoop に BigQuery オブジェクトを JsonObject 形式で提供します。

  • ユーザー指定のクエリを使用して、BigQuery オブジェクトを選択する
  • クエリ結果を Hadoop ノード間で均等に分割する
  • Mapper に渡す Java オブジェクトへの分割をパースする。Hadoop Mapper クラスは、選択した各 Hadoop オブジェクトの JsonObject 表現を受け取ります。

BigQueryInputFormat クラスにより、Hadoop の InputFormat クラスの拡張機能を使用して BigQuery レコードにアクセスできるようになります。BigQueryInputFormat クラスを使用するには、次のことを行う必要があります。

  1. Hadoop 構成でパラメータを設定するには、メインの Hadoop ジョブに数行追加します。

  2. InputFormat クラスを GsonBigQueryInputFormat に設定します。

以降のセクションで、これらの要件を満たす方法を説明します。

入力パラメータ

QualifiedInputTableId
読み取り元の BigQuery テーブル。形式は次のとおりです。optional-projectId:datasetId.tableId
例: publicdata:samples.shakespeare
projectId
BigQuery の projectId。入力オペレーションのすべてがこの ID で発生します。
例: my-first-cloud-project
// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);

// Configure input parameters.
BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);

// Set InputFormat.
job.setInputFormatClass(GsonBigQueryInputFormat.class);

注:

  • joborg.apache.hadoop.mapreduce.Job を指しており、実行する Hadoop ジョブにあたります。
  • conf は Hadoop ジョブの org.apache.hadoop.Configuration を指しています。

マッパー

GsonBigQueryInputFormat クラスは BigQuery から読み取り、Hadoop Mapper 関数への入力として BigQuery オブジェクトを 1 つずつ渡します。入力は、次の要素を含むペアの形式です。

  • LongWritable: レコード番号
  • JsonObject: JSON 形式の BigQuery レコード

Mapper は、LongWritableJsonObject pair を入力として受け取ります。

サンプルの WordCount ジョブの Mapper からのスニペットを次に示します。

  // private static final LongWritable ONE = new LongWritable(1);
  // The configuration key used to specify the BigQuery field name
  // ("column name").
  public static final String WORDCOUNT_WORD_FIELDNAME_KEY =
      "mapred.bq.samples.wordcount.word.key";

  // Default value for the configuration entry specified by
  // WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in
  // publicdata:samples.shakespeare or 'repository_name'
  // in publicdata:samples.github_timeline.
  public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word";

  /**
   * The mapper function for WordCount.
   */
  public static class Map
      extends Mapper <LongWritable, JsonObject, Text, LongWritable> {
    private static final LongWritable ONE = new LongWritable(1);
    private Text word = new Text();
    private String wordKey;

    @Override
    public void setup(Context context)
        throws IOException, InterruptedException {
      // Find the runtime-configured key for the field name we're looking for
      // in the map task.
      Configuration conf = context.getConfiguration();
      wordKey = conf.get(WORDCOUNT_WORD_FIELDNAME_KEY,
          WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT);
    }

    @Override
    public void map(LongWritable key, JsonObject value, Context context)
        throws IOException, InterruptedException {
      JsonElement countElement = value.get(wordKey);
      if (countElement != null) {
        String wordInRecord = countElement.getAsString();
        word.set(wordInRecord);
        // Write out the key, value pair (write out a value of 1, which will be
        // added to the total count for this word in the Reducer).
        context.write(word, ONE);
      }
    }
  }

IndirectBigQueryOutputFormat クラス

IndirectBigQueryOutputFormat により、Hadoop で BigQuery テーブルに JsonObject 値を直接書き込めるようになります。このクラスを使用すると、Hadoop の OutputFormat クラスの拡張機能により BigQuery レコードにアクセスできるようになります。これを正しく使用するには、Hadoop 構成でいくつかのパラメータを設定し、OutputFormat クラスを IndirectBigQueryOutputFormat に設定する必要があります。IndirectBigQueryOutputFormat を正しく使用するために必要な、パラメータの設定値とプログラムの例を下に示します。

出力パラメータ

projectId
BigQuery projectId。すべての出力オペレーションがこの ID で発生します。
例: 「my-first-cloud-project」
QualifiedOutputTableId
最終的なジョブ結果を書き込む BigQuery データセット。形式は optional-projectId:datasetId.tableId です。データセット ID はプロジェクトにすでに存在している必要があります。outputDatasetId_hadoop_temporary データセットは、一時的な結果を得るために BigQuery で作成されます。これが、既存のデータセットと競合していないことを確認してください。
:
test_output_dataset.wordcount_output
my-first-cloud-project:test_output_dataset.wordcount_output
outputTableFieldSchema
出力 BigQuery テーブルのスキーマを定義するスキーマ
GcsOutputPath
一時的な Cloud Storage データ(gs://bucket/dir/)を格納する出力パス
    // Define the schema we will be using for the output BigQuery table.
    List<TableFieldSchema> outputTableFieldSchema = new ArrayList<TableFieldSchema>();
    outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING"));
    outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER"));
    TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema);

    // Create the job and get its configuration.
    Job job = new Job(parser.getConfiguration(), "wordcount");
    Configuration conf = job.getConfiguration();

    // Set the job-level projectId.
    conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);

    // Configure input.
    BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);

    // Configure output.
    BigQueryOutputConfiguration.configure(
        conf,
        outputQualifiedTableId,
        outputSchema,
        outputGcsPath,
        BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
        TextOutputFormat.class);

    // (Optional) Configure the KMS key used to encrypt the output table.
    BigQueryOutputConfiguration.setKmsKeyName(
        conf,
        "projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1");
);

レデューサ

IndirectBigQueryOutputFormat クラスは、BigQuery に書き込みを行います。これは入力としてキーと JsonObject 値を受け取りますが、BigQuery には JsonObject 値だけが書き込まれます(キーは無視されます)。JsonObject は、JSON 形式の BigQuery レコードに含まれている必要があります。また、レデューサは任意の種類のキー(サンプルの WordCount ジョブでは NullWritable を使用)と JsonObject 値のペアを出力する必要があります。サンプルの WordCount ジョブのレデューサを次に示します。

  /**
   * Reducer function for WordCount.
   */
  public static class Reduce
      extends Reducer<Text, LongWritable, JsonObject, NullWritable> {

    @Override
    public void reduce(Text key, Iterable<LongWritable> values, Context context)
        throws IOException, InterruptedException {
      // Add up the values to get a total number of occurrences of our word.
      long count = 0;
      for (LongWritable val : values) {
        count = count + val.get();
      }

      JsonObject jsonObject = new JsonObject();
      jsonObject.addProperty("Word", key.toString());
      jsonObject.addProperty("Count", count);
      // Key does not matter.
      context.write(jsonObject, NullWritable.get());
    }
  }

クリーンアップ

ジョブの完了後に、Cloud Storage のエクスポート パスをクリーンアップします。

job.waitForCompletion(true);
GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());

単語数は、Google Cloud コンソールの BigQuery 出力テーブルで確認できます。

サンプル WordCount ジョブの全コード

下のコードは、BigQuery のオブジェクトからワードカウントを集計する、単純な WordCount ジョブの一例です。

package com.google.cloud.hadoop.io.bigquery.samples;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration;
import com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * Sample program to run the Hadoop Wordcount example over tables in BigQuery.
 */
public class WordCount {

 // The configuration key used to specify the BigQuery field name
  // ("column name").
  public static final String WORDCOUNT_WORD_FIELDNAME_KEY =
      "mapred.bq.samples.wordcount.word.key";

  // Default value for the configuration entry specified by
  // WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in
  // publicdata:samples.shakespeare or 'repository_name'
  // in publicdata:samples.github_timeline.
  public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word";

  // Guava might not be available, so define a null / empty helper:
  private static boolean isStringNullOrEmpty(String toTest) {
    return toTest == null || "".equals(toTest);
  }

  /**
   * The mapper function for WordCount. For input, it consumes a LongWritable
   * and JsonObject as the key and value. These correspond to a row identifier
   * and Json representation of the row's values/columns.
   * For output, it produces Text and a LongWritable as the key and value.
   * These correspond to the word and a count for the number of times it has
   * occurred.
   */

  public static class Map
      extends Mapper <LongWritable, JsonObject, Text, LongWritable> {
    private static final LongWritable ONE = new LongWritable(1);
    private Text word = new Text();
    private String wordKey;

    @Override
    public void setup(Context context)
        throws IOException, InterruptedException {
      // Find the runtime-configured key for the field name we're looking for in
      // the map task.
      Configuration conf = context.getConfiguration();
      wordKey = conf.get(WORDCOUNT_WORD_FIELDNAME_KEY, WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT);
    }

    @Override
    public void map(LongWritable key, JsonObject value, Context context)
        throws IOException, InterruptedException {
      JsonElement countElement = value.get(wordKey);
      if (countElement != null) {
        String wordInRecord = countElement.getAsString();
        word.set(wordInRecord);
        // Write out the key, value pair (write out a value of 1, which will be
        // added to the total count for this word in the Reducer).
        context.write(word, ONE);
      }
    }
  }

  /**
   * Reducer function for WordCount. For input, it consumes the Text and
   * LongWritable that the mapper produced. For output, it produces a JsonObject
   * and NullWritable. The JsonObject represents the data that will be
   * loaded into BigQuery.
   */
  public static class Reduce
      extends Reducer<Text, LongWritable, JsonObject, NullWritable> {

    @Override
    public void reduce(Text key, Iterable<LongWritable> values, Context context)
        throws IOException, InterruptedException {
      // Add up the values to get a total number of occurrences of our word.
      long count = 0;
      for (LongWritable val : values) {
        count = count + val.get();
      }

      JsonObject jsonObject = new JsonObject();
      jsonObject.addProperty("Word", key.toString());
      jsonObject.addProperty("Count", count);
      // Key does not matter.
      context.write(jsonObject, NullWritable.get());
    }
  }

  /**
   * Configures and runs the main Hadoop job. Takes a String[] of 5 parameters:
   * [ProjectId] [QualifiedInputTableId] [InputTableFieldName]
   * [QualifiedOutputTableId] [GcsOutputPath]
   *
   * ProjectId - Project under which to issue the BigQuery
   * operations. Also serves as the default project for table IDs that don't
   * specify a project for the table.
   *
   * QualifiedInputTableId - Input table ID of the form
   * (Optional ProjectId):[DatasetId].[TableId]
   *
   * InputTableFieldName - Name of the field to count in the
   * input table, e.g., 'word' in publicdata:samples.shakespeare or
   * 'repository_name' in publicdata:samples.github_timeline.
   *
   * QualifiedOutputTableId - Input table ID of the form
   * (Optional ProjectId):[DatasetId].[TableId]
   *
   * GcsOutputPath - The output path to store temporary
   * Cloud Storage data, e.g., gs://bucket/dir/
   *
   * @param args a String[] containing ProjectId, QualifiedInputTableId,
   *     InputTableFieldName, QualifiedOutputTableId, and GcsOutputPath.
   * @throws IOException on IO Error.
   * @throws InterruptedException on Interrupt.
   * @throws ClassNotFoundException if not all classes are present.
   */
  public static void main(String[] args)
      throws IOException, InterruptedException, ClassNotFoundException {

    // GenericOptionsParser is a utility to parse command line arguments
    // generic to the Hadoop framework. This example doesn't cover the specifics,
    // but recognizes several standard command line arguments, enabling
    // applications to easily specify a NameNode, a ResourceManager, additional
    // configuration resources, etc.
    GenericOptionsParser parser = new GenericOptionsParser(args);
    args = parser.getRemainingArgs();

    // Make sure we have the right parameters.
    if (args.length != 5) {
      System.out.println(
          "Usage: hadoop jar bigquery_wordcount.jar [ProjectId] [QualifiedInputTableId] "
              + "[InputTableFieldName] [QualifiedOutputTableId] [GcsOutputPath]\n"
              + "    ProjectId - Project under which to issue the BigQuery operations. Also serves "
              + "as the default project for table IDs that don't explicitly specify a project for "
              + "the table.\n"
              + "    QualifiedInputTableId - Input table ID of the form "
              + "(Optional ProjectId):[DatasetId].[TableId]\n"
              + "    InputTableFieldName - Name of the field to count in the input table, e.g., "
              + "'word' in publicdata:samples.shakespeare or 'repository_name' in "
              + "publicdata:samples.github_timeline.\n"
              + "    QualifiedOutputTableId - Input table ID of the form "
              + "(Optional ProjectId):[DatasetId].[TableId]\n"
              + "    GcsOutputPath - The output path to store temporary Cloud Storage data, e.g., "
              + "gs://bucket/dir/");
      System.exit(1);
    }

    // Get the individual parameters from the command line.
    String projectId = args[0];
    String inputQualifiedTableId = args[1];
    String inputTableFieldId = args[2];
    String outputQualifiedTableId = args[3];
    String outputGcsPath = args[4];

   // Define the schema we will be using for the output BigQuery table.
    List<TableFieldSchema> outputTableFieldSchema = new ArrayList<TableFieldSchema>();
    outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING"));
    outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER"));
    TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema);

    // Create the job and get its configuration.
    Job job = new Job(parser.getConfiguration(), "wordcount");
    Configuration conf = job.getConfiguration();

    // Set the job-level projectId.
    conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);

    // Configure input.
    BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);

    // Configure output.
    BigQueryOutputConfiguration.configure(
        conf,
        outputQualifiedTableId,
        outputSchema,
        outputGcsPath,
        BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
        TextOutputFormat.class);

    // (Optional) Configure the KMS key used to encrypt the output table.
    BigQueryOutputConfiguration.setKmsKeyName(
        conf,
        "projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1");

    conf.set(WORDCOUNT_WORD_FIELDNAME_KEY, inputTableFieldId);

    // This helps Hadoop identify the Jar which contains the mapper and reducer
    // by specifying a class in that Jar. This is required if the jar is being
    // passed on the command line to Hadoop.
    job.setJarByClass(WordCount.class);

    // Tell the job what data the mapper will output.
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setInputFormatClass(GsonBigQueryInputFormat.class);

    // Instead of using BigQueryOutputFormat, we use the newer
    // IndirectBigQueryOutputFormat, which works by first buffering all the data
    // into a Cloud Storage temporary file, and then on commitJob, copies all data from
    // Cloud Storage into BigQuery in one operation. Its use is recommended for large jobs
    // since it only requires one BigQuery "load" job per Hadoop/Spark job, as
    // compared to BigQueryOutputFormat, which performs one BigQuery job for each
    // Hadoop/Spark task.
    job.setOutputFormatClass(IndirectBigQueryOutputFormat.class);

    job.waitForCompletion(true);

    // After the job completes, clean up the Cloud Storage export paths.
    GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());

    // You can view word counts in the BigQuery output table at
    // https://console.cloud.google.com/.
  }
}

Java のバージョン

BigQuery コネクタには Java 8 が必要です。

Apache Maven 依存関係についての情報

<dependency>
    <groupId>com.google.cloud.bigdataoss</groupId>
    <artifactId>bigquery-connector</artifactId>
    <version>insert "hadoopX-X.X.X" connector version number here</version>
</dependency>

詳細な情報は、BigQuery コネクタのリリースノートおよび Javadoc リファレンスをご覧ください。