Friday, September 14, 2012

JRuby transparently running methods asynchronously. Combining Ruby metaprogramming techiniques and Java concurrent Future

JRuby is great, it offers an opportunity to combine my two favorite languages and here is another great way of combining Java power with Ruby beauty and convenience.

In this case I created a small gem in a couple of hours (still not really well tested, just some simple unit tests) that allows to use some of the nice metaprogramming techniques from Ruby to transparently execute methods asynchronously, wrapping their return value in a java.util.concurrent.Future, so that when we access any method of the returned object, the future's get method will be called to make sure we have access to the value only when we really need it.

What follows is the source code of the main file in the Gem. Where all the relevant logic is:

require 'java'
java_import 'java.util.concurrent.ExecutorService'
java_import 'java.util.concurrent.Executors'
java_import 'java.util.concurrent.Future'
java_import 'java.util.concurrent.TimeUnit'
java_import 'java.util.concurrent.Callable'

module Futurizeit
  module ClassMethods
    def futurize(*methods)
      Futurizeit.futurize(self, *methods)
    end
  end

  def self.included(klass)
    klass.extend(ClassMethods)
  end

  def self.executor
    @executor ||= Executors.newFixedThreadPool(10)
  end

  def self.futurize(klass, *methods)
    klass.class_eval do
      methods.each do |method|
        alias :"non_futurized_#{method}" :"#{method}"
        define_method :"#{method}" do |*args|
          @future = Futurizeit.executor.submit(CallableRuby.new { self.send(:"non_futurized_#{method}", *args) })
          Futuwrapper.new(@future)
        end
      end
    end
  end
end

module Futurizeit
  class Futuwrapper < BasicObject
    def initialize(future)
      @future = future
    end

    def method_missing(method, *params)
      instance = @future.get
      instance.send(method, *params)
    end
  end

  class CallableRuby
    include Callable

    def initialize(&block)
      @block = block
    end

    def call
      @block.call
    end
  end
end
The functionality can be used in two ways, including the module in a class and calling the macro method futurize on the class, or from the outside calling the Futurizeit.futurize method directly passing a class and the instance methods of that class that we want to run asynchronously.

The way it works is straightforward:

First it creates an alias to the original instance method called "non_futurized_xxx" where xxx is the name of the original method. Then it defines a new method with the original name. This method will create a CallableRuby object which implements (include the module) the Java Callable interface.

This CallableRuby instance is then submitted to a preconfigured ExecutorService. The ExecutorService will create a Future internally and return it inmediately. We the wrap this Future in a Futurewrapper instance.

The Futurewrapper is the object that will be returned by the method. When we try to access any method on this wrapper, it will internally call the future's get method which in turn will return the actual instance that the original method would have returned without the futurizing feature.

Following is the RSpec test that tests the current functionality:

require '../lib/futurizeit'

class Futurized
  def do_something_long
    sleep 3
    "Done!"
  end
end

class FuturizedWithModuleIncluded
  include Futurizeit
  def do_something_long
    sleep 3
    "Done!"
  end
  futurize :do_something_long
end


describe "Futurizer" do
  before(:all) do
    Futurizeit::futurize(Futurized, :do_something_long)
  end

  it "should wrap methods in futures and return correct values" do
    object = Futurized.new
    start_time = Time.now.sec
    value = object.do_something_long
    end_time = Time.now.sec
    (end_time - start_time).should < 2
    value.to_s.should == 'Done!'
  end

  it "should allow calling the value twice" do
     object = Futurized.new
     value = object.do_something_long
     value.to_s.should == 'Done!'
     value.to_s.should == 'Done!'
   end

  it "should increase performance a lot parallelizing work" do
    object1 = Futurized.new
    object2 = Futurized.new
    object3 = Futurized.new
    start_time = Time.now.sec
    value1 = object1.do_something_long
    value2 = object2.do_something_long
    value3 = object3.do_something_long
    value1.to_s.should == 'Done!'
    value2.to_s.should == 'Done!'
    value3.to_s.should == 'Done!'
    end_time = Time.now.sec
    (end_time - start_time).should < 4
  end

  it "should work with class including module" do
      object = FuturizedWithModuleIncluded.new
      start_time = Time.now.sec
      value = object.do_something_long
      end_time = Time.now.sec
      (end_time - start_time).should < 2
      value.to_s.should == 'Done!'
    end

  after(:all) do
    Futurizeit.executor.shutdown
  end
