Exception which using ReduceByKeyAndWindow in Spark Streaming.

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
SRK
Reply | Threaded
Open this post in threaded view
|

Exception which using ReduceByKeyAndWindow in Spark Streaming.

SRK
This post has NOT been accepted by the mailing list yet.
Hi,

I see the following error when I use ReduceByKeyAndWindow in my Spark Streaming app. I use reduce, invReduce and filterFunction as shown below. Any idea as to why I get the error?

 java.lang.Exception: Neither previous window has value for key, nor new values found. Are you sure your key class hashes consistently?


  def reduceWithHashSet: ((Long, HashSet[String]), (Long, HashSet[String])) => (Long, HashSet[String])= {
    case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long, set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2), set1 ++set2 )

  }

  def invReduceWithHashSet: ((Long, HashSet[String]), (Long, HashSet[String])) => (Long, HashSet[String])= {
    case ((timeStamp1: Long, set1: HashSet[String]), (timeStamp2: Long, set2: HashSet[String])) => (Math.max(timeStamp1, timeStamp2), set1.diff(set2))
  }

  def filterFuncForInvReduce: ((String, (Long, HashSet[String]))) => (Boolean)= {
    case ((metricName:String, (timeStamp: Long, set: HashSet[String]))) => set.size>0
  }