Can anyone offer any insight at all?

classic Classic list List threaded Threaded
5 messages Options
od
Reply | Threaded
Open this post in threaded view
|

Can anyone offer any insight at all?

od
What is wrong with this code?

A condensed set of this code works in the spark-shell.

It does not work when deployed via a jar.

def
calcSimpleRetention(start:String,end:String,event1:String,event2:String):List[Double]
= {
     val spd = new PipelineDate(start)
     val epd = new PipelineDate(end)
     // filter for event1 events and return RDDs that are maps of
user_ids and 0
     val f = sc.textFile(spd.toJsonHdfsFileName)
     val ev1rdd =
f.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
event1).map(line =>
(line.split(",")(2).split(":")(1).replace("\"",""),1)).cache
     val ev1c = ev1rdd.count.toDouble

     // do the same as above for event2 events, only substitute 0s with 1s
     val ev2rdds = for {
        dt <- PipelineDate.getPeriod(spd+1,epd)
        val f1 = sc.textFile(dt.toJsonHdfsFileName)
     } yield (f1.filter(_.split(",")(0).split(":")(1).replace("\"","")
== event2).map(line =>
(line.split(",")(2).split(":")(1).replace("\"",""),1)).distinct)

     // cache all event1 and event2 RDDs
     ev2rdds.foreach(_.cache)
     val cts = for {
       ev2 <- ev2rdds
     } yield ev2.count

     val retent = for {
       ev2rdd <- ev2rdds
       val ret = ev1rdd.union(ev2rdd).groupByKey()
     } yield ret.filter(e => e._2.length > 1 && e._2.filter(_==0).length>0)

     val rcts = retent.map(_.count)
println("----------------------------------------------------------------------")
     println(s"${rcts}")
     println(s"${cts}")

     for {
       c <- rcts
     } yield(ev1c/c.toDouble)
     //Map(result:_*)
   }

This is what this code prints:
List(0, 0)
List(785912, 825254)
List(Infinity, Infinity)

My question is: it does not appear that the
union().groupBy().filter(....) segment is working (the List(0,0)
output). The app is not failing, it finishes just fine.

Any ideas?
Ognen
Reply | Threaded
Open this post in threaded view
|

Re: Can anyone offer any insight at all?

Ognen Duzlevski-2
Strike that. Figured it out. Don't you just hate it when you fire off an
email and you figure it out as it is being sent? ;)
Ognen

On 3/7/14, 12:41 PM, Ognen Duzlevski wrote:

> What is wrong with this code?
>
> A condensed set of this code works in the spark-shell.
>
> It does not work when deployed via a jar.
>
> def
> calcSimpleRetention(start:String,end:String,event1:String,event2:String):List[Double]
> = {
>     val spd = new PipelineDate(start)
>     val epd = new PipelineDate(end)
>     // filter for event1 events and return RDDs that are maps of
> user_ids and 0
>     val f = sc.textFile(spd.toJsonHdfsFileName)
>     val ev1rdd =
> f.filter(_.split(",")(0).split(":")(1).replace("\"","") ==
> event1).map(line =>
> (line.split(",")(2).split(":")(1).replace("\"",""),1)).cache
>     val ev1c = ev1rdd.count.toDouble
>
>     // do the same as above for event2 events, only substitute 0s with 1s
>     val ev2rdds = for {
>        dt <- PipelineDate.getPeriod(spd+1,epd)
>        val f1 = sc.textFile(dt.toJsonHdfsFileName)
>     } yield (f1.filter(_.split(",")(0).split(":")(1).replace("\"","")
> == event2).map(line =>
> (line.split(",")(2).split(":")(1).replace("\"",""),1)).distinct)
>
>     // cache all event1 and event2 RDDs
>     ev2rdds.foreach(_.cache)
>     val cts = for {
>       ev2 <- ev2rdds
>     } yield ev2.count
>
>     val retent = for {
>       ev2rdd <- ev2rdds
>       val ret = ev1rdd.union(ev2rdd).groupByKey()
>     } yield ret.filter(e => e._2.length > 1 &&
> e._2.filter(_==0).length>0)
>
>     val rcts = retent.map(_.count)
> println("----------------------------------------------------------------------")
>
>     println(s"${rcts}")
>     println(s"${cts}")
>
>     for {
>       c <- rcts
>     } yield(ev1c/c.toDouble)
>     //Map(result:_*)
>   }
>
> This is what this code prints:
> List(0, 0)
> List(785912, 825254)
> List(Infinity, Infinity)
>
> My question is: it does not appear that the
> union().groupBy().filter(....) segment is working (the List(0,0)
> output). The app is not failing, it finishes just fine.
>
> Any ideas?
> Ognen

