Scrittura di un job MapReduce con il connettore BigQuery

Il connettore Hadoop BigQuery è installato per impostazione predefinita su tutti i nodi del cluster Dataproc 1.0-1.2 in /usr/lib/hadoop/lib/. È disponibile sia negli ambienti Spark che PySpark.

Versioni immagine Dataproc 1.5 e successive: il connettore BigQuery non è installato per impostazione predefinita nelle versioni immagine Dataproc 1.5 e successive. Per utilizzarlo con queste versioni:

  1. Installa il connettore BigQuery utilizzando questa azione di inizializzazione.

  2. Specifica il connettore BigQuery nel parametro jars quando invii un job:

    --jars=gs://hadoop-lib/bigquery/bigquery-connector-hadoop3-latest.jar

  3. Includi le classi del connettore BigQuery nel file jar-with-dependencies dell'applicazione.

Per evitare conflitti: se la tua applicazione utilizza una versione del connettore diversa da quella implementata nel cluster Dataproc, devi:

  1. Crea un nuovo cluster con un'azione di inizializzazione che installa la versione del connettore utilizzata dalla tua applicazione oppure

  2. Includi e rialloca le classi e le dipendenze del connettore per la versione che utilizzi nel file jar dell'applicazione per evitare conflitti tra la versione del connettore e la versione del connettore di cui è stato eseguito il deployment nel tuo cluster Dataproc (consulta questo esempio di riallocazione delle dipendenze in Maven).

Classe GsonBigQueryInputFormat

GsonBigQueryInputFormat fornisce a Hadoop gli oggetti BigQuery in formato JsonObject tramite le seguenti operazioni principali:

  • Utilizzo di una query specificata dall'utente per selezionare oggetti BigQuery
  • Suddivisione dei risultati della query in modo uniforme tra i nodi Hadoop
  • Analizza le suddivisioni in oggetti Java da passare al mapper. La classe Hadoop Mapper riceve una rappresentazione JsonObject di ogni oggetto BigQuery selezionato.

La classe BigQueryInputFormat fornisce l'accesso ai record BigQuery tramite un'estensione della classe Hadoop InputFormat. Per utilizzare la classe BigQueryInputFormat:

  1. È necessario aggiungere righe al job Hadoop principale per impostare i parametri nella configurazione di Hadoop.

  2. La classe InputFormat debe essere impostata su GsonBigQueryInputFormat.

Le sezioni seguenti spiegano come soddisfare questi requisiti.

Parametri di input

QualifiedInputTableId
La tabella BigQuery da cui leggere, nel formato: optional-projectId:datasetId.tableId
Esempio: publicdata:samples.shakespeare
ID progetto
L'ID progetto BigQuery in cui vengono eseguite tutte le operazioni di input.
Esempio: my-first-cloud-project
// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);

// Configure input parameters.
BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);

// Set InputFormat.
job.setInputFormatClass(GsonBigQueryInputFormat.class);

Note:

  • job si riferisce a org.apache.hadoop.mapreduce.Job, il job Hadoop da eseguire.
  • conf si riferisce a org.apache.hadoop.Configuration per il job Hadoop.

Mapper

La classe GsonBigQueryInputFormat legge da BigQuery e passa gli oggetti BigQuery uno alla volta come input alla funzione Hadoop Mapper. Gli input hanno la forma di una coppia contenente quanto segue:

  • LongWritable, il numero di record
  • JsonObject, il record BigQuery in formato JSON

Mapper accetta LongWritable e JsonObject pair come input.

Ecco uno snippet del file Mapper per un job WordCount di esempio.

  // private static final LongWritable ONE = new LongWritable(1);
  // The configuration key used to specify the BigQuery field name
  // ("column name").
  public static final String WORDCOUNT_WORD_FIELDNAME_KEY =
      "mapred.bq.samples.wordcount.word.key";

  // Default value for the configuration entry specified by
  // WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in
  // publicdata:samples.shakespeare or 'repository_name'
  // in publicdata:samples.github_timeline.
  public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word";

  /**
   * The mapper function for WordCount.
   */
  public static class Map
      extends Mapper <LongWritable, JsonObject, Text, LongWritable> {
    private static final LongWritable ONE = new LongWritable(1);
    private Text word = new Text();
    private String wordKey;

    @Override
    public void setup(Context context)
        throws IOException, InterruptedException {
      // Find the runtime-configured key for the field name we're looking for
      // in the map task.
      Configuration conf = context.getConfiguration();
      wordKey = conf.get(WORDCOUNT_WORD_FIELDNAME_KEY,
          WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT);
    }

    @Override
    public void map(LongWritable key, JsonObject value, Context context)
        throws IOException, InterruptedException {
      JsonElement countElement = value.get(wordKey);
      if (countElement != null) {
        String wordInRecord = countElement.getAsString();
        word.set(wordInRecord);
        // Write out the key, value pair (write out a value of 1, which will be
        // added to the total count for this word in the Reducer).
        context.write(word, ONE);
      }
    }
  }

