Running actions in loops

classic Classic list List threaded Threaded
6 messages Options
od
Reply | Threaded
Open this post in threaded view
|

Running actions in loops

od
Hello,

What is the general approach people take when trying to do analysis
across multiple large files where the data to be extracted from a
successive file depends on the data extracted from a previous file or
set of files?

For example:
I have the following: a group of HDFS files each 20+GB in size. I need
to extract event1 on day 1 from first file and extract event2 from all
remaining files in a period of successive dates, then do a calculation
on the two events.
I then need to move on to day2, extract event1 (with certain
properties), take all following days, extract event2 and run a
calculation against previous day for all days in period. So on and so on.

I have verified that the following (very naive approach doesn't work):

def
calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]]
= {
     val epd = new PipelineDate(end)
     val result = for {
       dt1 <- PipelineDate.getPeriod(new PipelineDate(start), epd)
       val f1 = sc.textFile(dt1.toJsonHdfsFileName)
       val e1 = f1.filter(_.split(",")(0).split(":")(1).replace("\"","")
== event1).map(line =>
(line.split(",")(2).split(":")(1).replace("\"",""),0)).cache
       val c = e1.count.toDouble

       val intres = for {
         dt2 <- PipelineDate.getPeriod(dt1+1,epd)
         val f2 = sc.textFile(dt2.toJsonHdfsFileName)
         val e2 =
f2.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
event2).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),1))
         val e1e2 = e1.union(e2)
         val r = e1e2.groupByKey().filter(e => e._2.length > 1 &&
e._2.filter(_==0).length>0).count.toDouble
       } yield (c/r) // get the retention rate
     } yield (dt1.toString->intres)
     Map(result:_*)
   }

I am getting the following errors:
14/03/07 03:22:25 INFO SparkContext: Starting job: count at
CountActor.scala:33
14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at
CountActor.scala:33) with 140 output partitions (allowLocal=false)
14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at
CountActor.scala:33)
14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List()
14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at
map at CountActor.scala:32), which has no missing parents
14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at
CountActor.scala:33
14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not
serializable: java.io.NotSerializableException:
com.github.ognenpv.pipeline.CountActor
org.apache.spark.SparkException: Job aborted: Task not serializable:
java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
     at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
     at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
     at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
     at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
     at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
     at
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
     at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
     at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
     at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
     at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
     at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
     at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I should mention that this code is fired off from an Akka actor (which
is controlled by a Scalatra servlet).

Any ideas, recommendations etc.? I am fairly new to Scala and M/R
principles in general, it is fair to say that at this point I am still
thinking from a point of view of an imperative programmer trying to fit
a square peg through a round hole ;)
Ognen
Reply | Threaded
Open this post in threaded view
|

Re: Running actions in loops

Ognen Duzlevski-2
It looks like the problem is in the filter task - is there anything
special about filter()?

I have removed the filter line from the loops just to see if things will
work and they do.

Anyone has any ideas?

Thanks!
Ognen

On 3/6/14, 9:39 PM, Ognen Duzlevski wrote:

