Wednesday, February 18, 2015

Clustering customers for machine learning with Hadoop and Mahout

The problems

This post was originally published in my company's tech blog Simplybusiness

The problems

  • We manage quite a bit of customer data, starting from the beginning of a customer's search for a new insurance policy, all the way until they buy (or don't buy) the policy. We keep all of this data, but for the most part, we don't do anything to improve our customer offering.
  • Our site looks exactly the same for every customer -- we don't try to engage with them on a more personal level. No customisation exists, which means that each customer's experience doesn't adapt to his or her personality, specific trade, or home or business location. Nothing at all.
  • Filling out a long form is always boring. But filling it out while being unsure of what information to put where, and being forced to make a phone call to confirm details, is even more boring. In many cases, it could mean the customer just gets bored and leaves our form.

The idea

We wanted to create and test a solution that allowed us to group together similar customers using different sets of dimensions depending on the information we wanted to provide or obtain. We thought about introducing clustering technology and algorithms to group our customers.

This would be a very rough implementation that would allow us to prove certain techniques and solutions for this type of problems -- it certainly would NOT cover all the nuances that machine learning algorithms and analysis carry with them. Many liberties were taken to get to a proof of concept. The code presented here is not 100% the same code used in the spike, but it forms a very accurate approximation

This post covers the implementation of the solution.

Solution

Setting up the Clustering backend algorithms to allow multidimensional clustering.
  • I had already decided that I would put into practice my knowledge of Mahout and Hadoop to run the clustering processing. I installed Hadoop using my own recipes from hadoop-vagrant to be run on a local Vagrant cluster, and then to be run in a AWS cluster.
  • Hadoop is a framework that allows the processing of certaing types of tasks in a distributed environment using commodity machines that allows it to massively scale horizontaly. Its main components are the map-reduce execution framework and the HDFS distributed filesystem. For more details, check out my blog post.
Getting the data:
  • After Hadoop was installed, the first task was to find and extract the data. The data was stored on a SqlServer database, so we needed to fetch it and put it into HDFS. There is a fantastic tool called Sqoop that's built just for this. Sqoop not only allows you to get data ready for HDFS, it actually uses Hadoop itself to paralellize the extraction of the data. The standard way to run Sqoop is as follows: sqoop import --driver com.microsoft.sqlserver.jdbc.SQLServerDriver --connect "jdbc:sqlserver://xxx:1433;username=xxx;password=xxx;databaseName=xxx" --query "select xxx from xxxx" --split-by customer._id --fetch-size 20 --delete-target-dir --target-dir aggregated_customers --package-name "clustercustomers.sqoop" --null-string '' --fields-terminated-by ','

The previous command generates the required hadoop compatible files that will be used in the subsequent analysis. The most important part is the query that you want to use to extract the data. In our case, in the first iteration, we extracted information like trade, vertical, claims, years_insured, and turnover. These values are the dimensions that we will use to group our "similar" customers.

K-Means Clustering.

I have read quite a bit about different machine learning techniques and algorithms. I have developed a bit with them in the past, particularly in the recommendation area. The first thing to decide with a Machine Learning problem is what exactly I want to achieve. First, let's look at the three main problems that Machine Learning solves, and then follow the reasoning behind my choices.

Machine Learning algorithms in Mahout can be broadly categorized in three main areas:

  • Recommendation Algorithms: Try to make an informed guess about what things you might like out of a large domain of things. In the simplest and most common form, the inference is done based on similarity. This similarity could be based on items that you've already said you like, or similarity with other users that happen to like the same items as you.
    • Assume we have a database of movies, and say you like Lethal Weapon.
      • Item-Based similarity:
        • recommendations for movies similar to Lethal Weapon.
      • User-Based similarity:
        • recommendations for movies that other people who liked Lethal Weapon liked as well
  • Classification Algorithms: In the family of Supervised Learning algorithms (supervised because the set of resolutions and categories are known beforehand). Classification algortihms allow you to assign an item to a particular category given a set of known characteristics (where the category belongs to a limited set of options)
    • This technique used in Spam detection systems.
      • Let's say you decide that any email with at least two of the following characteristics: 4 or more images, 4 words written in all-capital letters, and the text 'congratulations' with an exclamation mark at the end should be marked as Spam, and anything with fewer than two of these is not Spam.
      • The Classification system will be built upon this characteristics and rules. It knows that any incoming email that matches these rules will belong to the corresponding category.
  • Clustering Algorithms: These belong to the Unsupervised Learning family because there is no predetermined set of possible answers. Clustering algorithms are simply given a set of inputs with dimensions. The algorithm itself works out how to organize and group the data into individual clusters.

