Spark is smokin' at the moment. Every being and his/her/its four-legged canine-like companion seems to be scrambling to provide some kind of support for running their data processing platform under Spark. Hadoop/YARN is the old big data dinosaur, whereas Spark is like
Zaphod Beeblebrox - so hip it has trouble seeing over its own pelvis; so cool you could keep a side of beef in it for a month :-) Certainly, Spark has advantages over first generation map-reduce when it comes to machine learning. Having data (or as much as possible) in memory can't be beat for iterative algorithms. Perhaps less so for incremental, single pass methods.
Anyhow, the coolness factor alone was sufficient to get me to have a go at seeing whether Weka's general purpose distributed processing package -
distributedWekaBase - could be leveraged inside of Spark. To that end, there is now a
distributedWekaSpark package which, short of a few small modifications to distributedWekaBase (mainly to support some retrieval of outputs in-memory rather than from files), proved fairly straightforward to produce. In fact, because develop/test cycles seemed so much faster in Spark than Hadoop, I prototyped Weka's distributed k-means|| implementation in Spark before coding it for Hadoop.
Internals
Internally, distributedWekaSpark handles CSV files only at present. In the future we'll look at supporting other data formats, such as Parquet, Avro and sequence files. CSV handling is the same as for
distributedWekaHadoop - due to the fact that source data is split/partitioned over multiple nodes/workers it can't have a header row, and attribute names can be supplied via a "names" file or manually as a parameter to the jobs. The CSV data is loaded and parsed just once, resulting in an
RDD<weka.core.Instance> distributed data structure. Thanks to Spark's mapPartitions() processing framework, processing proceeds in much the same way as it does in distributedWekaHadoop, with the exception that the overhead of re-parsing the data on each iteration when running iterative algorithms is eliminated. Processing an entire partition at a time also avoids object creation overhead when making use of distributedWekaBase's classes.
Reduce operations are pairwise associative and commutative in Spark, and there isn't quite the analogue of a Hadoop reduce (where a single reducer instance iterates over a list of all elements with the same key value). Because of this, and to avoid lots of object creations again, many "reduce" operations were implemented via sorting and repartitioning followed by a map partitions phase. In most cases this approach works just fine. In the case of the job that randomly shuffles and stratifies the input data, the result (when there are class labels that occur less frequently than the number of requested folds) is slightly different to the Hadoop implementation. The Spark implementation results in these minority classes getting oversampled slightly.
distributedWekaSpark is not the only Weka on Spark effort available.
Kyriakos-Aris Koliopoulos has been developing at the same time as myself, and released a proof-of-concept he developed for his Masters these about five months ago:
https://github.com/ariskk/distributedWekaSpark
I've borrowed his nifty cache heuristic that uses the source file size and object overhead settings to automatically determine a Spark storage level to use for RDDs.
Having RDDs referenceable for the duration that the Spark context is alive makes it possible to have a tighter coupling between Spark job steps in the Knowledge Flow. The success and failure connection types introduced in distributedWekaHadoop can now be used to carry data, such as the context and references to various RDD datasets that are in play. This allows Spark steps downstream from the start point in the flow to present a simpler version of their configuration UI, i.e. they can hide connection and CSV parsing details (as this is only required to be specified once, at the start step).
What's in the package?
The distributedWekaSpark package comes bundled with core Spark classes that are sufficient for running out-of-the-box in Spark's
local mode and sourcing data from the local filesystem. This mode can take advantage of all the cores on your desktop machine by launching workers in separate threads. All the bundled template examples for Spark available from the Knowledge Flow's template menu use this mode of operation. It should also be sufficient to run against a standalone Spark cluster using a shared filesystem such as NTFS.
To run against HDFS (and/or on YARN), it is necessary to delete all the Spark jar files in ${user.home}/wekafiles/packages/distributedWekaSpark/lib and copy in the spark-assembly-X.Y.Z-hadoopA.B.C.jar from your Spark distribution, as this will have been compiled against the version of Hadoop/HDFS that you're using.
All the same jobs that are available in distributedWekaHadoop have been implemented in distributedWekaSpark. Like in the Hadoop case, there is a full featured command line interface available. All jobs can stand alone - i.e. they will invoke other jobs (such as ARFF header creation and data shuffling) internally if necessary. As mentioned above, when running in the Knowledge Flow, individual job steps become aware of the Spark context and what datasets already exist in memory on the cluster. This allows the configuration of connection details and CSV parsing options to only have to be specified once, and downstream job steps can simplify their UI accordingly.
When referencing inputs or outputs in HDFS, the Weka Spark code handles hdfs:// URLs in a somewhat non-standard way. Like the way the hadoop fs/hdfs command operates relative to the current user's directory in HDFS (unless an absolute path is specified), hdfs:// URLs are interpreted relative to the current user's directory. So a URL like
hdfs://host:port/output/experiment1
refers to the output/experiment1 directory in the current user's home directory. To force an absolute path an extra / is needed - e.g.
hdfs://host:port//user/mhall/output/experiment1
Limitations
Aside from only handling CSV files at present, another limitation stems from the the fact that only one Spark context can be active within a single JVM. This imposes some constraints on the structure of Knowledge Flow processes using Spark steps. There can only be one Spark-related start point in a given Knowledge Flow graph. This is because a start point is where the Spark context is first created; so having more than one will lead to grief.
When running on a Spark cluster managed by YARN only the "yarn-client" mode is supported. This is the mode where the driver program executes on the local machine and the YARN resource manager is simply used to provision worker nodes in the cluster for Spark to use. The reason for this is that Weka's implementation configures everything programatically via
SparkConf/JavaSparkContext, including all Weka and supporting jar files that are needed to run. This works fine for standalone Spark clusters, Mesos clusters and yarn-client mode, but not for yarn-cluster mode (where the driver program itself runs in an application master process on the cluster). In yarn-cluster mode some funky jiggery pokery (as done by the spark-submit shell script) is necessary to get everything configured for the driver to work on the cluster. It seems pretty ugly that this can't be handled by Spark seamlessly behind the scenes via the same
SparkConf/SparkContext configuration as the other modes. Hopefully this will get rectified in a future release of Spark. Some discussion of this issue can be seen in the email thread at:
http://markmail.org/message/cmxswp2ffqjrkxix#query:+page:1+mid:42fug7it6f2zoxga+state:results
Another YARN-related issue is that it is necessary to have your Hadoop cluster's
conf dir in the
CLASSPATH. This is because Spark picks up the resource manager's address and other bits and pieces from the configuration files. Unfortunately, it is not possible to set this information programatically. You can only get at the Hadoop
Configuration object being used by Spark internally after the
SparkContext has been created - by this time it's too late, as Spark is already trying to talk to the resource manager.
Anyhow,
distributedWekaSpark is available from Weka's
package manager today. So give it a go and pass on any feedback you might have.