How to introduce reset logic when aggregating/joining streaming dataframe with static dataframe for spark streaming

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

How to introduce reset logic when aggregating/joining streaming dataframe with static dataframe for spark streaming

spark-learner

A good feature of spark structured streaming is that it can join the static dataframe with the streaming dataframe. To cite an example as below. users is a static dataframe read from database. transactionStream is from a stream. By the joining operation, we can get the spending of each country accumulated with the new arrival of batches.

val spendingByCountry = (transactionStream
    .join(users, users("id") === transactionStream("userid"))
    .groupBy($"country")
    .agg(sum($"cost")) as "spending")

spendingByContry.writeStream
    .outputMode("complete")
    .format("console")
    .start()

The sum of cost is aggregated with the arrival of new batches as shown below.

-------------------------------
Batch: 0
------------------------------- 
Country Spending
EN      90.0
FR      50.0

-------------------------------
Batch: 1
------------------------------- 
Country Spending
EN      190.0
FR      150.0

If I want to introduce a notification and reset logic as the above example, what should be the correct approach? The requirement is that if the spending is larger than some threshold, the records of country and spending should be stored into a table and the spending should be reset as 0 to accumulate again.