Given the previous definitions, it was very clear to me that what we needed was a Clustering solution because I didn't have any idea how the data was supposed to be organized. I wanted the system to figure out the clustering and return a set of groups containing similar customers in each of them.

I selected K-Means clustering as the clustering algorithm I wanted to use. I have some familiarity with it, and it's the most common clustering algorithm in use and in the bibliography around.

To quickly explain K-Means: When given a number of elements N and a number of resulting clusters K, it finds K centroid points (firstly at random) and iterates an X amount of times finding the n elements in N that are closer to each centroid k and grouping them together. In each iteration x the centroids are recalculated and the n elements are assigned to the cluster determined by their closest centroid. A better explanation is here.

From the previous explanation, you can see that K-Means expects the number of clusters K as an input. However, I had no idea at all of what a good number of clusters would be. In Mahout, you can combine K-Means with another clustering algorithm named Canopy. Canopy is capable of finding a first set of K centroids that can then be fed into K-Means.

The way Canopy works is roughly the following: Instead of being given a K for the total number of clusters, you provide Canopy with a measure of the size that you expect each cluster to have. This allows you to get different sized clusters on different runs of the algorithm (i.e. if you want to cluster people by a wider or narrower geographical location). The algorithm works by using a distance measure (most machine learning algorithms use this to find similarities) and a couple of threshold values T1 and T2. Looping through all the points in the dataset, the algorithm takes each point and compares aginst T1 and T2 (T2 > T2) to each of the already created Canopies. If it is between T2, it will be assigned to that existing Canopy. If it is between T1, but not T2, it will be added to the Canopy but will be allowed to be part of another Canopy. If it is neither it will be used as the center of a new Canopy. At the end, after all Canopies are created, the centroid is calculated for each of them. And these will be the centroids used for the next step using K-Means.

Deciding on and weighting the dimensions

For the K-Means, Canopy and in most Machine Learning algorithms, the way to find whether or not a particular item belongs or how to make a recommendation is based on a measure of distance. To be able to measure the distance between two items (customers, in our use case) their values need to be converted into a form that allows them to be compared and measured. This means that we need to convert our values, whatever they are, to a numeric representation that would allow us to use traditional distance measure algorithms to compare them.

The dimensions I planned on using for this first iteration were:

  • product
  • trade
  • turnover
  • employees
  • claims

Out of these dimensions, only 2 were already numbers (turnover and claims), and the other 3 were text values. Even trickier, only employees followed a directly comparable value ("less than 5 employees" is comparable to "more than 100 employees") while the other 2 were discret disjunt values (product "business" is not direcly comparable to product "shop").

For the case of employees, I converted the values to consecutive numbers like (values are made up for this example):

  • "less than 15 employees" -> 1
  • "between 15 and 50 employees" -> 2
  • "between 50 and 200 employees" -> 3

For the two discrete properties product and trade, I have to create individual dimensions for each of the discrete values that they can be. As in my example I was only going to use Baker and Accountant for trades and Shop and Business for product, the final dimensions Vector ended something like:

| shop | business | accountant | baker | turnover | employees | claims |

So let's say we wanted to model an accountant with 50000 turnover 20 employees and 2 claims. His vector would look like:

| 0 | 1 | 1 | 0 | 50000 | 2 | 2 |

We can already see a problem with this vector. In particular we can see that the value turnover is much larger than the rest of the dimensions. This means that the calculation of measure will be extremely influenced by this value: we say this value has a much bigger weight than the rest. For the example, we assume that there is a maximum turnover of 100000.

In our case, we want to give extra weight to the product and trade dimensions and make turnover much less significant.

Mahout offers some functionality for doing just that. Normally as an implementation of the class WeightedDistanceMeasure it works by building a Vector with multipliers for each of the dimensions of the original vector. This vector needs to be the same size as the dimensions vector. In our case, we could have a vector like this:

