spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From elevy <elev...@gmail.com>
Subject [Spark SQL & Core]: RDD to Dataset 1500 columns data with createDataFrame() throw exception of grows beyond 64 KB
Date Sat, 18 Mar 2017 08:13:27 GMT
Hello all, 
I am using the Spark 2.1.0 release,
I am trying to load BigTable CSV file with more than 1500 columns into our
system 

Our flow of doing that is:

•   First, read the data as an RDD <BR>
•   generate continuing record id using the zipWithIndex()
    (this operation exist only in RDD API, 
     in the Dataset there is similar option which is
monotonically_increasing_id() 
     but this method work in partitioning and create id which is not
sequentially – and it is not what we need ☹)
•   converting the RDD to Dataset using the createDataFrame() of
sparkSession 
•   this last operation generate code that exceeded the JVM object size
limitation of 64KB 

I search the web for some solution or even similar Use Case, 
found few issues that talked about the 64KB error but all of the cases was
dealing with 100 column and solved in Spark 2.1.0 version by shrinking the
generated code, 
but none of them reach the JVM limitation 
 
*Any Idea from this forum of expert will be very appreciated *
there could be 2 type of solution we are looking for:
*1.* How should I overcome the size of the code generation 
*OR* 
*2.* How can I generate sequential ID directly on the Dataset

Our Temporal Solution:

•   reading the DS as RDD
•   generate sequential id 
•   write the new data as text file 
•   reading the data as Dataset
this solution cause us 30% of performance degradation :(

*Code That reproduced the issue *

/import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.*;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.Metadata;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;
import poc.commons.SparkSessionInitializer;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.stream.IntStream;

public class RDDConverter {
    private static final int FIELD_COUNT = 1900;

    private Dataset<Row> createBigSchema(SparkSession sparkSession , int
startColName, int fieldNumber) {
        JavaSparkContext jsc = new
JavaSparkContext(sparkSession.sparkContext());
        SQLContext sqlContext = new SQLContext(sparkSession.sparkContext());

        String[] row = IntStream.range(startColName,
fieldNumber).mapToObj(String::valueOf).toArray(String[]::new);
        List<String[]> data = Collections.singletonList(row);
        JavaRDD<Row> rdd = jsc.parallelize(data).map(RowFactory::create);

        StructField[] structFields = IntStream.range(startColName,
fieldNumber)
                .mapToObj(i -> new StructField(String.valueOf(i),
DataTypes.StringType, true, Metadata.empty()))
                .toArray(StructField[]::new);
        StructType schema = DataTypes.createStructType(structFields);

        Dataset<Row> dataSet = sqlContext.createDataFrame(rdd, schema);
        dataSet.show();
        return dataSet;
    }

    public static void main(String[] args) {
        SparkSessionInitializer sparkSessionInitializer = new
SparkSessionInitializer();
        SparkSession sparkSession = sparkSessionInitializer.init();

        RDDConverter rddConverter = new RDDConverter();
        rddConverter.createBigSchema(sparkSession, 0, FIELD_COUNT);
    }
}/

The Exception we are getting :

org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$.org$apache$spark$sql$catalyst$expressions$codegen$CodeGenerator$$doCompile(CodeGenerator.scala:893)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:950)
at
org.apache.spark.sql.catalyst.expressions.codegen.CodeGenerator$$anon$1.load(CodeGenerator.scala:947)
at
org.spark_project.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
at
org.spark_project.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
... 39 common frames omitted
*Caused by: org.codehaus.janino.JaninoRuntimeException: Code of method
"(Lorg/apache/spark/sql/catalyst/expressions/GeneratedClass;[Ljava/lang/Object;)V"
of class
"org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection"
grows beyond 64 KB*
at org.codehaus.janino.CodeContext.makeSpace(CodeContext.java:941)




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-SQL-Core-RDD-to-Dataset-1500-columns-data-with-createDataFrame-throw-exception-of-grows-beyondB-tp28509.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message