How to initialize updateStateByKey operation

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

How to initialize updateStateByKey operation

Soumitra Kumar
I started with StatefulNetworkWordCount to have a running count of words seen.

I have a file 'stored.count' which contains the word counts.

$ cat stored.count
a 1
b 2

I want to initialize StatefulNetworkWordCount with the values in 'stored.count' file, how do I do that?

I looked at the paper 'EECS-2012-259.pdf' Matei et al, in figure 2, it would be useful to have an initial RDD feeding into 'counts' at 't = 1', as below.

                           initial
                             |
t = 1: pageView -> ones -> counts
                             |
t = 2: pageView -> ones -> counts
...

I have also attached the modified figure 2 of http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf .

I managed to hack Spark code to achieve this, and attaching the modified files.

Essentially, I added an argument 'initial : RDD [(K, S)]' to updateStateByKey method, as
    def updateStateByKey[S: ClassTag](
        initial : RDD [(K, S)],
        updateFunc: (Seq[V], Option[S]) => Option[S],
        partitioner: Partitioner
      ): DStream[(K, S)]

If it sounds interesting for larger crowd I would be happy to cleanup the code, and volunteer to push into the code. I don't know the procedure to that though.

-Soumitra.


---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

sparkstreaming.png (39K) Download Attachment
PairDStreamFunctions.scala (38K) Download Attachment
StateDStream.scala (7K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: How to initialize updateStateByKey operation

Soumitra Kumar
I thought I did a good job ;-)

OK, so what is the best way to initialize updateStateByKey operation? I have counts from previous spark-submit, and want to load that in next spark-submit job.

----- Original Message -----
From: "Soumitra Kumar" <[hidden email]>
To: "spark users" <[hidden email]>
Sent: Sunday, September 21, 2014 10:43:01 AM
Subject: How to initialize updateStateByKey operation

I started with StatefulNetworkWordCount to have a running count of words seen.

I have a file 'stored.count' which contains the word counts.

$ cat stored.count
a 1
b 2

I want to initialize StatefulNetworkWordCount with the values in 'stored.count' file, how do I do that?

I looked at the paper 'EECS-2012-259.pdf' Matei et al, in figure 2, it would be useful to have an initial RDD feeding into 'counts' at 't = 1', as below.

                           initial
                             |
t = 1: pageView -> ones -> counts
                             |
t = 2: pageView -> ones -> counts
...

I have also attached the modified figure 2 of http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf .

I managed to hack Spark code to achieve this, and attaching the modified files.

Essentially, I added an argument 'initial : RDD [(K, S)]' to updateStateByKey method, as
    def updateStateByKey[S: ClassTag](
        initial : RDD [(K, S)],
        updateFunc: (Seq[V], Option[S]) => Option[S],
        partitioner: Partitioner
      ): DStream[(K, S)]

If it sounds interesting for larger crowd I would be happy to cleanup the code, and volunteer to push into the code. I don't know the procedure to that though.

-Soumitra.

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: How to initialize updateStateByKey operation

Tathagata Das
At a high-level, the suggestion sounds good to me. However regarding code, its best to submit a Pull Request on Spark github page for community reviewing. You will find more information here.

On Tue, Sep 23, 2014 at 10:11 PM, Soumitra Kumar <[hidden email]> wrote:
I thought I did a good job ;-)

OK, so what is the best way to initialize updateStateByKey operation? I have counts from previous spark-submit, and want to load that in next spark-submit job.

----- Original Message -----
From: "Soumitra Kumar" <[hidden email]>
To: "spark users" <[hidden email]>
Sent: Sunday, September 21, 2014 10:43:01 AM
Subject: How to initialize updateStateByKey operation

I started with StatefulNetworkWordCount to have a running count of words seen.

I have a file 'stored.count' which contains the word counts.

$ cat stored.count
a 1
b 2

I want to initialize StatefulNetworkWordCount with the values in 'stored.count' file, how do I do that?

I looked at the paper 'EECS-2012-259.pdf' Matei et al, in figure 2, it would be useful to have an initial RDD feeding into 'counts' at 't = 1', as below.

                           initial
                             |
t = 1: pageView -> ones -> counts
                             |
t = 2: pageView -> ones -> counts
...

I have also attached the modified figure 2 of http://www.eecs.berkeley.edu/Pubs/TechRpts/2012/EECS-2012-259.pdf .

I managed to hack Spark code to achieve this, and attaching the modified files.

Essentially, I added an argument 'initial : RDD [(K, S)]' to updateStateByKey method, as
    def updateStateByKey[S: ClassTag](
        initial : RDD [(K, S)],
        updateFunc: (Seq[V], Option[S]) => Option[S],
        partitioner: Partitioner
      ): DStream[(K, S)]

If it sounds interesting for larger crowd I would be happy to cleanup the code, and volunteer to push into the code. I don't know the procedure to that though.

-Soumitra.

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]