Thursday, 27 July 2017

Integrating Spark MLlib into Weka

The distributed Weka for Spark package has been available in Weka for several years now. One nice thing about the package is that it allows any Weka classifier to be trained in Spark. However, aside from averaging being implemented for regression and naive Bayes, most classifiers are learned in the cluster by using the embarrassingly simple "Dagging" (Disjoint Aggregation) ensemble approach. Spark's infrastructure provides the disjoint data chunks (RDD partitions) automatically, and standard desktop Weka classifiers are trained on each chunk/partition and then combined into a final voted ensemble via the Vote meta classifier. This can work fairly well, but partition size is another tuning parameter to consider and available RAM on worker nodes will enforce a hard upper limit. If partitions are too small relative to the total dataset size, then individual ensemble classifiers might fail to capture enough of the structure of the problem, in turn leading to lower predictive performance.

Spark's machine learning library (MLlib) has a small set of algorithms, but each has been designed to operate efficiently in a fully distributed fashion and produces a single final model. This could have an accuracy advantage over the Dagging approach for more complicated problems, and definitely has an advantage when interpretability of the final model is important.

This blog entry takes a look at how MLlib algorithms have been integrated into the latest version of the distributedWekaSpark package. Or, to be precise, there is now a new version of this package available with a new name: distributedWekaSparkDev. This has been done so that the old version of the package can continue to be used with (and remain consistent with) what is shown in the Advanced Data Mining With Weka MOOC. The new version of the package adds support for Spark data-frame based data sources (CSV, Avro and Parquet) as well as MLlib classifiers and regressors.

MLlib in desktop Weka

The new distributedWekaSparkDev package adds Weka wrapper classifiers for the major MLlib supervised learning schemes. These are designed to work just like any other Weka classifier, and operate on datasets that fit into main memory on the desktop client. They can be run from the command line, Explorer, Knowledge Flow and Experimenter interfaces. This allows MLlib schemes to be used within Weka's standard evaluation framework, used as base classifiers in meta learners, be combined with arbitrary Weka preprocessing filters in the FilteredClassifier, dropped into standard Knowledge Flow processes and used in repeated cross-validation experiments in the Experimenter. It continues Weka's interoperability theme that started with R (MLR) and CPython (Scikit-learn) integration. Now it is possible to run an experiment in the Experimenter that involves implementations from four different ML tools, safe in the knowledge that results are fully comparable due to the fact that the same data splits and evaluation metrics are used in each case.

10-fold cross-validation of an MLlib decision tree in Weka's Explorer


Under the hood the Weka wrappers for each MLlib classifier accept a standard Weka Instances object via the Classifier.buildClassifier() method. Standard Weka filters are applied where necessary to prepare the data for the underlying MLlib algorithm. For example, the MLlibNaiveBayes wrapper automatically discretizes any numeric fields if the user has selected a Bernoulli model. A utility class is then used to extract a list of individual Instance objects and then parallelize this into an RDD[Instance] via SparkContext.parallelize(). From here, the RDD[Instance] is converted into an RDD[LabeledPoint] that the underlying MLlib implementations can work with. During this conversion process auxiliary data structures, such as maps of categorical features, are computed for schemes that require them.

Comparing Weka's NaiveBayes to MLlib NaiveBayes in the Knowledge Flow


Default options for the MLlib wrapper classifiers result in a local Spark cluster getting started on-the-fly to perform the learning. Spark's local mode runs in the same JVM as Weka and utilizes the processing cores of the CPU as workers. However, there is nothing to stop the user specifying an external cluster to perform the processing.

Comparing MLlib algorithms to native Weka implementations in Weka's Experimenter UI

One nice thing about this approach is that standard Weka filters can be used for any data transformations needed, rather that using MLlib's transformers. The last mile involves invoking just the MLlib learning algorithm which, in turn, results in a model object for the type of classifier applied. These model objects can be used to predict individual LabeledPoint instances. LabeledPoint is a simple data structure that does not require the Spark distributed processing framework, so MLlib models can be applied to score data rapidly in a streaming fashion without requiring a cluster (local mode or otherwise). The following screenshots show Weka wrapped Spark MLlib decision tree model being used to score data in Pentaho Data Integration.

Weka wrapped Spark MLlib decision tree classifier loaded into the Weka Scoring step in Pentaho Data Integration

Previewing data scored using the Spark MLlib decision tree model in Pentaho Data Integration


MLlib in distributed Weka

