Google Cloud Platform

Apache Hadoop, Hive, and Pig on Google Compute Engine

Bring your Hadoop-based infrastructure to Google Compute Engine

Introduction

In 2004, Google published a paper on MapReduce, a programming model that lets us rapidly process massive amounts of data on commodity hardware. In 2005, the Apache Hadoop project began. Today, it is the best-known open source implementation of the MapReduce framework.

The growth in popularity of the MapReduce model has led to a revolution in Big Data[1] processing. Additional tools and languages have been developed to make it easier for data analysts, statisticians, and others to perform Big Data analysis.

In adopting Hadoop for data processing, many organizations start with the traditional model of keeping compute hardware on-site. They build out their own compute clusters, with sizable capital and operations costs. Many find the continual need to grow and maintain their compute clusters too expensive and a drain on resources.

Cloud computing now provides a compelling alternative. Using Google Compute Engine, you can move many of your existing services, including your Hadoop infrastructure, to the Google Cloud Platform.

Scope

There is an explosion in tools and products across the industry and on the Google Cloud Platform for storing and processing Big Data. Managed cloud-based software products and specialized components offer ease of use along with reduced usage and maintenance costs. But there will always be organizations interested in deploying self-managed components on compute nodes in the cloud.

This paper describes how you can take advantage of Google Compute Engine, with support from Google Cloud Storage, and run a self-managed Hadoop cluster with Apache Hive and Apache Pig as part of a Big Data processing solution.

While this paper is primarily intended for software engineers and architects with expertise in Hadoop, it is also valuable for anyone in the organization involved with making a Big Data project a success. For background on the Hadoop technologies, please see Appendix A: What are MapReduce, Hive, and Pig.

Scenario

Imagine that your company receives periodic data uploads from your new product in the field that initially aggregates to a few hundred megabytes. The data might be from hardware sensors built into the product, providing critical information to engineering and manufacturing to ensure its long-term reliability.

The data analyst logs in daily to the Hadoop compute cluster, examines the quality and applicability of data processing results, updates Hive or Pig scripts, and starts new analysis jobs.

Your data analyst and information technology teams say that during development and the managed roll-out of the product, the original investment in your private compute cluster was insufficient. When the sales team had pushed for the beta program to expand from 20 to 40 potential customers, the team had to scramble to squeeze every last byte and CPU cycle from the “production” cluster, including minimizing, rescheduling, or removing development jobs.

Your team is now trying to forecast computing needs for the public launch. One flexible solution is to use Google Compute Engine clusters. You can add compute resources to the production cluster as customers and products are added and sensor data grows into terabytes. Development teams can bring up Compute Engine clusters during the day, with less-expensive instance types than production and then bring them down at night to save R&D budget. The release team, at last, can create a staging cluster to test new software deployments before they are pushed to production. These staging clusters could then be torn down shortly after roll-out, once production stability is ensured.

Solution overview

Our solution entails launching a cluster of Google Compute Engine instances, installing Hadoop, Hive, and Pig software, and configuring each instance appropriately. Google Cloud Storage supports the software installation and the input and output of target data.

If you are not already familiar with how Hadoop, Pig, and Hive fit together, please see Appendix B: Hadoop Software Components.

Core Hadoop components are typically installed across all nodes of a cluster and managed centrally by an administrator. Hive and Pig as client tools may be installed once per user, with different people running different versions of each. We use two Google Compute Engine operating system user ids in this solution to represent a typical installation:

  • hadoop: Userid that owns the Hadoop software, processes, and Hadoop File System (HDFS) storage. An administrator may log in as this user
  • hdpuser: User id that owns the Hive and Pig query tools. A data analyst logs in as this user

A traditional Hadoop installation includes the following:[2]

  • HDFS NameNode — Master that manages the file system namespace and regulates access to files by clients
  • HDFS DataNode(s) — Manage storage attached to the nodes they run on
  • MapReduce JobTracker — Master that manages scheduling, monitoring, and restarting jobs' component tasks on compute nodes
  • MapReduce TaskTracker(s) — Execute tasks as directed by the JobTracker

The Hadoop Cluster on Compute Engine sample application referenced in this paper creates one master node and a user-specified number of worker nodes. The master node runs the HDFS NameNode and MapReduce JobTracker. Each worker node runs an instance of the HDFS DataNode and MapReduce TaskTracker.

The Apache Hive and Pig sample application referenced in this paper installs Hive and/or Pig on the master node and configures appropriate access for the hdpuser to directories within HDFS.

