[Structured Streaming] User Define Aggregation Function

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

[Structured Streaming] User Define Aggregation Function

subramgr
This post was updated on .
Hi

I am trying to explore how I can use UDAF for my use case.

I have something like this in my Structured Streaming Job.

        val counts: Dataset[(String, Double)] = events
            .withWatermark("timestamp", "30 minutes")
            .groupByKey(e => e._2.siteIdentifier + "|" + e._2.sessionId)
            .flatMapGroupsWithState(OutputMode.Append(),
GroupStateTimeout.EventTimeTimeout())(updateSessionState)
            .map(m=>(s"${m.name}.${m.timestamp}",m.count))
            .groupByKey(_._1)
            .agg(typed.sum(_._2))

Where I am always using *sum* as my *agg* function.

I would want to to choose the *agg* function depending on the name of the
*metric* which is a *string*

Something like if *name* startsWith *count* use *sum* else if it starts with
*time* use *avg*

Can I do that ? or do I need to split the stream into different stream and then apply agg function ?
If I need to split the stream any idea how to do that ?



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org