The MLlib classifiers can also be applied in the distributed Weka for Spark framework on a real Spark cluster. The difference, compared to the desktop case, is that Spark's data sources are used to read large datasets directly into data frames in the distributed environment (rather than parallelizing a data set that has been read into Weka on the local machine). From here, a data frame is converted to an RDD[Instance], and the to an RDD[LabeledPoint]. During this process arbitrary Weka filters can be used to preprocess the data (prior to its conversion to LabeledPoints), as long as those filters are ones that are Streamable - i.e. do not require all the data to be seen as a batch before producing output. This is because the results of transforming the data in each partition must be consistent in terms of the structure of the data, in order to facilitate aggregation. Following this, an MLlib classifier is trained as per normal.

The distributedWekaSparkDev package also implements hold-out and cross-validation evaluation for MLlib classifiers when run in the cluster. In the case of cross-valdiation, it produces training and test folds that are consistent with those used when cross-validating Weka classifiers in the Spark cluster. This entails some fancy shuffling of the data for training MLlib classifiers because maximum parallelism during cross-validation in the Dagging and model averaging approach for Weka classifiers is achieved by building all training fold classifiers in one pass over the data. To do this, distributed Weka treats each partition of the RDD as containing part of each cross-validation fold (as shown in the following figure). On the other hand, cross-validation for MLlib classifiers is basically the sequential case - ie., each fold is processed in turn, albeit in parallel fashion by the learning algorithm, as a separate training dataset. So, in order to be comparable, a sequential training fold processed by MLlib during cross-validation needs to be constructed by assembling the data for that fold that is spread across the partitions of the RDD that distributed Weka processes during its cross-validation routine.

Cross-validation phase 1 for Weka classifiers when running in distributed Weka: building models for all training folds simultaneously

Cross-validation phase 2 for Weka classifiers when running in distributed Weka: evaluating models for all test folds simultaneously

Conclusion

Integration of Spark MLlib algorithms continues Weka's interoperability theme and expands the variety of schemes available to the user. It also provides, as is the case with Weka's R and Python integration, convenient no-coding access to the machine learning algorithms from MLlib. Weka's Interoperability with different languages and tools provides a convenient unified framework for experimental comparison across different implementations of the same algorithm. This simplifies the data scientist's job and reduces their workload when considering multiple tools for solving a particular predictive problem.

Wednesday, 18 May 2016

New Weka 3.8.0 Stable Release

On Friday 15th April NZT we released Weka 3.8.0. I've only just got around to writing about it because, after collapsing from exhaustion right after the release went out, I required immediate vacational therapy with my son at the theme parks on Australia's Gold Coast :-)

Weka uses the Linux model of releases, where an even second digit of a release number indicates a "stable" release and an odd second digit indicates a "development" release. Weka 3.8.0 is the first stable release of Weka since 2008! There are tons of new features and improvements in 3.8.0 compared to 3.6 (the previous stable release). Furthermore, it supports the newest Weka MOOC that launched on the 25th of April and the forthcoming 4th edition of the Data Mining book.

Some of the features added since Weka 3.6 include:

A package management system

The Weka software has evolved considerably since Weka 3.6. Many new algorithms and features (too many to detail here) have been added to the system, a number of which have been contributed by the community. With so many algorithms on offer we felt that the software could be considered overwhelming to the new user. Therefore a number of algorithms and community contributions were removed and placed into plugin packages. A package management system was added that allows the user to browser for, and selectively install, packages of interest. To date, there are 177 packages that can be installed via the package manager.

Another motivation for introducing the package management system was to make the process of contributing to the Weka software easier, and to ease the maintenance burden on the Weka development team. A contributor of a plugin package is responsible for maintaining its code and hosting the installable archive; while Weka simply tracks the package metadata. The package system also opens the door to the use of third-party libraries, something that we'd discouraged in the past in order to keep a lightweight footprint for Weka.

Plugin packages for Weka can be viewed online at the central package metadata repository.

A completely rewritten Knowledge Flow

Weka's Knowledge Flow user interface received a graphical makeover a few years ago, but for 3.8 it has been completely rewritten from scratch. The rewrite includes a brand new engine that is fully multithreaded and supports pluggable execution environments. There is also a radically simplified API for developers to use. New features in the knowledge flow include:
  • Automatic execution of individual steps in separate threads
  • Single-threaded execution for streaming flows
  • Separate executor service for resource intensive steps and tasks
  • Support for attribute selection and prediction boundary visualisation
  • JSON-based flow persistence
  • Support for loading legacy .kfml flow files
  • Settings and preferences at the application and perspective level
  • User-configurable logging level
  • New and simplified API

MTJ-based linear algebra

