Anyhow, the coolness factor alone was sufficient to get me to have a go at seeing whether Weka's general purpose distributed processing package - distributedWekaBase - could be leveraged inside of Spark. To that end, there is now a distributedWekaSpark package which, short of a few small modifications to distributedWekaBase (mainly to support some retrieval of outputs in-memory rather than from files), proved fairly straightforward to produce. In fact, because develop/test cycles seemed so much faster in Spark than Hadoop, I prototyped Weka's distributed k-means|| implementation in Spark before coding it for Hadoop.
Internals
Internally, distributedWekaSpark handles CSV files only at present. In the future we'll look at supporting other data formats, such as Parquet, Avro and sequence files. CSV handling is the same as for distributedWekaHadoop - due to the fact that source data is split/partitioned over multiple nodes/workers it can't have a header row, and attribute names can be supplied via a "names" file or manually as a parameter to the jobs. The CSV data is loaded and parsed just once, resulting in an
RDD<weka.core.Instance> distributed data structure. Thanks to Spark's mapPartitions() processing framework, processing proceeds in much the same way as it does in distributedWekaHadoop, with the exception that the overhead of re-parsing the data on each iteration when running iterative algorithms is eliminated. Processing an entire partition at a time also avoids object creation overhead when making use of distributedWekaBase's classes.
Reduce operations are pairwise associative and commutative in Spark, and there isn't quite the analogue of a Hadoop reduce (where a single reducer instance iterates over a list of all elements with the same key value). Because of this, and to avoid lots of object creations again, many "reduce" operations were implemented via sorting and repartitioning followed by a map partitions phase. In most cases this approach works just fine. In the case of the job that randomly shuffles and stratifies the input data, the result (when there are class labels that occur less frequently than the number of requested folds) is slightly different to the Hadoop implementation. The Spark implementation results in these minority classes getting oversampled slightly.
distributedWekaSpark is not the only Weka on Spark effort available. Kyriakos-Aris Koliopoulos has been developing at the same time as myself, and released a proof-of-concept he developed for his Masters these about five months ago:
https://github.com/ariskk/distributedWekaSpark
I've borrowed his nifty cache heuristic that uses the source file size and object overhead settings to automatically determine a Spark storage level to use for RDDs.
Having RDDs referenceable for the duration that the Spark context is alive makes it possible to have a tighter coupling between Spark job steps in the Knowledge Flow. The success and failure connection types introduced in distributedWekaHadoop can now be used to carry data, such as the context and references to various RDD datasets that are in play. This allows Spark steps downstream from the start point in the flow to present a simpler version of their configuration UI, i.e. they can hide connection and CSV parsing details (as this is only required to be specified once, at the start step).
RDD<weka.core.Instance> distributed data structure. Thanks to Spark's mapPartitions() processing framework, processing proceeds in much the same way as it does in distributedWekaHadoop, with the exception that the overhead of re-parsing the data on each iteration when running iterative algorithms is eliminated. Processing an entire partition at a time also avoids object creation overhead when making use of distributedWekaBase's classes.
Reduce operations are pairwise associative and commutative in Spark, and there isn't quite the analogue of a Hadoop reduce (where a single reducer instance iterates over a list of all elements with the same key value). Because of this, and to avoid lots of object creations again, many "reduce" operations were implemented via sorting and repartitioning followed by a map partitions phase. In most cases this approach works just fine. In the case of the job that randomly shuffles and stratifies the input data, the result (when there are class labels that occur less frequently than the number of requested folds) is slightly different to the Hadoop implementation. The Spark implementation results in these minority classes getting oversampled slightly.
distributedWekaSpark is not the only Weka on Spark effort available. Kyriakos-Aris Koliopoulos has been developing at the same time as myself, and released a proof-of-concept he developed for his Masters these about five months ago:
https://github.com/ariskk/distributedWekaSpark
I've borrowed his nifty cache heuristic that uses the source file size and object overhead settings to automatically determine a Spark storage level to use for RDDs.
Having RDDs referenceable for the duration that the Spark context is alive makes it possible to have a tighter coupling between Spark job steps in the Knowledge Flow. The success and failure connection types introduced in distributedWekaHadoop can now be used to carry data, such as the context and references to various RDD datasets that are in play. This allows Spark steps downstream from the start point in the flow to present a simpler version of their configuration UI, i.e. they can hide connection and CSV parsing details (as this is only required to be specified once, at the start step).
What's in the package?
The distributedWekaSpark package comes bundled with core Spark classes that are sufficient for running out-of-the-box in Spark's local mode and sourcing data from the local filesystem. This mode can take advantage of all the cores on your desktop machine by launching workers in separate threads. All the bundled template examples for Spark available from the Knowledge Flow's template menu use this mode of operation. It should also be sufficient to run against a standalone Spark cluster using a shared filesystem such as NTFS.
To run against HDFS (and/or on YARN), it is necessary to delete all the Spark jar files in ${user.home}/wekafiles/packages/distributedWekaSpark/lib and copy in the spark-assembly-X.Y.Z-hadoopA.B.C.jar from your Spark distribution, as this will have been compiled against the version of Hadoop/HDFS that you're using.
All the same jobs that are available in distributedWekaHadoop have been implemented in distributedWekaSpark. Like in the Hadoop case, there is a full featured command line interface available. All jobs can stand alone - i.e. they will invoke other jobs (such as ARFF header creation and data shuffling) internally if necessary. As mentioned above, when running in the Knowledge Flow, individual job steps become aware of the Spark context and what datasets already exist in memory on the cluster. This allows the configuration of connection details and CSV parsing options to only have to be specified once, and downstream job steps can simplify their UI accordingly.
When referencing inputs or outputs in HDFS, the Weka Spark code handles hdfs:// URLs in a somewhat non-standard way. Like the way the hadoop fs/hdfs command operates relative to the current user's directory in HDFS (unless an absolute path is specified), hdfs:// URLs are interpreted relative to the current user's directory. So a URL like
hdfs://host:port/output/experiment1
refers to the output/experiment1 directory in the current user's home directory. To force an absolute path an extra / is needed - e.g.
hdfs://host:port//user/mhall/output/experiment1
Limitations
Aside from only handling CSV files at present, another limitation stems from the the fact that only one Spark context can be active within a single JVM. This imposes some constraints on the structure of Knowledge Flow processes using Spark steps. There can only be one Spark-related start point in a given Knowledge Flow graph. This is because a start point is where the Spark context is first created; so having more than one will lead to grief.
When running on a Spark cluster managed by YARN only the "yarn-client" mode is supported. This is the mode where the driver program executes on the local machine and the YARN resource manager is simply used to provision worker nodes in the cluster for Spark to use. The reason for this is that Weka's implementation configures everything programatically via SparkConf/JavaSparkContext, including all Weka and supporting jar files that are needed to run. This works fine for standalone Spark clusters, Mesos clusters and yarn-client mode, but not for yarn-cluster mode (where the driver program itself runs in an application master process on the cluster). In yarn-cluster mode some funky jiggery pokery (as done by the spark-submit shell script) is necessary to get everything configured for the driver to work on the cluster. It seems pretty ugly that this can't be handled by Spark seamlessly behind the scenes via the same SparkConf/SparkContext configuration as the other modes. Hopefully this will get rectified in a future release of Spark. Some discussion of this issue can be seen in the email thread at:
http://markmail.org/message/cmxswp2ffqjrkxix#query:+page:1+mid:42fug7it6f2zoxga+state:results
Another YARN-related issue is that it is necessary to have your Hadoop cluster's conf dir in the CLASSPATH. This is because Spark picks up the resource manager's address and other bits and pieces from the configuration files. Unfortunately, it is not possible to set this information programatically. You can only get at the Hadoop Configuration object being used by Spark internally after the SparkContext has been created - by this time it's too late, as Spark is already trying to talk to the resource manager.
Anyhow, distributedWekaSpark is available from Weka's package manager today. So give it a go and pass on any feedback you might have.
When running on a Spark cluster managed by YARN only the "yarn-client" mode is supported. This is the mode where the driver program executes on the local machine and the YARN resource manager is simply used to provision worker nodes in the cluster for Spark to use. The reason for this is that Weka's implementation configures everything programatically via SparkConf/JavaSparkContext, including all Weka and supporting jar files that are needed to run. This works fine for standalone Spark clusters, Mesos clusters and yarn-client mode, but not for yarn-cluster mode (where the driver program itself runs in an application master process on the cluster). In yarn-cluster mode some funky jiggery pokery (as done by the spark-submit shell script) is necessary to get everything configured for the driver to work on the cluster. It seems pretty ugly that this can't be handled by Spark seamlessly behind the scenes via the same SparkConf/SparkContext configuration as the other modes. Hopefully this will get rectified in a future release of Spark. Some discussion of this issue can be seen in the email thread at:
http://markmail.org/message/cmxswp2ffqjrkxix#query:+page:1+mid:42fug7it6f2zoxga+state:results
Another YARN-related issue is that it is necessary to have your Hadoop cluster's conf dir in the CLASSPATH. This is because Spark picks up the resource manager's address and other bits and pieces from the configuration files. Unfortunately, it is not possible to set this information programatically. You can only get at the Hadoop Configuration object being used by Spark internally after the SparkContext has been created - by this time it's too late, as Spark is already trying to talk to the resource manager.
Anyhow, distributedWekaSpark is available from Weka's package manager today. So give it a go and pass on any feedback you might have.
Mark,
ReplyDeletehave you got a Java example of how to use distributedWekaSpark from the code ?
Or some unit/integration tests from which the usage could be derived.
tnx in advance.
Hi Alex,
ReplyDeleteI don't have any specific code examples yet. However, all the individual jobs are OptionHandlers and have main() methods. So you can look at the main method to see how a single job is used. To see how a bunch of different jobs are used within one JVM (sharing a context and RDD datasets) you can look at the code for the Knowledge Flow steps (which invoke the main Weka spark job classes) - in particular weka.gui.beans.AbstractSparkJob (most of the logic is in this class, and subclasses of it are pretty much just extracting different types of results from their corresponding weka.distributed.spark.* job object).
Cheers,
Mark.
Alright, thanks
DeleteThis is great Mark!
ReplyDeleteI was searching for Weka with hadoop and found this which is even better.
I have a CDH cloudera cluster running but, I don't know how to connect the Weka to it.
It would be great if a tutorial similar to "Weka and Hadoop Part 1, 2 & 3" can be posted.
Thank you Mark!
Hi Amr,
ReplyDeleteI've verified that it works against a CDH 5.3.0 sandbox VM at least. What you have to do is delete all the jars in wekafiles/packages/distributedWekaSpark/lib and then copy everything from /usr/lib/hadoop/client/ into wekafiles/packages/distributedWekaSpark/lib and the CDH spark assembly jar too (/usr/lib/spark/lib).
The sample flows included in Knowledge Flow will then run against the CDH Spark master if you 1) copy the hypothyroid.csv file into hdfs somewhere, set the inputFile setting of the ArffHeaderSparkJob to point to the directory in hdfs, and then set the masterHost setting appropriately. E.g. for my quickstart VM I used:
inputFile: hdfs://quickstart.cloudera:8020/input/hypothyroid.csv
masterHost: spark://quickstart.cloudera
Optionally you can st the outputDir to point to a directory in hdfs too, rather than saving the results to your client machine's home directory.
Note that this was running against standalone Spark on CDH. I haven't tried it against YARN on CDH yet.
Cheers,
Mark.
Thanks Mark!
ReplyDeleteI'll try doing that soon and see what happens.
Hi Mark,
ReplyDeleteI have gone through the steps and built up the Knowledge Flow but, I get "12:48:50: Could not parse Master URL: ':null'" when I try to run the flow.
In ArffHeaderSparkJob I have:
inputfile:hdfs://192.168.56.102:8020/user/input/hypothyroid.csv
masterhost: spark://192.168.56.102
masterport: 18080
outputDir: C:\Users\Amr\wekafiles\OutDir
pathtoWekaJar: C:\Program Files (x86)\Weka-3-7\weka.jar
Does that seem alright?
Thank you,
Amr
Is there a stack trace in the console or in the Weka log (${user.home}/wekafiles/weka.log)? I assume that port 18080 is correct for your cluster (the default in Spark is 7077 for the master).
ReplyDeleteAlso, did you start with the template Knowledge Flow file for producing just an ARFF header? This can be accessed from the templates menu (third button from the right in the Knowledge Flow's toolbar).
Cheers,
Mark.
I missed starting with the file for producing the ARFF header. Thanks for pointing that out. Now I get "14:34:30: WARN - Could not connect to akka.tcp://sparkMaster@192.168.56.101:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@192.168.56.101:7077]"
ReplyDeleteI have tried 18080 as well.
This is the log file:
....
INFO: Monday, 30 March 2015
WARNING: loading a newer version (3.7.13-SNAPSHOT > 3.7.12)!
Combined: -master spark://192.168.56.101 -port 7077 -weka-jar "C:\\Program Files (x86)\\Weka-3-7\\weka.jar" -input-file hdfs://192.168.56.101:8020/user/input/hypothyroid.csv -min-slices 4 -output-dir "C:\\Users\\Amr Munshi\\wekafiles\\OutDir" -cluster-mem -1.0 -overhead 3.0 -mem-fraction 0.6 -names-file ${user.home}/wekafiles/packages/distributedWekaSpark/sample_data/hypothyroid.names -header-file-name hypo.arff -M ? -E ' -F , -compression 50.0
2015-03-30 14:34:03 weka.gui.beans.LogPanel logMessage
INFO: [FlowRunner] launching flow start points in parallel...
2015-03-30 14:34:03 weka.gui.beans.LogPanel logMessage
INFO: [FlowRunner] Launching flow 1...
2015-03-30 14:34:03 weka.gui.beans.LogPanel logMessage
INFO: Setting job name to: WekaKF:ARFF instances header job
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/C:/Users/Amr%20Munshi/wekafiles/packages/distributedWekaSpark/lib/slf4j-log4j12.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/C:/Users/Amr%20Munshi/wekafiles/packages/distributedWekaSpark/lib/spark-assembly-1.3.0-hadoop2.0.0-mr1-cdh4.2.0.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
2015-03-30 14:34:04 weka.gui.beans.LogPanel logMessage
INFO: INFO - Changing view acls to: Amr Munshi
This comment has been removed by the author.
ReplyDelete2015-03-30 14:34:46 weka.gui.beans.LogPanel logMessage
ReplyDeleteINFO: WARN - Could not connect to akka.tcp://sparkMaster@192.168.56.101:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@192.168.56.101:7077]
2015-03-30 14:34:47 weka.gui.beans.LogPanel logMessage
INFO: WARN - Could not connect to akka.tcp://sparkMaster@192.168.56.101:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@192.168.56.101:7077]
2015-03-30 14:34:48 weka.gui.beans.LogPanel logMessage
INFO: WARN - Could not connect to akka.tcp://sparkMaster@192.168.56.101:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@192.168.56.101:7077]
2015-03-30 14:34:49 weka.gui.beans.LogPanel logMessage
INFO: WARN - Could not connect to akka.tcp://sparkMaster@192.168.56.101:7077: akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkMaster@192.168.56.101:7077]
2015-03-30 14:35:05 weka.gui.beans.LogPanel logMessage
INFO: ERROR - Application has been killed. Reason: All masters are unresponsive! Giving up.
2015-03-30 14:35:05 weka.gui.beans.LogPanel logMessage
INFO: WARN - Application ID is not initialized yet.
2015-03-30 14:35:05 weka.gui.beans.LogPanel logMessage
INFO: ERROR - Exiting due to error from cluster scheduler: All masters are unresponsive! Giving up.
Hi Amr,
ReplyDeleteIs 192.168.56.102 the address for the master that is displayed on the Spark cluster web UI? I've found that you need to use exactly the address that is shown there. Beyond this it is almost certainly likely to be a networking issue. Spark uses a bunch of ports and web servers for communication between the driver, master and workers. I'm assuming that your Windows machine is not on the same network as the cluster. You'll need to check that all the necessary ports are accessible from your machine.
Another thing to check is that your machine (the client) is accessible from the master on the cluster. Under *nix at least, the output of the 'hostname' command or first non-local host interface of the client machine is picked up and passed to the master. If this is not reachable by the master then you'll have problems. There are a couple of properties (that can be set in the Weka Spark step's property field) that can be used to explicitly set this if necessary: spark.local.ip and spark.driver.host.
If possible, I would first suggest running Weka on one of the cluster nodes. If this works, then the problems are definitely network related with respect to your Windows machine.
Cheers,
Mark.
This comment has been removed by the author.
ReplyDeleteThe ip is dynamic so I checked it before thats why I have 192.168.56.101 and sometimes 192.168.56.102. And I tried pinging from the master's cmd and it worked.
ReplyDeleteI think as you suggested that I have some issues with Spark configurations on the client.
Thanks for helping
Hi Mark,
ReplyDeleteI solved the issue, it was something to do with the IPs of the master and client machine. I gave static IPs to all and I got them connected.
Now, I have in the 'inputFile'=hdfs://192.168.56.105:8020/user/input/hypothyroid.csv
But, I get: ArffHeaderSparkJob$189847010|ERROR: Input path does not exist: hdfs://192.168.56.105:8020/user/Amr/user/input/hypothyroid.csv. It adds "/user/Amr/" to the input file.
Excellent. The code allows for both relative (to your home directory in HDFS) and absolute paths. Use either:
ReplyDeletehdfs://192.168.56.105:8020/input/hypothyroid.csv
or
hdfs://192.168.56.105:8020//user/Amr/user/input/hypothyroid.csv
Note the // after the port number in the second URL.
Cheers,
Mark.
Thanks a lot Mark!
ReplyDeleteAppreciate your help and patience.
Hi Mark
ReplyDeleteCan you please give us an example on how to use this library from a Java program ?, i know how to use Weka from Java and also Spark but i can't understand how to use this in Java.
Thank you
Each job is basically self contained and can be invoked from a Java program with just a few lines of code. Outputs are written to the file system and, in many cases, can be obtained from the job object programatically. For example, using the classifier building job:
ReplyDeleteString[] options = ... // an array of command line options for the job
WekaClassifierSparkJob wcsj = new WekaClassifierSparkJob();
wcsj.setOptions(options);
wcsj.runJob();
Classifier finalModel = wcsj.getClassifier();
Instances modelHeader = wcsj.getTrainingHeader();
Cheers,
Mark.
Thank you Mark :)
Deletei get this error while using HDFS problem:
ReplyDeleteINFO: ArffHeaderSparkJob$950316213|ERROR: Server IPC version 9 cannot communicate with client version 4
org.apache.hadoop.ipc.RemoteException: Server IPC version 9 cannot communicate with client version 4
any solution ?
Replace all the jar files in ~/wekafiles/packages/distributedWekaSpark/lib with the spark assembly jar that comes with your spark distribution.
ReplyDeleteCheers,
Mark.
Thank you again :)
ReplyDeleteHi Mark,
ReplyDeleteThis is an interesting implementation. Any Spark wrapper for LinearRegression ?
Thanks.
Jason
Hi Jason,
ReplyDeleteYou can use the WekaClassifierSpark job to train (via multiple iterations over the data) weka.classifiers.functions.SGD. Setting the loss function in SGD to squared loss will give you linear regression (albeit without the attribute selection performed by weka.classifiers.LinearRegression).
Cheers,
Mark.
Hi Mark,
ReplyDeleteI would like to implement distributedWekaSpark jobs on yarn-cluster but I'm really stuck with even the basics like how to setup the data through because I'm just starting out with spark as well. Can you kindly share how i can use the functions/class Dataset or which class I should use to set up the data.
It's not really necessary to dig that deep into the supporting classes and API if you're wanting to use the distributedWeka jobs programatically. Each job class has a fully functional command line option parsing mechanism, so you just need to construct an instance of a particular job (e.g. WekaClassifierSparkJob) and then call setOptions() to fully configure it. Then you just call the runJob() method to execute it. If you want to get any outputs produced, then all the jobs write to the specified output directory. Alternatively, most of them have methods to retrieve outputs programatically once the job completes (e.g. the WekaClassifierSparkJob as a getClassifier() method and a getTrainingHeader() method to get the trained classifier and header-only Instances object of the training data respectively).
ReplyDeleteCheers,
Mark.
Hi Mark,
DeleteIts actually much easier than I thought to set up a job, thanks for all the work. However, I'm still having a little problem, see my code below:
//set sparkContext
JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count"));
//Load Header as Instances object
String toyDataHeader = "/Users/dmatekenya/Documents/BestDrSc/LocationPrediction2.0/data/toyData/toyDataHeader.arff";
DataSource source_l = new DataSource(toyDataHeader);
Instances header = source_l.getDataSet();
//File containing csv data
String toyDataCSV = "/Users/dmatekenya/Documents/BestDrSc/LocationPrediction2.0/data/toyData/toyDataCSV.csv";
//Now create WekaClassifierSparkJob and configure it
WekaClassifierSparkJob cls = new WekaClassifierSparkJob();
JavaRDD data_RDD = cls.loadCSVFile(toyDataCSV,header,"", sc,"",-1,false);
Dataset distData = new Dataset("toyData",data_RDD);
cls.setDataset("toyData",distData);
cls.setNumIterations(50);
cls.setClassAttribute(4);
wcsj.runJob();
The problem is on the line where I try to load CSV file. It gives the compilation error shown below which I'm not sure how to resolve:
"The type weka.gui.beans.ClassifierProducer cannot be resolved. It is indirectly referenced from required .class files"
Thanks in advance for your help.
Dunstan
Can you provide your example to my mail id vsdrshn@gmail.com it would be very helpful thanks in advance also wanted to know if we can use voting mechanism in spark programming
DeleteMark,
ReplyDeleteThanks for the quick response. I guess I'm thinking I have to follow the same pipeline as in regular weka. Let me have a go at it again. I will post if I need further help.
Best,
Dunstan
I installed Weka 3.7.13 x64 for Windows and all the distributed packages. I then loaded the "Spark:train and save two classifiers" knowledge flow and ran it. The ArfHeaderSparkJob "Executing" takes forever (close to two hours already) and there is no error messages. What is going on? It is said to "run out of the box" so is it not an example? How do I know Spark works properly in Weka?
ReplyDeleteThanks, Chen
I would recommend that you just install distributedWekaSpark (which will also install distributedWekaBase automatically). Weka's package system uses a shared classpath. This makes having package dependencies easy to implement but can lead to library clashes. Installing all the distributed Weka for Hadoop packages as well as for Spark will probably lead to issues.
ReplyDeleteTo see if there are exceptions/problems you should use the Start menu option for launching Weka with a console. You can also check the weka.log file - ${user.home}/wekafiles/weka.log.
I have run the template examples for distributedWekaSpark under x64 Windows 7 and x64 Windows 8 without issues.
Cheers,
Mark.
Hi, Mark:
DeleteThanks for your prompt reply! That's the problem. When Hadoop packages are removed, the problem is gone.
Then I simplified the KnowledgeFlow to just WekaClassifierSparkJob (Naive Bayes) and try to scale up the data size. The data I used is KDD99. I know my data files are OK since I ran PySpark classifiers on them successfully. I wish to see Weka running it. The classifier ran OK up to 10% of the KDD99 but raised error at the full set. The ERROR lines in the console (twice after screen top overflow) are below:
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
16/01/26 14:48:39 ERROR scheduler.TaskSchedulerImpl: Exception in statusUpdate
java.util.concurrent.RejectedExecutionException: Task org.apache.spark.scheduler
.TaskResultGetter$$anon$3@67d1a07f rejected from java.util.concurrent.ThreadPool
Executor@c45a52c[Terminated, pool size = 0, active threads = 0, queued tasks = 0
, completed tasks = 2]
Hy,
ReplyDeletePlease is there any way to Handle GRIB Files with Weka or Spark please?
Thank you
I don't think there is a way to read GRIB files with Weka regardless of whether it's running on your desktop or in Spark :-) You will need to convert your GRIB files to CSV before you can process them using Weka on Spark.
ReplyDeleteCheers,
Mark.
Thank you very much Mark, I hope there will be examples to use your Spark package, because i want to do some comparaison of clustering algorithms. Anyway, thank you again :)
ReplyDeleteHello mark, is there any memory constraint if i use larger datasets on spark templets ? basic weka has memory constraint of 1.2GB
ReplyDeleteThat will depend on the configuration of your Spark cluster, size of the dataset, number of partitions in the RDD that backs the dataset and choice of algorithm. In general, you will want to make sure that there are enough partitions in the data to enable a given partition to be processed in the heap space available to a Spark worker process.
ReplyDeleteCheers,
Mark.
Hello Mark ,Can you please provide an example on how i can run j48 classifier in my cluster?Also wanted to know if i can create and save the model once and apply to the test set to generate the class Thanks in advance
ReplyDeleteProgrammatic Example
DeleteHi, I have installed distributedwekaspark on my machine (os-windows 8, weka-3.8, spark 2.0). The package works fine when i run it in standalone mode. But when i replace the jar files from the spark distribution (spark standalone mode work on the same machine - windows-8 which is also working) and execute the arfffheader flow, it throws below error. can you please let me know if it can work with spark on the same machine.?
ReplyDeleteError logs from weka.log as below -
java.lang.AbstractMethodError: weka.distributed.spark.ArffHeaderSparkJob$1.call(Ljava/lang/Object;)Ljava/util/Iterator;
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:152)
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:152)
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:766)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:332)
org.apache.spark.rdd.RDD$$anonfun$8.apply(RDD.scala:330)
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:919)
org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:910)
org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:866)
org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:910)
org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:668)
org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:330)
org.apache.spark.rdd.RDD.iterator(RDD.scala:281)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
org.apache.spark.scheduler.Task.run(Task.scala:85)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
java.lang.Thread.run(Unknown Source)
There are API breaking changes in Spark 2.0. Distributed Weka for Spark has not been changed to work against Spark 2.0, and I don't anticipate adding 2.0 support for a while yet.
ReplyDeleteCheers,
Mark.
Thanks Mark.
DeleteCheers,
Sudhakar
Hello Mark
ReplyDeleteI am trying to execute the example, but i get this error:
aused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost): java.io.IOException: Expected 3887 fields, but got 1 for row
weka.distributed.CSVToARFFHeaderMapTask.processRowValues(CSVToARFFHeaderMapTask.java:1121)
weka.distributed.CSVToARFFHeaderMapTask.processRow(CSVToARFFHeaderMapTask.java:1296)
weka.distributed.spark.ArffHeaderSparkJob$1.call(ArffHeaderSparkJob.java:779)
weka.distributed.spark.ArffHeaderSparkJob$1.call(ArffHeaderSparkJob.java:754)
org.apache.spark.api.java.JavaRDDLike$$anonfun$fn$4$1.apply(JavaRDDLike.scala:138)
I have only Arffherader and wekaClassifier with two text viewer
This comment has been removed by the author.
DeleteHello Mark,
ReplyDeletereally nice stuff! I can run it local and I am now trying to connect it to my hortonworks 2.5 installation. I found the spark-assembly files as described in the package description and copied it. However, mu cluster has kerberos authentication and I am looking for a way to use properties in the spark job configuration in Weka to pass on these parameters. Is this at all possible at the moment? And if so, how?
Thanks in advance.
best regards
Andreas
instructor lead live training in Big Data Hadoop and Spark Developer, kindly contact us http://www.maxmunus.com/contact
ReplyDeleteMaxMunus Offer World Class Virtual Instructor led training on TECHNOLOGY. We have industry expert trainer. We provide Training Material and Software Support. MaxMunus has successfully conducted 100000+ trainings in India, USA, UK, Australlia, Switzerland, Qatar, Saudi Arabia, Bangladesh, Bahrain and UAE etc.
For Demo Contact us.
Sangita Mohanty
MaxMunus I really appreciate information shared above. It’s of great help. If someone want to learn Online (Virtual)
E-mail: sangita@maxmunus.com
Skype id: training_maxmunus
Ph:(0) 9738075708 / 080 - 41103383
http://www.maxmunus.com/
Hello Mark,
ReplyDeleteI continuously get the following error. I appreciate any help
...
12:47:50: INFO - Added rdd_1_0 in memory on 10.50.2.203:42650 (size: 10.7 KB, free: 366.3 MB)
12:47:50: WARN - Lost task 0.0 in stage 0.0 (TID 0, 10.50.2.203): java.lang.AbstractMethodError: weka.distributed.spark.ArffHeaderSparkJob$1.call(Ljava/lang/Object;)Ljava/util/Iterator;
...
12:47:50: INFO - Starting task 0.1 in stage 0.0 (TID 1, 10.50.2.203, partition 0, ANY, 6090 bytes)
12:47:50: INFO - Launching task 1 on executor id: 0 hostname: 10.50.2.203.
12:47:50: INFO - Lost task 0.1 in stage 0.0 (TID 1) on executor 10.50.2.203: java.lang.AbstractMethodError (weka.distributed.spark.ArffHeaderSparkJob$1.call(Ljava/lang/Object;)Ljava/util/Iterator;) [duplicate 1]
12:47:50: INFO - Starting task 0.2 in stage 0.0 (TID 2, 10.50.2.203, partition 0, ANY, 6090 bytes)
12:47:50: INFO - Launching task 2 on executor id: 0 hostname: 10.50.2.203.
12:47:51: INFO - Lost task 0.2 in stage 0.0 (TID 2) on executor 10.50.2.203: java.lang.AbstractMethodError (weka.distributed.spark.ArffHeaderSparkJob$1.call(Ljava/lang/Object;)Ljava/util/Iterator;) [duplicate 2]
12:47:51: INFO - Starting task 0.3 in stage 0.0 (TID 3, 10.50.2.203, partition 0, ANY, 6090 bytes)
12:47:51: INFO - Launching task 3 on executor id: 0 hostname: 10.50.2.203.
12:47:51: INFO - Lost task 0.3 in stage 0.0 (TID 3) on executor 10.50.2.203: java.lang.AbstractMethodError (weka.distributed.spark.ArffHeaderSparkJob$1.call(Ljava/lang/Object;)Ljava/util/Iterator;) [duplicate 3]
12:47:51: ERROR - Task 0 in stage 0.0 failed 4 times; aborting job
12:47:51: INFO - Removed TaskSet 0.0, whose tasks have all completed, from pool
12:47:51: INFO - Cancelling stage 0
12:47:51: INFO - ResultStage 0 (collect at ArffHeaderSparkJob.java:840) failed in 3.034 s
12:47:51: INFO - Job 0 failed: collect at ArffHeaderSparkJob.java:840, took 3.287746 s
12:47:51: [Basic] WekaClassifierSparkJob$2078201622|Last step - shutting down Spark context
12:47:51: INFO - Stopped Spark web UI at http://10.50.2.203:4040
12:47:51: INFO - Shutting down all executors
12:47:51: INFO - Asking each executor to shut down
12:47:51: INFO - MapOutputTrackerMasterEndpoint stopped!
12:47:51: INFO - MemoryStore cleared
12:47:51: INFO - BlockManager stopped
12:47:51: INFO - BlockManagerMaster stopped
12:47:51: INFO - OutputCommitCoordinator stopped!
12:47:51: INFO - Successfully stopped SparkContext
12:47:51: [ERROR] WekaClassifierSparkJob$2078201622|Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.3 in stage 0.0 (TID 3, 10.50.2.203): java.lang.AbstractMethodError: weka.distributed.spark.ArffHeaderSparkJob$1.call(Ljava/lang/Object;)Ljava/util/Iterator;
...
Have you replaced the Spark libraries in ~/wekafiles/packages/distributedWekaSpark/lib with the spark assembly jar that comes with the Spark distribution being used to run your cluster? Also make sure that you are using an Oracle JVM to run both Weka and Spark.
ReplyDeleteCheers,
Mark.
Yes, I replaced the libraries and I'm using Oracle JVM. But I guess, it's because I use Spark 2.1. I recently seen that it works with older versions. Thanks for the reply though. I'll try with older versions.
ReplyDeleteHello Mark,
ReplyDeleteI am trying to use the distributedWekaSpark package. Everything is running fine while running the process using files in local system. But when I try to use a file from kerberose enabled hdfs ,its giving me following errors. Is there a way to connect to kerberized cluster?
ArffHeaderSparkJob$266157602|SIMPLE authentication is not enabled. Available:[TOKEN, KERBEROS]
weka.core.WekaException: SIMPLE authentication is not enabled. Available:[TOKEN, KERBEROS]
at weka.knowledgeflow.steps.AbstractSparkJob.runJob(AbstractSparkJob.java:294)
at weka.knowledgeflow.steps.AbstractSparkJob.start(AbstractSparkJob.java:221)
at weka.knowledgeflow.StepManagerImpl.startStep(StepManagerImpl.java:1020)
at weka.knowledgeflow.BaseExecutionEnvironment$3.run(BaseExecutionEnvironment.java:440)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Thanks
Arshaq,
DeleteDid you resolve this? I'm getting the same error.
Thanks
This comment has been removed by the author.
ReplyDeleteHi Fray,
ReplyDeletePackages are installed in ${user.home}/wekafiles/packages. Inside this directory you should find distributedWekaBase and distributedWekaSpark (or distributedWekaSparkDev, depending on which one you installed). Each directory contains a src folder.
Cheers,
Mark.
Thank you very much , another question please, i ran distributed kmeans in local mode and i got a model extension file, how i can view the file
Deleteyou may open weka explorer and load any dataset file to enable other tabs. Now go to cluster tab, under the result list area, right click which will show you load model object file. There you can locate the model file and see it in cluster output window.
ReplyDeleteThank you very much
DeleteMark,
ReplyDeleteAny thoughts on Arshaq's question above?
There is nothing built in to distributed Weka to handle kerberos I'm afraid. However, the general approach (not specific to Pentaho data integration) at:
Deletehttps://help.pentaho.com/Documentation/5.2/0P0/0W0/030/040
might be usable.
Cheers,
Mark.
This comment has been removed by the author.
ReplyDeleteDear Mark,
ReplyDeleteI am using DistributedWekaSpark via Java.
I am trying to run WekaClassifierSparkJob and a WekaClassifierEvaluationSparkJob for Linear Regression with the code bellow:
WekaClassifierSparkJob job = new WekaClassifierSparkJob();
job.setClassAttribute("last");
job.setDebug(false);
job.setModelFileName(classifier.getName()+".model");
job.setNumIterations(1);
job.setRandomizeAndStratify(false);
job.setWriteRandomlyShuffledSplitsToOutput(false);
job.setClassifierMapTaskOptions("LinearRegression -S 0 -R 1.0E-8 -num-decimal-places 4");
job.setDataset(previousJob.getDatasets().next().getKey(), previousJob.getDataset(previousJob.getDatasets().next().getKey()));
job.setCachingStrategy(previousJob.getCachingStrategy());
job.getSparkJobConfig().setOutputDir(Constants.SPARK_OUTPUT);
job.runJobWithContext(previousJob.getSparkContext());
WekaClassifierEvaluationSparkJob job2 = new WekaClassifierEvaluationSparkJob();
job2.setDebug(false);
job2.setOutputSubdir(classifier.getName());
job2.setDataset(previousJob.getDatasets().next().getKey(), previousJob.getDataset(previousJob.getDatasets().next().getKey()));
job2.setCachingStrategy(previousJob.getCachingStrategy());
job2.getSparkJobConfig().setOutputDir(Constants.SPARK_OUTPUT);
job2.runJobWithContext(job.getSparkContext());
if(job2.getJobStatus().equals(DistributedJob.JobStatus.FINISHED)){
System.out.println(classifier.getName()+": "+job2.getText());
}
Although this seems to work, if I run the same job for Gaussian Processes I get the same results.
So, I guess I am not configuring the jobs correctly.
How can I configure them to get the same results as when I run the jobs from Weka KnowledgeFlow?
Thanks in advance,
Marios
The key to this is to take a look at the command line options for the job (or the listOptions() method in WekaClassifierMapTask). If you run:
ReplyDeletejava weka.Run .WekaClassifierSparkJob -h
One of the options is:
-W
The fully qualified base classifier to use. Classifier options
can be supplied after a '--'
So, your setClassifierMapTaskOptions() call needs to actually take the following string:
"-W weka.classifiers.functions.LinearRegression -- -S 0 -R 1.0E-8 -num-decimal-places 4"
A similar string will allow you to specify Gaussian processes.
Cheers,
Mark.
Good Blog
ReplyDeleteSanjary kids is the best playschool, preschool in Hyderabad, India. Start your play school,preschool in Hyderabad with sanjary kids. Sanjary kids provides programs like Play group,Nursery,Junior KG,Serior KG,and Teacher Training Program.
best preschool in hyderabad
preschool teacher training
playschools in hyderabad
preschool teacher training in hyderabad
Both Weka and Spark are popular tools used in data science and machine learning, but they cater to different needs and have distinct strengths. Here's a breakdown to help you understand their key differences:
DeleteBig Data Projects For Final Year Students
Image Processing Projects For Final Year
Weka:
Type: Open-source collection of machine learning algorithms for data mining tasks.
Focus: Ease of use, visualization, and exploration. Great for beginners or quick prototyping.
Strengths:
User-friendly graphical interface (GUI) for easy exploration of various algorithms.
Wide variety of pre-built algorithms for classification, regression, clustering, etc.
Data visualization tools to understand data patterns.
Open-source and lightweight, runs on most platforms.
The details captured by the data migration development services help in the migration of files folder excellently. The steps explained by your services are beneficial, which helped me in the migration of the folder files easily without getting interrupted at any point.
ReplyDeleteAs the growth of Big data engineering automation , it is essential to spread knowledge in people. This meetup will work as a burst of awareness.
ReplyDeletesoftware testing company in India
ReplyDeletesoftware testing company in Hyderabad
Thanks for sharing such an informative post.
Great article , keep sharing
Having RDDs referenceable for the duration that the Spark context is alive makes it possible to have a tighter coupling between Spark job steps in the Knowledge Flow. The success and failure connection types introduced in distributedWekaHadoop can now be used to carry data, such as the context and references to various RDD datasets that are in play. bedsheets buy online , premium bed sheets , queen size fitted bed sheets , bridal bed covers , cotton duvet sets , vicky razai factory address , sofa cover sofa cover , velvet duvet cover
ReplyDeleteVery nice article,Thank you for sharing it.
ReplyDeleteKeep updating...
Big Data and Hadoop Online Training
Would you be interested in trading links or maybe guest writing a blog post or vice-versa?
ReplyDeleteoracle rac online training
oracle rac training
I desperately looking for a blog like yours, full of information written in simple and understandable language. I always support bloggers like you who do not post only on that topic that make money. Keep it up and a big thumb to you and your work. I also have a request for you nowadays people are obsessed with organic words. Before buying any product they seek organic, especially in foods. Would you like to cover a suggested topic pusht 100% organic whole masoor dal
ReplyDeleteI am getting this error on my Windows machine for the 4th workflow.: java.lang.NoSuchMethodError: 'sun.misc.Cleaner sun.nio.ch.DirectBuffer.cleaner()'
ReplyDelete