Spark Streaming and database access (e.g. MySQL)

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark Streaming and database access (e.g. MySQL)

jchen
Hi,

Has someone tried using Spark Streaming with MySQL (or any other database/data store)? I can write to MySQL at the beginning of the driver application. However, when I am trying to write the result of every streaming processing window to MySQL, it fails with the following error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: com.mysql.jdbc.JDBC4PreparedStatement

I think it is because the statement object should be serializable, in order to be executed on the worker node. Has someone tried the similar cases? Example code will be very helpful. My intension is to execute INSERT/UPDATE/DELETE/SELECT statements for each sliding window.

Thanks,
JC
Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming and database access (e.g. MySQL)

Mayur Rustagi
Standard pattern is to initialize the mysql jdbc driver in your mappartition call , update database & then close off the driver. 
Couple of gotchas
1. New driver initiated for all your partitions
2. If the effect(inserts & updates) is not idempotent, so if your server crashes, Spark will replay updates to mysql & may cause data corruption. 


Regards
Mayur

Mayur Rustagi
Ph: +1 (760) 203 3257

On Sun, Sep 7, 2014 at 11:54 AM, jchen <[hidden email]> wrote:
Hi,

Has someone tried using Spark Streaming with MySQL (or any other
database/data store)? I can write to MySQL at the beginning of the driver
application. However, when I am trying to write the result of every
streaming processing window to MySQL, it fails with the following error:

org.apache.spark.SparkException: Job aborted due to stage failure: Task not
serializable: java.io.NotSerializableException:
com.mysql.jdbc.JDBC4PreparedStatement

I think it is because the statement object should be serializable, in order
to be executed on the worker node. Has someone tried the similar cases?
Example code will be very helpful. My intension is to execute
INSERT/UPDATE/DELETE/SELECT statements for each sliding window.

Thanks,
JC



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-database-access-e-g-MySQL-tp13644.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming and database access (e.g. MySQL)

sowen
... I'd call out that last bit as actually tricky: "close off the driver"

See this message for the right-est way to do that, along with the
right way to open DB connections remotely instead of trying to
serialize them:

http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAPH-c_O9kQO6yJ4khXUVdO=+D4vj=JfG2tP9eqn5RPko=dRNAg@...%3E

On Sun, Sep 7, 2014 at 4:19 PM, Mayur Rustagi <[hidden email]> wrote:

> Standard pattern is to initialize the mysql jdbc driver in your mappartition
> call , update database & then close off the driver.
> Couple of gotchas
> 1. New driver initiated for all your partitions
> 2. If the effect(inserts & updates) is not idempotent, so if your server
> crashes, Spark will replay updates to mysql & may cause data corruption.
>
>
> Regards
> Mayur
>
> Mayur Rustagi
> Ph: +1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi
>
>
> On Sun, Sep 7, 2014 at 11:54 AM, jchen <[hidden email]> wrote:
>>
>> Hi,
>>
>> Has someone tried using Spark Streaming with MySQL (or any other
>> database/data store)? I can write to MySQL at the beginning of the driver
>> application. However, when I am trying to write the result of every
>> streaming processing window to MySQL, it fails with the following error:
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> not
>> serializable: java.io.NotSerializableException:
>> com.mysql.jdbc.JDBC4PreparedStatement
>>
>> I think it is because the statement object should be serializable, in
>> order
>> to be executed on the worker node. Has someone tried the similar cases?
>> Example code will be very helpful. My intension is to execute
>> INSERT/UPDATE/DELETE/SELECT statements for each sliding window.
>>
>> Thanks,
>> JC
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-database-access-e-g-MySQL-tp13644.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [hidden email]
>> For additional commands, e-mail: [hidden email]
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming and database access (e.g. MySQL)

Soumitra Kumar
I have the following code:

stream foreachRDD { rdd =>
                if (rdd.take (1).size == 1) {
                    rdd foreachPartition { iterator =>
                        initDbConnection ()
                        iterator foreach {
                            write to db
                        }
                        closeDbConnection ()
                    }
                }
            }

On Sun, Sep 7, 2014 at 1:26 PM, Sean Owen <[hidden email]> wrote:
... I'd call out that last bit as actually tricky: "close off the driver"

See this message for the right-est way to do that, along with the
right way to open DB connections remotely instead of trying to
serialize them:

http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAPH-c_O9kQO6yJ4khXUVdO=+D4vj=JfG2tP9eqn5RPko=dRNAg@...%3E

On Sun, Sep 7, 2014 at 4:19 PM, Mayur Rustagi <[hidden email]> wrote:
> Standard pattern is to initialize the mysql jdbc driver in your mappartition
> call , update database & then close off the driver.
> Couple of gotchas
> 1. New driver initiated for all your partitions
> 2. If the effect(inserts & updates) is not idempotent, so if your server
> crashes, Spark will replay updates to mysql & may cause data corruption.
>
>
> Regards
> Mayur
>
> Mayur Rustagi
> Ph: <a href="tel:%2B1%20%28760%29%20203%203257" value="+17602033257">+1 (760) 203 3257
> http://www.sigmoidanalytics.com
> @mayur_rustagi
>
>
> On Sun, Sep 7, 2014 at 11:54 AM, jchen <[hidden email]> wrote:
>>
>> Hi,
>>
>> Has someone tried using Spark Streaming with MySQL (or any other
>> database/data store)? I can write to MySQL at the beginning of the driver
>> application. However, when I am trying to write the result of every
>> streaming processing window to MySQL, it fails with the following error:
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> not
>> serializable: java.io.NotSerializableException:
>> com.mysql.jdbc.JDBC4PreparedStatement
>>
>> I think it is because the statement object should be serializable, in
>> order
>> to be executed on the worker node. Has someone tried the similar cases?
>> Example code will be very helpful. My intension is to execute
>> INSERT/UPDATE/DELETE/SELECT statements for each sliding window.
>>
>> Thanks,
>> JC
>>
>>
>>
>> --
>> View this message in context:
>> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-database-access-e-g-MySQL-tp13644.html
>> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [hidden email]
>> For additional commands, e-mail: [hidden email]
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming and database access (e.g. MySQL)

