Custom encoders and udf's

Custom encoders and udf's


I am using a org.apache.spark.sql.Encoder to serialize a custom object.

I now want to pass this column to a udf so it can do some operations on it but this gives me the error :

Caused by: java.lang.ClassCastException: [B cannot be cast to

The code included at the problem demonstrates the issue. 

I know I can simply make Person a case class in this example but its for illustration purposes

Does anyone know how to solve this problem?

import com.holdenkarau.spark.testing.DatasetSuiteBase
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.functions._
import org.scalatest.FunSuite
import org.scalatest.Matchers._

class Person(val name: String) extends Serializable

class MySpec extends FunSuite with DatasetSuiteBase {

  test("udf test") {

    val sqlCtx = sqlContext
    import sqlCtx.implicits._

    val myUdf = udf { person: Person => }

    implicit val personEncoder: Encoder[Person] =

    implicit val partitionAndPersonEncoder: Encoder[(Int, Person)] =
      Encoders.tuple(Encoders.scalaInt, personEncoder)

    val input = sc.parallelize(Seq(
      1 -> new Person("jack"),
      2 -> new Person("jill")
    )).toDF("partition", "value")