Sunday, 3 March 2013

Learning linear models in Hadoop with Weka and Pentaho

The ever increasing size of recorded data and the recent rise in popularity of distributed processing systems such as Hadoop fuel the need for machine learning algorithms that can take advantage of the later in order to tackle the former. In many cases it is possible to carefully extract a subsample from a large data set and process it using a sequential algorithm on a single machine. However, there is evidence that, in some cases, even highly biased methods (such as linear regression) benefit from seeing all the data, and significantly outperform training on a carefully chosen subsample [1].

While certain supervised learning algorithms lend themselves naturally to parallel implementations - for example, ensemble methods such as bagging and random forests - others remain open research questions. A recent paper on the parallel learning of linear models [2] looked interesting because 1. the basic idea is super simple, 2. it uses stochastic gradient descent (SGD) and requires no communication between machines until the end, which is suited to Hadoop's single-pass map-reduce framework, and 3. just about all the necessary bits and pieces to implement it are already available in Weka and Pentaho. Weka already has an implementation of stochastic gradient descent for learning least squares linear models, so there was no need to consider distributed matrix-based solutions (furthermore, matrix-based solutions don't scale in the number of features, and large feature spaces is one situation where seeing all the data outperforms subsampling). The basic idea is:

  1. Split the data up into chunks
  2. Apply SGD to learn a linear model for each chunk (one pass)
  3. Take the weights (and bias) from each individual model and simply average them into an aggregated model
  4. Take the aggregated model from step 3 and use it as the starting point for step 2
  5. Repeat steps 1 - 4 until convergence (or happy with the error)
Bits that were already available in Weka and Pentaho:
  • SGD and SGD for text classification implemented in Weka
  • Pentaho Map Reduce for graphically designing map and reduce processes as Pentaho Data Integration (PDI) ETL processes
  • Weka plugins for PDI:
    • Knowledge Flow plugin for executing a data mining process
Bits that were missing:
  • The ability for the PDI Knowledge Flow plugin to listen for a "classifier" event and output a serialized Weka classifier in an outgoing PDI data field
  • A way of aggregating Weka SGD models
The first was easily rectified with a few extra lines of code in the Knowledge Flow plugin step; the second required a new "Weka Model Aggregator" step for PDI.

Test scenario

I quickly transfered my trusty sentiment analysis Twitter data set into HDFS as the basis for testing because no data preprocessing was needed and everything was good to go with the standard Hadoop TextFileInputFormat. The following diagram shows the basic map-reduce setup:

To facilitate multiple iterations over the data, Hadoop's efficient distributed cache mechanism is used copy the aggregated model from iteration i out to the local filesystem of each node - ready for the mappers at iteration i + 1 to load and continue training. Because stuff in the distributed cache is stored on the local filesystem of each node it meant that Weka's Knowledge Flow did not need to be made aware of HDFS.


To learn the SGD logistic regression models at each mapper I essentially used the same Knowledge Flow layout as from the sentiment analysis with Weka post - the main difference is that the ArffLoader step has been replaced with the plugin "KettleInject" step that allows rows from Kettle (PDI) to be converted into Weka Instances.

The PDI transformation used to implement the mappers makes use of the above Knowledge Flow layout and looks like:

The "Add constants" step is used to produce a constant key value. This ensures that all models produced by the mappers go to the same reducer instance.

The transformation used to implement the reducer uses the new "Weka Model Aggregator" PDI step and looks like:

Finally, we have a PDI job that ties it all together using the "Pentaho MapReduce" job entry:

There are two paths that are executed in the job graph. The first (failure path) gets executed on the first iteration only. This path is triggered by the failure of the "Hadoop Copy Files" step to copy the aggregated model file from a "staging" directory to a "deploy" directory in HDFS. There is no aggregated model file for the first iteration of training so we don't want to use the distributed cache mechanism in this case (and, in fact, it packs a sad if you specify a non-existent file via the "mapred.cache.files" property). The second (non-failure path) is executed for subsequent iterations, and makes sure that the aggregated model file is pushed out to nodes via the distributed cache. The whole job is repeated for as many iterations as desired.

So, that's about it. It doesn't quite implement Algorithm 2 from the paper as the data is not randomly shuffled and that algorithm uses an overcomplete solution - if there are k nodes then typically more than 1/kth of the data is processed at each node (i.e sampling with replacement). In practical terms it means that more iterations (or more data) are needed via the parallel approach than the single-machine sequential approach to reach the same error. Whether the parallel approach saves time over sequential approach depends on the amount of data and time available to learn the model. We might be able to achieve five iterations over the data in the parallel approach vs. one in the sequential approach in the time available.

The Weka Model Aggregator PDI step is still under development. So far it can aggregate SGD and SGDText Weka models produced in multiple passes over the data via multiple map-reduce jobs. It can also handle any other Weka batch or incremental learner to produce a voted ensemble (via the Vote meta classifier) in a single pass/map-reduce job. The ability to aggregate multiple naive Bayes models into a single model in one pass has yet to be added.