Spark Streaming with mapGroupsWithState

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark Streaming with mapGroupsWithState

Something Something
I am writing a Stateful Streaming application in which I am using mapGroupsWithState to create aggregates for Groups but I need to create Groups based on more than one column in the Input Row. All the examples in the 'Spark: The Definitive Guide' use only one column such as 'User' or 'Device'. I am using code similar to what's given below. How do I specify more than one field in the 'groupByKey'?

There are other challenges as well. The book says we can use 'updateAcrossEvents' the way given below but I get compile time error saying: Error:(43, 65) missing argument list for method updateAcrossEvents in object Main
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of `updateAcrossEvents`.
      .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)


Another challenge: Compiler also complains about the my MyReportError:(41, 12) Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
        
Help in resolving these errors would be greatly appreciated. Thanks in advance.

withEventTime
.as[MyReport]
.groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
.writeStream
.queryName("test_query")
.format("memory")
.outputMode("update")
.start()
Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming with mapGroupsWithState

shicheng31604@gmail.com
maybe you can combine the fields you want to use into one field 

Something Something <[hidden email]> 于2020年3月3日周二 上午6:37写道:
I am writing a Stateful Streaming application in which I am using mapGroupsWithState to create aggregates for Groups but I need to create Groups based on more than one column in the Input Row. All the examples in the 'Spark: The Definitive Guide' use only one column such as 'User' or 'Device'. I am using code similar to what's given below. How do I specify more than one field in the 'groupByKey'?

There are other challenges as well. The book says we can use 'updateAcrossEvents' the way given below but I get compile time error saying: Error:(43, 65) missing argument list for method updateAcrossEvents in object Main
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of `updateAcrossEvents`.
      .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)


Another challenge: Compiler also complains about the my MyReportError:(41, 12) Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
        
Help in resolving these errors would be greatly appreciated. Thanks in advance.

withEventTime
.as[MyReport]
.groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
.writeStream
.queryName("test_query")
.format("memory")
.outputMode("update")
.start()
Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming with mapGroupsWithState

Something Something
I changed it to Tuple2 and that problem is solved.

Any thoughts on this message....

Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of `updateAcrossEvents`.
      .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)

On Mon, Mar 2, 2020 at 5:12 PM lec ssmi <[hidden email]> wrote:
maybe you can combine the fields you want to use into one field 

Something Something <[hidden email]> 于2020年3月3日周二 上午6:37写道:
I am writing a Stateful Streaming application in which I am using mapGroupsWithState to create aggregates for Groups but I need to create Groups based on more than one column in the Input Row. All the examples in the 'Spark: The Definitive Guide' use only one column such as 'User' or 'Device'. I am using code similar to what's given below. How do I specify more than one field in the 'groupByKey'?

There are other challenges as well. The book says we can use 'updateAcrossEvents' the way given below but I get compile time error saying: Error:(43, 65) missing argument list for method updateAcrossEvents in object Main
Unapplied methods are only converted to functions when a function type is expected.
You can make this conversion explicit by writing `updateAcrossEvents _` or `updateAcrossEvents(_,_,_,_,_)` instead of `updateAcrossEvents`.
      .mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)


Another challenge: Compiler also complains about the my MyReportError:(41, 12) Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.
        
Help in resolving these errors would be greatly appreciated. Thanks in advance.

withEventTime
.as[MyReport]
.groupByKey(_.getKeys.getKey1). // How do I add _.getKeys.getKey2?
.mapGroupsWithState(GroupStateTimeout.EventTimeTimeout())(updateAcrossEvents)
.writeStream
.queryName("test_query")
.format("memory")
.outputMode("update")
.start()