Spark structured streaming - Fallback to earliest offset

classic Classic list List threaded Threaded
6 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark structured streaming - Fallback to earliest offset

Ruijing Li
Hi all,

I have a spark structured streaming app that is consuming from a kafka topic with retention set up. Sometimes I face an issue where my query has not finished processing a message but the retention kicks in and deletes the offset, which since I use the default setting of “failOnDataLoss=true” causes my query to fail. The solution I currently have is manual, deleting the offsets directory and rerunning. 

I instead like to have spark automatically fall back to the earliest offset available. The solutions I saw recommend setting auto.offset = earliest, but for structured streaming, you cannot set that. How do I do this for structured streaming?

Thanks!
--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Spark structured streaming - Fallback to earliest offset

Burak Yavuz-2
Just set `failOnDataLoss=false` as an option in readStream?

On Tue, Apr 14, 2020 at 4:33 PM Ruijing Li <[hidden email]> wrote:
Hi all,

I have a spark structured streaming app that is consuming from a kafka topic with retention set up. Sometimes I face an issue where my query has not finished processing a message but the retention kicks in and deletes the offset, which since I use the default setting of “failOnDataLoss=true” causes my query to fail. The solution I currently have is manual, deleting the offsets directory and rerunning. 

I instead like to have spark automatically fall back to the earliest offset available. The solutions I saw recommend setting auto.offset = earliest, but for structured streaming, you cannot set that. How do I do this for structured streaming?

Thanks!
--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Spark structured streaming - Fallback to earliest offset

Ruijing Li
I see, I wasn’t sure if that would work as expected. The docs seems to suggest to be careful before turning off that option, and I’m not sure why failOnDataLoss is true by default.

On Tue, Apr 14, 2020 at 5:16 PM Burak Yavuz <[hidden email]> wrote:
Just set `failOnDataLoss=false` as an option in readStream?

On Tue, Apr 14, 2020 at 4:33 PM Ruijing Li <[hidden email]> wrote:
Hi all,

I have a spark structured streaming app that is consuming from a kafka topic with retention set up. Sometimes I face an issue where my query has not finished processing a message but the retention kicks in and deletes the offset, which since I use the default setting of “failOnDataLoss=true” causes my query to fail. The solution I currently have is manual, deleting the offsets directory and rerunning. 

I instead like to have spark automatically fall back to the earliest offset available. The solutions I saw recommend setting auto.offset = earliest, but for structured streaming, you cannot set that. How do I do this for structured streaming?

Thanks!
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Spark structured streaming - Fallback to earliest offset

Jungtaek Lim-2
I think Spark is trying to ensure that it reads the input "continuously" without any missing. Technically it may be valid to say the situation is a kind of "data-loss", as the query couldn't process the offsets which are being thrown out, and owner of the query needs to be careful as it affects the result.

If your streaming query keeps up with input rate then it's pretty rare for the query to go under retention. Even it lags a bit, it'd be safe if retention is set to enough period. The ideal state would be ensuring your query to process all offsets before they are thrown out by retention (don't leave the query lagging behind - either increasing processing power or increasing retention duration, though most probably you'll need to do former), but if you can't make sure and if you understand the risk then yes you can turn off the option and take the risk.


On Wed, Apr 15, 2020 at 9:24 AM Ruijing Li <[hidden email]> wrote:
I see, I wasn’t sure if that would work as expected. The docs seems to suggest to be careful before turning off that option, and I’m not sure why failOnDataLoss is true by default.

On Tue, Apr 14, 2020 at 5:16 PM Burak Yavuz <[hidden email]> wrote:
Just set `failOnDataLoss=false` as an option in readStream?

On Tue, Apr 14, 2020 at 4:33 PM Ruijing Li <[hidden email]> wrote:
Hi all,

I have a spark structured streaming app that is consuming from a kafka topic with retention set up. Sometimes I face an issue where my query has not finished processing a message but the retention kicks in and deletes the offset, which since I use the default setting of “failOnDataLoss=true” causes my query to fail. The solution I currently have is manual, deleting the offsets directory and rerunning. 

I instead like to have spark automatically fall back to the earliest offset available. The solutions I saw recommend setting auto.offset = earliest, but for structured streaming, you cannot set that. How do I do this for structured streaming?

Thanks!
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Spark structured streaming - Fallback to earliest offset

Ruijing Li
Thanks Jungtaek, that makes sense.