Google Cloud Storage plays two roles in the solution:

  • Stages the software for installation on Google Compute Engine instances. When an instance starts, it downloads the appropriate software package from Google Cloud Storage
  • Provides the durable storage for data. Data is brought onto the Google Compute Engine cluster and pushed into HDFS for processing. Results data is then copied from HDFS into Google Cloud Storage

Solution diagram

Here are the components of our solution:

Figure 1: Architectural components

Figure 1 shows the high-level architectural components discussed in the previous section. The following figures illustrate the flow of key operations through these components.

Figure2: Input data flow from Google Cloud Storage to HDFS

Because MapReduce tasks compute over files stored in HDFS, the first step is to insert data into HDFS. Figure 2 shows a simple example of a single file being transferred from Google Cloud Storage to HDFS.

  1. On the master node, a user (or automated task) downloads a data file from Google Cloud Storage to be pushed into HDFS. If data has been stored encrypted in Google Cloud Storage, it can be decrypted at this step. Data stored at rest on all Google Compute Engine disks is transparently encrypted
  2. The HDFS NameNode is queried as to how to distribute the file to the HDFS DataNodes. For high availability and reliability, the file is segmented into blocks, which are then replicated across multiple DataNodes

The transfer of large datasets can take significant time for data processing. To remedy this, download and insert of large datasets can be done in parallel across Google Compute Engine instances when input data is split across multiple files in Google Cloud Storage. We give an example of this procedure after introduction of the MapReduce workflow.

Figure 3: MapReduce process flow

Once stored in HDFS, data is ready for processing with MapReduce. Figure 3 shows a MapReduce process flow.

  1. A MapReduce job is created to be sent to the MapReduce JobTracker on the master node. The job could be one of the following:
    • A hand-coded MapReduce job submitted from outside the cluster, from an on-premise developer workstation, for example
    • The result of Hive or Pig code being interpreted and turned into a MapReduce job
  2. The MapReduce code and HDFS path to input data are passed to the MapReduce JobTracker
  3. The MapReduce JobTracker divides the job into tasks, requesting that TaskTracker instances on different worker nodes create Mapper Tasks
  4. Mapper Tasks are executed with blocks of data from HDFS files, generating lists of key: value output
  5. Mapper Tasks output is grouped by key and passed to Reducer Tasks, with each Reducer Task being responsible for all Mapper output for a given key
  6. Reducer output is written to HDFS

A common post-process task is to upload the data from HDFS to Google Cloud Storage; following the reverse process illustrated in Figure 2.

Figure 4: Parallel data transfer, Google Cloud Storage to HDFS

Figure 4 shows how you can use the MapReduce infrastructure to distribute data transfer from Google Cloud Storage to HDFS across Google Compute Engine instances.

  1. A list of files to be transferred is generated and inserted into HDFS
  2. A MapReduce job is started with only Mappers, no Reducers. Each Mapper task will receive a portion of the file list to act on
  3. For each file in the list, the script does the following:
    1. Download the file from Google Cloud Storage
    2. Uncompress it
    3. Insert the file into HDFS

Distributing transfer from Google Cloud Storage across instances distributes bandwidth usage across the network. Distributing decompression across instances distributes CPU usage across instances.

When files are inserted into HDFS, the NameNode need only be consulted for information on where to replicate data blocks. With the approach shown here, individual files are never sent to the NameNode and block replication is performed among the DataNodes.

Implementation considerations

Google Compute Engine is built on virtual machines based on popular Linux distributions, and on networking infrastructure that uses industry-standard protocols. Popular tools such as Hive and Pig run on Google Compute Engine without change and with little additional consideration.

Google Compute Engine also lets you build the right network composed of the right machine types. When constructing a Hadoop cluster, you must consider elements such as CPU, memory, disks, and network configuration.

This section highlights key implementation options on Google Compute engine for running Hadoop, Hive, and Pig.

Instance machine types

Google Compute Engine offers a range of hardware resources, such as the number of CPU cores and gigabytes of memory, for each instance. For development and other non-production clusters, you can select low-powered instances (2 CPU, 7.5 GB). For staging and production clusters, you can select high-powered instances (8 CPU, 52 GB). You can measure organizational usage and periodically refresh clusters to increase or decrease capabilities, aligning costs with needs.

For more information on Google Compute Engine machine types, please see https://cloud.google.com/pricing/compute-engine.

Cluster network and firewalls

Google Compute Engine lets you create named networks, into which instances can be added and secured. You can create firewall rules for each network, which opens different ports to hosts within the network versus outside the network.

This flexibility lets you create separate networks within a project for different Hadoop clusters. For example, adding instances to “development,” “staging,” and “production” networks, with no additional configuration, ensures that services do not send development and staging traffic to production servers.

