Monday, 22 September 2014

k-means|| in distributed Weka for Hadoop

This is a quick update to announce some support for clustering in distributed Weka. In particular, it now has an implementation of the scalable k-means++ (k-means||) algorithm. Although k-means is a super simple algorithm (even in the distributed case), it still took a bit of work to get all the necessary pieces together in order to achieve distributed Weka's goal of having the overall experience and final results similar to that of using desktop Weka. In particular I wanted
  • the final model to be the same as desktop Weka (i.e a subclass of SimpleKMeans) so that the textual output looks the same and the use of the trained model is no different than any other Weka clusterer
  • to use weka.core.EuclideanDistance, like SimpleKMeans does
  • to have built-in missing values replacement
  • to be able to use arbitrary streaming filters in the learning process for on-the-fly data transformation (just like the classifier job does)
  • to have the option of randomly selected starting points or the k-means++ initialisation

k-means|| initialisation

The k-means++ initialisation process starts with a single training instance selected uniformly at random as an initial candidate centroid, and then iteratively samples subsequent training instances with probability proportional to the distance between a candidate centre and the closest centre already chosen. The idea is that initial starting points that are well separated in instance space will lead to a good clustering result in possibly fewer iterations that purely random initialisation. 

In the sequential case, the k-means++ routine requires k passes over the data, one for each of the k clusters. Instead of sampling a single point in each iteration, k-means|| oversamples to the tune of O(k) points. The oversampling process lends itself naturally to parallelisation and the authors show that only a few iterations are necessary (usually 4 or 5 are plenty). The final set of candidate centres are reduced to k start points by first assigning a weight to each candidate centre (where the weight is the number of training instances that are closer to this centre than any other) and then running k-means on this weighted data. A k-means|| iteration requires one pass over the data to compute the distance between each training instance and the closest of the centres already selected and then a second pass to sample the O(k) candidates to add to the current sketch. However, through clever use of weighted reservoir sampling the cost computation and sampling can be achieved simultaneously in the same pass over the data. To this end, the distributedWekaBase package now includes a WeightedReservoirSample class which, in turn, is used by a CentroidSketch class. The reduce phase aggregates the reservoirs from each map and then updates the current sketch with the contents of the reservoir.

Distance function

SimpleKMeans uses Weka's EuclideanDistance class, which handles both numeric and nominal attributes and normalises all numeric attributes to the 0-1 range by default. Normalisation ensures that attributes on different scales have an equal impact on the distance computation. In order to normalise numeric attribute values the minimum and maximum of each are needed. Luckily in distributed Weka we have this information already from the ARFF profiling phase that produces the ARFF header. However, we need to jump through a few more hoops if the user has opted to apply pre-processing filters with the k-means job - in this case the data may get transformed arbitrarily and we will need to recompute minimums and maximums in the transformed space. In the first iteration, EuclideanDistance updates minimum and maximums incrementally as each transformed instance is seen in a given mapper. Since we re-use the ARFF header task to maintain (partial) statistics on each centroid in mappers, the minimum and maximum of each transformed numeric variable is available after the reduce phase at the end of the first iteration. The EuclideanDistance function can then be batch initialised for subsequent iterations with a two instance "dummy" dataset that just contains the minimums and maximums.

Multiple runs

Distributed Weka allows multiple runs of k-means to be executed simultaneously. This allows for parallelism in the reduce phase with as many reducers as there are runs being used. There is a heuristic that will drop a given run if it looks like it is settling into a sub-optimal local minima - i.e. if it is unlikely that the within cluster error will equal or better the current best performing run, within the remaining number of iterations. The final model is taken from the run with the lowest within cluster error.

The new k-means job is available in distributedWekaHadoop version 1.0.11.