The first new package is called distributedWekaBase. It provides base "map" and "reduce" tasks that are not tied to any specific distributed platform. The second, called distributedWekaHadoop, provides Hadoop-specific wrappers and jobs for these base tasks. In the future there could be other wrappers - one based on the Spark platform would be cool.
Base map and reduce tasksdistributedWekaBase version 1.0 provides tasks for:
- Determining a unified ARFF header from separate data chunks in CSV format. This is particularly important because, as Weka users know, Weka is quite particular about metadata - especially when it comes to nominal attributes. At the same time this task computes some handy summary statistics (that are stored as additional "meta attributes" in the header), such as count, sum, sum squared, min, max, num missing, mean, standard deviation and frequency counts for nominal values. These summary statistics come in useful for some of the other tasks listed below.
- Computing a correlation or covariance matrix. Once the ARFF header job has been run, then computing a correlation matrix can be completed in just one pass over the data given our handy summary stats. The matrix produced by this job can be read by Weka's Matrix class. Map tasks compute a partial matrix of covariance sums. The reduce tasks aggregates individual rows of the matrix in order to produce the final matrix. This means that parallelism can be exploited in the reduce phase by using as many reducers as there are rows in the matrix.
- Training a Weka classifier (or regressor). The map portion of this task can train any Weka classifier (batch or incremental) on a given data chunk and then the reduce portion will aggregate the individual models in various ways, depending on the type of classifier. Recently, a number of classifiers in Weka 3.7 have become Aggregateable. Such classifiers allow one final model, of the same type, to be produced from several separate models. Examples include: naive Bayes, naive Bayes multinomial, various linear regression models (learned by SGD) and Bagging. Other, non-Aggregateable, classifiers can be combined by forming a voted ensemble using Weka's Vote meta classifier. The classifier task also has various handy options such as allowing reservoir sampling to be used with batch learners (so that a maximum number of instances processed by the learning algorithm in a given map can be enforced), normal Weka filters to be used for pre-processing in each map (the task takes care of using various special subclasses of FilteredClassifier for wrapping the base classifier and filters depending on whether the base learner is Aggregateable and/or incremental), forcing batch learning for incremental learners (if desired), and for using a special "pre-constructed" filter (see below).
- Evaluating a classifier or regressor. This task handles evaluating a classifier using either the training data, a separate test set or cross-validation. Because Weka's Evaluation module is Aggregateable, and computes statistics incrementally, this is fairly straightforward. The process makes use of the classifier training task to learn an aggregated classifier in one pass over the data and then evaluation proceeds in a second pass. In the case of cross-validation, the classifiers for all folds are learned in one go (i.e. one aggregated classifier per fold) and then evaluated. In this case, the learning phase can make use of up to k reducers (one per fold). In the batch learning case, the normal process of creating folds (using Instances.train/testCV()) is used and the order of the instances in each map gets randomised first. In the case of incremental learning, instances are processed in a streaming fashion and a modulus operation is used to pull out the training/test instances corresponding to a given fold of the cross-validation.
- Scoring using a trained classifier or regressor. This is fairly simple and just takes uses a trained model to make predictions. No reducer is needed in this case. The task outputs input instances with predicted probability distributions appended. The user can specify which of the input attribute values to output along with the predictions. It also builds a mapping between the attributes in the incoming instances and those that the model is expecting, with missing attributes or type mismatches replaced with missing values.
- PreconstructedPCA. This is not a distributed task as such; instead it is a filter that can accept a correlation matrix or covariance matrix (as produced by the correlation matrix task) and produces a principal components analysis. The filter produces the same textual analysis output as Weka's standard PCA (in the attribute selection package) and also encapsulates the transformation for data filtering purposes. Once constructed, it can be used with the classifier building task.
Hadoop wrappers and jobs
distributedWekaHadoop version 1.0 provides a number of utilities for configuration/HDFS, mappers and reducers that wrap the base tasks, and jobs to orchestrate everything against Apache Hadoop 1.x (in particular, it has been developed and tested against Hadoop 1.1.2 and 1.2.1).
Getting datasets in and out of HDFS
The first thing this package provides is a "Loader" and "Saver" for HDFS. These can batch transfer or stream data in and out of HDFS using any base Loader or Saver - so any data format that Weka already supports can be written or read to/from HDFS. Because the package uses Hadoop's TextInputFormat for delivering data to mappers, we work solely with CSV files that have no header row. The CSVSaver in Weka 3.7.10 has a new option to omit the header row when writing a CSV file. The new HDFSSaver and HDFSLoader can be used from the command line or the Knowledge Flow GUI:
ARFF header creation job
The first job that the distributedWekaHadoop package provides is one to create a unified ARFF header + summary statistics from the input data. All Weka Hadoop jobs have an extensive command line interface (to facilitate scripting etc.) and a corresponding step in the Knowledge Flow GUI. The jobs also take care of making sure that all Weka classes (and dependencies) are available to map and reduce tasks executing in Hadoop. It does this by installing the Weka jar file (and other dependencies) in HDFS and then adding them to the distributed cache and classpath for the job.
java weka.Run ArffHeaderHadoopJob \ -hdfs-host palladium.local -hdfs-port 9000 \ -jobtracker-host palladium.local -jobtracker-port 9001 \ -input-paths /users/mhall/input/classification \ -output-path /users/mhall/output \ -names-file $HOME/hypothyroid.names -max-split-size 100000 \ -logging-interval 5 \ -user-prop mapred.child.java.opts=-Xmx500m
The job has options for specifying Hadoop connection details and input/output paths. It also allows control over the number of map tasks that actually get executed via the max-split-size option (this sets dfs.block.size) as Hadoop's default of 64Mb may not be appropriate for batch learning tasks, depending on data characteristics. The classifier job, covered in the next instalment of this series, has a pre-processing option to create a set of randomly shuffled input data chunks, which gives greater control over the number and size of the data sets processed by the mappers. The ARFF header job also has a set of options for controlling how the CSV input file gets parsed and processed. It is possible to specify attribute (column) names directly or have them read from a "names" file (one attribute name per line; not to be confused with the C4.5 ".names" file format) stored on the local file system or in HDFS.
As other Weka Hadoop jobs use the ARFF job internally, and it is not necessary to repeat it for subsequent jobs that process the same data set, it is possible to prevent the job from executing by providing a path to an existing ARFF header (in or out of HDFS) to use.
This ends the first part of our coverage of the new distributed Weka functionality. In part two I'll cover the remaining Hadoop jobs for learning and evaluating classifiers and performing a correlation analysis.