Menulis tugas MapReduce dengan konektor BigQuery

Konektor BigQuery Hadoop diinstal secara default di semua node cluster Dataproc 1.0-1.2 di /usr/lib/hadoop/lib/. Ini tersedia di lingkungan Spark dan PySpark.

Versi image Dataproc 1.5+: Konektor BigQuery tidak diinstal secara default di versi image Dataproc 1.5 dan yang lebih tinggi. Untuk menggunakannya dengan versi ini:

  1. Instal konektor BigQuery menggunakan tindakan inisialisasi ini.

  2. Tentukan konektor BigQuery dalam parameter jars saat mengirim tugas:

    --jars=gs://hadoop-lib/bigquery/bigquery-connector-hadoop3-latest.jar

  3. Sertakan class konektor BigQuery dalam jar dengan dependensi aplikasi.

Untuk Menghindari Konflik: Jika aplikasi Anda menggunakan versi konektor yang berbeda dengan versi konektor yang di-deploy di cluster Dataproc, Anda harus:

  1. Buat cluster baru dengan tindakan inisialisasi yang menginstal versi konektor yang digunakan oleh aplikasi Anda, atau

  2. Sertakan dan pindahkan class konektor dan dependensi konektor untuk versi yang Anda gunakan ke dalam jar aplikasi untuk menghindari konflik antara versi konektor dan versi konektor yang di-deploy di cluster Dataproc (lihat contoh pemindahan dependensi di Maven ini).

Class GsonBigQueryInputFormat

GsonBigQueryInputFormat menyediakan objek BigQuery dalam format JsonObject ke Hadoop melalui operasi utama berikut:

  • Menggunakan kueri yang ditentukan pengguna untuk memilih objek BigQuery
  • Membagi hasil kueri secara merata di antara node Hadoop
  • Mengurai bagian menjadi objek Java untuk diteruskan ke Mapper. Class Hadoop Mapper menerima representasi JsonObject dari setiap objek BigQuery yang dipilih.

Class BigQueryInputFormat memberikan akses ke kumpulan data BigQuery melalui ekstensi class InputFormat Hadoop. Untuk menggunakan class BigQueryInputFormat:

  1. Baris harus ditambahkan ke tugas Hadoop utama untuk menetapkan parameter dalam konfigurasi Hadoop.

  2. Class InputFormat harus ditetapkan ke GsonBigQueryInputFormat.

Bagian di bawah ini menunjukkan cara memenuhi persyaratan ini.

Parameter Input

QualifiedInputTableId
Tabel BigQuery yang akan dibaca, dalam bentuk: optional-projectId:datasetId.tableId
Contoh: publicdata:samples.shakespeare
projectId
projectId BigQuery tempat semua operasi input terjadi.
Contoh: my-first-cloud-project
// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);

// Configure input parameters.
BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);

// Set InputFormat.
job.setInputFormatClass(GsonBigQueryInputFormat.class);

Catatan:

  • job mengacu pada org.apache.hadoop.mapreduce.Job, tugas Hadoop yang akan berjalan.
  • conf mengacu pada org.apache.hadoop.Configuration untuk tugas Hadoop.

Pemetaan

Class GsonBigQueryInputFormat membaca dari BigQuery dan meneruskan objek BigQuery satu per satu sebagai input ke fungsi Mapper Hadoop. Input berbentuk pasangan yang terdiri dari hal berikut:

  • LongWritable, nomor data
  • JsonObject, data BigQuery berformat Json

Mapper menerima LongWritable dan JsonObject pair sebagai input.

Berikut adalah cuplikan dari Mapper untuk tugas contoh WordCount.

  // private static final LongWritable ONE = new LongWritable(1);
  // The configuration key used to specify the BigQuery field name
  // ("column name").
  public static final String WORDCOUNT_WORD_FIELDNAME_KEY =
      "mapred.bq.samples.wordcount.word.key";

  // Default value for the configuration entry specified by
  // WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in
  // publicdata:samples.shakespeare or 'repository_name'
  // in publicdata:samples.github_timeline.
  public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word";

  /**
   * The mapper function for WordCount.
   */
  public static class Map
      extends Mapper <LongWritable, JsonObject, Text, LongWritable> {
    private static final LongWritable ONE = new LongWritable(1);
    private Text word = new Text();
    private String wordKey;

    @Override
    public void setup(Context context)
        throws IOException, InterruptedException {
      // Find the runtime-configured key for the field name we're looking for
      // in the map task.
      Configuration conf = context.getConfiguration();
      wordKey = conf.get(WORDCOUNT_WORD_FIELDNAME_KEY,
          WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT);
    }

    @Override
    public void map(LongWritable key, JsonObject value, Context context)
        throws IOException, InterruptedException {
      JsonElement countElement = value.get(wordKey);
      if (countElement != null) {
        String wordInRecord = countElement.getAsString();
        word.set(wordInRecord);
        // Write out the key, value pair (write out a value of 1, which will be
        // added to the total count for this word in the Reducer).
        context.write(word, ONE);
      }
    }
  }