end


All the code is in https://github.com/calo81/futurizeit

Friday, September 7, 2012

Setting up a Hadoop virtual cluster with Vagrant

Usually for testing and using virtual machines, I go online, download the iso image of the machine I want to install, start Virtual Box, tell it to init from the iso, and install the OS manually, and then install the applications I want to use. It is a boring and tedious process but I never really cared very much about However recently I discovered the power of Vagrant and also Puppet. They allow me to automate all the steps I used to manually make before.

Here I test drive the process of automatically configuring a Hadoop cluster in virtual machines for a fully distributed mode.

First of all make sure you have Ruby installed. I’m testing with Ruby 1.9.3. You should also have Virtual Box installed. I have version 4.1.

Then from the command line install the vagrant gem:

gem install vagrant

Vagrant is a great tool that allow us to manage our Virtual Box machines using the command line and simple configuration files.

First we will install a linux Ubuntu virtual machine (or a box as it is called in vagrant)

vagrant box add base-hadoop http://files.vagrantup.com/lucid64.box

Then we go to a directory where we want to have our “workspace” and also the directory to create the vagrant configuration file for our new box and execute. This will create a Vagrantfile file with the vagrant configuration.

vagrant init base-hadoop

The virtual machine is ready to be started up now. You can start it by doing:

vagrant up

That is the virtual machine running. You can connect to it with ssh. type

vagrant ssh

Next step is to download Puppet. Do that going to the URL http://puppetlabs.com/misc/download-options/

Puppet is a tool that allow us to automate the process of provisioning servers. We will use it to manage our virtual machines, installing the required software on them and executing the required services.

So we create a directory where we are going to put our manifests (puppet configuration files)

mkdir manifests

in that new directory we create a file called base-hadoop.pp with the following content:

group { "puppet":
  ensure => "present",
}
 
In the Vagrantfile file that got created previously we uncomment the lines that look like:

config.vm.provision :puppet do |puppet|
     puppet.manifests_path = "manifests"
     puppet.manifest_file  = "base-hadoop.pp"
  end


The next thing we need to do is tell puppet to install Java in our servers. for that we open the base-hadoop.pp file and add the following:

exec { 'apt-get update':
    command => 'apt-get update',
}

package { "openjdk-6-jdk" :
   ensure => present
  require => Exec['apt-get update']
}


Next thing we need to install hadoop. For this we will create a new puppet module. A puppet module is used to encapsulate resources that belong to the same component.

We execute

mkdir -p modules/hadoop/manifests

Then we create an init.pp in this new manifests directory with the following content:

class hadoop {
 $hadoop_home = "/opt/hadoop"

exec { "download_hadoop":
command => "wget -O /tmp/hadoop.tar.gz http://apache.mirrors.timporter.net/hadoop/common/hadoop-1.0.3/hadoop-1.0.3.tar.gz",
path => $path,
unless => "ls /opt | grep hadoop-1.0.3",
require => Package["openjdk-6-jdk"]
}

exec { "unpack_hadoop" :
  command => "tar -zxf /tmp/hadoop.tar.gz -C /opt",
  path => $path,
  creates => "${hadoop_home}-1.0.3",
  require => Exec["download_hadoop"]
}
}


We have done a few things here, and they are almost self-explanatory. We are basically setting a variable to point to our hadoop installation. We are downloading Hadoop’s binaries from its Apache location and we are extracting it into the specified hadoop_home directory.

We need to add our new module to the main puppet configuration file. We add the following line at the top of the base-hadoop.pp file:

include hadoop

Then we add this new modules path to our Vagrantfile. So now our puppet section looks like:

config.vm.provision :puppet do |puppet|
     puppet.manifests_path = "manifests"
     puppet.manifest_file  = "base-hadoop.pp"
     puppet.module_path = "modules"
  end


We execute the following to reload the vagrant machine:

vagrant reload

That command will reload the vagrant machine and execute the puppet recipes. That will install the required software needed.

We will need a cluster of virtual machines. Vagrant supports that. we open our Vagrantfile and replace the content with the following:

