Exception on Avro Schema Object Serialization

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Exception on Avro Schema Object Serialization

Artemis User

We tried to standardize the SQL data source management using the Avro schema, but encountered some serialization exceptions when trying to use the data.  The interesting part is that we didn't have any problems in reading the Avro schema JSON file and converting the Avro schema into a SQL StructType, then use it to create a data frame in subsequent data source load operation.  The problem occurred when later using the data frame with some lambda functions. 

I am a little confuse in the sense that why the lamba function still complains about Avro Schema Record not serializable even after the the data frame is already created?  Because after the Avro schema is converted to StructType, which is used by the load function of DataFrameReader, there shouldn't be any reference to the Avro schema at all.  Any hints and suggestions are highly appreciated. 

-- ND

org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) at org.apache.spark.SparkContext.clean(SparkContext.scala:2362) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:886) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:388) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:885) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:723) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:316) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:382) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627) at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2940) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at org.apache.spark.sql.Dataset.collect(Dataset.scala:2940) at determineAnomalies(<console>:138) ... 60 elided Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema Serialization stack: - object not serializable (class: org.apache.avro.Schema$RecordSchema, value: {"type":"record","name":"Swat_physical_Feb_2021","namespace":"Swat_physical_Feb_2021","fields":[{"name":"Timestamp","type":"string"},{"name":"FIT101","type":"double"},{"name":"LIT101","type":"double"},{"name":"MV101","type":"int"},{"name":"P101","type":"int"},{"name":"P102","type":"int"},{"name":"AIT201","type":"double"},{"name":"AIT202","type":"double"},{"name":"AIT203","type":"double"},{"name":"FIT201","type":"double"},{"name":"MV201","type":"int"},{"name":"P201","type":"int"},{"name":"P202","type":"int"},{"name":"P203","type":"int"},{"name":"P204","type":"int"},{"name":"P205","type":"int"},{"name":"P206","type":"int"},{"name":"DPIT301","type":"double"},{"name":"FIT301","type":"double"},{"name":"LIT301","type":"double"},{"name":"MV301","type":"int"},{"name":"MV302","type":"int"},{"name":"MV303","type":"int"},{"name":"MV304","type":"int"},{"name":"P301","type":"int"},{"name":"P302","type":"int"},{"name":"AIT401","type":"double"},{"name":"AIT402","type":"double"},{"name":"FIT401","type":"double"},{"name":"LIT401","type":"double"},{"name":"P401","type":"int"},{"name":"P402","type":"int"},{"name":"P403","type":"int"},{"name":"P404","type":"int"},{"name":"UV401","type":"int"},{"name":"AIT501","type":"double"},{"name":"AIT502","type":"double"},{"name":"AIT503","type":"double"},{"name":"AIT504","type":"double"},{"name":"FIT501","type":"double"},{"name":"FIT502","type":"double"},{"name":"FIT503","type":"double"},{"name":"FIT504","type":"double"},{"name":"P501","type":"int"},{"name":"P502","type":"int"},{"name":"PIT501","type":"double"},{"name":"PIT502","type":"double"},{"name":"PIT503","type":"double"},{"name":"FIT601","type":"double"},{"name":"P601","type":"int"},{"name":"P602","type":"int"},{"name":"P603","type":"int"},{"name":"Normal_Attack","type":"string"}]}) - field (class: $iw, name: schemaObj, type: class org.apache.avro.Schema) - object (class $iw, $iw@5d09dadc) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@102d60fd) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@cec4fce) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@493fbce0) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@18a15a09) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@163ad83a) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@7b8c10cc) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@4f51c2ed) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@71c5d6e2) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@411c804f) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@23bbcdc4) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@33733309) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@6332bc47) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@60867362) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@37b75cb5) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@421a5931) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@72cda75a) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@4538e79a) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@12be42d3) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@7aabcd55) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@387078b) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@58e180ea) - field (class: $line153036384055.$read, name: $iw, type: class $iw) - object (class $line153036384055.$read, $line153036384055.$read@448f7b5b) - field (class: $iw, name: $line153036384055$read, type: class $line153036384055.$read) - object (class $iw, $iw@2ef8d075) - field (class: $iw, name: $outer, type: class $iw) - object (class $iw, $iw@1a3f2f26) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 2) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$determineAnomalies$1$adapted:(L$iw;Lorg/apache/spark/ml/clustering/KMeansModel;Lorg/apache/spark/sql/Row;)Ljava/lang/Object;, instantiatedMethodType=(Lorg/apache/spark/sql/Row;)Ljava/lang/Object;, numCaptured=2]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class $Lambda$4615/190536174, $Lambda$4615/190536174@130972) - element of array (index: 1) - array (class [Ljava.lang.Object;, size 2) - element of array (index: 1) - array (class [Ljava.lang.Object;, size 3) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=3]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2749/1641575129, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2749/1641575129@79194969) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413) ... 87 more