| 10 | 10 | 5 | 5 | 1/100000 | 1/2 | 1/10 |

The effect of that Vector will be to alter the values of the original by multiplying the product by 10, the trade by 5, making sure that turnover is always less than 1, halving the influence of number of employees and making claims less influential.

NOTE: Finding the correct dimensions and weights for a clustering algorithm is a really hard exercise which normally requires multiple iterations to find the "best" solution. Our example, following with the Spike approach for this hackathon, is using completely arbitrary values chosen just to prove the technique, and not carefully crafted normalizations of data. If these values are good enough for our examples, then they are good enough.

Following are the main parts of the raw code written to convert the initial data to a list of vectors:

public class VectorCreationMapReduce extends Configured implements Tool {

public static class VectorizerMapper extends Mapper<LongWritable, Text, Text, VectorWritable> {

    public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

        VectorWritable writer = new VectorWritable();
        System.out.println(value.toString());
        String[] values = value.toString().split("\\|");
        double[] verticals = vectorForVertical(values[1]);
        double[] trade = vectorForTrade(values[2]);
        double[] turnover = vectorForDouble(values[3]);
        double[] claimCount = vectorForDouble(values[4]);
        double[] xCoordinate = vectorForDouble(values[7]);
        double[] yCoordinate = vectorForDouble(values[8]);
        NamedVector vector = new NamedVector(new DenseVector(concatArrays(verticals, trade, turnover, claimCount, xCoordinate, yCoordinate)), values[0]);
        writer.set(vector);
        context.write(new Text(values[0]), writer);
    }

 }

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    int res = ToolRunner.run(conf, new VectorCreationMapReduce(), args);
    System.exit(res);
}

  @Override
  public int run(String[] strings) throws Exception {
    Configuration conf = super.getConf();
    conf.set("fs.default.name", "hdfs://"+ Configurations.HADOOP_MASTER_IP+":9000/");
    conf.set("mapred.job.tracker", Configurations.HADOOP_MASTER_IP+":9001");
    Job job = new Job(conf, "customer_to_vector_mapreduce");
    job.setJarByClass(VectorCreationMapReduce.class);
    job.setMapperClass(VectorizerMapper.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(VectorWritable.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(VectorWritable.class);
    job.setOutputFormatClass(SequenceFileOutputFormat.class);
    job.setNumReduceTasks(0);
    FileInputFormat.addInputPaths(job, "aggregated_customers_with_coordinates");
    FileOutputFormat.setOutputPath(job, new Path("vector_seq_file"));
    return job.waitForCompletion(true) ? 0 : 1;
  }
}

The code is a Hadoop map-reduce job (with only map phase) that takes the input from the Sqoop exported file and creates a NamedVector with the dimensions values. The Vector classes used are Mahout provided classes for use within Hadoop.

The next step was to run the actual Canopy algorithm to find the K centroids that we are going to feed to the K-Means algorithm.

This is run something like:

./hadoop org.apache.mahout.clustering.canopy.CanopyDriver  -i /vector_seq_file/part-m-00000 -o customer-centroids -dm clustercustomers.mahout.CustomWeightedEuclideanDistanceMeasure -t1 4.0 -t2 2.0

The previous command specifies the Mahout class that contains the Canopy Hadoop job. It specifies an input file from HDFS which is the output of the file generated by the previous vectorization process. It also specifies an output file customer-centroids where the generated vector centroids will be generated to. We also specify that we want to use an Euclidean distance measure with weighting, which is defined with the custom class we see below:

public class CustomWeightedEuclideanDistanceMeasure extends WeightedEuclideanDistanceMeasure {

  public static final Vector WEIGHTS = new DenseVector(new double[]{10, 10, 5, 5 ,1/1000, 1/2,1/10});
  public CustomWeightedEuclideanDistanceMeasure(){
      super();
      setWeights(WEIGHTS);
  }
}

This class simply extends the Mahout provided WeightedEuclideanDistanceMeasure and sets the custom weight vector that we mentioned above. This will make sure that when the algorithm runs, the weighting will be applied to all the vectors from the input.

Now that we have generated our K centroids, it is time to run the actual K-Means clustering algorithm. This is also very simple to run by using the Mahout provided classes:

 hadoop org.apache.mahout.clustering.kmeans.KMeansDriver  -i vector_seq_file/part-m-00000 -c customer-centroids/clusters-0-final -o customer-kmeans -dm clustercustomers.mahout.CustomWeightedEuclideanDistanceMeasure -x 10 -ow --clustering

In this command, we are specifying that we want to run the KMeansDriver hadoop job. We pass in the input vector file again, and we specify the centroids file generated by canopy with the -c option. We then specify where the clustering output should go, the use of the weighting mechanism again, and how many iterations we want to do on the data.

Here's a quick overview of how K-Means actually works:

The K-Means Clustering algorithm starts with the given set of K centroids and iterates over adjusting the centorids until the iteration limit X is reached or until the centroids converge to a point from where they don't move. Each iteration has 2 steps and works the following way. - For each point in the input, it finds the nearest centroid and assigns the point to the cluster represented by that centroid. - At the end of the iteration, the points are averaged to recalculate the new centroid possition. - If the maximum number of iterations is reached, or centroid points don't move any more, the clustering concludes.

K-Means (and Canopy as well) are parallelizable algorithms, meaning that you can have many jobs working on a subset of the problem and aggregating results. This is where Hadoop comes in for the clustering execution. Internally Mahout and in particular KMeansDriver is build to work on the Hadoop Map-Reduce infrastructure. By leveraging Hadoop's map-reduce proved implementation, Mahout algorithms are able to scale to very big data sets and process them in a parallel way.

After generating this cluster, the next step is to create individual cluster files and to a single cluster file with the simple syntax (cluster_id, customer_id).

This is done with the following map and reduce methods:

 public static class ClusterPassThroughMapper extends Mapper<IntWritable, WeightedVectorWritable, IntWritable, Text> {
    public void map(IntWritable key, WeightedVectorWritable value, Context context) throws IOException, InterruptedException {
      NamedVector vector = (NamedVector) value.getVector();
      context.write(key,new Text(vector.getName()));
    }
}

public static class ClusterPointsToIndividualFile extends Reducer<IntWritable, Text, IntWritable, Text> {
    private MultipleOutputs mos;

    public void setup(Context context) {
        mos = new MultipleOutputs(context);
    }


    public void reduce(IntWritable key, Iterable<Text> value, Context context) throws IOException, InterruptedException {
        for(Text text: value){
            mos.write("seq", key, text, "cluster-"+key.toString());
            context.write(key,text);
        }
    }

    public void cleanup(Context context) throws IOException, InterruptedException {
        mos.close();
    }

This has allowed us to obtain clusters of customers - next week's post will explore what can be done with these clusters!

PART 2

Read the first part of this hackathon implementation at Clustering our customers to get a full background on what's being presented here!

Analysing the data

At the end of the first post, we have obtained various clusters of customers in HDFS. This is the most important part of the job. However, we still need to do something with those clusters. We though about different possibilities: "The most common frequently asked question among members of a cluster", "What is the average premium that people pay in a particular cluster?", "What is the average time people are being insured with us in a particular cluster?".

Most of the analysis we defined here were straightforward aggregations, a job that Hadoop map-reduce does nicely (moreso as the data is already stored on HDFS).

There are many ways to analyse data with Hadoop including Hive, Scalding, Pig, and some others. Most of these do translations from a particular language to the native map-reduce algorithms supported by Hadoop. I chose Apache Pig for my analysis as I really liked the abstraction it creates on top of the standard Hadoop map-reduce. It does this by creating a language called Pig Latin. Pig Latin is very easy to read and write, both for people familiar with SQL analysis queries and for developers familiar with a procedural way of doing things.

Here I will show just one example, how to calculate the average premium for each cluster. Following is the Pig program that does this:

premiums = LOAD '/user/cscarion/imported_quotes' USING PigStorage('|') AS(rfq_id, premium, insurer);
cluster = LOAD '/user/cscarion/individual-clusters/part-r-00000' AS(clusterId, customerId);
customers = LOAD '/user/cscarion/aggregated_customers_text' using PigStorage('|') AS(id, vertical, trade, turnover, claims,rfq_id);

withPremiums = JOIN premiums BY rfq_id, customers BY rfq_id;

store withPremiums into 'withPremiums' using PigStorage('|');

groupCluster2 = JOIN withPremiums BY customers::id, cluster BY customerId;

grouped2 = GROUP groupCluster2 BY cluster::clusterId;

premiumsAverage = FOREACH grouped2 GENERATE group, AVG(groupCluster2.withPremiums::premiums::premium);

STORE premiumsAverage into 'premiumsAverage' using PigStorage('|');

The previous Pig program should be fairly straightforward to understand.

  • The RFQs with their respective premiums are loaded from HDFS into tuples like (rfq_id, premium, insurer)
  • The cluster information is loaded from HDFS into tuples like (cluster_id, customer_id)
  • The customers are loaded from the originally imported file into a tuple like (id, vertical, trade, turnover, claims,rfq_id)
  • The RFQs are joined with the customers using the rfq_id. This will basically add the premium to the customer collection.
  • The result of the previous join is joined with the cluster tuples. This essentially adds the cluster_id to the customer collection.
  • Results in the previous join are grouped by the cluster_id
  • For each group computed in the previous step, the average is calculated.
  • The results from the previous step are stored back into HDFS as a tuple (cluster_id, average_premium)

Querying the data from the main application

Now that we have the data and the analytics are passed to it, the next step is to consume this data from the main web application. For this, and to make it as unobtrusive as possible, I created a new server application (using Play) with a simple interface that was connected to the HDFS and could be queried from our main application.

So for example, when a customer is filling the form we can invoke an endpoint on this new service like this: GET /?trade=accountant&product=business&claims=2&turnover=25000&amployees=2

This call will vectorise that information, find the correct cluster, and return the information for given cluster. The important parts of the code follow:

def similarBusinesses(business: Business): Seq[Business] = {
  loadCentroids()
  val cluster = clusterForBusiness(business)
  val maxBusinesses = 100
  var currentBusinesses = 0
  CustomHadoopTextFileReader.readFile(s"hdfs://localhost:9000/individual-clusters/cluster-${cluster}-r-00000") {
    line =>
      val splitted = line.split("\t")
      userIds += splitted(1)
      currentBusinesses += 1

  }(currentBusinesses < maxBusinesses)
  businessesForUserIds(userIds)
}

That code finds the cluster for the business arriving from the main application. Then it reads the HDFS file representing that individual cluster and gets the business information for the returned user ids.

To find the cluster to which the business belongs to, we compare against the stored centroids:

  private def clusterForBusiness(business: Business): String = {
    val businessVector = business.vector
    var currentDistance = Double.MaxValue
    var selectedCentroid: (String, Cluster) = null
    for (centroid <- SimilarBusinessesRetriever.centroids) {
      if (distance(centroid._2.getCenter, businessVector) < currentDistance) {
        currentDistance = distance(centroid._2.getCenter, businessVector);
        selectedCentroid = centroid;
      }
    }
    clusterId = Integer.valueOf(selectedCentroid._1)
    selectedCentroid._1
  }

The code that actually reads the Hadoop filesystem follows, it looks like reading a simple file:

object CustomHadoopTextFileReader {
  def readFile(filePath: String)(f: String => Unit)(g: => Boolean = true) {
    try {
      val pt = new Path(filePath)
      val br = new BufferedReader(new InputStreamReader(SimilarBusinessesRetriever.fs.open(pt)));
      var line = br.readLine()
      while (line != null && g) {
        f(line)
        line = br.readLine()
      }
    } catch {
      case e: Exception =>
        e.printStackTrace()
    }
  }
}

Then to return the premium for a particular cluster:

def averagePremium(cluster: Int): Int = {
  CustomHadoopTextFileReader.readFile("hdfs://localhost:9000/premiumsAverage/part-r-00000") {
    line =>
      val splitted = line.split("\\|")
      if (cluster.toString == splitted(0)) {
        return Math.ceil(java.lang.Double.valueOf(splitted(1))).toInt
      }
  }(true)
  0
}

Any of these values can then be returned to the calling application - this can provide a list of "similar" businesses or retrieve the premium average paid for those similar businesses.

This was it for the first iteration of the hack! The second iteration was to use the same infrastructure to cluster customers based on the location of their businesses instead of the dimensions used here. Given that the basic procedure for clustering remains the same despite the dimensions utilised, the code for that looks similar to the code presented here.