How to index each map operation????

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

How to index each map operation????

yh18190
Hi,

I want to perform map operation on an RDD of elements such that resulting RDD is a key value pair(counter,value)

For example var k:RDD[Int]=10,20,30,40,40,60...
k.map(t=>(i,t))  where 'i' value should be like a counter whose value increments after each mapoperation...
Pleas help me..
I tried to wirte like this but didnt work out..
var i=0;
k.map(t=>{
(i,t);i+=1;
})

please correct me...
Reply | Threaded
Open this post in threaded view
|

Re: How to index each map operation????

Thierry Herrmann
I'm new to Spark, but isn't this a pure scala question ?

The following seems to work with the spark shell:

$ spark-shell

scala> val rdd = sc.makeRDD(List(10,20,30))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at makeRDD at <console>:12

scala> var cnt = -1
cnt: Int = -1

scala> val rdd2 = rdd.map(i => {cnt+=1;  (cnt,i)} )
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = MappedRDD[9] at map at <console>:16

scala> rdd2.collect
res8: Array[(Int, Int)] = Array((0,10), (1,20), (2,30))

Thierry
Reply | Threaded
Open this post in threaded view
|

Re: How to index each map operation????

Shixiong Zhu
Hi Thierry,

Your code does not work if @yh18190 wants a global counter. A RDD may have more than one partition. For each partition, cnt will be reset to -1. You can try the following code:

scala> val rdd = sc.parallelize( (1, 'a') :: (2, 'b') :: (3, 'c') :: (4, 'd') :: Nil)
rdd: org.apache.spark.rdd.RDD[(Int, Char)] = ParallelCollectionRDD[3] at parallelize at <console>:12

scala> import org.apache.spark.HashPartitioner
import org.apache.spark.HashPartitioner

scala> val rdd2 = rdd.partitionBy(new HashPartitioner(2))
rdd2: org.apache.spark.rdd.RDD[(Int, Char)] = ShuffledRDD[4] at partitionBy at <console>:18

scala> var cnt = -1
cnt: Int = -1

scala> val rdd3 = rdd2.map(i => {cnt+=1;  (cnt,i)} )
rdd3: org.apache.spark.rdd.RDD[(Int, (Int, Char))] = MappedRDD[5] at map at <console>:22

scala> rdd3.collect
res2: Array[(Int, (Int, Char))] = Array((0,(2,b)), (1,(4,d)), (0,(1,a)), (1,(3,c)))

A proper solution is using "rdd.partitionBy(new HashPartitioner(1))" to make sure there is only one partition. But that's not efficient for big input.

Best Regards,

Shixiong Zhu


2014-04-02 11:10 GMT+08:00 Thierry Herrmann <[hidden email]>:
I'm new to Spark, but isn't this a pure scala question ?

The following seems to work with the spark shell:

$ spark-shell

scala> val rdd = sc.makeRDD(List(10,20,30))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at makeRDD at
<console>:12

scala> var cnt = -1
cnt: Int = -1

scala> val rdd2 = rdd.map(i => {cnt+=1;  (cnt,i)} )
rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = MappedRDD[9] at map at
<console>:16

scala> rdd2.collect
res8: Array[(Int, Int)] = Array((0,10), (1,20), (2,30))

Thierry



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/How-to-index-each-map-operation-tp3471p3614.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: How to index each map operation????

yh18190
Hi Therry,

Thanks for the above responses..I implemented using RangePartitioner..we need to use any of the custom partitioners in orderto perform this task..Normally u cant maintain a counter becoz count operations should beperformed on each partitioned block ofdata...