Class IndirectBigQueryOutputFormat

IndirectBigQueryOutputFormat memberi Hadoop kemampuan untuk menulis nilai JsonObject langsung ke tabel BigQuery. Class ini memberikan akses ke kumpulan data BigQuery melalui ekstensi class OutputFormat Hadoop. Untuk menggunakannya dengan benar, beberapa parameter harus ditetapkan dalam konfigurasi Hadoop, dan class OutputFormat harus ditetapkan ke IndirectBigQueryOutputFormat. Berikut adalah contoh parameter yang akan ditetapkan dan baris kode yang diperlukan untuk menggunakan IndirectBigQueryOutputFormat dengan benar.

Parameter Output

projectId
projectId BigQuery tempat semua operasi output terjadi.
Contoh: "my-first-cloud-project"
QualifiedOutputTableId
Set data BigQuery tempat menulis hasil tugas akhir, dalam bentuk optional-projectId:datasetId.tableId. datasetId seharusnya sudah ada di project Anda. Set data outputDatasetId_hadoop_temporary akan dibuat di BigQuery untuk hasil sementara. Pastikan hal ini tidak bertentangan dengan set data yang ada.
Contoh:
test_output_dataset.wordcount_output
my-first-cloud-project:test_output_dataset.wordcount_output
outputTableFieldSchema
Skema yang menentukan skema untuk tabel BigQuery output
GcsOutputPath
Jalur output untuk menyimpan data Cloud Storage sementara (gs://bucket/dir/)
    // Define the schema we will be using for the output BigQuery table.
    List<TableFieldSchema> outputTableFieldSchema = new ArrayList<TableFieldSchema>();
    outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING"));
    outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER"));
    TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema);

    // Create the job and get its configuration.
    Job job = new Job(parser.getConfiguration(), "wordcount");
    Configuration conf = job.getConfiguration();

    // Set the job-level projectId.
    conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);

    // Configure input.
    BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);

    // Configure output.
    BigQueryOutputConfiguration.configure(
        conf,
        outputQualifiedTableId,
        outputSchema,
        outputGcsPath,
        BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
        TextOutputFormat.class);

    // (Optional) Configure the KMS key used to encrypt the output table.
    BigQueryOutputConfiguration.setKmsKeyName(
        conf,
        "projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1");
);

Pengurang

Class IndirectBigQueryOutputFormat menulis ke BigQuery. Fungsi ini menggunakan kunci dan nilai JsonObject sebagai input dan hanya menulis nilai JsonObject ke BigQuery (kunci diabaikan). JsonObject harus berisi kumpulan data BigQuery berformat JSON. Reducer harus menghasilkan kunci dari jenis apa pun (NullWritable digunakan dalam tugas contoh WordCount) dan pasangan nilai JsonObject. Reducer untuk contoh tugas WordCount ditampilkan di bawah ini.

  /**
   * Reducer function for WordCount.
   */
  public static class Reduce
      extends Reducer<Text, LongWritable, JsonObject, NullWritable> {

    @Override
    public void reduce(Text key, Iterable<LongWritable> values, Context context)
        throws IOException, InterruptedException {
      // Add up the values to get a total number of occurrences of our word.
      long count = 0;
      for (LongWritable val : values) {
        count = count + val.get();
      }

      JsonObject jsonObject = new JsonObject();
      jsonObject.addProperty("Word", key.toString());
      jsonObject.addProperty("Count", count);
      // Key does not matter.
      context.write(jsonObject, NullWritable.get());
    }
  }

Pembersihan

Setelah tugas selesai, bersihkan jalur ekspor Cloud Storage.

job.waitForCompletion(true);
GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());

