Sunday, September 15, 2013

Analyzing your Mongo data with Pig

MongoDB offers a fantastic query interface, however sometimes in Big Data scenarios we may want to combine our MongoDB database with the processing power of Hadoop in order to extract and analyze the data stored in our DB.

Working with Hadoop directly doing Map Reduce jobs is all great and nice, however sometimes we may want or need a simpler and more straightforward interface to execute our analisys jobs. Here is where Apache Pig comes in.

What is so good about Pig is that it offers a nice abstraction on top of map reduce that takes away some of the inherent complexity of programming with this paradigm and instead offers a high level language to achieve our analysis tasks. Pig then translates this high level descriptions into the proper map reduce jobs.

What I will show next is how to integrate (in the simples possible scenario all running in localhost) Pig to extract and analyze data from your Mongo database and store the results back into Mongo.

The example

The example will be a very simplistic insurance policy storage analysis application. Basically we will have a couple of simple MongoDB collections which we will aggregate together and run some analysis on them. The collections we have are:

> show collections

with the following contents:

> db.policies.find()
   { "_id" : ObjectId("5235d8354d17d7080dbd913f"), "provider" : "ic1", "price" : 50 }
   { "_id" : ObjectId("5235d8434d17d7080dbd9140"), "provider" : "ic1", "price" : 200 }
   { "_id" : ObjectId("5235d84d4d17d7080dbd9141"), "provider" : "ic2", "price" : 400 }
   { "_id" : ObjectId("5235d8524d17d7080dbd9142"), "provider" : "ic2", "price" : 150 }

The policies collection have a list of documents informing of sold policies, the insurance provider and the price they were sold for.

> db.providerType.find()
   { "_id" : ObjectId("5235e85794eeca389060cd40"), "provider" : "ic1", "type" : "premium" }
   { "_id" : ObjectId("5235e86194eeca389060cd41"), "provider" : "ic2", "type" : "standard" }

The providerType collection has the type of the particular insurance provider. As example there is premium provider and standard provider.

Our analysis process will simply offer an average of the price spend by buying the premium policies. For that is obvious that we will need a grouping and an aggregation.

Setting up

For the example we will need the following:

After you download (or clone) all the needed software, you proceed to do the following:

  • Build mongo_hadoop: From the root of the cloned repository, after checking out the r1.1.0 tag, execute ./sbt package. This will build the needed jar files for us.
  • From the previous step you would normally need to copy a couple of Jar files to your hadoop environment if we were using full hadoop. However as we will be using just local run of Pig, this is not needed.

You need to have configured some enviornment variables, in particular JAVA_HOME and PATH. I have the following in my .bash_profile:

if [ -f ~/.bashrc ]; then
 . ~/.bashrc
export PATH
export JAVA_HOME=/usr/java/default/
export PIG_HOME=/home/vagrant/Programs/pig-0.11.1/

Next you will need the actual Pig script file. Contents below:

REGISTER /home/vagrant/Programs/pig-0.11.1/contrib/piggybank/java/piggybank.jar
REGISTER /home/vagrant/Programs/pig-0.11.1/pig-0.11.1.jar
REGISTER  /home/vagrant/Downloads/mongo-java-driver-2.11.3.jar
REGISTER /home/vagrant/ossources/mongo-hadoop/core/target/mongo-hadoop-core-1.1.0.jar
REGISTER /home/vagrant/ossources/mongo-hadoop/pig/target/mongo-hadoop-pig-1.1.0.jar

raw_providers = LOAD 'mongodb://localhost:27017/insurance_example.policies' using com.mongodb.hadoop.pig.MongoLoader;
raw_provider_types = LOAD 'mongodb://localhost:27017/insurance_example.providerType' using com.mongodb.hadoop.pig.MongoLoader;

with_names = FOREACH raw_providers GENERATE $0#'provider' as provider, (double)$0#'price' as price;
types = FOREACH raw_provider_types GENERATE $0#'provider' as provider, $0#'type' as type;
premiums = FILTER types BY type == 'premium';

by_provider = COGROUP with_names BY provider, premiums BY provider INNER;

averages = FOREACH by_provider GENERATE group, AVG(with_names.price) as avg;

STORE averages
  INTO 'mongodb://localhost:27017/insurance_example.premium_policies_averages'
  com.mongodb.hadoop.pig.MongoInsertStorage('group:chararray,avg:float', 'group');

This is the file where all the magic happens,to run the file, assuming you called the file insurance.pig as I did, simply execute pig -exectype local -f insurance.pig.

After you run the file, and after a substantial output, you can go to your MongoDB and check the new collection:

> db.premium_policies_averages.find()
  { "_id" : ObjectId("5235f2490cf24ccadbf9638a"), "group" : "ic1", "avg" : 125 }

You can see that it has now stored the average price for all the premium policy providers, in our example the only premium provider is ic1.

Let's have a quick walkthrough over the insurance.pig file

  • The first five lines (The ones starting with REGISTER) simply add the necesary dependencies to the CLASSPATH of the Pig job we are going to run. In this case is worth noting that we are adding the jars from the Mongo Java Driver that we downloaded and the ones we built from the mongo_hadoop project that we cloned from Github.
  • The next 2 lines (statrting with LOAD) loads the data from the necesary mongo collections into their respective relation.
  • In the next 2 lines (start with FOREACH) we give names to the raw data we extracted before for future access.
  • The next line (starts with FILTER) filters the types to only the ones whose type equals 'premium'
  • The next line (starts with COGROUP) creates a combined group between the provider/price touples and the premium providers grouped by provider name.
  • The next line generates the tuples with the corresponding averages.
  • The last part of the script stores the generated tuples into the MongoDB collection, specyfying the type of the data stored.

And that's it. This is a very simplistic example, runnign with very little data and in a local non-distributed environment. However it serves to illustrate how to use the power of Hadoop and Pig to extract and analyze data from MongoDB.

  • I created a Vagrant box with the example which can be downloaded here
  • P.D. Although we did not install Hadoop per se, because it is not needed to install it when running Pig in local mode, Hadoop map-reduce functionality is still used internally by Pig to execute the jobs that we create.

    No comments: