java spark udf error

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

java spark udf error

崔苗
Hi,
I define a udf to mark the empty string in java like that:

 public class MarkUnknown implements UDF2<String,String,String> {
    @Override
    public String call(String processor,String fillContent){
        if(processor.trim().equals("")){
            logger.info("find empty string");
            return fillContent;
        }
        else{
            return processor;
        }
    }
}
and register by sparkSession: 
spark.udf().register("markUnknown",markUnknown,StringType);

but when I use the udf in sql : "select markUnknown(useId,'unknown') FROM table", I got a exception:

Exception in thread "main" java.lang.ClassCastException: org.apache.spark.sql.UDFRegistration$$anonfun$27 cannot be cast to scala.Function2
        at org.apache.spark.sql.catalyst.expressions.ScalaUDF.<init>(ScalaUDF.scala:97)
        at org.apache.spark.sql.UDFRegistration$$anonfun$register$26.apply(UDFRegistration.scala:515)
        at org.apache.spark.sql.UDFRegistration$$anonfun$register$26.apply(UDFRegistration.scala:515)
        at org.apache.spark.sql.catalyst.analysis.SimpleFunctionRegistry.lookupFunction(FunctionRegistry.scala:91)
        at org.apache.spark.sql.catalyst.catalog.SessionCatalog.lookupFunction(SessionCatalog.scala:1165)
        at org.apache.spark.sql.hive.HiveSessionCatalog.org$apache$spark$sql$hive$HiveSessionCatalog$$super$lookupFunction(HiveSessionCatalog.scala:129)
        at org.apache.spark.sql.hive.HiveSessionCatalog$$anonfun$3.apply(HiveSessionCatalog.scala:129)
        at org.apache.spark.sql.hive.HiveSessionCatalog$$anonfun$3.apply(HiveSessionCatalog.scala:129)
        at scala.util.Try$.apply(Try.scala:192)
        at org.apache.spark.sql.hive.HiveSessionCatalog.lookupFunction0(HiveSessionCatalog.scala:129)
        at org.apache.spark.sql.hive.HiveSessionCatalog.lookupFunction(HiveSessionCatalog.scala:122)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16$$anonfun$applyOrElse$7$$anonfun$applyOrElse$53.apply(Analyzer.scala:1189)
        at org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveFunctions$$anonfun$apply$16$$anonfun$applyOrElse$7$$anonfun$applyOrElse$53.apply(Analyzer.scala:1189)

I replaced the String "unknown" with other column : "select markUnknown(useId,companyId) FROM table" , still got the same exception.
so how to define the udf in java?


thanks for any reply