Register UDF duration runtime

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Register UDF duration runtime

杜斌
Hi ,
I meeting some issue when I try to read from some string coming from web service as an UDF string and register.

Here is the exception.

java.lang.ClassCastException: cannot assign instance of scala.collection.immutable.List$SerializationProxy to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of type scala.collection.Seq in instance of org.apache.spark.rdd.MapPartitionsRDD

at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)

at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2251)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)

at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)

at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)

at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)

at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)

at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)

at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)

at java.lang.reflect.Method.invoke(Method.java:498)

at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2136)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2245)

at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2169)

at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2027)

at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)

at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)

at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)

at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)

at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)

at org.apache.spark.scheduler.Task.run(Task.scala:108)

at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)

at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)

at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)

at java.lang.Thread.run(Thread.java:748)


Here is the some simple code.

package app

import org.apache.spark.sql.SparkSession
import util.{ScalaSourceCodeCompiler}

/**
*/
object Test {

def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("Test")
.master("local")
.getOrCreate()
spark.sparkContext.getConf.setJars(Seq("/Users/dubin/code/java/BatchPro/BatchProSpark/target/BatchProSpark-1.0-SNAPSHOT.jar"))

val logDataDF = spark.read.format("csv").option("header", "true").load("/Users/dubin/code/java/BatchPro/BatchProSpark/src/test/resources/cars.csv")
logDataDF.show()
logDataDF.registerTempTable("car")

val code =
"""
|(a: String) => { a.toUpperCase }
""".stripMargin


val userUDF = ScalaSourceCodeCompiler.compileCodeTest(code)
spark.udf.register("toU", userUDF)

val res = spark.sql("select toU(t) from car")
val r = res.toJSON.collect()

res.printSchema()
res.show()
spark.stop()

r.foreach(println(_))
}

}
and 

def compileCodeTest(code: String): String => String = {
Eval[String=> String](code)
}
I already try some ways, I suspect the exception is related to the Eval run time, But I am not sure. Can anyone help?

Thanks,
Du Bin
Reply | Threaded
Open this post in threaded view
|

Re: Register UDF duration runtime

amittonge
I am also facing same issue. If you found any solution please share the same
with me.
I can solve this issue by creating jar of runtime added UDF and call addJar
on spark context.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]