For more information on Google Compute Engine networks and firewalls, please see https://developers.google.com/compute/docs/instances_and_network.

Advanced routing and Virtual Private Networks

When creating a Google Compute Engine cluster, the easiest way to make nodes accessible from your private network is to give each instance an external IP address. Each external IP address is publicly accessible to all hosts on the Internet. A more secure Hadoop cluster consists of a network of instances with only private IP addresses, plus a gateway instance with a public address.

Google Compute Engine Advanced Routing lets you create such a Virtual Private Network. For more information, please see https://developers.google.com/compute/docs/networking#settingupvpn.

Persistent Disks and HDFS replication

Persistent Disks provide high performing and reliable block storage to Compute Engine instances. Persistent Disk data is preserved when an instance terminates.

Persistent disks have built in replication, which reduces but doesn’t eliminate the value of HDFS replication. HDFS replication provides the MapReduce scheduler with flexibility in distributing work, and data remains available in the event of a single node failure. However, by reducing the HDFS replication factor from the default of 3 to 2, you can store 50 percent more data with the same investment in disk storage.

For more information on Google Compute Engine disks please see https://developers.google.com/compute/docs/disks.

User system privileges

If the end user associated with the hdpuser system account should have project-level administrative privileges, you can add him or her as a team member in the Google Compute Engine Console. That person can then connect to their master instance via the gcutil ssh command.

People who need only have SSH access should not be added as team members, and cannot use gcutil to connect to the Hadoop master instance. gcutil is a tool for project owners and editors. Enabling SSH access requires the addition of the user’s public key to the .ssh/authorized_keys file on the Hadoop master instance. This is assumed in our sample application and details can be found there.

For more information on connecting to Google Compute Engine, please see https://developers.google.com/compute/docs/instances#sshing.

Hive metastore

The Hive metastore for tables, columns, and other schema information is maintained in a database outside of HDFS. The default Hive metastore is a single-user, single-connection database. To support multiple Hive users or multiple Hive connections by the same user, you can replace the default database. The most common alternative is MySQL, which can be installed on a Google Compute Engine instance, typically either on the Hadoop master or a standalone instance.

An alternative is to use Google Cloud SQL, which is a highly available, fully managed MySQL service. Cloud SQL provides per-use pricing, automatic replication, and out-of-the-box secure accessibility from clients outside of the Google Compute Engine network.

The sample application includes instructions for either MySQL configuration.

Sample applications

We have two sample applications that accompany this paper:

You can download them at https://github.com/googlecloudplatform/.

Conclusion

Google’s Cloud Platform provides many components for building new cloud-based applications. Google Compute Engine, however, lets you quickly migrate existing applications from costly and comparatively fixed private clusters to flexible clusters of compute nodes.

If you have your own Hadoop-based clusters and tools such as Hive and Pig, and face ever-increasing demands on your compute infrastructure, you can seamlessly move your applications to Google Compute Engine.

Additional resources

Appendix A: What Are MapReduce, Hive, and Pig?

To understand the value and the components of our solution, you have to understand the value and design of MapReduce. MapReduce is a simple programming paradigm that is surprisingly powerful and has a wide variety of applications. We can show you the power of MapReduce with a real-world example.

Imagine a store with eight cash registers. At the end of the day, the manager needs a tally of all the $1, $5, $10, and $20 bills. Here are two possible solutions for accomplishing this task.

Solution 1: One employee

In the first solution, a single employee walks from one cash register to the next, and for each bill type updates a simple ledger. This solution takes one employee and a sheet of paper with four lines on it. It is very resource efficient, but can take a long time.

Solution 2: Four employees

In the second solution, four employees are sent to the odd-numbered registers. For each bill type, each employee writes down the value and the count. After finishing at the odd numbered register, each employee goes to the next even-numbered register. For each bill type, each employee writes down the value and count.

The four employees now exchange pieces of paper, so that one employee has all of the $1 tallies, another has all of the $5 tallies, and so on. Each employee counts the total number of one denomination.

This solution is comparatively resource costly and requires four employees and 40 lines of paper. It is also much faster. When the four employees are done, the employee in Solution 1 has tallied only two registers. As the number of cash registers—that is, the size of the data—increases, the disparity between the time to execute the solutions quickly increases.

MapReduce

Solution 2 illustrates the MapReduce paradigm. In the first step, the Mappers, working independently, examine separate chunks of data, and produced a simple “key: value” mapping, with the key being the bill denomination and the value being the count of that bill. Since the Mappers work independently and on separate portions of the total dataset, they work in parallel.