The old JAMA-based linear algebra routines have been replaced with MTJ. MTJ provides faster pure JVM routines than JAMA and, more importantly, can seamlessly use reference and system-optimised versions of native libraries based on BLAS, LAPACK and ARPACK if available. To that end we have provided three plugin packages - one for each of the major OS's - providing JNI native libraries. Mac OSX users are lucky because system-optimised versions are pre-installed, giving the biggest speed increases. Linux and Windows users get a native reference implementation in their respective packages (which is faster than a pure JVM implementation), but will have to install a system-optimised library if they want the ultimate speed. Multiple linear regression, Gaussian processes, PCA and LDA are some of the schemes in Weka to benefit from MTJ.

Core improvements

Numerous efficiency improvements to core data structures, filters and some classifiers have been made over the years. All of these add up to better memory utilisation and faster execution.

Workbench

Aside from the Knowledge Flow, the other major graphical user interfaces in Weka (Explorer and Experimenter) have remained largely unchanged from Weka 3.6. However, one new user interface - the Workbench - has been added in 3.8.0. This is a unified graphical interface that combines the other three (and any plugins that the user has installed) into one application. The Workbench is highly configurable, allowing the user to specify which applications and plugins will appear, along with settings relating to them.


Monday, 25 January 2016

CPython Scripting in Pentaho Data Integration

Using the approach developed for integrating Python into Weka, Pentaho Data Integration (PDI) now has a new step that can be used to leverage the Python programming language (and its extensive package-based support for scientific computing) as part of a data integration pipeline. The step has been released to the community from Pentaho Labs and can be installed directly from PDI via the marketplace.


Python is becoming a serious contender to R when it comes to programming language choice for data scientists. In fact, many folks are leveraging the strengths of both languages when developing solutions. With that in mind, it is clear that data scientists and predictive application developers can boost productivity by leveraging the PDI + Python combo. As we all know, data preparation consumes the bulk of time in a typical predictive project. That data prep can typically be achieved more quickly in PDI, compared to developing code from scratch, thanks to its intuitive graphical development environment and extensive library of connectors and processing steps. Instead of having to write (and rewrite) code to connect to source systems (such as relational databases, NoSQL databases, Hadoop filesystems and so forth), and to join/filter/blend data etc., PDI allows the developer to focus their coding efforts on the cool data science-oriented algorithms.

CPython Script Executor

As the name suggests, the new step uses the C implementation of the Python programming language. While there are JVM-based solutions available - such as Jython - that allow a more tightly integrated experience when executing in the JVM, these do not facilitate the use of many high-powered Python libraries for scientific computing, due to the fact that such libraries include highly optimised components that are written in C or Fortran. In order to gain access to such libraries, the PDI step launches, and communicates with, a micro-service running in the C Python environment. Communication is done over plain sockets and messages are stored in JSON structures. Datasets are transmitted as CSV and the very fast routines for reading and writing CSV from the pandas Python package are leveraged.

The step itself offers maximum flexibility when it comes to dealing with data. It can act as a start point/data source in PDI (thus allowing the developer the freedom to source data directly via their Python code if so desired), or it can accept data from an upstream step and push it into the Python environment. In the latter case, the user can opt to send all incoming rows to Python in one hit, send fixed sized batches of rows, or send rows one-at-a-time. In any of these cases the data sent is considered a dataset, gets stored in a user-specified variable in Python, and the user's Python script is invoked. In the "all data" case, there is also the option to apply reservoir sampling to down-sample to a fixed size before sending the data to Python. The pandas DataFrame is used as the data structure for datasets transferred into Python.



A python script can be specified via the built-in editor, or loaded from a file dynamically at runtime. There are two scenarios for getting output from the Python environment to pass on to downstream PDI steps for further processing. The first (primary) scenario is when there is a single variable to retrieve from Python and it is a pandas DataFrame. In this case, the columns of the data frame become output fields from the step. In the second scenario, one or more non-data frame variables may be specified. In this case, their values are assumed to be textual (or can be represented as text) or contain image data (in which case they are retrieved from Python as binary PNG data). Each variable is output in a separate PDI field.


Requirements

The CPython Script Executor step will work with PDI >= 5.0. Of course, it requires Python to be installed and the python executable to be in your PATH environment variable. The step has been tested with Python 2.7 and 3.x and, at a minimum, needs the pandas, matplotlib and numpy libraries to be installed. For Windows users in particular, I'd recommend installing the excellent Anaconda python distribution. This includes the entire SciPy stack (including pandas and scikit-learn) along with lots of other libraries.

Example

The example transformation shown in the following screenshot can be obtained from here.