Reply | Threaded
Open this post in threaded view
|

Re: Exception on Avro Schema Object Serialization

srowen
Your function is somehow capturing the actual Avro schema object, which won't seiralize. Try rewriting it to ensure that it isn't used in the function.

On Tue, Feb 2, 2021 at 2:32 PM Artemis User <[hidden email]> wrote:

We tried to standardize the SQL data source management using the Avro schema, but encountered some serialization exceptions when trying to use the data.  The interesting part is that we didn't have any problems in reading the Avro schema JSON file and converting the Avro schema into a SQL StructType, then use it to create a data frame in subsequent data source load operation.  The problem occurred when later using the data frame with some lambda functions. 

I am a little confuse in the sense that why the lamba function still complains about Avro Schema Record not serializable even after the the data frame is already created?  Because after the Avro schema is converted to StructType, which is used by the load function of DataFrameReader, there shouldn't be any reference to the Avro schema at all.  Any hints and suggestions are highly appreciated. 

-- ND

org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) at org.apache.spark.SparkContext.clean(SparkContext.scala:2362) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:886) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:388) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:885) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:723) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:316) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:382) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627) at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2940) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at org.apache.spark.sql.Dataset.collect(Dataset.scala:2940) at determineAnomalies(<console>:138) ... 60 elided Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema Serialization stack: - object not serializable (class: org.apache.avro.Schema$RecordSchema, value: {"type":"record","name":"Swat_physical_Feb_2021","namespace":"Swat_physical_Feb_2021","fields":[{"name":"Timestamp","type":"string"},{"name":"FIT101","type":"double"},{"name":"LIT101","type":"double"},{"name":"MV101","type":"int"},{"name":"P101","type":"int"},{"name":"P102","type":"int"},{"name":"AIT201","type":"double"},{"name":"AIT202","type":"double"},{"name":"AIT203","type":"double"},{"name":"FIT201","type":"double"},{"name":"MV201","type":"int"},{"name":"P201","type":"int"},{"name":"P202","type":"int"},{"name":"P203","type":"int"},{"name":"P204","type":"int"},{"name":"P205","type":"int"},{"name":"P206","type":"int"},{"name":"DPIT301","type":"double"},{"name":"FIT301","type":"double"},{"name":"LIT301","type":"double"},{"name":"MV301","type":"int"},{"name":"MV302","type":"int"},{"name":"MV303","type":"int"},{"name":"MV304","type":"int"},{"name":"P301","type":"int"},{"name":"P302","type":"int"},{"name":"AIT401","type":"double"},{"name":"AIT402","type":"double"},{"name":"FIT401","type":"double"},{"name":"LIT401","type":"double"},{"name":"P401","type":"int"},{"name":"P402","type":"int"},{"name":"P403","type":"int"},{"name":"P404","type":"int"},{"name":"UV401","type":"int"},{"name":"AIT501","type":"double"},{"name":"AIT502","type":"double"},{"name":"AIT503","type":"double"},{"name":"AIT504","type":"double"},{"name":"FIT501","type":"double"},{"name":"FIT502","type":"double"},{"name":"FIT503","type":"double"},{"name":"FIT504","type":"double"},{"name":"P501","type":"int"},{"name":"P502","type":"int"},{"name":"PIT501","type":"double"},{"name":"PIT502","type":"double"},{"name":"PIT503","type":"double"},{"name":"FIT601","type":"double"},{"name":"P601","type":"int"},{"name":"P602","type":"int"},{"name":"P603","type":"int"},{"name":"Normal_Attack","type":"string"}]}) - field (class: $iw, name: schemaObj, type: class org.apache.avro.Schema) - object (class $iw, $iw@5d09dadc) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@102d60fd) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@cec4fce) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@493fbce0) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@18a15a09) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@163ad83a) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@7b8c10cc) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@4f51c2ed) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@71c5d6e2) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@411c804f) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@23bbcdc4) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@33733309) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@6332bc47) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@60867362) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@37b75cb5) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@421a5931) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@72cda75a) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@4538e79a) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@12be42d3) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@7aabcd55) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@387078b) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@58e180ea) - field (class: $line153036384055.$read, name: $iw, type: class $iw) - object (class $line153036384055.$read, $line153036384055.$read@448f7b5b) - field (class: $iw, name: $line153036384055$read, type: class $line153036384055.$read) - object (class $iw, $iw@2ef8d075) - field (class: $iw, name: $outer, type: class $iw) - object (class $iw, $iw@1a3f2f26) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 2) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$determineAnomalies$1$adapted:(L$iw;Lorg/apache/spark/ml/clustering/KMeansModel;Lorg/apache/spark/sql/Row;)Ljava/lang/Object;, instantiatedMethodType=(Lorg/apache/spark/sql/Row;)Ljava/lang/Object;, numCaptured=2]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class $Lambda$4615/190536174, $Lambda$4615/190536174@130972) - element of array (index: 1) - array (class [Ljava.lang.Object;, size 2) - element of array (index: 1) - array (class [Ljava.lang.Object;, size 3) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=3]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2749/1641575129, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2749/1641575129@79194969) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413) ... 87 more

