Thursday, 27 July 2017

Integrating Spark MLlib into Weka

The distributed Weka for Spark package has been available in Weka for several years now. One nice thing about the package is that it allows any Weka classifier to be trained in Spark. However, aside from averaging being implemented for regression and naive Bayes, most classifiers are learned in the cluster by using the embarrassingly simple "Dagging" (Disjoint Aggregation) ensemble approach. Spark's infrastructure provides the disjoint data chunks (RDD partitions) automatically, and standard desktop Weka classifiers are trained on each chunk/partition and then combined into a final voted ensemble via the Vote meta classifier. This can work fairly well, but partition size is another tuning parameter to consider and available RAM on worker nodes will enforce a hard upper limit. If partitions are too small relative to the total dataset size, then individual ensemble classifiers might fail to capture enough of the structure of the problem, in turn leading to lower predictive performance.

Spark's machine learning library (MLlib) has a small set of algorithms, but each has been designed to operate efficiently in a fully distributed fashion and produces a single final model. This could have an accuracy advantage over the Dagging approach for more complicated problems, and definitely has an advantage when interpretability of the final model is important.

This blog entry takes a look at how MLlib algorithms have been integrated into the latest version of the distributedWekaSpark package. Or, to be precise, there is now a new version of this package available with a new name: distributedWekaSparkDev. This has been done so that the old version of the package can continue to be used with (and remain consistent with) what is shown in the Advanced Data Mining With Weka MOOC. The new version of the package adds support for Spark data-frame based data sources (CSV, Avro and Parquet) as well as MLlib classifiers and regressors.

MLlib in desktop Weka

The new distributedWekaSparkDev package adds Weka wrapper classifiers for the major MLlib supervised learning schemes. These are designed to work just like any other Weka classifier, and operate on datasets that fit into main memory on the desktop client. They can be run from the command line, Explorer, Knowledge Flow and Experimenter interfaces. This allows MLlib schemes to be used within Weka's standard evaluation framework, used as base classifiers in meta learners, be combined with arbitrary Weka preprocessing filters in the FilteredClassifier, dropped into standard Knowledge Flow processes and used in repeated cross-validation experiments in the Experimenter. It continues Weka's interoperability theme that started with R (MLR) and CPython (Scikit-learn) integration. Now it is possible to run an experiment in the Experimenter that involves implementations from four different ML tools, safe in the knowledge that results are fully comparable due to the fact that the same data splits and evaluation metrics are used in each case.

10-fold cross-validation of an MLlib decision tree in Weka's Explorer

Under the hood the Weka wrappers for each MLlib classifier accept a standard Weka Instances object via the Classifier.buildClassifier() method. Standard Weka filters are applied where necessary to prepare the data for the underlying MLlib algorithm. For example, the MLlibNaiveBayes wrapper automatically discretizes any numeric fields if the user has selected a Bernoulli model. A utility class is then used to extract a list of individual Instance objects and then parallelize this into an RDD[Instance] via SparkContext.parallelize(). From here, the RDD[Instance] is converted into an RDD[LabeledPoint] that the underlying MLlib implementations can work with. During this conversion process auxiliary data structures, such as maps of categorical features, are computed for schemes that require them.

Comparing Weka's NaiveBayes to MLlib NaiveBayes in the Knowledge Flow

Default options for the MLlib wrapper classifiers result in a local Spark cluster getting started on-the-fly to perform the learning. Spark's local mode runs in the same JVM as Weka and utilizes the processing cores of the CPU as workers. However, there is nothing to stop the user specifying an external cluster to perform the processing.

Comparing MLlib algorithms to native Weka implementations in Weka's Experimenter UI

One nice thing about this approach is that standard Weka filters can be used for any data transformations needed, rather that using MLlib's transformers. The last mile involves invoking just the MLlib learning algorithm which, in turn, results in a model object for the type of classifier applied. These model objects can be used to predict individual LabeledPoint instances. LabeledPoint is a simple data structure that does not require the Spark distributed processing framework, so MLlib models can be applied to score data rapidly in a streaming fashion without requiring a cluster (local mode or otherwise). The following screenshots show Weka wrapped Spark MLlib decision tree model being used to score data in Pentaho Data Integration.

Weka wrapped Spark MLlib decision tree classifier loaded into the Weka Scoring step in Pentaho Data Integration

Previewing data scored using the Spark MLlib decision tree model in Pentaho Data Integration

MLlib in distributed Weka

The MLlib classifiers can also be applied in the distributed Weka for Spark framework on a real Spark cluster. The difference, compared to the desktop case, is that Spark's data sources are used to read large datasets directly into data frames in the distributed environment (rather than parallelizing a data set that has been read into Weka on the local machine). From here, a data frame is converted to an RDD[Instance], and the to an RDD[LabeledPoint]. During this process arbitrary Weka filters can be used to preprocess the data (prior to its conversion to LabeledPoints), as long as those filters are ones that are Streamable - i.e. do not require all the data to be seen as a batch before producing output. This is because the results of transforming the data in each partition must be consistent in terms of the structure of the data, in order to facilitate aggregation. Following this, an MLlib classifier is trained as per normal.

The distributedWekaSparkDev package also implements hold-out and cross-validation evaluation for MLlib classifiers when run in the cluster. In the case of cross-valdiation, it produces training and test folds that are consistent with those used when cross-validating Weka classifiers in the Spark cluster. This entails some fancy shuffling of the data for training MLlib classifiers because maximum parallelism during cross-validation in the Dagging and model averaging approach for Weka classifiers is achieved by building all training fold classifiers in one pass over the data. To do this, distributed Weka treats each partition of the RDD as containing part of each cross-validation fold (as shown in the following figure). On the other hand, cross-validation for MLlib classifiers is basically the sequential case - ie., each fold is processed in turn, albeit in parallel fashion by the learning algorithm, as a separate training dataset. So, in order to be comparable, a sequential training fold processed by MLlib during cross-validation needs to be constructed by assembling the data for that fold that is spread across the partitions of the RDD that distributed Weka processes during its cross-validation routine.

Cross-validation phase 1 for Weka classifiers when running in distributed Weka: building models for all training folds simultaneously

Cross-validation phase 2 for Weka classifiers when running in distributed Weka: evaluating models for all test folds simultaneously


Integration of Spark MLlib algorithms continues Weka's interoperability theme and expands the variety of schemes available to the user. It also provides, as is the case with Weka's R and Python integration, convenient no-coding access to the machine learning algorithms from MLlib. Weka's Interoperability with different languages and tools provides a convenient unified framework for experimental comparison across different implementations of the same algorithm. This simplifies the data scientist's job and reduces their workload when considering multiple tools for solving a particular predictive problem.