Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint

alberskib
This post has NOT been accepted by the mailing list yet.
This post was updated on .
Hey all,

When my streaming application is restarting from failure (from checkpoint) I am receiving strange error:

java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration cannot be cast to com.example.sender.MyClassReporter.

Instance of B class is created on driver side (with proper config passed as constructor arg) and broadcasted to the executors in order to ensure that on each worker there will be only single instance. Everything is going well up to place where I am getting value of broadcasted field and executing function on it i.e.
broadcastedValue.value.send(...)

Below you can find definition of MyClassReporter (with trait):

trait Reporter{
  def send(name: String, value: String, timestamp: Long) : Unit
  def flush() : Unit
}

class MyClassReporter(config: MyClassConfig, flow: String) extends Reporter with Serializable {

  val prefix = s"${config.senderConfig.prefix}.$flow"

  var counter = 0

  @transient
  private lazy val sender : GraphiteSender = initialize()

  @transient
  private lazy val threadPool = ExecutionContext.fromExecutorService(Executors.newSingleThreadExecutor())

  private def initialize() = {
      val sender = new Sender(
        new InetSocketAddress(config.senderConfig.hostname, config.senderConfig.port)
      )
      sys.addShutdownHook{
        sender.close()
      }
      sender
  }

  override def send(name: String, value: String, timestamp: Long) : Unit = {
    threadPool.submit(new Runnable {
      override def run(): Unit = {
        try {
          counter += 1
          if (!sender.isConnected)
            sender.connect()
          sender.send(s"$prefix.$name", value, timestamp)
          if (counter % graphiteConfig.batchSize == 0)
            sender.flush()
        }catch {
          case NonFatal(e) => {
            println(s"Problem with sending metric to graphite $prefix.$name: $value at $timestamp: ${e.getMessage}", e)
            Try{sender.close()}.recover{
              case NonFatal(e) => println(s"Error closing graphite ${e.getMessage}", e)
            }
          }
        }
      }
    })
  }

Do you have any idea how I can solve this issue? Using broadcasted variable helps me keeping single socket open to the service on executor.

UPDATE: spark 1.5.1, java 8
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint

ganeshyadiyala
This post has NOT been accepted by the mailing list yet.
Hi,

Are you able to solve this issue? We are facing the same issue and appreciate if you can guide us through this.
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: Spark streaming: java.lang.ClassCastException: org.apache.spark.util.SerializableConfiguration ... on restart from checkpoint

dcam
This post has NOT been accepted by the mailing list yet.
Considering the @transient annotations and the work done in the instance initializer, not much state is really be broadcast to the executors. It might be simpler to just create these instances on the executors, rather than trying to broadcast them?
Loading...