Weka classifier evaluation job
This job builds on the classifier training job covered in installment 2 and provides map-reduce tasks to evaluate classifiers via the training data, a separate test set or cross-validation. Apart from ARFF header creation and the optional randomisation/stratification phase (both of which are re-usable once run initially), the evaluation job involves two passes over the data. The first builds the model and the second performs the evaluation.
In the case of a k-fold cross-validation, each mapper for the model building job divides its dataset up into k folds and builds k models in one hit. The reduce phase for the model building job can use up to k reducers, with a reduce operation aggregating all the models for one fold of the cross-validation. The input to the evaluation pass over the data is then the aggregated model (k aggregated models in the case of cross-validation), pushed out to the nodes via the distributed cache, and either the input data (in the case of test on training or cross-validation) or a separate test set. In the case where the models are batch trained, the data at each map is randomly shuffled and then divided into stratified folds. In the case where the models are incrementally trained, the cross-validation folds are created and processed in a streaming fashion by extracting the instances for a given fold using a modulus operation. The same random seed is used in both the model building and evaluation job in order to keep the folds consistent.
In the case of a k-fold cross-validation, each mapper for the model building job divides its dataset up into k folds and builds k models in one hit. The reduce phase for the model building job can use up to k reducers, with a reduce operation aggregating all the models for one fold of the cross-validation. The input to the evaluation pass over the data is then the aggregated model (k aggregated models in the case of cross-validation), pushed out to the nodes via the distributed cache, and either the input data (in the case of test on training or cross-validation) or a separate test set. In the case where the models are batch trained, the data at each map is randomly shuffled and then divided into stratified folds. In the case where the models are incrementally trained, the cross-validation folds are created and processed in a streaming fashion by extracting the instances for a given fold using a modulus operation. The same random seed is used in both the model building and evaluation job in order to keep the folds consistent.
The evaluation job adds only a few options over and above those in the classifier job. You can specify the number of nodes in your cluster so that the job can specify up to k reducers for a cross-validation. Weka's evaluation module computes just about all of its metrics incrementally in an additive fashion (perfectly suited to aggregation). The only exceptions are area under the ROC curve and area under the precision recall curve. These require predictions to be retained. By default, the evaluation job does not compute these two statistics. They can be computed by providing a number for the "sampleFractionForAUC" option. This allows the user to specify some percentage of the total number of predictions generated to be retained (via uniform random sampling) for computing these two statistics. In the above screenshot, we've set this to 0.5 - i.e. 50% of all the predictions generated in all the map tasks will be retained.
In the earlier discussion of the classifier training job we used it to build a model on all the data. It can also be used to train a model on a specific fold of a cross-validation by setting the "foldNumber" and "totalNumFolds" options. When the evaluation job uses the classifier job to perform cross-validation it sets the "foldNumber" option automatically in order to learn models for each of the folds. All we have to do when configuring the evaluation job is to set the "totalNumFolds" parameter.
The output of the evaluation job is the standard Weka evaluation results text (like when the Explorer or command line interface to Weka is used normally) and the metrics stored in a single line CSV and ARFF file. All of these files are written out to the "eval" subdirectory of the output directory in HDFS for the job.
Scoring job
The last Hadoop job in the version 1.0 release of the package is one to perform scoring (prediction) using a trained model. This job actually handles scoring using clusterers as well as classifiers, even though there aren't any clustering tasks/jobs in version 1.0 (stuff to do for version 1.1...).
The job doesn't require a reduce phase, so there will be as many output files in the output directory as there are map tasks run for the dataset being scored. Again the distributed cache is used to place the model on the local file system of each node. The model to be used can be initially on the local file system or in HDFS - the job looks in both places.
The map tasks build a mapping between the incoming data fields and what the model is expecting. Missing data fields, nominal values that haven't been seen during training and type mismatches between what the model is expecting and what is in the current input row are replaced with missing values. During the setup phase, when the mapping is being built, the job will fail if there are fewer than 50% of the attributes that the model is expecting to see present in the incoming data.
The map tasks output CSV data in the same format as the input data but with the predicted probability distribution (comma-separated label:probability pairs) appended to the end of each row. The user can opt to output fewer than all the input columns by setting the "columnsToOutputInScoredData" option.
Orchestrating jobs
The Hadoop jobs can be chained together using the sequential execution facility in the Knowledge Flow and/or new "success" and "failure" event types. The following screenshot shows a flow that:
- Transfers the hypothyroid data into HDFS
- Runs the correlation matrix + PCA job (which also executes the ARFF header creation job first)
- Re-uses the ARFF header and PCA filter created in step 2 to learn a filtered bagging model
- Extracts the learned model from HDFS and saves it to the local file system
As mentioned in the first installment of this series, all the jobs have an extensive command-line interface to facilitate scripting.
A note for Windows users
If you are running the Weka jobs from Windows and your Hadoop cluster is running on *nix machines then you will run into an issue with the classpath for the map and reduce tasks on the *nix side of things. It turns out that setting the classpath for a Hadoop job programatically uses the path separator character of the client system (naturally I guess). So under Windows the ";" character is used to separate entries in the classpath that is set in the Configuration object for the job. This will result in ClassNotFound exceptions when the job is actually executed on the *nix cluster. To get around this the Weka jobs will postprocess the classpath entry in the Configuration to replace ";"s with ":"s, but only if you tell it that you're running a Windows client against a *nix Hadoop cluster. To do this you just need to set the environment variable HADOOP_ON_LINUX=true. This is pretty hacky and if anyone knows of a more elegant solution to this please let me know.
Benchmarking on the KDD99 data
I ran a quick test on the KDD99 data set (just under 5 million instances, 42 attributes and 23 classes) on Waikato's Symphony torque cluster (quad core i7 processors at 2793 MHz). I set up a 10 node Hadoop cluster and ran a 10-fold cross-validation of a random forest consisting of 200 trees. The job involved creating the ARFF header, creating 15 randomly shuffled input chunks and then the evaluation itself. This took just under 5 minutes to run. Subsequent runs of 10-fold cross-validation using the already created input chunks took about 3 and a half minutes.
java weka.distributed.hadoop.WekaClassifierEvaluationHadoopJob \ -hdfs-host 192.168.22.240 -hdfs-port 9000 \ -jobtracker-host 192.168.22.240 -jobtracker-port 9001 \ -input-paths /users/mhall/input/kdd99 \ -output-path /users/mhall/output \ -header-file-name kdd99.arff -max-split-size 50000000 \ -randomized-chunks -num-chunks 15 \ -W weka.classifiers.meta.Bagging -total-folds 10 \ -num-nodes 10 -logging-interval 5 \ -user-prop mapred.child.java.opts=-Xmx1200m \ -- -W weka.classifiers.trees.RandomTree -I 200 \ -- -depth 3 -K 3
Next I doubled the size of the input data (just by duplicating the kdd 99 data), to give just under 10 million instances, and launched a 15 node Hadoop cluster. I ran the same job as before but increased the number of randomly shuffled data chunks from 15 to 30 (in order to keep the amount of data entering each map the same as before). This time the job ran in 4 minutes and 23 seconds (the average over several repetitions was about 4 minutes). Although each map is processing the same amount of data, the faster run time is explained by greater parallelism - each map in the model building process now only has to build half as many trees as it did in the first job in order to generate a forest of 200 trees.
Future stuff
There is a bunch of stuff that could go into future releases of the distributed packages. Some things I'd like to add for the next release include:
- Clustering. k-means first probably.
- More text mining stuff. SGDText and NaiveBayesMultinomialText can already be used in version 1.0 of the distributed packages. Weka's StringToWordVector filter really needs an option to allow a dictionary to be supplied by the user. Once this is done, we could have a job to create a dictionary (and IDF counts) - basically just a modification of the classic word count MR job - and then use the StringToWordVector filter as normal.
- The SubstringLabeler and SubstringReplacer Knowledge Flow steps need to become filters so that they can be used for pre-processing in the classifier training job. This would allow the twitter sentiment analysis example (which involves automatic creation of labelled training data) to be implemented as a map-reduce job.
- Allow ensembles of heterogeneous classifiers to be learned with the classifier job. At present, only a voted ensemble of classifiers of the same type can be learned. The job could be extended to allow the user to specify a set of base classifiers and then the map tasks could use their task number as a basis for choosing which classifier to build from the set.
- Oversampling in the randomly shuffled chunk creation task. This job already makes sure that minority classes have at least one instance in all data chunks but it could be extended to bias the final distribution of classes in each chunk towards a uniform distribution.
- Possibly the execution of a Knowledge Flow process in a map or reduce task.