--
Some people, when confronted with a problem, think "I know, I'll use regular expressions." Now they have two problems.
-- Jamie Zawinski

Reply | Threaded
Open this post in threaded view
|

Re: Can anyone offer any insight at all?

Mayur Rustagi
the issue was with print?
printing on worker?

Mayur Rustagi
Ph: +1 (760) 203 3257


On Fri, Mar 7, 2014 at 10:43 AM, Ognen Duzlevski <[hidden email]> wrote:
Strike that. Figured it out. Don't you just hate it when you fire off an email and you figure it out as it is being sent? ;)
Ognen


On 3/7/14, 12:41 PM, Ognen Duzlevski wrote:
What is wrong with this code?

A condensed set of this code works in the spark-shell.

It does not work when deployed via a jar.

def calcSimpleRetention(start:String,end:String,event1:String,event2:String):List[Double] = {
    val spd = new PipelineDate(start)
    val epd = new PipelineDate(end)
    // filter for event1 events and return RDDs that are maps of user_ids and 0
    val f = sc.textFile(spd.toJsonHdfsFileName)
    val ev1rdd = f.filter(_.split(",")(0).split(":")(1).replace("\"","") == event1).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),1)).cache
    val ev1c = ev1rdd.count.toDouble

    // do the same as above for event2 events, only substitute 0s with 1s
    val ev2rdds = for {
       dt <- PipelineDate.getPeriod(spd+1,epd)
       val f1 = sc.textFile(dt.toJsonHdfsFileName)
    } yield (f1.filter(_.split(",")(0).split(":")(1).replace("\"","") == event2).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),1)).distinct)

    // cache all event1 and event2 RDDs
    ev2rdds.foreach(_.cache)
    val cts = for {
      ev2 <- ev2rdds
    } yield ev2.count

    val retent = for {
      ev2rdd <- ev2rdds
      val ret = ev1rdd.union(ev2rdd).groupByKey()
    } yield ret.filter(e => e._2.length > 1 && e._2.filter(_==0).length>0)

    val rcts = retent.map(_.count)
println("----------------------------------------------------------------------")
    println(s"${rcts}")
    println(s"${cts}")

    for {
      c <- rcts
    } yield(ev1c/c.toDouble)
    //Map(result:_*)
  }

This is what this code prints:
List(0, 0)
List(785912, 825254)
List(Infinity, Infinity)

My question is: it does not appear that the union().groupBy().filter(....) segment is working (the List(0,0) output). The app is not failing, it finishes just fine.

Any ideas?
Ognen

--
Some people, when confronted with a problem, think "I know, I'll use regular expressions." Now they have two problems.
-- Jamie Zawinski


Reply | Threaded
Open this post in threaded view
|

Re: Can anyone offer any insight at all?

Ognen Duzlevski-2
No.

It was a logical error.

val ev1rdd = f.filter(_.split(",")(0).split(":")(1).replace("\"","") == event1).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),1)).cache should have mapped to ,0, not ,1

I have had the most awful time figuring out these "looped" things. It seems like it is next to impossible to run a .filter() operation in a for loop, it seems to work if you yield .filter()

Still don't understand why that is...

Ognen

On 3/7/14, 1:05 PM, Mayur Rustagi wrote:
the issue was with print?
printing on worker?

Mayur Rustagi
Ph: +1 (760) 203 3257


