Konektor BigQuery Hadoop diinstal secara default di semua node cluster Dataproc 1.0-1.2 di /usr/lib/hadoop/lib/
.
Ini tersedia di lingkungan Spark dan PySpark.
Versi image Dataproc 1.5+: Konektor BigQuery tidak diinstal secara default di versi image Dataproc 1.5 dan yang lebih tinggi. Untuk menggunakannya dengan versi ini:
Instal konektor BigQuery menggunakan tindakan inisialisasi ini.
Tentukan konektor BigQuery dalam parameter
jars
saat mengirim tugas:--jars=gs://hadoop-lib/bigquery/bigquery-connector-hadoop3-latest.jar
Sertakan class konektor BigQuery dalam jar dengan dependensi aplikasi.
Untuk Menghindari Konflik: Jika aplikasi Anda menggunakan versi konektor yang berbeda dengan versi konektor yang di-deploy di cluster Dataproc, Anda harus:
Buat cluster baru dengan tindakan inisialisasi yang menginstal versi konektor yang digunakan oleh aplikasi Anda, atau
Sertakan dan pindahkan class konektor dan dependensi konektor untuk versi yang Anda gunakan ke dalam jar aplikasi untuk menghindari konflik antara versi konektor dan versi konektor yang di-deploy di cluster Dataproc (lihat contoh pemindahan dependensi di Maven ini).
Class GsonBigQueryInputFormat
GsonBigQueryInputFormat
menyediakan objek BigQuery
dalam format JsonObject ke Hadoop melalui operasi
utama berikut:
- Menggunakan kueri yang ditentukan pengguna untuk memilih objek BigQuery
- Membagi hasil kueri secara merata di antara node Hadoop
- Mengurai bagian menjadi objek Java untuk diteruskan ke Mapper.
Class Hadoop Mapper menerima representasi
JsonObject
dari setiap objek BigQuery yang dipilih.
Class BigQueryInputFormat
memberikan akses ke kumpulan data BigQuery melalui ekstensi class InputFormat Hadoop. Untuk menggunakan class BigQueryInputFormat:
Baris harus ditambahkan ke tugas Hadoop utama untuk menetapkan parameter dalam konfigurasi Hadoop.
Class InputFormat harus ditetapkan ke
GsonBigQueryInputFormat
.
Bagian di bawah ini menunjukkan cara memenuhi persyaratan ini.
Parameter Input
- QualifiedInputTableId
- Tabel BigQuery yang akan dibaca, dalam bentuk:
optional-projectId:datasetId.tableId
Contoh:publicdata:samples.shakespeare
- projectId
- projectId BigQuery tempat semua operasi input terjadi.
Contoh:my-first-cloud-project
// Set the job-level projectId. conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId); // Configure input parameters. BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId); // Set InputFormat. job.setInputFormatClass(GsonBigQueryInputFormat.class);
Catatan:
job
mengacu padaorg.apache.hadoop.mapreduce.Job
, tugas Hadoop yang akan berjalan.conf
mengacu padaorg.apache.hadoop.Configuration
untuk tugas Hadoop.
Pemetaan
Class GsonBigQueryInputFormat
membaca dari BigQuery dan meneruskan objek BigQuery satu per satu sebagai input ke fungsi Mapper
Hadoop. Input berbentuk pasangan yang terdiri dari hal berikut:
LongWritable
, nomor dataJsonObject
, data BigQuery berformat Json
Mapper
menerima LongWritable
dan JsonObject pair
sebagai
input.
Berikut adalah cuplikan dari Mapper
untuk
tugas contoh WordCount.
// private static final LongWritable ONE = new LongWritable(1); // The configuration key used to specify the BigQuery field name // ("column name"). public static final String WORDCOUNT_WORD_FIELDNAME_KEY = "mapred.bq.samples.wordcount.word.key"; // Default value for the configuration entry specified by // WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in // publicdata:samples.shakespeare or 'repository_name' // in publicdata:samples.github_timeline. public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word"; /** * The mapper function for WordCount. */ public static class Map extends Mapper <LongWritable, JsonObject, Text, LongWritable> { private static final LongWritable ONE = new LongWritable(1); private Text word = new Text(); private String wordKey; @Override public void setup(Context context) throws IOException, InterruptedException { // Find the runtime-configured key for the field name we're looking for // in the map task. Configuration conf = context.getConfiguration(); wordKey = conf.get(WORDCOUNT_WORD_FIELDNAME_KEY, WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT); } @Override public void map(LongWritable key, JsonObject value, Context context) throws IOException, InterruptedException { JsonElement countElement = value.get(wordKey); if (countElement != null) { String wordInRecord = countElement.getAsString(); word.set(wordInRecord); // Write out the key, value pair (write out a value of 1, which will be // added to the total count for this word in the Reducer). context.write(word, ONE); } } }
Class IndirectBigQueryOutputFormat
IndirectBigQueryOutputFormat
memberi Hadoop kemampuan untuk menulis
nilai JsonObject
langsung ke tabel BigQuery. Class ini memberikan akses
ke kumpulan data BigQuery melalui ekstensi class
OutputFormat
Hadoop. Untuk menggunakannya dengan benar, beberapa parameter harus ditetapkan dalam konfigurasi Hadoop, dan class OutputFormat harus ditetapkan ke IndirectBigQueryOutputFormat
. Berikut adalah contoh
parameter yang akan ditetapkan dan baris kode yang diperlukan untuk menggunakan
IndirectBigQueryOutputFormat
dengan benar.
Parameter Output
- projectId
- projectId BigQuery tempat semua operasi output terjadi.
Contoh: "my-first-cloud-project" - QualifiedOutputTableId
- Set data BigQuery tempat menulis hasil tugas akhir, dalam
bentuk optional-projectId:datasetId.tableId.
datasetId seharusnya sudah ada di project Anda.
Set data outputDatasetId_hadoop_temporary akan dibuat di BigQuery untuk hasil sementara. Pastikan hal ini tidak bertentangan dengan set data yang ada.
Contoh:
test_output_dataset.wordcount_output
my-first-cloud-project:test_output_dataset.wordcount_output
- outputTableFieldSchema
- Skema yang menentukan skema untuk tabel BigQuery output
- GcsOutputPath
- Jalur output untuk menyimpan data Cloud Storage sementara (
gs://bucket/dir/
)
// Define the schema we will be using for the output BigQuery table. List<TableFieldSchema> outputTableFieldSchema = new ArrayList<TableFieldSchema>(); outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING")); outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER")); TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema); // Create the job and get its configuration. Job job = new Job(parser.getConfiguration(), "wordcount"); Configuration conf = job.getConfiguration(); // Set the job-level projectId. conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId); // Configure input. BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId); // Configure output. BigQueryOutputConfiguration.configure( conf, outputQualifiedTableId, outputSchema, outputGcsPath, BigQueryFileFormat.NEWLINE_DELIMITED_JSON, TextOutputFormat.class); // (Optional) Configure the KMS key used to encrypt the output table. BigQueryOutputConfiguration.setKmsKeyName( conf, "projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1"); );
Pengurang
Class IndirectBigQueryOutputFormat
menulis ke BigQuery.
Fungsi ini menggunakan kunci dan nilai JsonObject
sebagai input dan hanya menulis nilai JsonObject ke BigQuery (kunci diabaikan). JsonObject
harus berisi kumpulan data BigQuery berformat JSON. Reducer harus menghasilkan kunci dari
jenis apa pun (NullWritable
digunakan dalam tugas contoh WordCount)
dan pasangan nilai JsonObject
. Reducer untuk
contoh tugas WordCount ditampilkan di bawah ini.
/** * Reducer function for WordCount. */ public static class Reduce extends Reducer<Text, LongWritable, JsonObject, NullWritable> { @Override public void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException { // Add up the values to get a total number of occurrences of our word. long count = 0; for (LongWritable val : values) { count = count + val.get(); } JsonObject jsonObject = new JsonObject(); jsonObject.addProperty("Word", key.toString()); jsonObject.addProperty("Count", count); // Key does not matter. context.write(jsonObject, NullWritable.get()); } }
Pembersihan
Setelah tugas selesai, bersihkan jalur ekspor Cloud Storage.
job.waitForCompletion(true); GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());
Anda dapat melihat jumlah kata dalam tabel output BigQuery di konsol Google Cloud.
Kode Lengkap untuk contoh tugas WordCount
Kode di bawah adalah contoh tugas WordCount sederhana yang menggabungkan jumlah kata dari objek di BigQuery.
package com.google.cloud.hadoop.io.bigquery.samples;
import com.google.api.services.bigquery.model.TableFieldSchema;
import com.google.api.services.bigquery.model.TableSchema;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.hadoop.io.bigquery.BigQueryFileFormat;
import com.google.cloud.hadoop.io.bigquery.GsonBigQueryInputFormat;
import com.google.cloud.hadoop.io.bigquery.output.BigQueryOutputConfiguration;
import com.google.cloud.hadoop.io.bigquery.output.IndirectBigQueryOutputFormat;
import com.google.gson.JsonElement;
import com.google.gson.JsonObject;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.GenericOptionsParser;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
/**
* Sample program to run the Hadoop Wordcount example over tables in BigQuery.
*/
public class WordCount {
// The configuration key used to specify the BigQuery field name
// ("column name").
public static final String WORDCOUNT_WORD_FIELDNAME_KEY =
"mapred.bq.samples.wordcount.word.key";
// Default value for the configuration entry specified by
// WORDCOUNT_WORD_FIELDNAME_KEY. Examples: 'word' in
// publicdata:samples.shakespeare or 'repository_name'
// in publicdata:samples.github_timeline.
public static final String WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT = "word";
// Guava might not be available, so define a null / empty helper:
private static boolean isStringNullOrEmpty(String toTest) {
return toTest == null || "".equals(toTest);
}
/**
* The mapper function for WordCount. For input, it consumes a LongWritable
* and JsonObject as the key and value. These correspond to a row identifier
* and Json representation of the row's values/columns.
* For output, it produces Text and a LongWritable as the key and value.
* These correspond to the word and a count for the number of times it has
* occurred.
*/
public static class Map
extends Mapper <LongWritable, JsonObject, Text, LongWritable> {
private static final LongWritable ONE = new LongWritable(1);
private Text word = new Text();
private String wordKey;
@Override
public void setup(Context context)
throws IOException, InterruptedException {
// Find the runtime-configured key for the field name we're looking for in
// the map task.
Configuration conf = context.getConfiguration();
wordKey = conf.get(WORDCOUNT_WORD_FIELDNAME_KEY, WORDCOUNT_WORD_FIELDNAME_VALUE_DEFAULT);
}
@Override
public void map(LongWritable key, JsonObject value, Context context)
throws IOException, InterruptedException {
JsonElement countElement = value.get(wordKey);
if (countElement != null) {
String wordInRecord = countElement.getAsString();
word.set(wordInRecord);
// Write out the key, value pair (write out a value of 1, which will be
// added to the total count for this word in the Reducer).
context.write(word, ONE);
}
}
}
/**
* Reducer function for WordCount. For input, it consumes the Text and
* LongWritable that the mapper produced. For output, it produces a JsonObject
* and NullWritable. The JsonObject represents the data that will be
* loaded into BigQuery.
*/
public static class Reduce
extends Reducer<Text, LongWritable, JsonObject, NullWritable> {
@Override
public void reduce(Text key, Iterable<LongWritable> values, Context context)
throws IOException, InterruptedException {
// Add up the values to get a total number of occurrences of our word.
long count = 0;
for (LongWritable val : values) {
count = count + val.get();
}
JsonObject jsonObject = new JsonObject();
jsonObject.addProperty("Word", key.toString());
jsonObject.addProperty("Count", count);
// Key does not matter.
context.write(jsonObject, NullWritable.get());
}
}
/**
* Configures and runs the main Hadoop job. Takes a String[] of 5 parameters:
* [ProjectId] [QualifiedInputTableId] [InputTableFieldName]
* [QualifiedOutputTableId] [GcsOutputPath]
*
* ProjectId - Project under which to issue the BigQuery
* operations. Also serves as the default project for table IDs that don't
* specify a project for the table.
*
* QualifiedInputTableId - Input table ID of the form
* (Optional ProjectId):[DatasetId].[TableId]
*
* InputTableFieldName - Name of the field to count in the
* input table, e.g., 'word' in publicdata:samples.shakespeare or
* 'repository_name' in publicdata:samples.github_timeline.
*
* QualifiedOutputTableId - Input table ID of the form
* (Optional ProjectId):[DatasetId].[TableId]
*
* GcsOutputPath - The output path to store temporary
* Cloud Storage data, e.g., gs://bucket/dir/
*
* @param args a String[] containing ProjectId, QualifiedInputTableId,
* InputTableFieldName, QualifiedOutputTableId, and GcsOutputPath.
* @throws IOException on IO Error.
* @throws InterruptedException on Interrupt.
* @throws ClassNotFoundException if not all classes are present.
*/
public static void main(String[] args)
throws IOException, InterruptedException, ClassNotFoundException {
// GenericOptionsParser is a utility to parse command line arguments
// generic to the Hadoop framework. This example doesn't cover the specifics,
// but recognizes several standard command line arguments, enabling
// applications to easily specify a NameNode, a ResourceManager, additional
// configuration resources, etc.
GenericOptionsParser parser = new GenericOptionsParser(args);
args = parser.getRemainingArgs();
// Make sure we have the right parameters.
if (args.length != 5) {
System.out.println(
"Usage: hadoop jar bigquery_wordcount.jar [ProjectId] [QualifiedInputTableId] "
+ "[InputTableFieldName] [QualifiedOutputTableId] [GcsOutputPath]\n"
+ " ProjectId - Project under which to issue the BigQuery operations. Also serves "
+ "as the default project for table IDs that don't explicitly specify a project for "
+ "the table.\n"
+ " QualifiedInputTableId - Input table ID of the form "
+ "(Optional ProjectId):[DatasetId].[TableId]\n"
+ " InputTableFieldName - Name of the field to count in the input table, e.g., "
+ "'word' in publicdata:samples.shakespeare or 'repository_name' in "
+ "publicdata:samples.github_timeline.\n"
+ " QualifiedOutputTableId - Input table ID of the form "
+ "(Optional ProjectId):[DatasetId].[TableId]\n"
+ " GcsOutputPath - The output path to store temporary Cloud Storage data, e.g., "
+ "gs://bucket/dir/");
System.exit(1);
}
// Get the individual parameters from the command line.
String projectId = args[0];
String inputQualifiedTableId = args[1];
String inputTableFieldId = args[2];
String outputQualifiedTableId = args[3];
String outputGcsPath = args[4];
// Define the schema we will be using for the output BigQuery table.
List<TableFieldSchema> outputTableFieldSchema = new ArrayList<TableFieldSchema>();
outputTableFieldSchema.add(new TableFieldSchema().setName("Word").setType("STRING"));
outputTableFieldSchema.add(new TableFieldSchema().setName("Count").setType("INTEGER"));
TableSchema outputSchema = new TableSchema().setFields(outputTableFieldSchema);
// Create the job and get its configuration.
Job job = new Job(parser.getConfiguration(), "wordcount");
Configuration conf = job.getConfiguration();
// Set the job-level projectId.
conf.set(BigQueryConfiguration.PROJECT_ID_KEY, projectId);
// Configure input.
BigQueryConfiguration.configureBigQueryInput(conf, inputQualifiedTableId);
// Configure output.
BigQueryOutputConfiguration.configure(
conf,
outputQualifiedTableId,
outputSchema,
outputGcsPath,
BigQueryFileFormat.NEWLINE_DELIMITED_JSON,
TextOutputFormat.class);
// (Optional) Configure the KMS key used to encrypt the output table.
BigQueryOutputConfiguration.setKmsKeyName(
conf,
"projects/myproject/locations/us-west1/keyRings/r1/cryptoKeys/k1");
conf.set(WORDCOUNT_WORD_FIELDNAME_KEY, inputTableFieldId);
// This helps Hadoop identify the Jar which contains the mapper and reducer
// by specifying a class in that Jar. This is required if the jar is being
// passed on the command line to Hadoop.
job.setJarByClass(WordCount.class);
// Tell the job what data the mapper will output.
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setMapperClass(Map.class);
job.setReducerClass(Reduce.class);
job.setInputFormatClass(GsonBigQueryInputFormat.class);
// Instead of using BigQueryOutputFormat, we use the newer
// IndirectBigQueryOutputFormat, which works by first buffering all the data
// into a Cloud Storage temporary file, and then on commitJob, copies all data from
// Cloud Storage into BigQuery in one operation. Its use is recommended for large jobs
// since it only requires one BigQuery "load" job per Hadoop/Spark job, as
// compared to BigQueryOutputFormat, which performs one BigQuery job for each
// Hadoop/Spark task.
job.setOutputFormatClass(IndirectBigQueryOutputFormat.class);
job.waitForCompletion(true);
// After the job completes, clean up the Cloud Storage export paths.
GsonBigQueryInputFormat.cleanupJob(job.getConfiguration(), job.getJobID());
// You can view word counts in the BigQuery output table at
// https://console.cloud.google.com/.
}
}
Versi Java
Konektor BigQuery memerlukan Java 8.
Informasi Dependensi Apache Maven
<dependency> <groupId>com.google.cloud.bigdataoss</groupId> <artifactId>bigquery-connector</artifactId> <version>insert "hadoopX-X.X.X" connector version number here</version> </dependency>
Untuk mengetahui informasi selengkapnya, lihat catatan rilis dan referensi Javadoc konektor BigQuery.