The example uses Fisher's classic iris data. The first python step (at the top) simply computes some quartiles for the numeric columns in the iris data. This is output from the step as a pandas DataFrame, where each row corresponds to one of the quartiles computed (25th, 50th and 75th), and each column holds the value for one of the numeric fields in the iris data. The second python step from the top uses the scikit-learn decomposition routine to compute a principal components analysis on the iris data and then transforms the iris data into the PCA space, which is then the output of the step. The third python step from the top uses the matplotlib library and plotting routines from the pandas library to compute some visualisations of the iris data (scatter plot matrix, Andrew's curves, parallel coordinates and rad-viz). These are then extracted as binary PNG data from the python environment and saved to files in the same directory as the transformation was loaded from. The two python steps at the bottom of the transformation learn a decision tree model and then use that model to score the iris data respectively. The model is saved (from the python environment) to the directory that the transformation was loaded from.


Conclusion

The new PDI CPython Script Executor step opens up the power of Python to the PDI developer and data scientist. It joins the R Script Executor and Weka machine learning steps in PDI as part of an expanding array of advanced statistical and predictive tools that can be leveraged within data integration processes.

Tuesday, 30 June 2015

CPython Integration in Weka

Continuing the interoperability in Weka that was started with R integration a few years ago, we now have integration with Python. Whilst Weka has had the ability to do Python scripting via Jython for quite some time, the latest effort adds CPython integration in the form of a "wekaPython" package that can be installed via Weka's package manager. This opens the door to all the highly optimised scientific libraries in Python - such as numpy, scipy, pandas and scikit-learn - that have components written in C or Fortran. Scikit-learn is a relatively new machine learning library that is increasing in popularity very rapidly (see the latest KDNuggets software poll).

Like the R integration in Weka, the CPython support allows for general scripting via a Knowledge Flow Python scripting step. This allows arbitrary scripts to be executed and one or more variables to be extracted from the Python runtime. Weka instances are transferred into Python as pandas data frames, and pandas data frames can be extracted from Python and converted back into instances. Furthermore, arbitrary variables can be extracted in textual form, and matlibplot graphics can be extracted as PNG images.


The package also provides a wrapper classifier and wrapper clusterer for the supervised and unsupervised learning algorithms implemented in scikit-learn. This allows the scikit-learn algorithms to be used and evaluated within Weka's framework, just like the MLRClassifier from the RPlugin package allows ML algorithms from R to be used. With both RPlugin and wekaPython installed it is quite cool to run comparisons between implementations in the different frameworks - e.g. here is a quick comparison on some UCI datasets (using Weka's Experiment environment to run a 10x10 fold cross-validation) between random forest implementations in Weka, R and scikit-learn. All default settings were used except for the number of trees, which was set to 500 for each implementation. Since scikit-learn only handles numeric input variables, both Weka's random forest and the MLRClassifier running R random forest were wrapped in the FilteredClassifier to apply unsupervised nominal to binary encoding (one hot encoding) so that all three implementations received the same input:


weka.classifiers.sklearn.ScikitLearnClassifier

This classifier wraps the majority of the supervised learning algorithms in scikit-learn. The wrapper supports retrieving the underlying model from python (as a pickled string) so that the ScikitLearnClassifier can be serialised and used for prediction at a later date.


weka.clusterers.ScikitLearnClusterer

This clusterer wraps clustering algorithms in scikit-learn. It basically functions in exactly the same way as the ScikitLearnClassifier, which allows it to be used in any Weka UI or from Weka's command line interface.

Under the hood

The underlying integration works via a micro-server written in python that is launched by Weka automatically. Communication is done over plain sockets and messages are stored in JSON structures. Datasets are transmitted as plain CSV and image data as base64 encoded PNG.

wekaPython works with both Python 2.7.x and 3.x. As it relies on a few new features in core Weka, a snapshot build of the development version (3.7) of Weka is required until Weka 3.7.13 is released. Numpy, pandas, matplotlib and scikit-learn must be installed in python for the wekaPython package to operate. Anaconda is a nice python distribution that comes with all the requirements (and lots more).

Wednesday, 4 March 2015

Weka and Spark

Spark is smokin' at the moment. Every being and his/her/its four-legged canine-like companion seems to be scrambling to provide some kind of support for running their data processing platform under Spark. Hadoop/YARN is the old big data dinosaur, whereas Spark is like Zaphod Beeblebrox - so hip it has trouble seeing over its own pelvis; so cool you could keep a side of beef in it for a month :-) Certainly, Spark has advantages over first generation map-reduce when it comes to machine learning. Having data (or as much as possible) in memory can't be beat for iterative algorithms. Perhaps less so for incremental, single pass methods.