In the second step, each Reducer (independently of all other Reducers) performs an operation on all Mapper output for a single key. Each employee in this case is tasked with simply summing the count of a specific denomination across all registers. Again, the independence of the tasks lets the employees work in parallel.

We make some simplifying assumptions in this example, notably that the time required to count each register is always the same. In practice, one of the four employees could finish counting two registers before another had finished counting one, and could thus move on and count a third register. In this case, the overall task would finish even faster.

The power of MapReduce is clear. To make Solution 1 faster requires making the single employee faster—that is, a faster computer. Alternatively, when large amounts of inexpensive compute power is available, the key is to ensure that the algorithm(s) allow incremental additions of compute power to yield improved total throughput.

Hive and Pig

MapReduce algorithms have been invented to handle a broad range of tasks, from simple instance counting, instance grouping, data filtering, and graph analysis, to matrix multiplication. Many use cases are extremely common and don’t need to be reinvented.

Hive and Pig independently provide higher-level languages that generate MapReduce tasks to process data. Hive’s HiveQL language is similar to a query language like SQL. Pig’s PigLatin language is a less rigidly structured data pipelining language. Both languages solve similar data processing tasks.

Below are brief samples based on our cash-register example. Imagine that you have a single tab-delimited text file representing all bills in all cash registers. Each record is simply the register number and the bill denomination. The file is in HDFS at /user/hdpuser/registers.txt.

Hive

In 2009 Facebook published Hive - A Warehousing Solution Over a Map-Reduce Framework. Afterward, Hive became an Apache open source project.

Hive lets you assign schema information, such as table and column names and datatypes, to describe data in files stored in HDFS. The schema information is inserted into the Hive “metastore,” a database stored outside of HDFS.

You can implement the cash register example in HiveQL this way:

CREATE TABLE register (
  register_id INT,
  bill INT
)
ROW FORMAT DELIMITED
  FIELDS TERMINATED BY '\t'
STORED AS TEXTFILE;

LOAD DATA INPATH '/user/hdpuser/registers.txt'
OVERWRITE INTO TABLE register;

SELECT bill, COUNT(bill) AS bill_count
FROM register
GROUP BY bill;

Issuing this code block to the Hive shell generates a MapReduce job within Hadoop to count the bill denominations across all registers.

Pig

In 2006 Yahoo began developing Pig. In 2007, it became an Apache open source project.

Pig provides a procedural, relation-centric interface to data stored in HDFS. There is no associated metastore. Instead,the structure of data is declared in the code that sets up the relationship.

You can implement the cash register example in PigLatin this way:

data = LOAD '/user/hdpuser/registers.txt'
  AS (register_id: INT, bill:INT);
grp = GROUP data BY (bill);
sums = FOREACH grp GENERATE
  FLATTEN(group), COUNT(data.bill) AS bill_count:LONG;
DUMP sums;

Issuing this code block to the Pig shell generates a MapReduce job within Hadoop to count the bill denominations across all registers.

Appendix B: Hadoop Software Components

An installation of Hadoop contains two key software components:

  • Hadoop File System
  • Hadoop MapReduce

The Hadoop File System (HDFS) is modeled on the Google File System to provide a high level of reliability on commodity hardware, plus high throughput for Big Data applications. One major objective of HDFS is to ensure that data is immediately available across multiple nodes, so that computation is flexibly deployed close to the data.

The Hadoop MapReduce infrastructure is modeled on the Google MapReduce framework. It provides a computing model that lets you rapidly deploy simple operations as tasks on compute nodes that are close to the input data.

A Hive installation provides the following:

  • A metastore for tagging data with “table” and “column” schema information
  • An interactive and scriptable shell for SQL-like data loading and querying (HiveQL)
  • Programmatic support (“Embedded Hive”) for languages such as Java and Python

HiveQL query commands are turned into MapReduce jobs that process the associated data files stored in HDFS.

A Pig installation provides the following:

  • An interactive and scriptable shell for procedure language data pipelining (PigLatin)
  • Programmatic support (“Embedded Pig”) for languages such as Java and Python

PigLatin code is turned into MapReduce jobs that process data files stored in HDFS.



[1]Big Data commonly refers to data sets so large that they are impractical to process using traditional database-management and data-processing tools

[2]The architecture is Hadoop 1.0. Hadoop 2.0, which is in alpha stage as of writing, divides JobTracker and TaskTracker roles, separating scheduling from resource-management.

Authentication required

You need to be signed in with Google+ to do that.

Signing you in...

Google Developers needs your permission to do that.