Reply | Threaded
Open this post in threaded view
|

Re: Exception on Avro Schema Object Serialization

Artemis User

Thanks Sean.  But the real issue is on the surface, my UDF should have no knowledge of the Avro schema at all!  Here are the high-level steps of what happened:

  1. read the JSON to create a Avro Schema object  -- OK
  2. Convert the Avro schema to a StructType using the Spark's SchemaConverter -- OK
  3. Create a data frame by using the StructType as the schema when loading a csv file -- OK
  4. Doing other SQL operations with the data frame -- OK
  5. Use the data frame in a UDF -- exception occurred.

We didn't have any exception when manually creating the struct type in step 2.  In other words, would the data frame, when created using an Avro-converted StructType, still keeps a reference to the original Avro schema object? 

-- ND

On 2/2/21 3:36 PM, Sean Owen wrote:
Your function is somehow capturing the actual Avro schema object, which won't seiralize. Try rewriting it to ensure that it isn't used in the function.

On Tue, Feb 2, 2021 at 2:32 PM Artemis User <[hidden email]> wrote:

We tried to standardize the SQL data source management using the Avro schema, but encountered some serialization exceptions when trying to use the data.  The interesting part is that we didn't have any problems in reading the Avro schema JSON file and converting the Avro schema into a SQL StructType, then use it to create a data frame in subsequent data source load operation.  The problem occurred when later using the data frame with some lambda functions. 

I am a little confuse in the sense that why the lamba function still complains about Avro Schema Record not serializable even after the the data frame is already created?  Because after the Avro schema is converted to StructType, which is used by the load function of DataFrameReader, there shouldn't be any reference to the Avro schema at all.  Any hints and suggestions are highly appreciated. 

-- ND