Anda dapat melihat jumlah kata dalam tabel output BigQuery di konsol Google Cloud.

Kode Lengkap untuk contoh tugas WordCount

Kode di bawah adalah contoh tugas WordCount sederhana yang menggabungkan jumlah kata dari objek di BigQuery.

package com.google.cloud.hadoop.io.bigquery.samples;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration;
import com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * Sample program to run the Hadoop Wordcount example over tables in BigQuery.
 */
public class WordCount {

 // The configuration key used to specify the BigQuery field name
  // ("column name").
  public static final String WORDCOUNT_WORD_FIELDNAME_KEY =
      "mapred.bq.samples.wordcount.word.key";

  // Default value for the configuration entry specified by
  // WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in
  // publicdata:samples.shakespeare or 'repository_name'
  // in publicdata:samples.github_timeline.
  public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word";

  // Guava might not be available, so define a null / empty helper:
  private static boolean isStringNullOrEmpty(String toTest) {
    return toTest == null || "".equals(toTest);
  }

  /**
   * The mapper function for WordCount. For input, it consumes a LongWritable
   * and JsonObject as the key and value. These correspond to a row identifier
   * and Json representation of the row's values/columns.
   * For output, it produces Text and a LongWritable as the key and value.
   * These correspond to the word and a count for the number of times it has
   * occurred.
   */

  public static class Map
      extends Mapper <LongWritable, JsonObject, Text, LongWritable> {
    private static final LongWritable ONE = new LongWritable(1);
    private Text word = new Text();
    private String wordKey;

    @Override
    public void setup(Context context)
        throws IOException, InterruptedException {
      // Find the runtime-configured key for the field name we're looking for in
      // the map task.
      Configuration conf = context.getConfiguration();
      wordKey = conf.get(WORDCOUNT_WORD_FIELDNAME_KEY, WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT);
    }

    @Override
    public void map(LongWritable key, JsonObject value, Context context)
        throws IOException, InterruptedException {
      JsonElement countElement = value.get(wordKey);
      if (countElement != null) {
        String wordInRecord = countElement.getAsString();
        word.set(wordInRecord);
        // Write out the key, value pair (write out a value of 1, which will be
        // added to the total count for this word in the Reducer).
        context.write(word, ONE);
      }
    }
  }

  /**
   * Reducer function for WordCount. For input, it consumes the Text and
   * LongWritable that the mapper produced. For output, it produces a JsonObject
   * and NullWritable. The JsonObject represents the data that will be
   * loaded into BigQuery.
   */
  public static class Reduce
      extends Reducer<Text, LongWritable, JsonObject, NullWritable> {

    @Override
    public void reduce(Text key, Iterable<LongWritable> values, Context context)
        throws IOException, InterruptedException {
      // Add up the values to get a total number of occurrences of our word.
      long count = 0;
      for (LongWritable val : values) {
        count = count + val.get();
      }

      JsonObject jsonObject = new JsonObject();
      jsonObject.addProperty("Word", key.toString());
      jsonObject.addProperty("Count", count);
      // Key does not matter.
      context.write(jsonObject, NullWritable.get());
    }
  }