Anyhow, the coolness factor alone was sufficient to get me to have a go at seeing whether Weka's general purpose distributed processing package - distributedWekaBase - could be leveraged inside of Spark. To that end, there is now a distributedWekaSpark package which, short of a few small modifications to distributedWekaBase (mainly to support some retrieval of outputs in-memory rather than from files), proved fairly straightforward to produce. In fact, because develop/test cycles seemed so much faster in Spark than Hadoop, I prototyped Weka's distributed k-means|| implementation in Spark before coding it for Hadoop.

Internals

Internally, distributedWekaSpark handles CSV files only at present. In the future we'll look at supporting other data formats, such as Parquet, Avro and sequence files. CSV handling is the same as for distributedWekaHadoop - due to the fact that source data is split/partitioned over multiple nodes/workers it can't have a header row, and attribute names can be supplied via a "names" file or manually as a parameter to the jobs. The CSV data is loaded and parsed just once, resulting in an
RDD<weka.core.Instance> distributed data structure. Thanks to Spark's mapPartitions() processing framework, processing proceeds in much the same way as it does in distributedWekaHadoop, with the exception that the overhead of re-parsing the data on each iteration when running iterative algorithms is eliminated. Processing an entire partition at a time also avoids object creation overhead when making use of distributedWekaBase's classes.

Reduce operations are pairwise associative and commutative in Spark, and there isn't quite the analogue of a Hadoop reduce (where a single reducer instance iterates over a list of all elements with the same key value). Because of this, and to avoid lots of object creations again, many "reduce" operations were implemented via sorting and repartitioning followed by a map partitions phase. In most cases this approach works just fine. In the case of the job that randomly shuffles and stratifies the input data, the result (when there are class labels that occur less frequently than the number of requested folds) is slightly different to the Hadoop implementation. The Spark implementation results in these minority classes getting oversampled slightly.

distributedWekaSpark is not the only Weka on Spark effort available. Kyriakos-Aris Koliopoulos has been developing at the same time as myself, and released a proof-of-concept he developed for his Masters these about five months ago:

https://github.com/ariskk/distributedWekaSpark

I've borrowed his nifty cache heuristic that uses the source file size and object overhead settings to automatically determine a Spark storage level to use for RDDs.

Having RDDs referenceable for the duration that the Spark context is alive makes it possible to have a tighter coupling between Spark job steps in the Knowledge Flow. The success and failure connection types introduced in distributedWekaHadoop can now be used to carry data, such as the context and references to various RDD datasets that are in play. This allows Spark steps downstream from the start point in the flow to present a simpler version of their configuration UI, i.e. they can hide connection and CSV parsing details (as this is only required to be specified once, at the start step).

What's in the package?

The distributedWekaSpark package comes bundled with core Spark classes that are sufficient for running out-of-the-box in Spark's local mode and sourcing data from the local filesystem. This mode can take advantage of all the cores on your desktop machine by launching workers in separate threads. All the bundled template examples for Spark available from the Knowledge Flow's template menu use this mode of operation. It should also be sufficient to run against a standalone Spark cluster using a shared filesystem such as NTFS. 

To run against HDFS (and/or on YARN), it is necessary to delete all the Spark jar files in ${user.home}/wekafiles/packages/distributedWekaSpark/lib and copy in the spark-assembly-X.Y.Z-hadoopA.B.C.jar from your Spark distribution, as this will have been compiled against the version of Hadoop/HDFS that you're using.

All the same jobs that are available in distributedWekaHadoop have been implemented in distributedWekaSpark. Like in the Hadoop case, there is a full featured command line interface available. All jobs can stand alone - i.e. they will invoke other jobs (such as ARFF header creation and data shuffling) internally if necessary. As mentioned above, when running in the Knowledge Flow, individual job steps become aware of the Spark context and what datasets already exist in memory on the cluster. This allows the configuration of connection details and CSV parsing options to only have to be specified once, and downstream job steps can simplify their UI accordingly.





When referencing inputs or outputs in HDFS, the Weka Spark code handles hdfs:// URLs in a somewhat non-standard way. Like the way the hadoop fs/hdfs command operates relative to the current user's directory in HDFS (unless an absolute path is specified), hdfs:// URLs are interpreted relative to the current user's directory. So a URL like

hdfs://host:port/output/experiment1

refers to the output/experiment1 directory in the current user's home directory. To force an absolute path an extra / is needed - e.g.

hdfs://host:port//user/mhall/output/experiment1

Limitations