> Hello,
>
> What is the general approach people take when trying to do analysis
> across multiple large files where the data to be extracted from a
> successive file depends on the data extracted from a previous file or
> set of files?
>
> For example:
> I have the following: a group of HDFS files each 20+GB in size. I need
> to extract event1 on day 1 from first file and extract event2 from all
> remaining files in a period of successive dates, then do a calculation
> on the two events.
> I then need to move on to day2, extract event1 (with certain
> properties), take all following days, extract event2 and run a
> calculation against previous day for all days in period. So on and so on.
>
> I have verified that the following (very naive approach doesn't work):
>
> def
> calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]]
> = {
>     val epd = new PipelineDate(end)
>     val result = for {
>       dt1 <- PipelineDate.getPeriod(new PipelineDate(start), epd)
>       val f1 = sc.textFile(dt1.toJsonHdfsFileName)
>       val e1 =
> f1.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
> event1).map(line =>
> (line.split(",")(2).split(":")(1).replace("\"",""),0)).cache
>       val c = e1.count.toDouble
>
>       val intres = for {
>         dt2 <- PipelineDate.getPeriod(dt1+1,epd)
>         val f2 = sc.textFile(dt2.toJsonHdfsFileName)
>         val e2 =
> f2.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
> event2).map(line =>
> (line.split(",")(2).split(":")(1).replace("\"",""),1))
>         val e1e2 = e1.union(e2)
>         val r = e1e2.groupByKey().filter(e => e._2.length > 1 &&
> e._2.filter(_==0).length>0).count.toDouble
>       } yield (c/r) // get the retention rate
>     } yield (dt1.toString->intres)
>     Map(result:_*)
>   }
>
> I am getting the following errors:
> 14/03/07 03:22:25 INFO SparkContext: Starting job: count at
> CountActor.scala:33
> 14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at
> CountActor.scala:33) with 140 output partitions (allowLocal=false)
> 14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at
> CountActor.scala:33)
> 14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List()
> 14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
> 14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3]
> at map at CountActor.scala:32), which has no missing parents
> 14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at
> CountActor.scala:33
> 14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not
> serializable: java.io.NotSerializableException:
> com.github.ognenpv.pipeline.CountActor
> org.apache.spark.SparkException: Job aborted: Task not serializable:
> java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>     at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>     at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
>     at
> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
>     at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>     at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>     at
> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> I should mention that this code is fired off from an Akka actor (which
> is controlled by a Scalatra servlet).
>
> Any ideas, recommendations etc.? I am fairly new to Scala and M/R
> principles in general, it is fair to say that at this point I am still
> thinking from a point of view of an imperative programmer trying to
> fit a square peg through a round hole ;)
> Ognen

--
Some people, when confronted with a problem, think "I know, I'll use regular expressions." Now they have two problems.
-- Jamie Zawinski

Reply | Threaded
Open this post in threaded view
|

Re: Running actions in loops

Mayur Rustagi
Mostly the job you are executing is not serializable, this typically happens when you have a library that is not serializable.. are you using any library like jodatime etc ?

Mayur Rustagi
Ph: +1 (760) 203 3257


On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski <[hidden email]> wrote:
It looks like the problem is in the filter task - is there anything special about filter()?

I have removed the filter line from the loops just to see if things will work and they do.

Anyone has any ideas?

Thanks!
Ognen


On 3/6/14, 9:39 PM, Ognen Duzlevski wrote:
Hello,

What is the general approach people take when trying to do analysis across multiple large files where the data to be extracted from a successive file depends on the data extracted from a previous file or set of files?

For example:
I have the following: a group of HDFS files each 20+GB in size. I need to extract event1 on day 1 from first file and extract event2 from all remaining files in a period of successive dates, then do a calculation on the two events.
I then need to move on to day2, extract event1 (with certain properties), take all following days, extract event2 and run a calculation against previous day for all days in period. So on and so on.

I have verified that the following (very naive approach doesn't work):

def calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]] = {
    val epd = new PipelineDate(end)
    val result = for {
      dt1 <- PipelineDate.getPeriod(new PipelineDate(start), epd)
      val f1 = sc.textFile(dt1.toJsonHdfsFileName)
      val e1 = f1.filter(_.split(",")(0).split(":")(1).replace("\"","") == event1).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),0)).cache
      val c = e1.count.toDouble

      val intres = for {
        dt2 <- PipelineDate.getPeriod(dt1+1,epd)
        val f2 = sc.textFile(dt2.toJsonHdfsFileName)
        val e2 = f2.filter(_.split(",")(0).split(":")(1).replace("\"","") == event2).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),1))
        val e1e2 = e1.union(e2)
        val r = e1e2.groupByKey().filter(e => e._2.length > 1 && e._2.filter(_==0).length>0).count.toDouble
      } yield (c/r) // get the retention rate
    } yield (dt1.toString->intres)
    Map(result:_*)
  }

I am getting the following errors:
14/03/07 03:22:25 INFO SparkContext: Starting job: count at CountActor.scala:33
14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at CountActor.scala:33) with 140 output partitions (allowLocal=false)
14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at CountActor.scala:33)
14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List()
14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at map at CountActor.scala:32), which has no missing parents
14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at CountActor.scala:33
14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not serializable: java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I should mention that this code is fired off from an Akka actor (which is controlled by a Scalatra servlet).