Classe IndirectBigQueryOutputFormat

IndirectBigQueryOutputFormat fornisce a Hadoop la possibilità di scrivere valori JsonObject direttamente in una tabella BigQuery. Questa classe fornisce l'accesso ai record BigQuery tramite un'estensione della classe OutputFormat di Hadoop. Per utilizzarlo correttamente, è necessario impostare diversi parametri nella configurazione di Hadoop e la classe OutputFormat deve essere impostata su IndirectBigQueryOutputFormat. Di seguito è riportato un esempio dei parametri da impostare e delle righe di codice necessarie per utilizzare correttamente IndirectBigQueryOutputFormat.

Parametri di output

ID progetto
L'ID progetto BigQuery in cui vengono eseguite tutte le operazioni di output.
Esempio: "my-first-cloud-project"
QualifiedOutputTableId
Il set di dati BigQuery in cui scrivere i risultati finali del job, nel formato optional-projectId:datasetId.tableId. L'attributo datasetId dovrebbe essere già presente nel progetto. Verrà creato il set di dati outputDatasetId_hadoop_temporary in BigQuery per i risultati temporanei. Assicurati che non sia in conflitto con un set di dati esistente.
Esempi:
test_output_dataset.wordcount_output
my-first-cloud-project:test_output_dataset.wordcount_output
outputTableFieldSchema
Uno schema che definisce lo schema della tabella BigQuery di output
GcsOutputPath
Il percorso di output per archiviare i dati temporanei di Cloud Storage (gs://bucket/dir/)
    // Define the schema we will be using for the output BigQuery table.
    List<TableFieldSchema> outputTableFieldSchema = new ArrayList<TableFieldSchema>();
    outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING"));
    outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER"));
    TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema);

    // Create the job and get its configuration.
    Job job = new Job(parser.getConfiguration(), "wordcount");
    Configuration conf = job.getConfiguration();

    // Set the job-level projectId.
    conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);

    // Configure input.
    BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);

    // Configure output.
    BigQueryOutputConfiguration.configure(
        conf,
        outputQualifiedTableId,
        outputSchema,
        outputGcsPath,
        BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
        TextOutputFormat.class);

    // (Optional) Configure the KMS key used to encrypt the output table.
    BigQueryOutputConfiguration.setKmsKeyName(
        conf,
        "projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1");
);

Riduttore

La classe IndirectBigQueryOutputFormat scrive in BigQuery. Prende come input una chiave e un valore JsonObject e scrive solo il valore JsonObject in BigQuery (la chiave viene ignorata). JsonObject deve contenere un record BigQuery in formato JSON. Il riduttore deve restituire una chiave di qualsiasi tipo (NullWritable viene utilizzata nel nostro job WordCount di esempio) e una coppia di valori JsonObject. Di seguito è mostrato il riduttore per il job di WordCount di esempio.

  /**
   * Reducer function for WordCount.
   */
  public static class Reduce
      extends Reducer<Text, LongWritable, JsonObject, NullWritable> {

    @Override
    public void reduce(Text key, Iterable<LongWritable> values, Context context)
        throws IOException, InterruptedException {
      // Add up the values to get a total number of occurrences of our word.
      long count = 0;
      for (LongWritable val : values) {
        count = count + val.get();
      }

      JsonObject jsonObject = new JsonObject();
      jsonObject.addProperty("Word", key.toString());
      jsonObject.addProperty("Count", count);
      // Key does not matter.
      context.write(jsonObject, NullWritable.get());
    }
  }

Esegui la pulizia

Al termine del job, ripulisci i percorsi di esportazione di Cloud Storage.

job.waitForCompletion(true);
GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());

Puoi visualizzare i conteggi delle parole nella tabella di output di BigQuery nella console Google Cloud.