Vagrant::Config.run do |config|
  config.vm.box = "base-hadoop"
  config.vm.provision :puppet do |puppet|
     puppet.manifests_path = "manifests"
     puppet.manifest_file  = "base-hadoop.pp"
     puppet.module_path = "modules"
  end
 
  config.vm.define :master do |master_config|
    master_config.vm.network :hostonly, "192.168.1.10"
  end

  config.vm.define :backup do |backup_config|
    backup_config.vm.network :hostonly, "192.168.1.11"
  end
 
  config.vm.define :hadoop1 do |hadoop1_config|
    hadoop1_config.vm.network :hostonly, "192.168.1.12"
  end
 
  config.vm.define :hadoop2 do |hadoop2_config|
    hadoop2_config.vm.network :hostonly, "192.168.1.13"
  end
 
  config.vm.define :hadoop3 do |hadoop3_config|
    hadoop3_config.vm.network :hostonly, "192.168.1.14"
  end
end


After this we execute:

vagrant up

That will start and provision all the servers. That will take a while

But we are not ready. Next we need to configure the hadoop cluster. In the directory modules/hadoop we create another directory called files. Here we will create the needed configuration files for our hadoop cluster.

we create the following files:

core-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
 <configuration>
  <property>
   <name>fs.default.name</name>
   <value>hdfs://master:9000</value>
   <description>The name of the default file system. A URI whose scheme and authority determine the FileSystem implementation.</description>
  </property>
 </configuration>


hdfs-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
 <property>
  <name>dfs.replication</name>
  <value>3</value>
  <description>The actual number of replications can be specified when the file is created.</description>
 </property>
</configuration>
 


mapred-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<configuration>
 <property>
  <name>mapred.job.tracker</name>
  <value>master:9001</value>
  <description>The host and port that the MapReduce job tracker runs at.</description>
 </property>
</configuration>
 


masters

192.168.1.11

slaves

192.168.1.12 192.168.1.13 192.168.1.14

We then need to tell puppet to copy these files to our cluster. So we modify our init.pp file in the hadoop puppet module to contain the following:

class hadoop {
 $hadoop_home = "/opt/hadoop"

exec { "download_hadoop":
command => "wget -O /tmp/hadoop.tar.gz http://apache.mirrors.timporter.net/hadoop/common/hadoop-1.0.3/hadoop-1.0.3.tar.gz",
path => $path,
unless => "ls /opt | grep hadoop-1.0.3",
require => Package["openjdk-6-jdk"]
}

exec { "unpack_hadoop" :
  command => "tar -zxf /tmp/hadoop.tar.gz -C /opt",
  path => $path,
  creates => "${hadoop_home}-1.0.3",
  require => Exec["download_hadoop"]
}
file {
  "${hadoop_home}-1.0.3/conf/slaves":
  source => "puppet:///modules/hadoop/slaves",
  mode => 644,
  owner => root,
  group => root,
  require => Exec["unpack_hadoop"]
 }
 
file {
  "${hadoop_home}-1.0.3/conf/masters":
  source => "puppet:///modules/hadoop/masters",
  mode => 644,
  owner => root,
  group => root,
  require => Exec["unpack_hadoop"]
 }

file {
  "${hadoop_home}-1.0.3/conf/core-site.xml":
  source => "puppet:///modules/hadoop/core-site.xml",
  mode => 644,
  owner => root,
  group => root,
  require => Exec["unpack_hadoop"]
 }
 
file {
  "${hadoop_home}-1.0.3/conf/mapred-site.xml":
  source => "puppet:///modules/hadoop/mapred-site.xml",
  mode => 644,
  owner => root,
  group => root,
  require => Exec["unpack_hadoop"]
 }
 
 file {
  "${hadoop_home}-1.0.3/conf/hdfs-site.xml":
  source => "puppet:///modules/hadoop/hdfs-site.xml",
  mode => 644,
  owner => root,
  group => root,
  require => Exec["unpack_hadoop"]
 }
}
 


We then execute:

vagrant provision

And we get these files copied to all our servers.

We need to setup ssh password-less communication between our servers. We modify our hadoop-base.pp and leave like this:

file {
  "/root/.ssh/id_rsa":
  source => "puppet:///modules/hadoop/id_rsa",
  mode => 600,
  owner => root,
  group => root,
  require => Exec['apt-get update']
 }
 