Any ideas, recommendations etc.? I am fairly new to Scala and M/R principles in general, it is fair to say that at this point I am still thinking from a point of view of an imperative programmer trying to fit a square peg through a round hole ;)
Ognen

--
Some people, when confronted with a problem, think "I know, I'll use regular expressions." Now they have two problems.
-- Jamie Zawinski


od
Reply | Threaded
Open this post in threaded view
|

Re: Running actions in loops

od
Mayur, have not thought of that. Yes, I use jodatime. What is the scope that this serialization issue applies to? Only the method making a call into / using such a library? The whole class the method using such a library belongs to? Sorry if it is a dumb question :)

Ognen

On 3/7/14, 1:29 PM, Mayur Rustagi wrote:
Mostly the job you are executing is not serializable, this typically happens when you have a library that is not serializable.. are you using any library like jodatime etc ?

Mayur Rustagi
Ph: +1 (760) 203 3257


On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski <[hidden email]> wrote:
It looks like the problem is in the filter task - is there anything special about filter()?

I have removed the filter line from the loops just to see if things will work and they do.

Anyone has any ideas?

Thanks!
Ognen


On 3/6/14, 9:39 PM, Ognen Duzlevski wrote:
Hello,

What is the general approach people take when trying to do analysis across multiple large files where the data to be extracted from a successive file depends on the data extracted from a previous file or set of files?

For example:
I have the following: a group of HDFS files each 20+GB in size. I need to extract event1 on day 1 from first file and extract event2 from all remaining files in a period of successive dates, then do a calculation on the two events.
I then need to move on to day2, extract event1 (with certain properties), take all following days, extract event2 and run a calculation against previous day for all days in period. So on and so on.

I have verified that the following (very naive approach doesn't work):

def calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]] = {
    val epd = new PipelineDate(end)
    val result = for {
      dt1 <- PipelineDate.getPeriod(new PipelineDate(start), epd)
      val f1 = sc.textFile(dt1.toJsonHdfsFileName)
      val e1 = f1.filter(_.split(",")(0).split(":")(1).replace("\"","") == event1).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),0)).cache
      val c = e1.count.toDouble

      val intres = for {
        dt2 <- PipelineDate.getPeriod(dt1+1,epd)
        val f2 = sc.textFile(dt2.toJsonHdfsFileName)
        val e2 = f2.filter(_.split(",")(0).split(":")(1).replace("\"","") == event2).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),1))
        val e1e2 = e1.union(e2)
        val r = e1e2.groupByKey().filter(e => e._2.length > 1 && e._2.filter(_==0).length>0).count.toDouble
      } yield (c/r) // get the retention rate
    } yield (dt1.toString->intres)
    Map(result:_*)
  }

I am getting the following errors:
14/03/07 03:22:25 INFO SparkContext: Starting job: count at CountActor.scala:33
14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at CountActor.scala:33) with 140 output partitions (allowLocal=false)
14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at CountActor.scala:33)
14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List()
14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at map at CountActor.scala:32), which has no missing parents
14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at CountActor.scala:33
14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not serializable: java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I should mention that this code is fired off from an Akka actor (which is controlled by a Scalatra servlet).

Any ideas, recommendations etc.? I am fairly new to Scala and M/R principles in general, it is fair to say that at this point I am still thinking from a point of view of an imperative programmer trying to fit a square peg through a round hole ;)
Ognen

--
Some people, when confronted with a problem, think "I know, I'll use regular expressions." Now they have two problems.
-- Jamie Zawinski



Reply | Threaded
Open this post in threaded view
|

Re: Running actions in loops

Mayur Rustagi
So the whole function closure you want to apply on your RDD needs to be serializable so that it can be "serialized" & sent to workers to operate on RDD. So objects of jodatime cannot be serialized & sent hence jodatime is out of work. 2 bad answers
1. initialize jodatime for each row & complete work & destroy them, that way they are only intialized when job is running & need not be sent across. 
2. Write your own parser & hope jodatime guys get their act together.

