OOM: Structured Streaming aggregation state not cleaned up propertly

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

OOM: Structured Streaming aggregation state not cleaned up propertly

weand
We implemented a streaming query with aggregation on event-time with
watermark. I'm wondering why aggregation state is not cleanup up. According
to documentation old aggregation state should be cleared when using
watermarks. We also don't see any condition [1] for why state should not be
cleanup up.

We do something like this:

event_schema = T.StructType([
    T.StructField("remote_ip", T.StringType(), True),
    T.StructField("username", T.StringType(), True)
])

stream_writer = (spark.readStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("subscribe", "input_topic")
    .option("startingOffsets", "earliest")
    .load()
    .withColumn("key", F.col("key").cast(T.StringType()))
    .withColumn("value", F.col("value").cast(T.StringType()))
    .withColumn("event", F.from_json(F.col("value"), event_schema))
    .select("timestamp", "event.*")
    .where("username rlike '[^@]+@[^\.]\..+'")
    *.withWatermark("timestamp", "600 seconds")
    .groupBy(
        F.window("timestamp", "600 seconds", "30 seconds"),
        F.col("remote_ip")
    )
    .agg(
        F.approx_count_distinct("username").alias("distinct_username"),
        F.collect_set("username").alias("all_usernames"),
    )*
    .where(F.expr("distinct_username >= 2"))
    .select("remote_ip")
    .writeStream
    .format("kafka")
    .option("kafka.bootstrap.servers", "localhost:9092")
    .option("topic", "output_topic")
    .option("checkpointLocation", "hdfs://...")
    .trigger(processingTime="10 seconds")
    .outputMode("update")
    .start()
)

When running this for several hours we face an Heap OutOfMemory in our
driver application
<http://apache-spark-user-list.1001560.n3.nabble.com/file/t8542/56_001.png>

Analyzing the heap dump reveals:
*One instance of "org.apache.spark.status.ElementTrackingStore"* loaded by
"sun.misc.Launcher$AppClassLoader @ 0x800223d0" occupies *1.793.945.416
(93,59%) bytes*. The memory is accumulated in one instance of
"java.util.concurrent.ConcurrentHashMap$Node[]" loaded by "<system class
loader>".


I would expect that memory peaks would only appear for the window duration
size (which is 10 minutes in our case). But it seems event state is never
cleaned.

Any ideas?

Regards
Andreas

[1]
http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#conditions-for-watermarking-to-clean-aggregation-state



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: OOM: Structured Streaming aggregation state not cleaned up propertly

weand
Nobody has any idea... ?

Is filtering after aggregation in structured streaming supported but maybe
buggy? See following line in the example from earlier mail...
...
.where(F.expr("distinct_username >= 2"))
...




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]