file {
  "/root/.ssh/id_rsa.pub":
  source => "puppet:///modules/hadoop/id_rsa.pub",
  mode => 644,
  owner => root,
  group => root,
  require => Exec['apt-get update']
 }

ssh_authorized_key { "ssh_key":
    ensure => "present",
    key    => "AAAAB3NzaC1yc2EAAAADAQABAAABAQCeHdBPVGuSPVOO+n94j/Y5f8VKGIAzjaDe30hu9BPetA+CGFpszw4nDkhyRtW5J9zhGKuzmcCqITTuM6BGpHax9ZKP7lRRjG8Lh380sCGA/691EjSVmR8krLvGZIQxeyHKpDBLEmcpJBB5yoSyuFpK+4RhmJLf7ImZA7mtxhgdPGhe6crUYRbLukNgv61utB/hbre9tgNX2giEurBsj9CI5yhPPNgq6iP8ZBOyCXgUNf37bAe7AjQUMV5G6JMZ1clEeNPN+Uy5Yrfojrx3wHfG40NuxuMrFIQo5qCYa3q9/SVOxsJILWt+hZ2bbxdGcQOd9AXYFNNowPayY0BdAkSr",
    type   => "ssh-rsa",
    user   => "root",
    require => File['/root/.ssh/id_rsa.pub']
}
 


We are ready to run our hadoop cluster now. For that, once again we modify the init.pp file in the hadoop puppet module, we add the following at the end, before closing the hadoop class:

 file {
  "${hadoop_home}-1.0.3/conf/hadoop-env.sh":
  source => "puppet:///modules/hadoop/hadoop-env.sh",
  mode => 644,
  owner => root,
  group => root,
  require => Exec["unpack_hadoop"]
 }
 


The haddop-env.sh file is the original one but we have uncommented the JAVA_HOME setting and pointed it to the correct Java installation.

We can give different names to each host in the Vagrantfile. For that we replace its contents with the following:


Vagrant::Config.run do |config|
  config.vm.box = "base-hadoop"
  config.vm.provision :puppet do |puppet|
     puppet.manifests_path = "manifests"
     puppet.manifest_file  = "base-hadoop.pp"
     puppet.module_path = "modules"
  end
 
  config.vm.define :backup do |backup_config|
    backup_config.vm.network :hostonly, "192.168.1.11"
    backup_config.vm.host_name = "backup"
  end
 
  config.vm.define :hadoop1 do |hadoop1_config|
    hadoop1_config.vm.network :hostonly, "192.168.1.12"
    hadoop1_config.vm.host_name = "hadoop1"
  end
 
  config.vm.define :hadoop2 do |hadoop2_config|
    hadoop2_config.vm.network :hostonly, "192.168.1.13"
    hadoop2_config.vm.host_name = "hadoop2"
  end
 
  config.vm.define :hadoop3 do |hadoop3_config|
    hadoop3_config.vm.network :hostonly, "192.168.1.14"
    hadoop3_config.vm.host_name = "hadoop3"
  end

  config.vm.define :master do |master_config|
    master_config.vm.network :hostonly, "192.168.1.10"
    master_config.vm.host_name = "master"
  end

end


Let’s do “vagrant reload” and wait for all systems to reload.

We have provisioned ur systems. Let’s go to our master node and start everything:

vagrant ssh master

then when we are logged in we go to /opt/hadoop-1.0.3/bin

and do:

sudo ./hadoop namenode -format

sudo ./start-all.sh

We have started now our hadoop cluster. Now we can visit http://192.168.1.10:50070/ to access our master node and see that our hadoop cluster is indeed running.

All the files for this example (except for the box itself) exist in git@github.com:calo81/vagrant-hadoop-cluster.git for free use.

Saturday, June 16, 2012

Simple MRI Ruby v JRuby performance comparison.

The following is a very simplistic performance comparison running 2 program examples, running them with both MRI Ruby and JRuby, in the following categories:

Single threaded compute intensive.
Multi threaded compute intensive.

The compute intensive function is simply a O(n2) function that multiplies the inner loop index by the outer loop index in every iteration.

We will use Jruby version 1.6.7 with 1.9.2 support And MRI Ruby 1.9.2 I will test in my Mac Air 13” i5 dual core

I will run the code as is with both JRuby and MRI without passing any special flags or optimization options to any of the two.



