Current way of using functions.window with Java

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Current way of using functions.window with Java

Anton Puzanov
The code looks like this:

Column slidingWindow = functions.window(myDF.col("timestamp"), "24 hours", "1 seconds");
Dataset<Row> finalRes = myDF.groupBy(slidingWindow, myDF.col("user")).agg(functions.collect_set("purchase").as("purchases"));

As you can see in this usecase I have small steps and large window.
A code with same flavor caused the following error (which in my understanding is related to the creation of the Java code generation):
Caused by: org.spark_project.guava.util.concurrent.ExecutionError: java.lang.OutOfMemoryError: Java heap space
    at org.spark_project.guava.cache.LocalCache$Segment.get(LocalCache.java:2261)
    at org.spark_project.guava.cache.LocalCache.get(LocalCache.java:4000)
    at org.spark_project.guava.cache.LocalCache.getOrLoad(LocalCache.java:4004)
    at org.spark_project.guava.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:4874)
    at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.compile(CodeGenerator.scala:890)
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:357)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
    at org.apache.spark.sql.execution.exchange.ShuffleExchange.prepareShuffleDependency(ShuffleExchange.scala:85)
    at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:121)
    at org.apache.spark.sql.execution.exchange.ShuffleExchange$$anonfun$doExecute$1.apply(ShuffleExchange.scala:112)
    at     org.apache.spark.sql.catalyst.errors.package$.attachTree(package.scala:52)
    ... 77 more
Caused by: java.lang.OutOfMemoryError: Java heap space
    at java.util.HashMap.resize(HashMap.java:703)
    at java.util.HashMap.putVal(HashMap.java:628)
    at java.util.HashMap.putMapEntries(HashMap.java:514)
    at java.util.HashMap.putAll(HashMap.java:784)
    at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3073)
    at org.codehaus.janino.UnitCompiler.access$4900(UnitCompiler.java:206)
    at org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:2958)
    at org.codehaus.janino.UnitCompiler$8.visitLocalVariableDeclarationStatement(UnitCompiler.java:2926)
    at org.codehaus.janino.Java$LocalVariableDeclarationStatement.accept(Java.java:2974)
    at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2925)
    at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:3033)
    at org.codehaus.janino.UnitCompiler.access$4400(UnitCompiler.java:206)
    at org.codehaus.janino.UnitCompiler$8.visitSwitchStatement(UnitCompiler.java:2950)
    at org.codehaus.janino.UnitCompiler$8.visitSwitchStatement(UnitCompiler.java:2926)
    at org.codehaus.janino.Java$SwitchStatement.accept(Java.java:2866)
    at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2925)
    at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2982)
    at org.codehaus.janino.UnitCompiler.access$3800(UnitCompiler.java:206)
    at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:2944)
    at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:2926)
    at org.codehaus.janino.Java$Block.accept(Java.java:2471)
    at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2925)
    at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2999)
    at org.codehaus.janino.UnitCompiler.access$4000(UnitCompiler.java:206)
    at org.codehaus.janino.UnitCompiler$8.visitForStatement(UnitCompiler.java:2946)
    at org.codehaus.janino.UnitCompiler$8.visitForStatement(UnitCompiler.java:2926)
    at org.codehaus.janino.Java$ForStatement.accept(Java.java:2660)
    at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2925)
    at org.codehaus.janino.UnitCompiler.buildLocalVariableMap(UnitCompiler.java:2982)
    at org.codehaus.janino.UnitCompiler.access$3800(UnitCompiler.java:206)
    at org.codehaus.janino.UnitCompiler$8.visitBlock(UnitCompiler.java:2944)

When I run the code with a really small data and window=3 minutes, step= 1 seconds I get this error:
there are tens of thousands of lines like:
/* 30761 */ if (!expand_isNull6254) {
/* 30762 */ expand_isNull6253 = false; // resultCode could change nullability.
/* 30763 */ expand_value6253 = expand_value6254 * 1000000L;
and the error:
haus.janino.JaninoRuntimeException: Code of method "processNext()V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator" grows beyond 64 KB
also I see hundreds of "Code generated in 4.979587 ms" plus it is taking quite a lot time to compute although it has only 10 rows of data.


Am I doing something wrong? is it a bug? what is the right way to use this function?
Please relate to Java when answering.