org.apache.spark.SparkException: Task not serializable at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:416) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:406) at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:162) at org.apache.spark.SparkContext.clean(SparkContext.scala:2362) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$1(RDD.scala:886) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:388) at org.apache.spark.rdd.RDD.mapPartitionsWithIndex(RDD.scala:885) at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:723) at org.apache.spark.sql.execution.SparkPlan.$anonfun$execute$1(SparkPlan.scala:175) at org.apache.spark.sql.execution.SparkPlan.$anonfun$executeQuery$1(SparkPlan.scala:213) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:210) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:171) at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:316) at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:382) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3627) at org.apache.spark.sql.Dataset.$anonfun$collect$1(Dataset.scala:2940) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3618) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:100) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:160) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:87) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:764) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3616) at org.apache.spark.sql.Dataset.collect(Dataset.scala:2940) at determineAnomalies(<console>:138) ... 60 elided Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema Serialization stack: - object not serializable (class: org.apache.avro.Schema$RecordSchema, value: {"type":"record","name":"Swat_physical_Feb_2021","namespace":"Swat_physical_Feb_2021","fields":[{"name":"Timestamp","type":"string"},{"name":"FIT101","type":"double"},{"name":"LIT101","type":"double"},{"name":"MV101","type":"int"},{"name":"P101","type":"int"},{"name":"P102","type":"int"},{"name":"AIT201","type":"double"},{"name":"AIT202","type":"double"},{"name":"AIT203","type":"double"},{"name":"FIT201","type":"double"},{"name":"MV201","type":"int"},{"name":"P201","type":"int"},{"name":"P202","type":"int"},{"name":"P203","type":"int"},{"name":"P204","type":"int"},{"name":"P205","type":"int"},{"name":"P206","type":"int"},{"name":"DPIT301","type":"double"},{"name":"FIT301","type":"double"},{"name":"LIT301","type":"double"},{"name":"MV301","type":"int"},{"name":"MV302","type":"int"},{"name":"MV303","type":"int"},{"name":"MV304","type":"int"},{"name":"P301","type":"int"},{"name":"P302","type":"int"},{"name":"AIT401","type":"double"},{"name":"AIT402","type":"double"},{"name":"FIT401","type":"double"},{"name":"LIT401","type":"double"},{"name":"P401","type":"int"},{"name":"P402","type":"int"},{"name":"P403","type":"int"},{"name":"P404","type":"int"},{"name":"UV401","type":"int"},{"name":"AIT501","type":"double"},{"name":"AIT502","type":"double"},{"name":"AIT503","type":"double"},{"name":"AIT504","type":"double"},{"name":"FIT501","type":"double"},{"name":"FIT502","type":"double"},{"name":"FIT503","type":"double"},{"name":"FIT504","type":"double"},{"name":"P501","type":"int"},{"name":"P502","type":"int"},{"name":"PIT501","type":"double"},{"name":"PIT502","type":"double"},{"name":"PIT503","type":"double"},{"name":"FIT601","type":"double"},{"name":"P601","type":"int"},{"name":"P602","type":"int"},{"name":"P603","type":"int"},{"name":"Normal_Attack","type":"string"}]}) - field (class: $iw, name: schemaObj, type: class org.apache.avro.Schema) - object (class $iw, $iw@5d09dadc) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@102d60fd) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@cec4fce) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@493fbce0) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@18a15a09) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@163ad83a) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@7b8c10cc) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@4f51c2ed) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@71c5d6e2) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@411c804f) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@23bbcdc4) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@33733309) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@6332bc47) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@60867362) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@37b75cb5) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@421a5931) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@72cda75a) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@4538e79a) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@12be42d3) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@7aabcd55) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@387078b) - field (class: $iw, name: $iw, type: class $iw) - object (class $iw, $iw@58e180ea) - field (class: $line153036384055.$read, name: $iw, type: class $iw) - object (class $line153036384055.$read, $line153036384055.$read@448f7b5b) - field (class: $iw, name: $line153036384055$read, type: class $line153036384055.$read) - object (class $iw, $iw@2ef8d075) - field (class: $iw, name: $outer, type: class $iw) - object (class $iw, $iw@1a3f2f26) - element of array (index: 0) - array (class [Ljava.lang.Object;, size 2) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class $iw, functionalInterfaceMethod=scala/Function1.apply:(Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic $anonfun$determineAnomalies$1$adapted:(L$iw;Lorg/apache/spark/ml/clustering/KMeansModel;Lorg/apache/spark/sql/Row;)Ljava/lang/Object;, instantiatedMethodType=(Lorg/apache/spark/sql/Row;)Ljava/lang/Object;, numCaptured=2]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class $Lambda$4615/190536174, $Lambda$4615/190536174@130972) - element of array (index: 1) - array (class [Ljava.lang.Object;, size 2) - element of array (index: 1) - array (class [Ljava.lang.Object;, size 3) - field (class: java.lang.invoke.SerializedLambda, name: capturedArgs, type: class [Ljava.lang.Object;) - object (class java.lang.invoke.SerializedLambda, SerializedLambda[capturingClass=class org.apache.spark.sql.execution.WholeStageCodegenExec, functionalInterfaceMethod=scala/Function2.apply:(Ljava/lang/Object;Ljava/lang/Object;)Ljava/lang/Object;, implementation=invokeStatic org/apache/spark/sql/execution/WholeStageCodegenExec.$anonfun$doExecute$4$adapted:(Lorg/apache/spark/sql/catalyst/expressions/codegen/CodeAndComment;[Ljava/lang/Object;Lorg/apache/spark/sql/execution/metric/SQLMetric;Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, instantiatedMethodType=(Ljava/lang/Object;Lscala/collection/Iterator;)Lscala/collection/Iterator;, numCaptured=3]) - writeReplace data (class: java.lang.invoke.SerializedLambda) - object (class org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2749/1641575129, org.apache.spark.sql.execution.WholeStageCodegenExec$$Lambda$2749/1641575129@79194969) at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:41) at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:47) at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:101) at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:413) ... 87 more