Aside from only handling CSV files at present, another limitation stems from the the fact that only one Spark context can be active within a single JVM. This imposes some constraints on the structure of Knowledge Flow processes using Spark steps. There can only be one Spark-related start point in a given Knowledge Flow graph. This is because a start point is where the Spark context is first created; so having more than one will lead to grief.

When running on a Spark cluster managed by YARN only the "yarn-client" mode is supported. This is the mode where the driver program executes on the local machine and the YARN resource manager is simply used to provision worker nodes in the cluster for Spark to use. The reason for this is that Weka's implementation configures everything programatically via SparkConf/JavaSparkContext, including all Weka and supporting jar files that are needed to run. This works fine for standalone Spark clusters, Mesos clusters and yarn-client mode, but not for yarn-cluster mode (where the driver program itself runs in an application master process on the cluster). In yarn-cluster mode some funky jiggery pokery (as done by the spark-submit shell script) is necessary to get everything configured for the driver to work on the cluster. It seems pretty ugly that this can't be handled by Spark seamlessly behind the scenes via the same SparkConf/SparkContext configuration as the other modes. Hopefully this will get rectified in a future release of Spark. Some discussion of this issue can be seen in the email thread at:

http://markmail.org/message/cmxswp2ffqjrkxix#query:+page:1+mid:42fug7it6f2zoxga+state:results

Another YARN-related issue is that it is necessary to have your Hadoop cluster's conf dir in the CLASSPATH. This is because Spark picks up the resource manager's address and other bits and pieces from the configuration files. Unfortunately, it is not possible to set this information programatically. You can only get at the Hadoop Configuration object being used by Spark internally after the SparkContext has been created - by this time it's too late, as Spark is already trying to talk to the resource manager.

Anyhow, distributedWekaSpark is available from Weka's package manager today. So give it a go and pass on any feedback you might have.

Monday, 22 September 2014

k-means|| in distributed Weka for Hadoop

This is a quick update to announce some support for clustering in distributed Weka. In particular, it now has an implementation of the scalable k-means++ (k-means||) algorithm. Although k-means is a super simple algorithm (even in the distributed case), it still took a bit of work to get all the necessary pieces together in order to achieve distributed Weka's goal of having the overall experience and final results similar to that of using desktop Weka. In particular I wanted
  • the final model to be the same as desktop Weka (i.e a subclass of SimpleKMeans) so that the textual output looks the same and the use of the trained model is no different than any other Weka clusterer
  • to use weka.core.EuclideanDistance, like SimpleKMeans does
  • to have built-in missing values replacement
  • to be able to use arbitrary streaming filters in the learning process for on-the-fly data transformation (just like the classifier job does)
  • to have the option of randomly selected starting points or the k-means++ initialisation

k-means|| initialisation

The k-means++ initialisation process starts with a single training instance selected uniformly at random as an initial candidate centroid, and then iteratively samples subsequent training instances with probability proportional to the distance between a candidate centre and the closest centre already chosen. The idea is that initial starting points that are well separated in instance space will lead to a good clustering result in possibly fewer iterations that purely random initialisation. 

In the sequential case, the k-means++ routine requires k passes over the data, one for each of the k clusters. Instead of sampling a single point in each iteration, k-means|| oversamples to the tune of O(k) points. The oversampling process lends itself naturally to parallelisation and the authors show that only a few iterations are necessary (usually 4 or 5 are plenty). The final set of candidate centres are reduced to k start points by first assigning a weight to each candidate centre (where the weight is the number of training instances that are closer to this centre than any other) and then running k-means on this weighted data. A k-means|| iteration requires one pass over the data to compute the distance between each training instance and the closest of the centres already selected and then a second pass to sample the O(k) candidates to add to the current sketch. However, through clever use of weighted reservoir sampling the cost computation and sampling can be achieved simultaneously in the same pass over the data. To this end, the distributedWekaBase package now includes a WeightedReservoirSample class which, in turn, is used by a CentroidSketch class. The reduce phase aggregates the reservoirs from each map and then updates the current sketch with the contents of the reservoir.

Distance function

SimpleKMeans uses Weka's EuclideanDistance class, which handles both numeric and nominal attributes and normalises all numeric attributes to the 0-1 range by default. Normalisation ensures that attributes on different scales have an equal impact on the distance computation. In order to normalise numeric attribute values the minimum and maximum of each are needed. Luckily in distributed Weka we have this information already from the ARFF profiling phase that produces the ARFF header. However, we need to jump through a few more hoops if the user has opted to apply pre-processing filters with the k-means job - in this case the data may get transformed arbitrarily and we will need to recompute minimums and maximums in the transformed space. In the first iteration, EuclideanDistance updates minimum and maximums incrementally as each transformed instance is seen in a given mapper. Since we re-use the ARFF header task to maintain (partial) statistics on each centroid in mappers, the minimum and maximum of each transformed numeric variable is available after the reduce phase at the end of the first iteration. The EuclideanDistance function can then be batch initialised for subsequent iterations with a two instance "dummy" dataset that just contains the minimums and maximums.