Codice completo per un job WordCount di esempio

Il codice riportato di seguito è un esempio di un semplice job WordCount che aggrega i conteggi delle parole dagli oggetti in BigQuery.

package com.google.cloud.hadoop.io.bigquery.samples;

import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration;
import com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

/**
 * Sample program to run the Hadoop Wordcount example over tables in BigQuery.
 */
public class WordCount {

 // The configuration key used to specify the BigQuery field name
  // ("column name").
  public static final String WORDCOUNT_WORD_FIELDNAME_KEY =
      "mapred.bq.samples.wordcount.word.key";

  // Default value for the configuration entry specified by
  // WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in
  // publicdata:samples.shakespeare or 'repository_name'
  // in publicdata:samples.github_timeline.
  public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word";

  // Guava might not be available, so define a null / empty helper:
  private static boolean isStringNullOrEmpty(String toTest) {
    return toTest == null || "".equals(toTest);
  }

  /**
   * The mapper function for WordCount. For input, it consumes a LongWritable
   * and JsonObject as the key and value. These correspond to a row identifier
   * and Json representation of the row's values/columns.
   * For output, it produces Text and a LongWritable as the key and value.
   * These correspond to the word and a count for the number of times it has
   * occurred.
   */

  public static class Map
      extends Mapper <LongWritable, JsonObject, Text, LongWritable> {
    private static final LongWritable ONE = new LongWritable(1);
    private Text word = new Text();
    private String wordKey;

    @Override
    public void setup(Context context)
        throws IOException, InterruptedException {
      // Find the runtime-configured key for the field name we're looking for in
      // the map task.
      Configuration conf = context.getConfiguration();
      wordKey = conf.get(WORDCOUNT_WORD_FIELDNAME_KEY, WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT);
    }

    @Override
    public void map(LongWritable key, JsonObject value, Context context)
        throws IOException, InterruptedException {
      JsonElement countElement = value.get(wordKey);
      if (countElement != null) {
        String wordInRecord = countElement.getAsString();
        word.set(wordInRecord);
        // Write out the key, value pair (write out a value of 1, which will be
        // added to the total count for this word in the Reducer).
        context.write(word, ONE);
      }
    }
  }

  /**
   * Reducer function for WordCount. For input, it consumes the Text and
   * LongWritable that the mapper produced. For output, it produces a JsonObject
   * and NullWritable. The JsonObject represents the data that will be
   * loaded into BigQuery.
   */
  public static class Reduce
      extends Reducer<Text, LongWritable, JsonObject, NullWritable> {

    @Override
    public void reduce(Text key, Iterable<LongWritable> values, Context context)
        throws IOException, InterruptedException {
      // Add up the values to get a total number of occurrences of our word.
      long count = 0;
      for (LongWritable val : values) {
        count = count + val.get();
      }

      JsonObject jsonObject = new JsonObject();
      jsonObject.addProperty("Word", key.toString());
      jsonObject.addProperty("Count", count);
      // Key does not matter.
      context.write(jsonObject, NullWritable.get());
    }
  }

