Implementing a custom Spark shell

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Implementing a custom Spark shell

Sampo Niskanen
Hi,

I'd like to create a custom version of the Spark shell, which has automatically defined some other variables / RDDs (in addition to 'sc') specific to our application.  Is this possible?

I took a look at the code that the spark-shell invokes, and it seems quite complex.  Can this be reused from my code?


I'm implementing a standalone application that uses the Spark libraries (managed by SBT).  Ideally, I'd like to be able to launch the shell from that application, instead of using the default Spark distribution.  Alternatively, can some utility code be injected within the standard spark-shell?


Thanks.

    Sampo Niskanen
    Lead developer / Wellmo


Reply | Threaded
Open this post in threaded view
|

Re: Implementing a custom Spark shell

Matei Zaharia
Administrator
In Spark 0.9 and master, you can pass the -i argument to spark-shell to load a script containing commands before opening the prompt. This is also a feature of the Scala shell as a whole (try scala -help for details).

Also, once you’re in the shell, you can use :load file.scala to execute the content of file.scala as if you’d typed it into the shell.

Matei

On Feb 25, 2014, at 11:44 PM, Sampo Niskanen <[hidden email]> wrote:

> Hi,
>
> I'd like to create a custom version of the Spark shell, which has automatically defined some other variables / RDDs (in addition to 'sc') specific to our application.  Is this possible?
>
> I took a look at the code that the spark-shell invokes, and it seems quite complex.  Can this be reused from my code?
>
>
> I'm implementing a standalone application that uses the Spark libraries (managed by SBT).  Ideally, I'd like to be able to launch the shell from that application, instead of using the default Spark distribution.  Alternatively, can some utility code be injected within the standard spark-shell?
>
>
> Thanks.
>
>     Sampo Niskanen
>     Lead developer / Wellmo
>
>

Reply | Threaded
Open this post in threaded view
|

Re: Implementing a custom Spark shell

Sampo Niskanen
Hi,

Thanks for the pointers.  I did get my code working within the normal spark-shell.  However, since I'm building a separate analysis service which pulls in the Spark libraries using SBT, I'd much rather have the custom shell incorporated in that, instead of having to use the default downloadable distribution.


I figured out how to create a custom Scala REPL using the instructions at http://stackoverflow.com/questions/18628516/embedded-scala-repl-interpreter-example-for-2-10  (The latter answer is my helper class that I use.)

I injected the SparkContext and my RDD's and for example rdd.count works fine.  However, when I try to perform a filter operation, I get a ClassNotFoundException [1].  My guess is that the inline function I define is created only within the REPL, and does not get sent to the processors (even though I'm using a local cluster).

I found out that there's a separate spark-repl library, which contains the SparkILoop class.  When I replace the ILoop with SparkILoop, I get the Spark logo + version number, a NullPointerException [2] and then the Scala prompt.  Still, I get exactly the same ClassNotFoundException when trying to perform a filter operation.

Can anyone give any pointers on how to get this working?


Best regards,
   Sampo N.



ClassNotFoundException [1]:

scala> data.profile.filter(p => p.email == "[hidden email]").count
14/02/28 08:49:16 ERROR Executor: Exception in task ID 1
java.lang.ClassNotFoundException: $line9.$read$$iw$$iw$$anonfun$1
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
at org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:63)
at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:139)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:195)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
14/02/28 08:49:16 ERROR TaskSetManager: Task 1.0:0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted: Task 1.0:0 failed 1 times (most recent failure: Exception failure: java.lang.ClassNotFoundException: $anonfun$1)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


NullPointerException [2]:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 0.9.0
      /_/

Using Scala version 2.10.3 (OpenJDK 64-Bit Server VM, Java 1.7.0_51)
Type in expressions to have them evaluated.
Type :help for more information.
java.lang.NullPointerException
at $iwC$$iwC.<init>(<console>:8)
at $iwC.<init>(<console>:14)
at <init>(<console>:16)
at .<init>(<console>:20)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:772)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1040)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:609)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:640)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:604)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:788)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:833)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:745)
at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:119)
at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:118)
at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:258)
at org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:118)
at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:53)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:903)
at org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:140)
at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:53)
at org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:102)
at org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:53)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:920)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:876)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:876)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:876)
at com.wellmo.reporting.Repl$.run(Repl.scala:30)
at com.wellmo.reporting.WellmoReportingScala.run(WellmoReportingScala.scala:60)
at com.wellmo.reporting.WellmoReportingJava.run(WellmoReportingJava.java:44)
at com.wellmo.reporting.WellmoReportingJava.main(WellmoReportingJava.java:33)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at sbt.Run.invokeMain(Run.scala:68)
at sbt.Run.run0(Run.scala:61)
at sbt.Run.execute$1(Run.scala:50)
at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:54)
at sbt.TrapExit$.executeMain$1(TrapExit.scala:33)
at sbt.TrapExit$$anon$1.run(TrapExit.scala:42)