Before doing this test I didn’t know which of the two would perform better although I kind of expected the case of the multithreaded one. Keep reading.

file test.rb:

def compute_intensive_stuff(times)
  (1..times).each do |i|
    (1..times).each do |j|
      i * j
    end
  end
end

require 'benchmark'
include Benchmark

def evaluate
  bm(1) do |test|
     test.report("method:") do
       compute_intensive_stuff 10000
     end
   end
end

evaluate


Carlos-MacBook-Air:jruby-vs-ruby cscarioni$ jruby-1.6.7 --1.9 test.rb user system total real method: 8.288000 0.000000 8.288000 ( 8.288000)

Carlos-MacBook-Air:jruby-vs-ruby cscarioni$ ruby-1.9.2-p290 test.rb user system total real method: 11.030000 0.040000 11.070000 ( 11.103539)

I ran this more than once and the results were similar to these each time.

JRuby seems faster in this very simple setup by more than 20 - 30%

Now with two threads doing the same amount of work:

def compute_intensive_stuff(times)
  (1..times).each do |i|
    (1..times).each do |j|
      i * j
    end
  end
end

require 'benchmark'
include Benchmark

def evaluate
  bm(1) do |test|
     test.report("method:") do
       compute_intensive_stuff 10000
     end
   end
end

t1 = Thread.new do
   evaluate
end

t2 = Thread.new do
   evaluate
end

t1.join
t2.join
 


Carlos-MacBook-Air:jruby-vs-ruby cscarioni$ ruby-1.9.2-p290 test.rb user system total real user system total real method:method: 22.460000 0.110000 22.570000 ( 22.676014) 22.640000 0.110000 22.750000 ( 22.850209)

Carlos-MacBook-Air:jruby-vs-ruby cscarioni$ jruby-1.6.7 --1.9 test.rb user system total real user system total real method:method: 11.890000 0.000000 11.890000 ( 11.890000) 12.068000 0.000000 12.068000 ( 12.068000)

We can see that the MRI Ruby version doubled the time for this running, while the JRuby version only increased like 30% the running time.

More importantly a look at the Activity Monitor shows that the CPU usage is 100% for the MRI Ruby running process while for the JRuby process the CPU usage shows 200% usage. Meaning that in the first case, even when it is multithreaded, only 1 thread is executing at any given time, not taking advantage of the two cores. The JRuby version in contrast takes full advantage of the dual core using the full CPU power. This is because JRuby uses the Thread model provided by the Java Runtime.

There is a way more comprehensive comparison in the following blog: http://blog.headius.com/2009/04/how-jruby-makes-ruby-fast.html. It has great explanations of how to tweak JRuby for performance to make it faster.

Tuesday, June 5, 2012

Using Java to access MongoDB, Redis, CouchDB, Riak, Cassandra

I had a requirement in my current job to persist some messages at different points in the running of the system. At the beggining we didn’t know the format in which the messages were going to be saved, where to save them or even which messages to save.

Last weekend I started working on my own in a small library for persisting java objects in different datasources and with different formats so that I was going to be able to leverage that library at work.

I intended to support different datasources. I started with MongoDB, Redis, File System, Cassandra, Riak and CouchDB.

The idea of the solution is to work as a kind of logger, so I took the main architecture characteristics from the Apache Log4j project. So for example I had the idea to easily plug the different datasources in what I called Appenders, following the Log4j concept.

Another thing I wanted is to be able to easily configure it with Spring, so I also created a small namespace for it.

The simple architecture I ended up with was something like this:

The idea is that any object will get “normalized” into a library internal object by using an implementation of a Normalizer. Then this normalized message goes to any of the Appenders where it gets converted into a provider specific message (e.g. DBObject in Mongo) then the appender takes care of storing it.

All the appenders and datastore libraries I currently use are very simple, and none of the datasources have been optimized anyhow, I work with them with their default installation behaviour.


If not for anything else, the library can at least serve to see the basic of how to interact with the different data sources. So next I show how all the appenders I have for the different Datasources.


package org.easytechs.recordpersister.appenders;

import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.Mongo;

public class MongoAppender extends AbstractAppender<DBObject>{

    /**
     */

