java.io.NotSerializableException: org.apache.spark.sql.TypedColumn

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

java.io.NotSerializableException: org.apache.spark.sql.TypedColumn

zzcclp
Hi dev:
  I am using Spark-Shell to run the example which is in section
'http://spark.apache.org/docs/2.2.2/sql-programming-guide.html#type-safe-user-defined-aggregate-functions',
and there is an error:

*Caused by: java.io.NotSerializableException:
org.apache.spark.sql.TypedColumn
Serialization stack:
        - object not serializable (class: org.apache.spark.sql.TypedColumn, value:
myaverage() AS `average_salary`)
        - field (class: $iw, name: averageSalary, type: class
org.apache.spark.sql.TypedColumn)
        - object (class $iw, $iw@4b2f8ae9)
        - field (class: MyAverage$, name: $outer, type: class $iw)
        - object (class MyAverage$, MyAverage$@2be41d90)
        - field (class:
org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression,
name: aggregator, type: class org.apache.spark.sql.expressions.Aggregator)
        - object (class
org.apache.spark.sql.execution.aggregate.ComplexTypedAggregateExpression,
MyAverage(Employee))
        - field (class:
org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression,
name: aggregateFunction, type: class
org.apache.spark.sql.catalyst.expressions.aggregate.AggregateFunction)
        - object (class
org.apache.spark.sql.catalyst.expressions.aggregate.AggregateExpression,
partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class Employee)),
Some(class Employee), Some(StructType(StructField(name,StringType,true),
StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0,
Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0,
Average, true])).count AS count#26L, newInstance(class Average), input[0,
double, false] AS value#24, DoubleType, false, 0, 0))
        - writeObject data (class:
scala.collection.immutable.List$SerializationProxy)
        - object (class scala.collection.immutable.List$SerializationProxy,
scala.collection.immutable.List$SerializationProxy@5e92c46f)
        - writeReplace data (class:
scala.collection.immutable.List$SerializationProxy)
        - object (class scala.collection.immutable.$colon$colon,
List(partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class
Employee)), Some(class Employee),
Some(StructType(StructField(name,StringType,true),
StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0,
Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0,
Average, true])).count AS count#26L, newInstance(class Average), input[0,
double, false] AS value#24, DoubleType, false, 0, 0)))
        - field (class:
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec, name:
aggregateExpressions, type: interface scala.collection.Seq)
        - object (class
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec,
ObjectHashAggregate(keys=[],
functions=[partial_myaverage(MyAverage$@2be41d90, Some(newInstance(class
Employee)), Some(class Employee),
Some(StructType(StructField(name,StringType,true),
StructField(salary,LongType,false))), assertnotnull(assertnotnull(input[0,
Average, true])).sum AS sum#25L, assertnotnull(assertnotnull(input[0,
Average, true])).count AS count#26L, newInstance(class Average), input[0,
double, false] AS value#24, DoubleType, false, 0, 0)], output=[buf#37])
+- *FileScan json [name#8,salary#9L] Batched: false, Format: JSON, Location:
InMemoryFileIndex[file:/opt/spark2/examples/src/main/resources/employees.json],
PartitionFilters: [], PushedFilters: [], ReadSchema:
struct<name:string,salary:bigint>
)
        - field (class:
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1,
name: $outer, type: class
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec)
        - object (class
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1,
<function0>)
        - field (class:
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2,
name: $outer, type: class
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1)
        - object (class
org.apache.spark.sql.execution.aggregate.ObjectHashAggregateExec$$anonfun$doExecute$1$$anonfun$2,
<function1>)
        - field (class: org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1,
name: f$23, type: interface scala.Function1)
        - object (class org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1,
<function0>)
        - field (class:
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25,
name: $outer, type: class
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1)
        - object (class
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25,
<function3>)
        - field (class: org.apache.spark.rdd.MapPartitionsRDD, name: f, type:
interface scala.Function3)
        - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[9]
at show at <console>:62)
        - field (class: org.apache.spark.NarrowDependency, name: _rdd, type: class
org.apache.spark.rdd.RDD)
        - object (class org.apache.spark.OneToOneDependency,
org.apache.spark.OneToOneDependency@5bb7895)
        - writeObject data (class:
scala.collection.immutable.List$SerializationProxy)
        - object (class scala.collection.immutable.List$SerializationProxy,
scala.collection.immutable.List$SerializationProxy@6e81dca3)
        - writeReplace data (class:
scala.collection.immutable.List$SerializationProxy)
        - object (class scala.collection.immutable.$colon$colon,
List(org.apache.spark.OneToOneDependency@5bb7895))
        - field (class: org.apache.spark.rdd.RDD, name:
org$apache$spark$rdd$RDD$$dependencies_, type: interface
scala.collection.Seq)
        - object (class org.apache.spark.rdd.MapPartitionsRDD, MapPartitionsRDD[10]
at show at <console>:62)
        - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
        - object (class scala.Tuple2, (MapPartitionsRDD[10] at show at
<console>:62,org.apache.spark.ShuffleDependency@421cd28))
  at
org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
*
  But if I use idea to run the example directly, it works. What is their
difference? How I run the example suucessfully on Spark-Shell?

  Thanks.



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]