Spark context available as sc.



    Sampo Niskanen
    Lead developer / Wellmo

    [hidden email]
    +358 40 820 5291
 



On Wed, Feb 26, 2014 at 10:24 AM, Matei Zaharia <[hidden email]> wrote:
In Spark 0.9 and master, you can pass the -i argument to spark-shell to load a script containing commands before opening the prompt. This is also a feature of the Scala shell as a whole (try scala -help for details).

Also, once you’re in the shell, you can use :load file.scala to execute the content of file.scala as if you’d typed it into the shell.

Matei

On Feb 25, 2014, at 11:44 PM, Sampo Niskanen <[hidden email]> wrote:

> Hi,
>
> I'd like to create a custom version of the Spark shell, which has automatically defined some other variables / RDDs (in addition to 'sc') specific to our application.  Is this possible?
>
> I took a look at the code that the spark-shell invokes, and it seems quite complex.  Can this be reused from my code?
>
>
> I'm implementing a standalone application that uses the Spark libraries (managed by SBT).  Ideally, I'd like to be able to launch the shell from that application, instead of using the default Spark distribution.  Alternatively, can some utility code be injected within the standard spark-shell?
>
>
> Thanks.
>
>     Sampo Niskanen
>     Lead developer / Wellmo
>
>


Reply | Threaded
Open this post in threaded view
|

Re: Implementing a custom Spark shell

Prashant Sharma
You can enable debug logging for repl, thankfully it uses sparks logging framework. Trouble must be with wrappers.

Prashant Sharma


On Fri, Feb 28, 2014 at 12:29 PM, Sampo Niskanen <[hidden email]> wrote:
Hi,

Thanks for the pointers.  I did get my code working within the normal spark-shell.  However, since I'm building a separate analysis service which pulls in the Spark libraries using SBT, I'd much rather have the custom shell incorporated in that, instead of having to use the default downloadable distribution.


I figured out how to create a custom Scala REPL using the instructions at http://stackoverflow.com/questions/18628516/embedded-scala-repl-interpreter-example-for-2-10  (The latter answer is my helper class that I use.)

I injected the SparkContext and my RDD's and for example rdd.count works fine.  However, when I try to perform a filter operation, I get a ClassNotFoundException [1].  My guess is that the inline function I define is created only within the REPL, and does not get sent to the processors (even though I'm using a local cluster).

I found out that there's a separate spark-repl library, which contains the SparkILoop class.  When I replace the ILoop with SparkILoop, I get the Spark logo + version number, a NullPointerException [2] and then the Scala prompt.  Still, I get exactly the same ClassNotFoundException when trying to perform a filter operation.

Can anyone give any pointers on how to get this working?


Best regards,
   Sampo N.



ClassNotFoundException [1]:

scala> data.profile.filter(p => p.email == "[hidden email]").count
14/02/28 08:49:16 ERROR Executor: Exception in task ID 1
java.lang.ClassNotFoundException: $line9.$read$$iw$$iw$$anonfun$1
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
at org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:63)
at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:139)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:195)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
14/02/28 08:49:16 ERROR TaskSetManager: Task 1.0:0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted: Task 1.0:0 failed 1 times (most recent failure: Exception failure: java.lang.ClassNotFoundException: $anonfun$1)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


NullPointerException [2]:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 0.9.0
      /_/

Using Scala version 2.10.3 (OpenJDK 64-Bit Server VM, Java 1.7.0_51)
Type in expressions to have them evaluated.
Type :help for more information.
java.lang.NullPointerException
at $iwC$$iwC.<init>(<console>:8)
at $iwC.<init>(<console>:14)
at <init>(<console>:16)
at .<init>(<console>:20)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:772)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1040)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:609)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:640)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:604)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:788)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:833)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:745)
at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:119)
at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:118)
at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:258)
at org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:118)
at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:53)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:903)
at org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:140)
at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:53)
at org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:102)
at org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:53)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:920)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:876)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:876)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:876)
at com.wellmo.reporting.Repl$.run(Repl.scala:30)
at com.wellmo.reporting.WellmoReportingScala.run(WellmoReportingScala.scala:60)
at com.wellmo.reporting.WellmoReportingJava.run(WellmoReportingJava.java:44)
at com.wellmo.reporting.WellmoReportingJava.main(WellmoReportingJava.java:33)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at sbt.Run.invokeMain(Run.scala:68)
at sbt.Run.run0(Run.scala:61)
at sbt.Run.execute$1(Run.scala:50)
at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:54)
at sbt.TrapExit$.executeMain$1(TrapExit.scala:33)
at sbt.TrapExit$$anon$1.run(TrapExit.scala:42)