    private DBCollection coll;
    public MongoAppender(String host, String port, String dbName, String collection) throws Exception{
        Mongo m = new Mongo(host , Integer.parseInt(port));
        DB db = m.getDB(dbName);
        coll = db.getCollection(collection);
    }

    @Override
    public void close() {
       
    }

    @Override
    protected void doAppend(DBObject record) throws Exception {
        coll.insert(record);
    }

}


package org.easytechs.recordpersister.appenders;

import org.easytechs.recordpersister.appenders.redis.KeyValue;

import redis.clients.jedis.Jedis;


public class RedisAppender extends AbstractAppender<KeyValue>{
    /**
     */

    private Jedis jedis;
    public RedisAppender(String host) {
        jedis = new Jedis(host);
        jedis.connect();
    }

    @Override
    public void close() {
        jedis.disconnect();
    }

    @Override
    protected void doAppend(KeyValue record) throws Exception {
        jedis.rpush(record.getKey(), record.getValue());
    }
}


package org.easytechs.recordpersister.appenders;

import java.util.Map;

import redis.clients.jedis.Jedis;

public class RedisHashAppender extends AbstractAppender<Map<String, String>> {

    /**
     */

    private String listKey;

    /**
     */

    private Jedis jedis;

    public RedisHashAppender(String host, String listKey) {
        this.listKey = listKey;
        jedis = new Jedis(host);
        jedis.connect();
    }

    @Override
    public void close() {
        jedis.disconnect();
    }

    @Override
    protected void doAppend(Map<String, String> record) throws Exception {
        String key = String.valueOf(record.hashCode());
        for (String field : record.keySet()) {
            jedis.hset(key, field, record.get(field));         
        }
        jedis.rpush(getListKey(), key);
    }
   
    /**
     * @return
     */

    private String getListKey(){
        return this.listKey;
    }
}



package org.easytechs.recordpersister.appenders;

import org.easytechs.recordpersister.appenders.redis.KeyValue;

import com.basho.riak.client.IRiakClient;
import com.basho.riak.client.RiakFactory;
import com.basho.riak.client.bucket.Bucket;

public class RiakAppender extends AbstractAppender<KeyValue>{

    private Bucket myBucket;
    private IRiakClient riakClient;
   
    public RiakAppender(String host, int port, String bucket) throws Exception{
        riakClient = RiakFactory.pbcClient(host,port);
        myBucket = riakClient.fetchBucket(bucket).execute();
    }
    @Override
    public void close() {
        riakClient.shutdown();
    }

    @Override
    protected void doAppend(KeyValue record) throws Exception {
        myBucket.store(record.getKey(), record.getValue()).execute();
    }

}



package org.easytechs.recordpersister.appenders;

import java.util.Map;

import org.jcouchdb.db.Database;

public class CouchDBAppender extends AbstractAppender<Map<String, String>>{

    private Database db;
    public CouchDBAppender(String host, String database){
         db = new Database(host, database);
       
    }
    @Override
    public void close() {
       
    }

    @Override
    protected void doAppend(Map<String, String> record) throws Exception {
        db.createDocument(record);
    }

}




package org.easytechs.recordpersister.appenders;

import java.nio.ByteBuffer;

import org.apache.cassandra.thrift.Cassandra;
import org.apache.cassandra.thrift.ColumnParent;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.TBinaryProtocol;
import org.apache.thrift.protocol.TProtocol;
import org.apache.thrift.transport.TFramedTransport;
import org.apache.thrift.transport.TSocket;
import org.apache.thrift.transport.TTransport;
import org.easytechs.recordpersister.appenders.cassandra.CassandraRow;



public class CassandraAppender extends AbstractAppender<CassandraRow>{


    /**
     */

    private Cassandra.Client client;
    /**
     */

    private ColumnParent columnParent ;
    /**
     */

    private TTransport tr;
    private static final ConsistencyLevel CL = ConsistencyLevel.ANY;

    public CassandraAppender(String host, int port, String keyspace, String columnParent) throws Exception{
        tr = new TSocket(host, port);
        TFramedTransport tf = new TFramedTransport(tr);
        TProtocol proto = new TBinaryProtocol(tf);
        client = new Cassandra.Client(proto);
        tf.open();
        client.set_keyspace(keyspace);
        this.columnParent = new ColumnParent(columnParent);
    }

    @Override
    public void close() {
        tr.close();
    }

