In this instalment we'll look at some more of the Hadoop-specific jobs.
Correlation/covariance matrix creation job
This Hadoop job produces either a correlation or covariance matrix (in a format that Weka's Matrix class can parse). It can handle numeric or nominal data (or a mixture of both) but the correlation matrix is only computed from the numeric fields in the data. Like the other remaining Hadoop jobs in the distributedWekaHadoop package, it requires that the ARFF header job be run first, and will run it automatically for you unless the -use-existing-header option is specified. The correlation matrix job relies on the summary meta data attributes computed by the ARFF header job so that it can compute the matrix in one pass over the data.
As explained in the first instalment, the map tasks compute a partial matrix of covariance sums for all rows in the matrix using their chunk of the data. The reducer phase aggregates on the basis of individual rows in the matrix, so the maps output rows of the full, but partially computed, matrix one at a time. This allows plenty of parallelism in the reduce phase, but does create lots of final output files (one for each reducer) that contain some of the rows of the final matrix. The job automatically tidies this up by reading all the part-r-xxxxx files and writing back to HDFS the final matrix in Weka's textual format. The job has an option to specify how many nodes are in the user's cluster, and then sets the number of reducers to min(num nodes * max reducers to run in parallel per node, num rows in the matrix).
The job also has options to specify a class attribute (which, if it happens to be a numeric field, is not part of the correlation analysis unless "keepClassAttributeIfSet" is selected) and run a principal components analysis in a post-processing phase after the job completes. The later does not distribute the computation of PCA - it runs locally on the client machine and the results are written back into the HDFS output directory. As PCA has a runtime that is at best quadratic in the number of input fields, this phase of the job is suitable for datasets that don't have tons of attributes. The PCA phase also creates a special serialised Weka filter that can be used for pre-processing in the classifier job.
Weka classifier builder job
This job uses map-reduce to build classifier models in Hadoop and is one of the most complicated due to the number of options it provides. It will run up to three distinct jobs types depending on options:
- ARFF header creation (can be omitted if this has already run previously)
- Optional creation of randomly shuffled (and stratified) input data chunks from the original data
- Training of a Weka model (can involve multiple passes/jobs over the entire data set in the case of iterative incremental algorithms like SGD)
Note that separate runs of the randomised chunk creation phase may not be deterministic (even with the same random seed) due to the fact that keys (there is one per chunk) output from the map tasks are not guaranteed to arrive at the reducer in the same order from run to run, combined with the way the reducer "deals" instances out to the output files. However, once run, the randomly shuffled chunks may be re-used in subsequent model building and evaluation tasks.
The model learning phase handles a number of different training scenarios:
- Aggregateable classifiers (produce one final model of the same type as the individual models)
- Incremental aggregateable classifiers - e.g. naive Bayes, naive Bayes multinomial and SGD
- Batch aggregateable classifiers - e.g. Bagging
- Incremental ones - e.g. Hoeffding trees, raced incremental logit boost
- Batch learners - the majority of the classifiers and regressors in Weka
In the case of non-aggregateable classifiers, the final model produced in the reduce phase is a voted ensemble of the models learned by the mappers. For the technically oriented, this is essentially a "Dagging" model. In all cases, the final serialised model is deposited in a "model" subdirectory of the output path of the job in HDFS, along with a copy of the ARFF header (sans summary attributes). The header can be prepended to new data sets and makes the model ready for deployment wherever required.
A note on Bagging: The job makes a special check for Bagging (actually for any method that extends weka.classifiers.IteratedSingleClassifierEnhancer) and will divide the total requested number of base models by the number of map tasks that will be run. Thus Bagging runs in each mapper in order to produce some of the total number of base models requested by the user. The random forest algorithm can be implemented by setting the base learner to RandomTree in Bagging. I guess the final model produced by using Bagging in Hadoop in this fashion is actually a Dagging one again, with the small difference that the base models trained by each map will have training datasets created by bootstrap sampling on the data chunk that enters the map.
The classifier job has a number of options to fine tune the creation of the final model:
- If not using the option to create randomly shuffled data chunks, and instead the mapredMaxSplitSize option is used to control the number of maps, then minTrainingFraction can be used to prune away a model created on a data split that contains less data that the others
- numIterations controls how many times the model learning phase is invoked. This option only makes sense for iterative incremental classifiers such as SGD. Each iteration is a separate complete pass over the data. Hadoop's distributed cache is used to distribute the intermediate model learned at iteration i out to the nodes for training to continue in the mappers at iteration i + 1
- pathToPreconstructedFilter allows the PCA filter optionally created by the correlation matrix job to be used to transform the data coming into each map
- filtersToUse can be used instead of (or in conjunction with) pathToPreconstructedFilter in order to use standard Weka filters to pre-process data entering a map. The job automatically determines whether a given filter can be used with the base classifier and wraps the base classifier in one of several special subclasses of Weka's FilteredClassifier. For example, in order to maintain the ability to aggregate an Aggregateable classifier after filtering the input data, it is necessary that all filters used with it are Streamable - i.e., they can determine their output format from only header information (so that the output format produced by the filter is the same in each map) and don't buffer input data
- forceBatchLearningForUpdateableClassifiers will, as the name suggests, force an incremental classifier to be trained in a batch fashion. This essentially makes no difference for naive Bayes but does for SGD, which will perform a number of epochs over the training data entering a given map when trained in a batch fashion.
- useReservoirSamplingWhenBatchLearning results in the data streamed into each map getting passed into a reservoir sampling filter. This is useful to control the total number of instances processed in batch learning when the user is too lazy to tune the number of maps carefully via the mapredMaxSplitSize option or the option to create randomly shuffled data chunks.
Hmm. I thought I'd be able to cover everything in two instalments of this series of blog postings. Looks like I'll need a third. To be continued...