  /**
   * Configures and runs the main Hadoop job. Takes a String[] of 5 parameters:
   * [ProjectId] [QualifiedInputTableId] [InputTableFieldName]
   * [QualifiedOutputTableId] [GcsOutputPath]
   *
   * ProjectId - Project under which to issue the BigQuery
   * operations. Also serves as the default project for table IDs that don't
   * specify a project for the table.
   *
   * QualifiedInputTableId - Input table ID of the form
   * (Optional ProjectId):[DatasetId].[TableId]
   *
   * InputTableFieldName - Name of the field to count in the
   * input table, e.g., 'word' in publicdata:samples.shakespeare or
   * 'repository_name' in publicdata:samples.github_timeline.
   *
   * QualifiedOutputTableId - Input table ID of the form
   * (Optional ProjectId):[DatasetId].[TableId]
   *
   * GcsOutputPath - The output path to store temporary
   * Cloud Storage data, e.g., gs://bucket/dir/
   *
   * @param args a String[] containing ProjectId, QualifiedInputTableId,
   *     InputTableFieldName, QualifiedOutputTableId, and GcsOutputPath.
   * @throws IOException on IO Error.
   * @throws InterruptedException on Interrupt.
   * @throws ClassNotFoundException if not all classes are present.
   */
  public static void main(String[] args)
      throws IOException, InterruptedException, ClassNotFoundException {

    // GenericOptionsParser is a utility to parse command line arguments
    // generic to the Hadoop framework. This example doesn't cover the specifics,
    // but recognizes several standard command line arguments, enabling
    // applications to easily specify a NameNode, a ResourceManager, additional
    // configuration resources, etc.
    GenericOptionsParser parser = new GenericOptionsParser(args);
    args = parser.getRemainingArgs();

    // Make sure we have the right parameters.
    if (args.length != 5) {
      System.out.println(
          "Usage: hadoop jar bigquery_wordcount.jar [ProjectId] [QualifiedInputTableId] "
              + "[InputTableFieldName] [QualifiedOutputTableId] [GcsOutputPath]\n"
              + "    ProjectId - Project under which to issue the BigQuery operations. Also serves "
              + "as the default project for table IDs that don't explicitly specify a project for "
              + "the table.\n"
              + "    QualifiedInputTableId - Input table ID of the form "
              + "(Optional ProjectId):[DatasetId].[TableId]\n"
              + "    InputTableFieldName - Name of the field to count in the input table, e.g., "
              + "'word' in publicdata:samples.shakespeare or 'repository_name' in "
              + "publicdata:samples.github_timeline.\n"
              + "    QualifiedOutputTableId - Input table ID of the form "
              + "(Optional ProjectId):[DatasetId].[TableId]\n"
              + "    GcsOutputPath - The output path to store temporary Cloud Storage data, e.g., "
              + "gs://bucket/dir/");
      System.exit(1);
    }

    // Get the individual parameters from the command line.
    String projectId = args[0];
    String inputQualifiedTableId = args[1];
    String inputTableFieldId = args[2];
    String outputQualifiedTableId = args[3];
    String outputGcsPath = args[4];

   // Define the schema we will be using for the output BigQuery table.
    List<TableFieldSchema> outputTableFieldSchema = new ArrayList<TableFieldSchema>();
    outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING"));
    outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER"));
    TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema);

    // Create the job and get its configuration.
    Job job = new Job(parser.getConfiguration(), "wordcount");
    Configuration conf = job.getConfiguration();

    // Set the job-level projectId.
    conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);

    // Configure input.
    BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);

    // Configure output.
    BigQueryOutputConfiguration.configure(
        conf,
        outputQualifiedTableId,
        outputSchema,
        outputGcsPath,
        BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
        TextOutputFormat.class);

    // (Optional) Configure the KMS key used to encrypt the output table.
    BigQueryOutputConfiguration.setKmsKeyName(
        conf,
        "projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1");

    conf.set(WORDCOUNT_WORD_FIELDNAME_KEY, inputTableFieldId);

    // This helps Hadoop identify the Jar which contains the mapper and reducer
    // by specifying a class in that Jar. This is required if the jar is being
    // passed on the command line to Hadoop.
    job.setJarByClass(WordCount.class);

    // Tell the job what data the mapper will output.
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setInputFormatClass(GsonBigQueryInputFormat.class);

    // Instead of using BigQueryOutputFormat, we use the newer
    // IndirectBigQueryOutputFormat, which works by first buffering all the data
    // into a Cloud Storage temporary file, and then on commitJob, copies all data from
    // Cloud Storage into BigQuery in one operation. Its use is recommended for large jobs
    // since it only requires one BigQuery "load" job per Hadoop/Spark job, as
    // compared to BigQueryOutputFormat, which performs one BigQuery job for each
    // Hadoop/Spark task.
    job.setOutputFormatClass(IndirectBigQueryOutputFormat.class);

    job.waitForCompletion(true);

    // After the job completes, clean up the Cloud Storage export paths.
    GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());

    // You can view word counts in the BigQuery output table at
    // https://console.cloud.google.com/.
  }
}

Versi Java

Konektor BigQuery memerlukan Java 8.

Informasi Dependensi Apache Maven

<dependency>
    <groupId>com.google.cloud.bigdataoss</groupId>
    <artifactId>bigquery-connector</artifactId>
    <version>insert "hadoopX-X.X.X" connector version number here</version>
</dependency>

Untuk mengetahui informasi selengkapnya, lihat catatan rilis dan referensi Javadoc konektor BigQuery.