    @Override
    protected void doAppend(CassandraRow record) throws Exception{
            client.insert(ByteBuffer.wrap(record.getKey().getBytes()), columnParent, record.getColumns().get(0), CL);
    }
}
 

This is the abstract appender they all derive from:


package org.easytechs.recordpersister.appenders;


import java.util.ArrayList;
import java.util.List;


import org.easytechs.recordpersister.Appender;
import org.easytechs.recordpersister.NormalizedMessage;
import org.easytechs.recordpersister.RecordGenerator;






public abstract class AbstractAppender<T extends Object> implements Appender{
    /**
     */

    protected RecordGenerator<T> recordGenerator;
   
    @Override
    public void append(NormalizedMessage normalizedMessage) {
        T record = recordGenerator.generate(normalizedMessage);
        try{
            doAppend(record);
        }catch(Exception e){
            e.printStackTrace();
            //Anything else to do here???
        }
    }
   
    @Override
    public final void append(List<NormalizedMessage> messages){
        List<T> records = new ArrayList<>();
        for(NormalizedMessage message:messages){
            records.add(recordGenerator.generate(message));
        }
        doBatchAppend(records);
    }


    /**
     * Basic implementation. Override if the appender supports batch processing
     * @param records
     */

    protected void doBatchAppend(List<T> records){
        for(T record:records){
            try{
                doAppend(record);
            }catch(Exception e){
                e.printStackTrace();
                //Anything else to do here???
            }
        }
    }


    @Override
    protected void finalize() throws Throwable {
        super.finalize();
        close();
    }




    protected abstract void doAppend(T record) throws Exception;
   
    public void setRecordGenerator(RecordGenerator<T> recordGenerator){
        this.recordGenerator = recordGenerator;
    }
}

As an example of how the library would be used there are a couple of Tests. Like the following:


package org.easytechs.recordpersister;


import org.easytechs.recordpersister.GenericPersister;
import org.easytechs.recordpersister.appenders.MongoAppender;
import org.easytechs.recordpersister.normalizers.BeanToMapNormalizer;
import org.easytechs.recordpersister.recordgenerators.MongoDBFromMapGenerator;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;




public class TestBeanMongoFullDocumentPersisterITest extends AbstractTimedTest{


    /**
     */

    private GenericPersister<TestBean> testObj;


    @BeforeMethod
    public void setup() throws Exception {
        testObj = new GenericPersister<>();
        MongoAppender appender = new MongoAppender("127.0.0.1", "27017", "test-db", "ticksfull2");
        appender.setRecordGenerator(new MongoDBFromMapGenerator());
        testObj.setNormalizedMessageTransformer(new BeanToMapNormalizer<TestBean>("symbol", "value","date"));
        testObj.setAppender(appender);
    }


    @Test
    public void shouldPersistOneItem() {
        TestBean tick = new TestBean();
        tick.setSymbol("XX");
        tick.setValue("100.00");
        tick.setDate(123444l);
        testObj.persist(tick);
    }


    @Test(invocationCount=10)
    public void shouldPersistManyItems() {
        doTimed(new IndexedRunnable() {    
            @Override
            public void run(int index) throws Exception {
                TestBean tick = new TestBean();
                tick.setSymbol("XX");
                tick.setValue("100.00");
                tick.setDate(123444l);
                testObj.persist(tick);
               
            }
        }, 20000);
    }


}



If using from Spring, I’m developing a simple namespace so things like the following can be done:


 <persister:mongo-document-persister id="persister" host="127.0.0.1" port="27017" db="test-db" collection="testcol" beanProperties="propA,propB,propC"/>



The Maven dependencies for all the drivers are:




                 <dependency>
            <groupId>org.apache.cassandra</groupId>
            <artifactId>cassandra-all</artifactId>
            <version>1.0.10</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.0.0</version>
        </dependency>
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>mongo-java-driver</artifactId>
            <version>2.7.3</version>
        </dependency>
         <dependency>
            <groupId>com.basho.riak</groupId>
            <artifactId>riak-client</artifactId>
            <version>1.0.5</version>
        </dependency>
         <dependency>
            <groupId>com.google.code.jcouchdb</groupId>
            <artifactId>jcouchdb</artifactId>
            <version>0.11.0-1</version>
        </dependency>





The source code is in Github

Great books on NoSQL