Spark clustering question

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark clustering question

goi.cto@gmail.com
Hi,

I have the following input file:

Tx ID , Dest Node ID, Original Tx ID, Amount 

for every line with Original Tx ID we will find a line with the same Tx ID
if Tx are edges going into nodes then every edge going out from a node had a previous edge going in.
 
Sample Data:
Tx1, node A, null , 100
Tx2, node B, Tx1, 50
Tx3, node C, Tx1, 50
Tx4, node B, null, 100
Tx5, node C, Tx4, 75
Tx6, node B, Tx4, 25

I want to build a spark program that build a file with the following structure:

Source Node, Tx ID edge , Dest Node

Sample Data:
ROOT    Tx1,     A,     100
A,          Tx2,     B,     50
A,          Tx3,     C,     50
ROOT    Tx4,      B,    100
B,          Tx5,      C,    75
B,          Tx6,      B,    25

The logic needs to be implemented here is:
for each node (N -> , 
           for each row where N is the dest node ( row -> 
                write: N , Row.TxID, Row.Node, Row.Amount))

Any idea how to do I do it using Spark?

--
Eran | CTO 
Reply | Threaded
Open this post in threaded view
|

Re: Spark clustering question

sowen
If I understand you correctly, this is basically a join, but with some
additional result rows thrown in.

val input = sc.textFile("data.csv").map(_.split(","))

val byTxID = input.map(record => (record(0),record))
val byOriginalTxID = input.map(record => (record(2),record))
val joined = byOriginalTxID.join(byTxID)

val outputA = joined.map(j => Array(j._2._2(1), j._2._1(0),
j._2._1(1), j._2._1(3)))
val outputB = input.filter(_(2) == "null").map(r => Array("ROOT",
r(0), r(1), r(3)))
val output = outputA.union(outputB)

output.foreach(r => println(r.deep.mkString(",")))
--
Sean Owen | Director, Data Science | London


On Thu, Feb 13, 2014 at 12:42 PM, goi cto <[hidden email]> wrote:

> Hi,
>
> I have the following input file:
>
> Tx ID , Dest Node ID, Original Tx ID, Amount
>
> for every line with Original Tx ID we will find a line with the same Tx ID
> if Tx are edges going into nodes then every edge going out from a node had a
> previous edge going in.
>
> Sample Data:
> Tx1, node A, null , 100
> Tx2, node B, Tx1, 50
> Tx3, node C, Tx1, 50
> Tx4, node B, null, 100
> Tx5, node C, Tx4, 75
> Tx6, node B, Tx4, 25
>
> I want to build a spark program that build a file with the following
> structure:
>
> Source Node, Tx ID edge , Dest Node
>
> Sample Data:
> ROOT    Tx1,     A,     100
> A,          Tx2,     B,     50
> A,          Tx3,     C,     50
> ROOT    Tx4,      B,    100
> B,          Tx5,      C,    75
> B,          Tx6,      B,    25
>
> The logic needs to be implemented here is:
> for each node (N -> ,
>            for each row where N is the dest node ( row ->
>                 write: N , Row.TxID, Row.Node, Row.Amount))
>
> Any idea how to do I do it using Spark?
>
> --
> Eran | CTO
Reply | Threaded
Open this post in threaded view
|

Re: Spark clustering question

goi.cto@gmail.com
Thanks, That's works just fine...
Looks so simple once you get it :-)

Ean


On Fri, Feb 14, 2014 at 6:24 PM, Sean Owen <[hidden email]> wrote:
If I understand you correctly, this is basically a join, but with some
additional result rows thrown in.

val input = sc.textFile("data.csv").map(_.split(","))

val byTxID = input.map(record => (record(0),record))
val byOriginalTxID = input.map(record => (record(2),record))
val joined = byOriginalTxID.join(byTxID)

val outputA = joined.map(j => Array(j._2._2(1), j._2._1(0),
j._2._1(1), j._2._1(3)))
val outputB = input.filter(_(2) == "null").map(r => Array("ROOT",
r(0), r(1), r(3)))
val output = outputA.union(outputB)

output.foreach(r => println(r.deep.mkString(",")))
--
Sean Owen | Director, Data Science | London


On Thu, Feb 13, 2014 at 12:42 PM, goi cto <[hidden email]> wrote:
> Hi,
>
> I have the following input file:
>
> Tx ID , Dest Node ID, Original Tx ID, Amount
>
> for every line with Original Tx ID we will find a line with the same Tx ID
> if Tx are edges going into nodes then every edge going out from a node had a
> previous edge going in.
>
> Sample Data:
> Tx1, node A, null , 100
> Tx2, node B, Tx1, 50
> Tx3, node C, Tx1, 50
> Tx4, node B, null, 100
> Tx5, node C, Tx4, 75
> Tx6, node B, Tx4, 25
>
> I want to build a spark program that build a file with the following
> structure:
>
> Source Node, Tx ID edge , Dest Node
>
> Sample Data:
> ROOT    Tx1,     A,     100
> A,          Tx2,     B,     50
> A,          Tx3,     C,     50
> ROOT    Tx4,      B,    100
> B,          Tx5,      C,    75
> B,          Tx6,      B,    25
>
> The logic needs to be implemented here is:
> for each node (N -> ,
>            for each row where N is the dest node ( row ->
>                 write: N , Row.TxID, Row.Node, Row.Amount))
>
> Any idea how to do I do it using Spark?
>
> --
> Eran | CTO



--
Eran | CTO