Using UDF compiled with Janino in Spark

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Using UDF compiled with Janino in Spark

smikesh
Hi all,

I am trying to compile my udf with janino copmpiler and then register
it in spark and use it afterwards. Here is the code:

String s = " \n" +
        "public class MyUDF<String, String> implements
org.apache.spark.sql.api.java.UDF1<String, String> {\n" +
        "            @Override\n" +
        "            public String call(String s) throws Exception {\n" +
        "                return s+\"dd\";\n" +
        "            }\n" +
        "        };";


ISimpleCompiler sc =
CompilerFactoryFactory.getDefaultCompilerFactory().newSimpleCompiler();
sc.cook(s);
UDF1 udf1 = (UDF1) sc.getClassLoader().loadClass("MyUDF").newInstance();

sparkSession.udf().register("MyUDF", udf1, DataTypes.StringType);

sparkSession.sql("select MyUDF(id) from deal").show();

The problem is, that during the execution I am getting the following exception:

15.12.2017 14:32:46 [ERROR]: Exception in task 0.0 in stage 5.0 (TID 5)
java.lang.ClassCastException: cannot assign instance of
scala.collection.immutable.List$SerializationProxy to field
org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_ of
type scala.collection.Seq in instance of
org.apache.spark.rdd.MapPartitionsRDD
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2237)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
15.12.2017 14:32:46 [ WARN]: Lost task 0.0 in stage 5.0 (TID 5,
localhost, executor driver): java.lang.ClassCastException: cannot
assign instance of scala.collection.immutable.List$SerializationProxy
to field org.apache.spark.rdd.RDD.org$apache$spark$rdd$RDD$$dependencies_
of type scala.collection.Seq in instance of
org.apache.spark.rdd.MapPartitionsRDD
at java.io.ObjectStreamClass$FieldReflector.setObjFieldValues(ObjectStreamClass.java:2133)
at java.io.ObjectStreamClass.setObjFieldValues(ObjectStreamClass.java:1305)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2237)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2231)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2155)
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2013)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1535)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:422)
at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:335)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

Any ideas what could go wrong?

Best,
Michael

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]