spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Han-Cheol Cho <>
Subject A question about inconsistency during dataframe creation with RDD/dict in PySpark
Date Wed, 01 Feb 2017 10:25:07 GMT
Dear spark user ml members,

I have quite messy input data so it is difficult to load them as a dataframe object
What I did is to load it as an RDD of strings first, convert it to an RDD of 
pyspark.sql.Row objects, then use toDF method as below.
    mydf =

I didn't expect any problem from this very simple code at first.

But, when I tested it with a bunch of data, I found that this approach fails with the
following exception.
    java.lang.IllegalStateException: Input row doesn't have expected number of values required
by the schema. 10 fields are required while 9 values are provided.                       
            at org.apache.spark.sql.execution.python.EvaluatePython$.fromJava(EvaluatePython.scala:147)
            at org.apache.spark.sql.SparkSession$$anonfun$7.apply(SparkSession.scala:665)
            at org.apache.spark.sql.SparkSession$$anonfun$7.apply(SparkSession.scala:665)
            at scala.collection.Iterator$$anon$   

This exception comes from the fact that some Row objects in RDD have missing fields.
For example, the following example fails with the same exception
    d1 = [Row(k1="value1.1", k2="value1.2")]                                             

    d2 = [Row(k1="value2.1")]                                                            

    rdd1 = spark.sparkContext.parallelize(d1)                                            
    rdd2 = spark.sparkContext.parallelize(d2)                                            

    urdd = rdd1.union(rdd2)                                             
        [Row(k1='value1.1', k2='value1.2'), Row(k1='value2.1')]                          

        DataFrame[k1: string, k2: string]                                                
        --> fail with the same exception

While digging into the code, I found that Row object raises an exception if
it does not have a given key as follows.
# spark/python/pyspark/sql/
def _verify_type(obj, dataType, nullable=True):
    elif isinstance(dataType, StructType):
        elif isinstance(obj, Row) and getattr(obj, "__from_dict__", False):
            # the order in obj could be different than dataType.fields
            for f in dataType.fields:
                _verify_type(obj[], f.dataType, f.nullable)
                       --> obj[] raise ValueError(item) exception if the key does
not exist.

I think that raising an exception in this situation is a reasonable approach.
However, if I use an RDD of dict objects, instead of Row objects, the convert process
succeed as follows by filling missing columns with null values.
    dict1 = [{"k1":"v1.1", "k2":"v1.2"}]
    dict2 = [{"k1":"v2.1"}]
    rdd1 = spark.sparkContext.parallelize(dict1)
    rdd2 = spark.sparkContext.parallelize(dict2)
        [{'k2': 'v1.2', 'k1': 'v1.1'}]
        [{'k1': 'v2.1'}]

    urdd = rdd1.union(rdd2)
        [{'k2': 'v1.2', 'k1': 'v1.1'}, {'k1': 'v2.1'}]

        |  k1|  k2|

        |  k1|  k2|

I am wonder whether this difference is an expected result or not.

Best wishes,

 Han-Cheol Cho  Data Laboratory   / Data Scientist <!-- <span id="deptLineBR"><br></span>
 --> 〒160-0022 東京都新宿区新宿6-27-30 新宿イーストサイドスクエア13階

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message