Regards


Mayur Rustagi
Ph: +1 (760) 203 3257


On Fri, Mar 7, 2014 at 12:56 PM, Ognen Duzlevski <[hidden email]> wrote:
Mayur, have not thought of that. Yes, I use jodatime. What is the scope that this serialization issue applies to? Only the method making a call into / using such a library? The whole class the method using such a library belongs to? Sorry if it is a dumb question :)

Ognen


On 3/7/14, 1:29 PM, Mayur Rustagi wrote:
Mostly the job you are executing is not serializable, this typically happens when you have a library that is not serializable.. are you using any library like jodatime etc ?

Mayur Rustagi
Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257" target="_blank">+1 (760) 203 3257


On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski <[hidden email]> wrote:
It looks like the problem is in the filter task - is there anything special about filter()?

I have removed the filter line from the loops just to see if things will work and they do.

Anyone has any ideas?

Thanks!
Ognen


On 3/6/14, 9:39 PM, Ognen Duzlevski wrote:
Hello,

What is the general approach people take when trying to do analysis across multiple large files where the data to be extracted from a successive file depends on the data extracted from a previous file or set of files?

For example:
I have the following: a group of HDFS files each 20+GB in size. I need to extract event1 on day 1 from first file and extract event2 from all remaining files in a period of successive dates, then do a calculation on the two events.
I then need to move on to day2, extract event1 (with certain properties), take all following days, extract event2 and run a calculation against previous day for all days in period. So on and so on.

I have verified that the following (very naive approach doesn't work):

def calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]] = {
    val epd = new PipelineDate(end)
    val result = for {
      dt1 <- PipelineDate.getPeriod(new PipelineDate(start), epd)
      val f1 = sc.textFile(dt1.toJsonHdfsFileName)
      val e1 = f1.filter(_.split(",")(0).split(":")(1).replace("\"","") == event1).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),0)).cache
      val c = e1.count.toDouble

      val intres = for {
        dt2 <- PipelineDate.getPeriod(dt1+1,epd)
        val f2 = sc.textFile(dt2.toJsonHdfsFileName)
        val e2 = f2.filter(_.split(",")(0).split(":")(1).replace("\"","") == event2).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),1))
        val e1e2 = e1.union(e2)
        val r = e1e2.groupByKey().filter(e => e._2.length > 1 && e._2.filter(_==0).length>0).count.toDouble
      } yield (c/r) // get the retention rate
    } yield (dt1.toString->intres)
    Map(result:_*)
  }

I am getting the following errors:
14/03/07 03:22:25 INFO SparkContext: Starting job: count at CountActor.scala:33
14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at CountActor.scala:33) with 140 output partitions (allowLocal=false)
14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at CountActor.scala:33)
14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List()
14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at map at CountActor.scala:32), which has no missing parents
14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at CountActor.scala:33
14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not serializable: java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I should mention that this code is fired off from an Akka actor (which is controlled by a Scalatra servlet).

Any ideas, recommendations etc.? I am fairly new to Scala and M/R principles in general, it is fair to say that at this point I am still thinking from a point of view of an imperative programmer trying to fit a square peg through a round hole ;)
Ognen

--
Some people, when confronted with a problem, think "I know, I'll use regular expressions." Now they have two problems.
-- Jamie Zawinski




Reply | Threaded
Open this post in threaded view
|

Re: Running actions in loops

MLnick
There is #3 which is use mapPartitions and init one jodatime obj per partition, which is less overhead for large objects

Sent from Mailbox for iPhone


On Sat, Mar 8, 2014 at 2:54 AM, Mayur Rustagi <[hidden email]> wrote:

So the whole function closure you want to apply on your RDD needs to be serializable so that it can be "serialized" & sent to workers to operate on RDD. So objects of jodatime cannot be serialized & sent hence jodatime is out of work. 2 bad answers
1. initialize jodatime for each row & complete work & destroy them, that way they are only intialized when job is running & need not be sent across. 
2. Write your own parser & hope jodatime guys get their act together.

Regards


Mayur Rustagi
Ph: +1 (760) 203 3257


