State size on joining two streams

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

State size on joining two streams

Alexander Chermenin
Hi all,

I'd like to ask about the following: we have some two streams (one with a lot of data and another with much less) and try to join them with each other. I modeled this situation using this code:

    val moreData = spark.readStream.format("rate")
      .option("rowsPerSecond", 1000).load()
      .withWatermark("timestamp", "15 seconds")

    val lessData = spark.readStream.format("rate")
      .option("rowsPerSecond", 1).load()
      .filter(_ => Random.nextInt(100) < 2)
      .withWatermark("timestamp", "15 seconds")

    val joinData = moreData.alias("md")
      .join(lessData.alias("ld"),
        expr(
          s"""
             | md.value = ld.value AND
             | md.timestamp BETWEEN ld.timestamp AND ld.timestamp + interval 15 seconds
          """.stripMargin
        )
      )
      .select(moreData("timestamp"), lessData("value"))

    joinData
      .withWatermark("timestamp", "15 seconds")
      .writeStream.format("console").start().awaitTermination()

So, after several minutes I checked the state and there were about million messages in it. And the question is why the entries aren't deleted from the state? There are watermarks for the streams and the condition about timestamps in the join expression.

As a result, when the job works some hours we can catch an OutOfMemory error.

Here is the pipeline (and some info about the state):

image.png


And also, there is the condition to clean up the state but as for me it looks some strange, for example, `timestamp - T15000ms <= -1000`:

image.png


Could anybody explain me, what does it mean?