Spark context available as sc.



    Sampo Niskanen
    Lead developer / Wellmo

    [hidden email]
    +358 40 820 5291
 



On Wed, Feb 26, 2014 at 10:24 AM, Matei Zaharia <[hidden email]> wrote:
In Spark 0.9 and master, you can pass the -i argument to spark-shell to load a script containing commands before opening the prompt. This is also a feature of the Scala shell as a whole (try scala -help for details).

Also, once you’re in the shell, you can use :load file.scala to execute the content of file.scala as if you’d typed it into the shell.

Matei

On Feb 25, 2014, at 11:44 PM, Sampo Niskanen <[hidden email]> wrote:

> Hi,
>
> I'd like to create a custom version of the Spark shell, which has automatically defined some other variables / RDDs (in addition to 'sc') specific to our application.  Is this possible?
>
> I took a look at the code that the spark-shell invokes, and it seems quite complex.  Can this be reused from my code?
>
>
> I'm implementing a standalone application that uses the Spark libraries (managed by SBT).  Ideally, I'd like to be able to launch the shell from that application, instead of using the default Spark distribution.  Alternatively, can some utility code be injected within the standard spark-shell?
>
>
> Thanks.
>
>     Sampo Niskanen
>     Lead developer / Wellmo
>
>



Reply | Threaded
Open this post in threaded view
|

Re: Implementing a custom Spark shell

Sampo Niskanen
Hi,

I've tried to enable debug logging, but can't figure out what might be going wrong.  Can anyone assist decyphering the log?

The log of the startup and run attempts is at http://pastebin.com/XyeY92VF
This uses SparkILoop, DEBUG level logging and settings.debug.value = true option.

Line 323:  Spark welcome message
Line 746:  The NullPointerException that occurs during startup whenever I use SparkILoop instead of ILoop
Lines 1973-2252:  Running an RDD count, which works correctly
Lines 2254-2890:  Running an RDD filter + count, which fails due to a ClassNotFoundException (line 2528)


Thanks.


    Sampo Niskanen
    Lead developer / Wellmo

    [hidden email]
    +358 40 820 5291
 



On Fri, Feb 28, 2014 at 10:46 AM, Prashant Sharma <[hidden email]> wrote:
You can enable debug logging for repl, thankfully it uses sparks logging framework. Trouble must be with wrappers.

Prashant Sharma


On Fri, Feb 28, 2014 at 12:29 PM, Sampo Niskanen <[hidden email]> wrote:
Hi,

Thanks for the pointers.  I did get my code working within the normal spark-shell.  However, since I'm building a separate analysis service which pulls in the Spark libraries using SBT, I'd much rather have the custom shell incorporated in that, instead of having to use the default downloadable distribution.


I figured out how to create a custom Scala REPL using the instructions at http://stackoverflow.com/questions/18628516/embedded-scala-repl-interpreter-example-for-2-10  (The latter answer is my helper class that I use.)

I injected the SparkContext and my RDD's and for example rdd.count works fine.  However, when I try to perform a filter operation, I get a ClassNotFoundException [1].  My guess is that the inline function I define is created only within the REPL, and does not get sent to the processors (even though I'm using a local cluster).

I found out that there's a separate spark-repl library, which contains the SparkILoop class.  When I replace the ILoop with SparkILoop, I get the Spark logo + version number, a NullPointerException [2] and then the Scala prompt.  Still, I get exactly the same ClassNotFoundException when trying to perform a filter operation.

Can anyone give any pointers on how to get this working?


Best regards,
   Sampo N.



ClassNotFoundException [1]:

scala> data.profile.filter(p => p.email == "[hidden email]").count
14/02/28 08:49:16 ERROR Executor: Exception in task ID 1
java.lang.ClassNotFoundException: $line9.$read$$iw$$iw$$anonfun$1
at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
at java.security.AccessController.doPrivileged(Native Method)
at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
at java.lang.Class.forName0(Native Method)
at java.lang.Class.forName(Class.java:270)
at org.apache.spark.serializer.JavaDeserializationStream$$anon$1.resolveClass(JavaSerializer.scala:37)
at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1612)
at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
at org.apache.spark.scheduler.ResultTask$.deserializeInfo(ResultTask.scala:63)
at org.apache.spark.scheduler.ResultTask.readExternal(ResultTask.scala:139)
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1837)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1796)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:40)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:62)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$run$1.apply$mcV$sp(Executor.scala:195)
at org.apache.spark.deploy.SparkHadoopUtil.runAsUser(SparkHadoopUtil.scala:49)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:178)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:744)
14/02/28 08:49:16 ERROR TaskSetManager: Task 1.0:0 failed 1 times; aborting job
org.apache.spark.SparkException: Job aborted: Task 1.0:0 failed 1 times (most recent failure: Exception failure: java.lang.ClassNotFoundException: $anonfun$1)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at scala.Option.foreach(Option.scala:236)
at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)


NullPointerException [2]:

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 0.9.0
      /_/

Using Scala version 2.10.3 (OpenJDK 64-Bit Server VM, Java 1.7.0_51)
Type in expressions to have them evaluated.
Type :help for more information.
java.lang.NullPointerException
at $iwC$$iwC.<init>(<console>:8)
at $iwC.<init>(<console>:14)
at <init>(<console>:16)
at .<init>(<console>:20)
at .<clinit>(<console>)
at .<init>(<console>:7)
at .<clinit>(<console>)
at $print(<console>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:772)
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1040)
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:609)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:640)
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:604)
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:788)
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:833)
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:745)
at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:119)
at org.apache.spark.repl.SparkILoopInit$$anonfun$initializeSpark$1.apply(SparkILoopInit.scala:118)
at org.apache.spark.repl.SparkIMain.beQuietDuring(SparkIMain.scala:258)
at org.apache.spark.repl.SparkILoopInit$class.initializeSpark(SparkILoopInit.scala:118)
at org.apache.spark.repl.SparkILoop.initializeSpark(SparkILoop.scala:53)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1$$anonfun$apply$mcZ$sp$5.apply$mcV$sp(SparkILoop.scala:903)
at org.apache.spark.repl.SparkILoopInit$class.runThunks(SparkILoopInit.scala:140)
at org.apache.spark.repl.SparkILoop.runThunks(SparkILoop.scala:53)
at org.apache.spark.repl.SparkILoopInit$class.postInitialization(SparkILoopInit.scala:102)
at org.apache.spark.repl.SparkILoop.postInitialization(SparkILoop.scala:53)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply$mcZ$sp(SparkILoop.scala:920)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:876)
at org.apache.spark.repl.SparkILoop$$anonfun$process$1.apply(SparkILoop.scala:876)
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135)
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:876)
at com.wellmo.reporting.Repl$.run(Repl.scala:30)
at com.wellmo.reporting.WellmoReportingScala.run(WellmoReportingScala.scala:60)
at com.wellmo.reporting.WellmoReportingJava.run(WellmoReportingJava.java:44)
at com.wellmo.reporting.WellmoReportingJava.main(WellmoReportingJava.java:33)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:606)
at sbt.Run.invokeMain(Run.scala:68)
at sbt.Run.run0(Run.scala:61)
at sbt.Run.execute$1(Run.scala:50)
at sbt.Run$$anonfun$run$1.apply$mcV$sp(Run.scala:54)
at sbt.TrapExit$.executeMain$1(TrapExit.scala:33)
at sbt.TrapExit$$anon$1.run(TrapExit.scala:42)

Spark context available as sc.



    Sampo Niskanen
    Lead developer / Wellmo

    [hidden email]
    +358 40 820 5291
 



On Wed, Feb 26, 2014 at 10:24 AM, Matei Zaharia <[hidden email]> wrote:
In Spark 0.9 and master, you can pass the -i argument to spark-shell to load a script containing commands before opening the prompt. This is also a feature of the Scala shell as a whole (try scala -help for details).

Also, once you’re in the shell, you can use :load file.scala to execute the content of file.scala as if you’d typed it into the shell.

Matei

On Feb 25, 2014, at 11:44 PM, Sampo Niskanen <[hidden email]> wrote:

> Hi,
>
> I'd like to create a custom version of the Spark shell, which has automatically defined some other variables / RDDs (in addition to 'sc') specific to our application.  Is this possible?
>
> I took a look at the code that the spark-shell invokes, and it seems quite complex.  Can this be reused from my code?
>
>
> I'm implementing a standalone application that uses the Spark libraries (managed by SBT).  Ideally, I'd like to be able to launch the shell from that application, instead of using the default Spark distribution.  Alternatively, can some utility code be injected within the standard spark-shell?
>
>
> Thanks.
>
>     Sampo Niskanen
>     Lead developer / Wellmo
>
>