sowen
That should be OK, since the iterator is definitely consumed, and
therefore the connection actually done with, at the end of a 'foreach'
method. You might put the close in a finally block.

On Mon, Sep 8, 2014 at 12:29 AM, Soumitra Kumar
<[hidden email]> wrote:

> I have the following code:
>
> stream foreachRDD { rdd =>
>                 if (rdd.take (1).size == 1) {
>                     rdd foreachPartition { iterator =>
>                         initDbConnection ()
>                         iterator foreach {
>                             write to db
>                         }
>                         closeDbConnection ()
>                     }
>                 }
>             }
>
> On Sun, Sep 7, 2014 at 1:26 PM, Sean Owen <[hidden email]> wrote:
>>
>> ... I'd call out that last bit as actually tricky: "close off the driver"
>>
>> See this message for the right-est way to do that, along with the
>> right way to open DB connections remotely instead of trying to
>> serialize them:
>>
>>
>> http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAPH-c_O9kQO6yJ4khXUVdO=+D4vj=JfG2tP9eqn5RPko=dRNAg@...%3E
>>
>> On Sun, Sep 7, 2014 at 4:19 PM, Mayur Rustagi <[hidden email]>
>> wrote:
>> > Standard pattern is to initialize the mysql jdbc driver in your
>> > mappartition
>> > call , update database & then close off the driver.
>> > Couple of gotchas
>> > 1. New driver initiated for all your partitions
>> > 2. If the effect(inserts & updates) is not idempotent, so if your server
>> > crashes, Spark will replay updates to mysql & may cause data corruption.
>> >
>> >
>> > Regards
>> > Mayur
>> >
>> > Mayur Rustagi
>> > Ph: +1 (760) 203 3257
>> > http://www.sigmoidanalytics.com
>> > @mayur_rustagi
>> >
>> >
>> > On Sun, Sep 7, 2014 at 11:54 AM, jchen <[hidden email]> wrote:
>> >>
>> >> Hi,
>> >>
>> >> Has someone tried using Spark Streaming with MySQL (or any other
>> >> database/data store)? I can write to MySQL at the beginning of the
>> >> driver
>> >> application. However, when I am trying to write the result of every
>> >> streaming processing window to MySQL, it fails with the following
>> >> error:
>> >>
>> >> org.apache.spark.SparkException: Job aborted due to stage failure: Task
>> >> not
>> >> serializable: java.io.NotSerializableException:
>> >> com.mysql.jdbc.JDBC4PreparedStatement
>> >>
>> >> I think it is because the statement object should be serializable, in
>> >> order
>> >> to be executed on the worker node. Has someone tried the similar cases?
>> >> Example code will be very helpful. My intension is to execute
>> >> INSERT/UPDATE/DELETE/SELECT statements for each sliding window.
>> >>
>> >> Thanks,
>> >> JC
>> >>
>> >>
>> >>
>> >> --
>> >> View this message in context:
>> >>
>> >> http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-and-database-access-e-g-MySQL-tp13644.html
>> >> Sent from the Apache Spark User List mailing list archive at
>> >> Nabble.com.
>> >>
>> >> ---------------------------------------------------------------------
>> >> To unsubscribe, e-mail: [hidden email]
>> >> For additional commands, e-mail: [hidden email]
>> >>
>> >
>>
>> ---------------------------------------------------------------------
>> To unsubscribe, e-mail: [hidden email]
>> For additional commands, e-mail: [hidden email]
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: [hidden email]
For additional commands, e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming and database access (e.g. MySQL)

Tobias Pfeiffer
Hi,

On Mon, Sep 8, 2014 at 4:39 PM, Sean Owen <[hidden email]> wrote:
>                 if (rdd.take (1).size == 1) {
>                     rdd foreachPartition { iterator =>

I was wondering: Since take() is an output operation, isn't it computed twice (once for the take(1), once during the iteration)? Or will only one single element be computed for take(1)?

Thanks
Tobias


Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming and database access (e.g. MySQL)

Mayur Rustagi
I think she is checking for blanks?
But if the RDD is blank then nothing will happen, no db connections etc. 

Mayur Rustagi
Ph: +1 (760) 203 3257

On Mon, Sep 8, 2014 at 1:32 PM, Tobias Pfeiffer <[hidden email]> wrote:
Hi,

On Mon, Sep 8, 2014 at 4:39 PM, Sean Owen <[hidden email]> wrote:
>                 if (rdd.take (1).size == 1) {
>                     rdd foreachPartition { iterator =>

I was wondering: Since take() is an output operation, isn't it computed twice (once for the take(1), once during the iteration)? Or will only one single element be computed for take(1)?

Thanks
Tobias