Mapping between two RDD?

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Mapping between two RDD?

goi.cto@gmail.com
Hi,

I have the following code:

val data = List(
List("Tx1", "node A", "ROOT", 100),
List("Tx2", "node B", "Tx1", 50),
List("Tx3", "node C", "Tx1", 50),
List("Tx4", "node B", "ROOT", 100),
List("Tx5", "node C", "Tx4", 75),
List("Tx6", "node B", "Tx4", 25));
val distData = sc.parallelize(data);
val txData = distData.map(x=>(x(1),(x(0),x(2),x(3))));
val destNode = txData.groupByKey();
/*
res60: Array[(Any, Seq[(Any, Any, Any)])] = Array((node A,ArrayBuffer((Tx1,ROOT,100))), (node B,ArrayBuffer((Tx2,Tx1,50), (Tx4,ROOT,100), (Tx6,Tx4,25))), (node C,ArrayBuffer((Tx3,Tx1,50), (Tx5,Tx4,75))))
*/
val txData = distData.map(x=>(x(0),x(1)));
val txgData = txData.groupByKey();
val txfData = txgData.map(p => (p._1,p._2(0)))
val root = List("ROOT","rootNode");
val rootNode = sc.parallelize(root);

How do I union rootNodeRDD with txfData RDD?

/*
res61: Array[(Any, Any)] = Array((Tx1,node A), (Tx6,node B), (Tx4,node B), (Tx2,node B), (Tx5,node C), (Tx3,node C))
*/

How do I map the 2nd argument in the destNode RDD with the key of the txfData RDD?
Outcome should be destNode RDD with the value from txfData RDD by key.
(node B,ArrayBuffer(ArrayBuffer((Tx2,Tx1,50),node A), ArrayBuffer((Tx4,ROOT,100),rootNode), ArrayBuffer((Tx6,Tx4,25),node B)))

Help appreciated!
--
Eran | CTO