  /**
   * Configures and runs the main Hadoop job. Takes a String[] of 5 parameters:
   * [ProjectId] [QualifiedInputTableId] [InputTableFieldName]
   * [QualifiedOutputTableId] [GcsOutputPath]
   *
   * ProjectId - Project under which to issue the BigQuery
   * operations. Also serves as the default project for table IDs that don't
   * specify a project for the table.
   *
   * QualifiedInputTableId - Input table ID of the form
   * (Optional ProjectId):[DatasetId].[TableId]
   *
   * InputTableFieldName - Name of the field to count in the
   * input table, e.g., 'word' in publicdata:samples.shakespeare or
   * 'repository_name' in publicdata:samples.github_timeline.
   *
   * QualifiedOutputTableId - Input table ID of the form
   * (Optional ProjectId):[DatasetId].[TableId]
   *
   * GcsOutputPath - The output path to store temporary
   * Cloud Storage data, e.g., gs://bucket/dir/
   *
   * @param args a String[] containing ProjectId, QualifiedInputTableId,
   *     InputTableFieldName, QualifiedOutputTableId, and GcsOutputPath.
   * @throws IOException on IO Error.
   * @throws InterruptedException on Interrupt.
   * @throws ClassNotFoundException if not all classes are present.
   */
  public static void main(String[] args)
      throws IOException, InterruptedException, ClassNotFoundException {

    // GenericOptionsParser is a utility to parse command line arguments
    // generic to the Hadoop framework. This example doesn't cover the specifics,
    // but recognizes several standard command line arguments, enabling
    // applications to easily specify a NameNode, a ResourceManager, additional
    // configuration resources, etc.
    GenericOptionsParser parser = new GenericOptionsParser(args);
    args = parser.getRemainingArgs();

    // Make sure we have the right parameters.
    if (args.length != 5) {
      System.out.println(
          "Usage: hadoop jar bigquery_wordcount.jar [ProjectId] [QualifiedInputTableId] "
              + "[InputTableFieldName] [QualifiedOutputTableId] [GcsOutputPath]\n"
              + "    ProjectId - Project under which to issue the BigQuery operations. Also serves "
              + "as the default project for table IDs that don't explicitly specify a project for "
              + "the table.\n"
              + "    QualifiedInputTableId - Input table ID of the form "
              + "(Optional ProjectId):[DatasetId].[TableId]\n"
              + "    InputTableFieldName - Name of the field to count in the input table, e.g., "
              + "'word' in publicdata:samples.shakespeare or 'repository_name' in "
              + "publicdata:samples.github_timeline.\n"
              + "    QualifiedOutputTableId - Input table ID of the form "
              + "(Optional ProjectId):[DatasetId].[TableId]\n"
              + "    GcsOutputPath - The output path to store temporary Cloud Storage data, e.g., "
              + "gs://bucket/dir/");
      System.exit(1);
    }

    // Get the individual parameters from the command line.
    String projectId = args[0];
    String inputQualifiedTableId = args[1];
    String inputTableFieldId = args[2];
    String outputQualifiedTableId = args[3];
    String outputGcsPath = args[4];

   // Define the schema we will be using for the output BigQuery table.
    List<TableFieldSchema> outputTableFieldSchema = new ArrayList<TableFieldSchema>();
    outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING"));
    outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER"));
    TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema);

    // Create the job and get its configuration.
    Job job = new Job(parser.getConfiguration(), "wordcount");
    Configuration conf = job.getConfiguration();

    // Set the job-level projectId.
    conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);

    // Configure input.
    BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);

    // Configure output.
    BigQueryOutputConfiguration.configure(
        conf,
        outputQualifiedTableId,
        outputSchema,
        outputGcsPath,
        BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
        TextOutputFormat.class);

    // (Optional) Configure the KMS key used to encrypt the output table.
    BigQueryOutputConfiguration.setKmsKeyName(
        conf,
        "projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1");

    conf.set(WORDCOUNT_WORD_FIELDNAME_KEY, inputTableFieldId);

    // This helps Hadoop identify the Jar which contains the mapper and reducer
    // by specifying a class in that Jar. This is required if the jar is being
    // passed on the command line to Hadoop.
    job.setJarByClass(WordCount.class);

    // Tell the job what data the mapper will output.
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(LongWritable.class);
    job.setMapperClass(Map.class);
    job.setReducerClass(Reduce.class);
    job.setInputFormatClass(GsonBigQueryInputFormat.class);

    // Instead of using BigQueryOutputFormat, we use the newer
    // IndirectBigQueryOutputFormat, which works by first buffering all the data
    // into a Cloud Storage temporary file, and then on commitJob, copies all data from
    // Cloud Storage into BigQuery in one operation. Its use is recommended for large jobs
    // since it only requires one BigQuery "load" job per Hadoop/Spark job, as
    // compared to BigQueryOutputFormat, which performs one BigQuery job for each
    // Hadoop/Spark task.
    job.setOutputFormatClass(IndirectBigQueryOutputFormat.class);

    job.waitForCompletion(true);

    // After the job completes, clean up the Cloud Storage export paths.
    GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());

    // You can view word counts in the BigQuery output table at
    // https://console.cloud.google.com/.
  }
}

Versione Java

Il connettore BigQuery richiede Java 8.

Informazioni sulla dipendenza Apache Maven

<dependency>
    <groupId>com.google.cloud.bigdataoss</groupId>
    <artifactId>bigquery-connector</artifactId>
    <version>insert "hadoopX-X.X.X" connector version number here</version>
</dependency>

Per informazioni dettagliate, consulta le note di rilascio e la documentazione di riferimento Javadoc del connettore BigQuery.