StackOverflowError for simple map

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

StackOverflowError for simple map

Chris Olivier
Hi, I have been stuck at this for a week.

I have a relatively simple dataframe like this:

+---------+---------+--------------------+-------------------+
|     item|  item_id|              target|              start|
+---------+---------+--------------------+-------------------+
|sensor123|sensor123|[0.005683, 0.0070...|2008-01-01 00:00:00|
|sensor249|sensor249|[0.009783, 0.0068...|2008-01-01 00:00:00|
|sensor379|sensor379|[0.001917, 0.0016...|2008-01-01 00:00:00|
| sensor52| sensor52|[0.016267, 0.0121...|2008-01-01 00:00:00|

target us a WrappedArray[Double]

This simple code runs on local spark but has stack overflow error on EMR Spark.  I've tried playing with paritioning with no effect.

```scala
def transform(dataset: Dataset[_]): DataFrame = {
var ds:Dataset[_] = dataset.
var df:DataFrame = ds.toDF

val sparkSession = dataset.sparkSession
import sparkSession.implicits._

val itemIndex:Int = SparkJavaUtils.getColumnIndex(DataSources.FIELD_ITEM, df)
val startIndex:Int = SparkJavaUtils.getColumnIndex(DataSources.FIELD_START, df)
val targetIndex:Int = SparkJavaUtils.getColumnIndex(DataSources.FIELD_TARGET, df)

val result = df.map(r => {
val itemName = r.getAs[String](itemIndex)
val start = r.getAs[Timestamp](startIndex)
val targetArray = r.getAs[mutable.WrappedArray[Double]](targetIndex)
(itemName, start, targetArray)
})
result.show

// Get the schema ready
val schema = StructType(
Seq(
StructField(DataSources.FIELD_ITEM, StringType, true, Metadata.empty),
StructField(DataSources.FIELD_START, TimestampType, true, Metadata.empty),
StructField(DataSources.FIELD_TARGET, DataTypes.createArrayType(DoubleType), true, Metadata.empty)
)
)

var nn = dataset.sqlContext.createDataFrame(result.toDF.rdd, schema)
nn.show
nn
}

```

Error is like:

Exception in thread "main" java.lang.StackOverflowError at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:65) at scala.StringContext.standardInterpolator(StringContext.scala:123) at scala.StringContext.s(StringContext.scala:95) at org.apache.spark.sql.catalyst.expressions.codegen.CodegenContext.freshName(CodeGenerator.scala:565) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:105) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104) at org.apache.spark.sql.execution.CodegenSupport$$anonfun$4.apply(WholeStageCodegenExec.scala:153) at org.apache.spark.sql.execution.CodegenSupport$$anonfun$4.apply(WholeStageCodegenExec.scala:152) at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418) at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223) at scala.collection.immutable.Stream.drop(Stream.scala:858) at scala.collection.immutable.Stream.drop(Stream.scala:202) at scala.collection.LinearSeqOptimized$class.apply(LinearSeqOptimized.scala:64) at scala.collection.immutable.Stream.apply(Stream.scala:202) at org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:62) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104) at org.apache.spark.sql.execution.CodegenSupport$$anonfun$4.apply(WholeStageCodegenExec.scala:153) at org.apache.spark.sql.execution.CodegenSupport$$anonfun$4.apply(WholeStageCodegenExec.scala:152) at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418) at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1233) at scala.collection.immutable.Stream$Cons.tail(Stream.scala:1223) at scala.collection.immutable.Stream.drop(Stream.scala:858) at scala.collection.immutable.Stream.drop(Stream.scala:202) at scala.collection.LinearSeqOptimized$class.apply(LinearSeqOptimized.scala:64) at scala.collection.immutable.Stream.apply(Stream.scala:202) at org.apache.spark.sql.catalyst.expressions.BoundReference.doGenCode(BoundAttribute.scala:62) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:107) at org.apache.spark.sql.catalyst.expressions.Expression$$anonfun$genCode$2.apply(Expression.scala:104) at scala.Option.getOrElse(Option.scala:121) at org.apache.spark.sql.catalyst.expressions.Expression.genCode(Expression.scala:104) at org.apache.spark.sql.execution.CodegenSupport$$anonfun$4.apply(WholeStageCodegenExec.scala:153) at org.apache.spark.sql.execution.CodegenSupport$$anonfun$4.apply(WholeStageCodegenExec.scala:152) at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418) at scala.collection.immutable.Stream$$anonfun$map$1.apply(Stream.scala:418)

-Chris