Re: [STRUCTURED STREAM] Join static dataset in state function (flatMapGroupsWithState)

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: [STRUCTURED STREAM] Join static dataset in state function (flatMapGroupsWithState)

maasg
Hi Chris,

Could you show the code you are using? When you mention "I like to use a static datasource (JDBC) in the state function" do you refer to a DataFrame from a JDBC source or an independent JDBC connection? 

The key point to consider is that the flatMapGroupsWithState function must be serializable. Its execution happens in the workers of a Spark job. 

If you are using a JDBC connection, you need to make sure the connection is made in the context of the function. JDBC connections are not serializable.
Likewise, Dataset/DataFrames only function in the driver where they are defined. They are bound to the Spark Session in the driver and it does not make sense to access them in a remote executor. 

Make sure you check the executor logs as well. There might be a NullPointerException lurking somewhere in your logs.

met vriendelijke groeten, Gerard.

PS: spark-dev ([hidden email]) is for discussions about open source development of the Spark project.
For general questions like this, use the user's  mailing list ([hidden email])  (note that I changed that address in the to: )

On Thu, Jul 19, 2018 at 12:51 PM Christiaan Ras <[hidden email]> wrote:

I use the state function flatmapgroupswithstate to track state of a kafka stream. To further customize the state function I like to use a static datasource (JDBC) in the state function. This datasource contains data I like to join with the stream (as Iterator) within flatmapgroupswithstate.

 

When I try to access the JDBC source within flatmapgroupswithstate Spark execution freezes without any Exceptions or logging.

To verify the JDBC connection works, I also tried to access the source outside the state function and that works. So now I join the static source with streaming source before feeding it to flatmapgroupswithstate. It seems to work so far…

 

Any ideas why accessing the JDBC source within flatmapgroupswithstate could fail (freezes Spark execution)? Is it wise to use external datasources within flatmapgroupswithstate?

 

Thanks,

Chris

 

 

Reply | Threaded
Open this post in threaded view
|

Re: [STRUCTURED STREAM] Join static dataset in state function (flatMapGroupsWithState)

chris-sw

Hi Gerard,

 

First, I like to thank you for your fast reply and for directing my question to the proper mailinglist!

I established the JDBC connection in the context of the state function (flatMapGroupsWithState). The JDBC connection is made by using the read in the SparkSession. Like below:

spark.read

                    .format("jdbc")

                    .option("url", s"jdbc:postgresql://${connection.host}/${connection.db}")

                    .option("dbtable", s"table")

                    .option("user", connection.user)

                    .option("password", connection.pwd)

                    .option("driver", "org.postgresql.Driver")

                    .option("numPartitions", "5")

                    .load()

                    .as(Encoders.product[Class])

 

I use a ‘shared’ SparkSession, initialized in main (run by Driver, I guess), but made accessible by Singleton to other classes. The class which handles the state function fetches the shared session from this singleton.

My test ran local with a single thread, so all logging should be visible on the console.

 

BTW: I now implemented the approach to join this datasource with the streaming source before feeding it the state function. That works! But I am still curious how to do this in flatmapgroupswithstate? Or that the state functions have not been designed to do such things…

 

Regards,

 

Chris

 

From: Gerard Maas <[hidden email]>
Date: Thursday, 19 July 2018 at 15:20
To: Christiaan Ras <[hidden email]>, spark users <[hidden email]>
Subject: Re: [STRUCTURED STREAM] Join static dataset in state function (flatMapGroupsWithState)

 

Hi Chris,

 

Could you show the code you are using? When you mention "I like to use a static datasource (JDBC) in the state function" do you refer to a DataFrame from a JDBC source or an independent JDBC connection? 

 

The key point to consider is that the flatMapGroupsWithState function must be serializable. Its execution happens in the workers of a Spark job. 

 

If you are using a JDBC connection, you need to make sure the connection is made in the context of the function. JDBC connections are not serializable.

Likewise, Dataset/DataFrames only function in the driver where they are defined. They are bound to the Spark Session in the driver and it does not make sense to access them in a remote executor. 

 

Make sure you check the executor logs as well. There might be a NullPointerException lurking somewhere in your logs.

 

met vriendelijke groeten, Gerard.

 

PS: spark-dev ([hidden email]) is for discussions about open source development of the Spark project.

For general questions like this, use the user's  mailing list ([hidden email])  (note that I changed that address in the to: )

 

On Thu, Jul 19, 2018 at 12:51 PM Christiaan Ras <[hidden email]> wrote:

I use the state function flatmapgroupswithstate to track state of a kafka stream. To further customize the state function I like to use a static datasource (JDBC) in the state function. This datasource contains data I like to join with the stream (as Iterator) within flatmapgroupswithstate.

 

When I try to access the JDBC source within flatmapgroupswithstate Spark execution freezes without any Exceptions or logging.

To verify the JDBC connection works, I also tried to access the source outside the state function and that works. So now I join the static source with streaming source before feeding it to flatmapgroupswithstate. It seems to work so far…

 

Any ideas why accessing the JDBC source within flatmapgroupswithstate could fail (freezes Spark execution)? Is it wise to use external datasources within flatmapgroupswithstate?

 

Thanks,

Chris