Multiple runs

Distributed Weka allows multiple runs of k-means to be executed simultaneously. This allows for parallelism in the reduce phase with as many reducers as there are runs being used. There is a heuristic that will drop a given run if it looks like it is settling into a sub-optimal local minima - i.e. if it is unlikely that the within cluster error will equal or better the current best performing run, within the remaining number of iterations. The final model is taken from the run with the lowest within cluster error.


The new k-means job is available in distributedWekaHadoop version 1.0.11.

Tuesday, 15 October 2013

Weka and Hadoop Part 3

This is the third of three posts covering some new functionality for distributed processing in Weka. The first and second installments covered base functionality and some of the Hadoop-specific wrappers. In this post we'll look at the remaining functionality in version 1.0 of the distributedWekaHadoop package.

Weka classifier evaluation job

This job builds on the classifier training job covered in installment 2 and provides map-reduce tasks to evaluate classifiers via the training data, a separate test set or cross-validation. Apart from ARFF header creation and the optional randomisation/stratification phase (both of which are re-usable once run initially), the evaluation job involves two passes over the data. The first builds the model and the second performs the evaluation.

In the case of a k-fold cross-validation, each mapper for the model building job divides its dataset up into k folds and builds k models in one hit. The reduce phase for the model building job can use up to k  reducers, with a reduce operation aggregating all the models for one fold of the cross-validation. The input to the evaluation pass over the data is then the aggregated model (k aggregated models in the case of cross-validation), pushed out to the nodes via the distributed cache, and either the input data (in the case of test on training or cross-validation) or a separate test set. In the case where the models are batch trained, the data at each map is randomly shuffled and then divided into stratified folds. In the case where the models are incrementally trained, the cross-validation folds are created and processed in a streaming fashion by extracting the instances for a given fold using a modulus operation. The same random seed is used in both the model building and evaluation job in order to keep the folds consistent.


The evaluation job adds only a few options over and above those in the classifier job. You can specify the number of nodes in your cluster so that the job can specify up to k reducers for a cross-validation. Weka's evaluation module computes just about all of its metrics incrementally in an additive fashion (perfectly suited to aggregation). The only exceptions are area under the ROC curve and area under the precision recall curve. These require predictions to be retained. By default, the evaluation job does not compute these two statistics. They can be computed by providing a number for the "sampleFractionForAUC" option. This allows the user to specify some percentage of the total number of predictions generated to be retained (via uniform random sampling) for computing these two statistics. In the above screenshot, we've set this to 0.5 - i.e. 50% of all the predictions generated in all the map tasks will be retained.

In the earlier discussion of the classifier training job we used it to build a model on all the data. It can also be used to train a model on a specific fold of a cross-validation by setting the "foldNumber" and "totalNumFolds" options. When the evaluation job uses the classifier job to perform cross-validation it sets the "foldNumber" option automatically in order to learn models for each of the folds. All we have to do when configuring the evaluation job is to set the "totalNumFolds" parameter.

The output of the evaluation job is the standard Weka evaluation results text (like when the Explorer or command line interface to Weka is used normally) and the metrics stored in a single line CSV and ARFF file. All of these files are written out to the "eval" subdirectory of the output directory in HDFS for the job.


Scoring job

The last Hadoop job in the version 1.0 release of the package is one to perform scoring (prediction) using a trained model. This job actually handles scoring using clusterers as well as classifiers, even though there aren't any clustering tasks/jobs in version 1.0 (stuff to do for version 1.1...).


The job doesn't require a reduce phase, so there will be as many output files in the output directory as there are map tasks run for the dataset being scored. Again the distributed cache is used to place the model on the local file system of each node. The model to be used can be initially on the local file system or in HDFS - the job looks in both places.

The map tasks build a mapping between the incoming data fields and what the model is expecting. Missing data fields, nominal values that haven't been seen during training and type mismatches between what the model is expecting and what is in the current input row are replaced with missing values. During the setup phase, when the mapping is being built, the job will fail if there are fewer than 50% of the attributes that the model is expecting to see present in the incoming data.

The map tasks output CSV data in the same format as the input data but with the predicted probability distribution (comma-separated label:probability pairs) appended to the end of each row. The user can opt to output fewer than all the input columns by setting the "columnsToOutputInScoredData" option.