I tried Burak’s solution of just turning failOnDataLoss to be false, but instead of failing, the job is stuck. I’m guessing that the offsets are being deleted faster than the job can process them and it will be stuck unless I increase resources? Or does once the exception happen, spark will hang?

On Tue, Apr 14, 2020 at 10:48 PM Jungtaek Lim <[hidden email]> wrote:
I think Spark is trying to ensure that it reads the input "continuously" without any missing. Technically it may be valid to say the situation is a kind of "data-loss", as the query couldn't process the offsets which are being thrown out, and owner of the query needs to be careful as it affects the result.

If your streaming query keeps up with input rate then it's pretty rare for the query to go under retention. Even it lags a bit, it'd be safe if retention is set to enough period. The ideal state would be ensuring your query to process all offsets before they are thrown out by retention (don't leave the query lagging behind - either increasing processing power or increasing retention duration, though most probably you'll need to do former), but if you can't make sure and if you understand the risk then yes you can turn off the option and take the risk.


On Wed, Apr 15, 2020 at 9:24 AM Ruijing Li <[hidden email]> wrote:
I see, I wasn’t sure if that would work as expected. The docs seems to suggest to be careful before turning off that option, and I’m not sure why failOnDataLoss is true by default.

On Tue, Apr 14, 2020 at 5:16 PM Burak Yavuz <[hidden email]> wrote:
Just set `failOnDataLoss=false` as an option in readStream?

On Tue, Apr 14, 2020 at 4:33 PM Ruijing Li <[hidden email]> wrote:
Hi all,

I have a spark structured streaming app that is consuming from a kafka topic with retention set up. Sometimes I face an issue where my query has not finished processing a message but the retention kicks in and deletes the offset, which since I use the default setting of “failOnDataLoss=true” causes my query to fail. The solution I currently have is manual, deleting the offsets directory and rerunning. 

I instead like to have spark automatically fall back to the earliest offset available. The solutions I saw recommend setting auto.offset = earliest, but for structured streaming, you cannot set that. How do I do this for structured streaming?

Thanks!
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
Reply | Threaded
Open this post in threaded view
|

Re: Spark structured streaming - Fallback to earliest offset

Jungtaek Lim-2
You may want to check "where" the job is stuck via taking thread dump - it could be in kafka consumer, in Spark codebase, etc. Without the information it's hard to say.

On Thu, Apr 16, 2020 at 4:22 PM Ruijing Li <[hidden email]> wrote:
Thanks Jungtaek, that makes sense.

I tried Burak’s solution of just turning failOnDataLoss to be false, but instead of failing, the job is stuck. I’m guessing that the offsets are being deleted faster than the job can process them and it will be stuck unless I increase resources? Or does once the exception happen, spark will hang?

On Tue, Apr 14, 2020 at 10:48 PM Jungtaek Lim <[hidden email]> wrote:
I think Spark is trying to ensure that it reads the input "continuously" without any missing. Technically it may be valid to say the situation is a kind of "data-loss", as the query couldn't process the offsets which are being thrown out, and owner of the query needs to be careful as it affects the result.

If your streaming query keeps up with input rate then it's pretty rare for the query to go under retention. Even it lags a bit, it'd be safe if retention is set to enough period. The ideal state would be ensuring your query to process all offsets before they are thrown out by retention (don't leave the query lagging behind - either increasing processing power or increasing retention duration, though most probably you'll need to do former), but if you can't make sure and if you understand the risk then yes you can turn off the option and take the risk.


On Wed, Apr 15, 2020 at 9:24 AM Ruijing Li <[hidden email]> wrote:
I see, I wasn’t sure if that would work as expected. The docs seems to suggest to be careful before turning off that option, and I’m not sure why failOnDataLoss is true by default.

On Tue, Apr 14, 2020 at 5:16 PM Burak Yavuz <[hidden email]> wrote:
Just set `failOnDataLoss=false` as an option in readStream?

On Tue, Apr 14, 2020 at 4:33 PM Ruijing Li <[hidden email]> wrote:
Hi all,

I have a spark structured streaming app that is consuming from a kafka topic with retention set up. Sometimes I face an issue where my query has not finished processing a message but the retention kicks in and deletes the offset, which since I use the default setting of “failOnDataLoss=true” causes my query to fail. The solution I currently have is manual, deleting the offsets directory and rerunning. 

I instead like to have spark automatically fall back to the earliest offset available. The solutions I saw recommend setting auto.offset = earliest, but for structured streaming, you cannot set that. How do I do this for structured streaming?

Thanks!
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li
--
Cheers,
Ruijing Li