Custom encoders and udf's

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Custom encoders and udf's

jelmer
Hi,

I am using a org.apache.spark.sql.Encoder to serialize a custom object.

I now want to pass this column to a udf so it can do some operations on it but this gives me the error :

Caused by: java.lang.ClassCastException: [B cannot be cast to

The code included at the problem demonstrates the issue. 

I know I can simply make Person a case class in this example but its for illustration purposes

Does anyone know how to solve this problem?




import com.holdenkarau.spark.testing.DatasetSuiteBase
import org.apache.spark.sql.{Encoder, Encoders}
import org.apache.spark.sql.functions._
import org.scalatest.FunSuite
import org.scalatest.Matchers._

class Person(val name: String) extends Serializable

class MySpec extends FunSuite with DatasetSuiteBase {

  test("udf test") {

    val sqlCtx = sqlContext
    import sqlCtx.implicits._

    val myUdf = udf { person: Person => person.name }

    implicit val personEncoder: Encoder[Person] =
      Encoders.javaSerialization[Person]

    implicit val partitionAndPersonEncoder: Encoder[(Int, Person)] =
      Encoders.tuple(Encoders.scalaInt, personEncoder)

    val input = sc.parallelize(Seq(
      1 -> new Person("jack"),
      2 -> new Person("jill")
    )).toDF("partition", "value")

    input.printSchema()

    input.select(myUdf($"value"))show()
  }

}