On Fri, Mar 7, 2014 at 12:56 PM, Ognen Duzlevski <[hidden email]> wrote:
Mayur, have not thought of that. Yes, I use jodatime. What is the scope that this serialization issue applies to? Only the method making a call into / using such a library? The whole class the method using such a library belongs to? Sorry if it is a dumb question :)

Ognen


On 3/7/14, 1:29 PM, Mayur Rustagi wrote:
Mostly the job you are executing is not serializable, this typically happens when you have a library that is not serializable.. are you using any library like jodatime etc ?

Mayur Rustagi
Ph: <a href="tel:%2B1%20%28760%29%20203%203257">+1 (760) 203 3257


On Thu, Mar 6, 2014 at 9:50 PM, Ognen Duzlevski <[hidden email]> wrote:
It looks like the problem is in the filter task - is there anything special about filter()?

I have removed the filter line from the loops just to see if things will work and they do.

Anyone has any ideas?

Thanks!
Ognen


On 3/6/14, 9:39 PM, Ognen Duzlevski wrote:
Hello,

What is the general approach people take when trying to do analysis across multiple large files where the data to be extracted from a successive file depends on the data extracted from a previous file or set of files?

For example:
I have the following: a group of HDFS files each 20+GB in size. I need to extract event1 on day 1 from first file and extract event2 from all remaining files in a period of successive dates, then do a calculation on the two events.
I then need to move on to day2, extract event1 (with certain properties), take all following days, extract event2 and run a calculation against previous day for all days in period. So on and so on.

I have verified that the following (very naive approach doesn't work):

def calcSimpleRetention(start:String,end:String,event1:String,event2:String):Map[String,List[Double]] = {
    val epd = new PipelineDate(end)
    val result = for {
      dt1 <- PipelineDate.getPeriod(new PipelineDate(start), epd)
      val f1 = sc.textFile(dt1.toJsonHdfsFileName)
      val e1 = f1.filter(_.split(",")(0).split(":")(1).replace("\"","") == event1).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),0)).cache
      val c = e1.count.toDouble

      val intres = for {
        dt2 <- PipelineDate.getPeriod(dt1+1,epd)
        val f2 = sc.textFile(dt2.toJsonHdfsFileName)
        val e2 = f2.filter(_.split(",")(0).split(":")(1).replace("\"","") == event2).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),1))
        val e1e2 = e1.union(e2)
        val r = e1e2.groupByKey().filter(e => e._2.length > 1 && e._2.filter(_==0).length>0).count.toDouble
      } yield (c/r) // get the retention rate
    } yield (dt1.toString->intres)
    Map(result:_*)
  }

I am getting the following errors:
14/03/07 03:22:25 INFO SparkContext: Starting job: count at CountActor.scala:33
14/03/07 03:22:25 INFO DAGScheduler: Got job 0 (count at CountActor.scala:33) with 140 output partitions (allowLocal=false)
14/03/07 03:22:25 INFO DAGScheduler: Final stage: Stage 0 (count at CountActor.scala:33)
14/03/07 03:22:25 INFO DAGScheduler: Parents of final stage: List()
14/03/07 03:22:25 INFO DAGScheduler: Missing parents: List()
14/03/07 03:22:25 INFO DAGScheduler: Submitting Stage 0 (MappedRDD[3] at map at CountActor.scala:32), which has no missing parents
14/03/07 03:22:25 INFO DAGScheduler: Failed to run count at CountActor.scala:33
14/03/07 03:22:25 ERROR OneForOneStrategy: Job aborted: Task not serializable: java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
org.apache.spark.SparkException: Job aborted: Task not serializable: java.io.NotSerializableException: com.github.ognenpv.pipeline.CountActor
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:794)
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:737)
    at org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:569)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

I should mention that this code is fired off from an Akka actor (which is controlled by a Scalatra servlet).

Any ideas, recommendations etc.? I am fairly new to Scala and M/R principles in general, it is fair to say that at this point I am still thinking from a point of view of an imperative programmer trying to fit a square peg through a round hole ;)
Ognen

--
Some people, when confronted with a problem, think "I know, I'll use regular expressions." Now they have two problems.
-- Jamie Zawinski