Orchestrating jobs

The Hadoop jobs can be chained together using the sequential execution facility in the Knowledge Flow and/or new "success" and "failure" event types. The following screenshot shows a flow that:
  1. Transfers the hypothyroid data into HDFS
  2. Runs the correlation matrix + PCA job (which also executes the ARFF header creation job first)
  3. Re-uses the ARFF header and PCA filter created in step 2 to learn a filtered bagging model
  4. Extracts the learned model from HDFS and saves it to the local file system


As mentioned in the first installment of this series, all the jobs have an extensive command-line interface to facilitate scripting.

A note for Windows users

If you are running the Weka jobs from Windows and your Hadoop cluster is running on *nix machines then you will run into an issue with the classpath for the map and reduce tasks on the *nix side of things. It turns out that setting the classpath for a Hadoop job programatically uses the path separator character of the client system (naturally I guess). So under Windows the ";" character is used to separate entries in the classpath that is set in the Configuration object for the job. This will result in ClassNotFound exceptions when the job is actually executed on the *nix cluster. To get around this the Weka jobs will postprocess the classpath entry in the Configuration to replace ";"s with ":"s, but only if you tell it that you're running a Windows client against a *nix Hadoop cluster. To do this you just need to set the environment variable HADOOP_ON_LINUX=true. This is pretty hacky and if anyone knows of a more elegant solution to this please let me know.

Benchmarking on the KDD99 data

I ran a quick test on the KDD99 data set (just under 5 million instances, 42 attributes and 23 classes) on Waikato's Symphony torque cluster (quad core i7 processors at 2793 MHz). I set up a 10 node Hadoop cluster and ran a 10-fold cross-validation of a random forest consisting of 200 trees. The job involved creating the ARFF header, creating 15 randomly shuffled input chunks and then the evaluation itself. This took just under 5 minutes to run. Subsequent runs of 10-fold cross-validation using the already created input chunks took about 3 and a half minutes.


java weka.distributed.hadoop.WekaClassifierEvaluationHadoopJob \
-hdfs-host 192.168.22.240 -hdfs-port 9000 \
-jobtracker-host 192.168.22.240 -jobtracker-port 9001 \
-input-paths /users/mhall/input/kdd99 \
-output-path /users/mhall/output \
-header-file-name kdd99.arff -max-split-size 50000000 \
-randomized-chunks -num-chunks 15 \
-W weka.classifiers.meta.Bagging -total-folds 10 \
-num-nodes 10 -logging-interval 5 \
-user-prop mapred.child.java.opts=-Xmx1200m \
-- -W weka.classifiers.trees.RandomTree -I 200 \
-- -depth 3 -K 3




Next I doubled the size of the input data (just by duplicating the kdd 99 data), to give just under 10 million instances, and launched a 15 node Hadoop cluster. I ran the same job as before but increased the number of randomly shuffled data chunks from 15 to 30 (in order to keep the amount of data entering each map the same as before). This time the job ran in 4 minutes and 23 seconds (the average over several repetitions was about 4 minutes). Although each map is processing the same amount of data, the faster run time is explained by greater parallelism - each map in the model building process now only has to build half as many trees as it did in the first job in order to generate a forest of 200 trees.


Future stuff

There is a bunch of stuff that could go into future releases of the distributed packages. Some things I'd like to add for the next release include:
  1. Clustering. k-means first probably.
  2. More text mining stuff. SGDText and NaiveBayesMultinomialText can already be used in version 1.0 of the distributed packages. Weka's StringToWordVector filter really needs an option to allow a dictionary to be supplied by the user. Once this is done, we could have a job to create a dictionary (and IDF counts) - basically just a modification of the classic word count MR job - and then use the StringToWordVector filter as normal.
  3. The SubstringLabeler and SubstringReplacer Knowledge Flow steps need to become filters so that they can be used for pre-processing in the classifier training job. This would allow the twitter sentiment analysis example (which involves automatic creation of labelled training data) to be implemented as a map-reduce job.
  4. Allow ensembles of heterogeneous classifiers to be learned with the classifier job. At present, only a voted ensemble of classifiers of the same type can be learned. The job could be extended to allow the user to specify a set of base classifiers and then the map tasks could use their task number as a basis for choosing which classifier to build from the set.
  5. Oversampling in the randomly shuffled chunk creation task. This job already makes sure that minority classes have at least one instance in all data chunks but it could be extended to bias the final distribution of classes in each chunk towards a uniform distribution.
  6. Possibly the execution of a Knowledge Flow process in a map or reduce task.