[Spark SQL & Core]: RDD to Dataset 1500 columns data with createDataFrame() throw exception of grows beyond 64 KB

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

[Spark SQL & Core]: RDD to Dataset 1500 columns data with createDataFrame() throw exception of grows beyond 64 KB

elevy
This post has NOT been accepted by the mailing list yet.
This post was updated on .
Hello all,
I am using the Spark 2.1.0 release,
I am trying to load BigTable CSV file with more than 1500 columns into our system

Our flow:

•   First, read the data as an RDD <BR>
•   generate continuing record id using the zipWithIndex()
    (this operation exist only in RDD API,
     in the Dataset there is similar option which is monotonically_increasing_id()
     but this method work in partitioning and create id which is not sequentially – and it is not what we need ☹)
•   converting the RDD to Dataset using the createDataFrame() of sparkSession
•   this last operation generate code that exceeded the JVM method size limitation of 64KB

I search the web for some solution or even similar Use Case,
found few issues that talked about the 64KB error but all of the cases was dealing with 100 column and solved in Spark 2.1.0 version by shrinking the generated code,
but none of them reach the JVM limitation
 
Any Idea from this forum of expert will be very appreciated
there could be 2 type of solution we are looking for:
1. How should I overcome the size of the code generation
OR 
2. How can I generate sequential ID directly on the Dataset

Our Temporal Solution:

•   reading the DS as RDD
•   generate sequential id
•   write the new data as text file
•   reading the data as Dataset
this solution cause us 30% of performance degradation :(

Code That reproduced the issue

import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import poc.commons.SparkSessionInitializer;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;

public class RDDConverter {
    private static final int FIELD_COUNT = 1900;

    private Dataset<Row> createBigSchema(SparkSession sparkSession , int startColName, int fieldNumber) {
        JavaSparkContext jsc = new JavaSparkContext(sparkSession.sparkContext());
        SQLContext sqlContext = new SQLContext(sparkSession.sparkContext());

        String[] row = IntStream.range(startColName, fieldNumber).mapToObj(String::valueOf).toArray(String[]::new);
        List<String[]> data = Collections.singletonList(row);
        JavaRDD<Row> rdd = jsc.parallelize(data).map(RowFactory::create);

        StructField[] structFields = IntStream.range(startColName, fieldNumber)
                .mapToObj(i -> new StructField(String.valueOf(i), DataTypes.StringType, true, Metadata.empty()))
                .toArray(StructField[]::new);
        StructType schema = DataTypes.createStructType(structFields);

        Dataset<Row> dataSet = sqlContext.createDataFrame(rdd, schema);
        dataSet.show();
        return dataSet;
    }

    public static void main(String[] args) {
        SparkSessionInitializer sparkSessionInitializer = new SparkSessionInitializer();
        SparkSession sparkSession = sparkSessionInitializer.init();

        RDDConverter rddConverter = new RDDConverter();
        rddConverter.createBigSchema(sparkSession, 0, FIELD_COUNT);
    }
}


The Exception we are getting :

org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:893)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:950)
at org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:947)
at org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
... 39 common frames omitted
Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method "(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Ljava/lang/Object;)V" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection" grows beyond 64 KB
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: [Spark SQL & Core]: RDD to Dataset 1500 columns data with createDataFrame() throw exception of grows beyond 64 KB

Mihal M
This post has NOT been accepted by the mailing list yet.
There is plenty of similar issues - https://issues.apache.org/jira/browse/SPARK-16845

You might try nightly builds of 2.0.3 or 2.1.1 versions to see if it's fixed already.

In my case i'm getting
Code of method "(Lorg/apache/spark/sql/catalyst/InternalRow;)Z" of class "org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificPredicate" grows beyond 64 KB 

exception on dataset with 2000 columns and seems it's still not fixed.

The only work around i've found is to use RDDs instead of DataFrames.

Regards
Loading...