On Fri, Mar 7, 2014 at 10:43 AM, Ognen Duzlevski <[hidden email]> wrote:
Strike that. Figured it out. Don't you just hate it when you fire off an email and you figure it out as it is being sent? ;)
Ognen


On 3/7/14, 12:41 PM, Ognen Duzlevski wrote:
What is wrong with this code?

A condensed set of this code works in the spark-shell.

It does not work when deployed via a jar.

def calcSimpleRetention(start:String,end:String,event1:String,event2:String):List[Double] = {
    val spd = new PipelineDate(start)
    val epd = new PipelineDate(end)
    // filter for event1 events and return RDDs that are maps of user_ids and 0
    val f = sc.textFile(spd.toJsonHdfsFileName)
    val ev1rdd = f.filter(_.split(",")(0).split(":")(1).replace("\"","") == event1).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),1)).cache
    val ev1c = ev1rdd.count.toDouble

    // do the same as above for event2 events, only substitute 0s with 1s
    val ev2rdds = for {
       dt <- PipelineDate.getPeriod(spd+1,epd)
       val f1 = sc.textFile(dt.toJsonHdfsFileName)
    } yield (f1.filter(_.split(",")(0).split(":")(1).replace("\"","") == event2).map(line => (line.split(",")(2).split(":")(1).replace("\"",""),1)).distinct)

    // cache all event1 and event2 RDDs
    ev2rdds.foreach(_.cache)
    val cts = for {
      ev2 <- ev2rdds
    } yield ev2.count

    val retent = for {
      ev2rdd <- ev2rdds
      val ret = ev1rdd.union(ev2rdd).groupByKey()
    } yield ret.filter(e => e._2.length > 1 && e._2.filter(_==0).length>0)

    val rcts = retent.map(_.count)
println("----------------------------------------------------------------------")
    println(s"${rcts}")
    println(s"${cts}")

    for {
      c <- rcts
    } yield(ev1c/c.toDouble)
    //Map(result:_*)
  }

This is what this code prints:
List(0, 0)
List(785912, 825254)
List(Infinity, Infinity)

My question is: it does not appear that the union().groupBy().filter(....) segment is working (the List(0,0) output). The app is not failing, it finishes just fine.

Any ideas?
Ognen

--
Some people, when confronted with a problem, think "I know, I'll use regular expressions." Now they have two problems.
-- Jamie Zawinski



-- 
Some people, when confronted with a problem, think "I know, I'll use regular expressions." Now they have two problems.
-- Jamie Zawinski
Reply | Threaded
Open this post in threaded view
|

Re: Can anyone offer any insight at all?

Alan Burlison
On 07/03/2014 19:08, Ognen Duzlevski wrote:

> I have had the most awful time figuring out these "looped" things. It
> seems like it is next to impossible to run a .filter() operation in a
> for loop, it seems to work if you yield .filter()

The equivalent of a filter in a for statement is an 'if'. Scala desugars
for comprehensions into the equivalent sequence of map, flatMap and
withFilter invocations, section 23.4 in "Programming with Scala" has a
very good explanation of how the mapping is done.

$ scala -Xprint:parser -e 'println(for(i <- Seq(1,2,3,4) if (i % 2) ==
0; j <- Seq(10,11) if j <= 10) yield i * j)'
[[syntax trees at end of                    parser]] //
scalacmd2816067526771161181.scala
package <empty> {
   object Main extends scala.AnyRef {
     def <init>() = {
       super.<init>();
       ()
     };
     def main(argv: Array[String]): scala.Unit = {
       val args = argv;
       {
         final class $anon extends scala.AnyRef {
           def <init>() = {
             super.<init>();
             ()
           };
           println(Seq(1, 2, 3, 4).withFilter(((i) =>
i.$percent(2).$eq$eq(0))).flatMap(((i) => Seq(10, 11).withFilter(((j) =>
j.$less$eq(10))).map(((j) => i.$times(j))))))
         };
         new $anon()
       }
     }
   }
}

List(20, 40)
$

--
Alan Burlison
--