dropping unused data from a stream

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

dropping unused data from a stream

Paul Tremblay
I will be streaming data and am trying to understand how to get rid of old data from a stream so it does not become to large. I will stream in one large table of buying data and join that to another table of different data. I need the last 14 days from the second table. I will not need data that is older than 14 days. 

Here is my practice code:


streaming1 = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\
.csv( "input_stream_csv1")
streaming1_with_impressions = streaming1.withWatermark("creation_time", "2 minutes")
streaming2 = spark.readStream.schema(dataSchema).option("maxFilesPerTrigger", 1)\
.csv( "input_stream_csv2")
streaming1.registerTempTable("my_table1")
streaming2.registerTempTable("my_table2")
spark.sql("""select t1.* from my_table1 t1 
inner join my_table2 t2 on t1.key = t2.key
where t1.creation_time < current_timestamp() - interval 15 minutes""")\
.writeStream.trigger(processingTime='10 seconds')\
.format("parquet")\
.option("checkpointLocation", "checkpoint_dir").outputMode("append")\
.option("path", "stream_dir5").start()

The important part of the code is the where in the SQL statement, "where t1.creation_time < current_timestamp() - interval 15 minutes"

For this example, I am hoping that the stream will not contain any rows more than 15 minutes ago. Is this assumption correct? I am not sure how to test this. In addition I have set a watermark on the first stream of 2 minutes. I am thinking that this watermark will make Spark wait an additional 2 minutes for any data that comes in late.

Thanks!
--
Henry Tremblay
Data Engineer, Best Buy