spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Daniel Williams (JIRA)" <j...@apache.org>
Subject [jira] [Created] (SPARK-17670) Spark DataFrame/Dataset no longer supports Option[Map] in case classes
Date Mon, 26 Sep 2016 20:37:20 GMT
Daniel Williams created SPARK-17670:
---------------------------------------

             Summary: Spark DataFrame/Dataset no longer supports Option[Map] in case classes
                 Key: SPARK-17670
                 URL: https://issues.apache.org/jira/browse/SPARK-17670
             Project: Spark
          Issue Type: Bug
          Components: SQL
    Affects Versions: 2.0.0
            Reporter: Daniel Williams


Upon upgrading to Spark 2.0 I discovered that previously supported case classes containing
members of the type Option[Map] of any key/value binding, mutable or immutable, were no longer
supported and produced an exception similar to the following.  Upon further testing I also
noticed that Option was support for Seq, case classes, and primitives.  Validating unit tests
included using spark-testing-base.

{code}
org.apache.spark.sql.AnalysisException: cannot resolve 'wrapoption(staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData$,
ObjectType(interface scala.collection.Map), toScalaMap, mapobjects(MapObjects_loopValue32,
MapObjects_loopIsNull33, StringType, lambdavariable(MapObjects_loopValue32, MapObjects_loopIsNull33,
StringType).toString, cast(lambdavariable(MapObjects_loopValue30, MapObjects_loopIsNull31,
StructField(uuid,StringType,true), StructField(timestamp,TimestampType,true), StructField(sourceSystem,StringType,true),
StructField(input,MapType(StringType,StringType,true),true)).input as map<string,string>).keyArray).array,
mapobjects(MapObjects_loopValue34, MapObjects_loopIsNull35, StringType, lambdavariable(MapObjects_loopValue34,
MapObjects_loopIsNull35, StringType).toString, cast(lambdavariable(MapObjects_loopValue30,
MapObjects_loopIsNull31, StructField(uuid,StringType,true), StructField(timestamp,TimestampType,true),
StructField(sourceSystem,StringType,true), StructField(input,MapType(StringType,StringType,true),true)).input
as map<string,string>).valueArray).array, true), ObjectType(interface scala.collection.immutable.Map))'
due to data type mismatch: argument 1 requires scala.collection.immutable.Map type, however,
'staticinvoke(class org.apache.spark.sql.catalyst.util.ArrayBasedMapData$, ObjectType(interface
scala.collection.Map), toScalaMap, mapobjects(MapObjects_loopValue32, MapObjects_loopIsNull33,
StringType, lambdavariable(MapObjects_loopValue32, MapObjects_loopIsNull33, StringType).toString,
cast(lambdavariable(MapObjects_loopValue30, MapObjects_loopIsNull31, StructField(uuid,StringType,true),
StructField(timestamp,TimestampType,true), StructField(sourceSystem,StringType,true), StructField(input,MapType(StringType,StringType,true),true)).input
as map<string,string>).keyArray).array, mapobjects(MapObjects_loopValue34, MapObjects_loopIsNull35,
StringType, lambdavariable(MapObjects_loopValue34, MapObjects_loopIsNull35, StringType).toString,
cast(lambdavariable(MapObjects_loopValue30, MapObjects_loopIsNull31, StructField(uuid,StringType,true),
StructField(timestamp,TimestampType,true), StructField(sourceSystem,StringType,true), StructField(input,MapType(StringType,StringType,true),true)).input
as map<string,string>).valueArray).array, true)' is of scala.collection.Map type.;
at org.apache.spark.sql.catalyst.analysis.package$AnalysisErrorAt.failAnalysis(package.scala:42)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:82)
at org.apache.spark.sql.catalyst.analysis.CheckAnalysis$$anonfun$checkAnalysis$1$$anonfun$apply$2.applyOrElse(CheckAnalysis.scala:74)
at org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:301)
{code}

Unit tests:

{code}
import com.holdenkarau.spark.testing.{DataFrameSuiteBase, SharedSparkContext}
import org.scalatest.{Matchers, FunSuite}
import org.slf4j.LoggerFactory

import scala.util.{Failure, Try, Success}

case class ImmutableMapTest(data: Map[String, String])
case class MapTest(data: scala.collection.mutable.Map[String, String])
case class ImmtableWithOption(data: Option[Map[String, String]])
case class MutableWithOption(data: Option[scala.collection.mutable.Map[String, String]])
case class PrimWithOption(data: Option[String])
case class ArrayWithOption(data: Option[Seq[String]])

class TestOptionWithDataTypes
  extends FunSuite
    with Matchers
    with SharedSparkContext
    with DataFrameSuiteBase {

  val logger = LoggerFactory.getLogger(classOf[TestOptionWithDataTypes])


  test("test immutable map") {
    import sqlContext.implicits._
    val rdd = sc.parallelize(Seq(ImmutableMapTest(Map("1"->"2"))))

    val result = Try {
      rdd.toDF()
    } match {
      case Success(e) => Option(e)
      case Failure(e) => {
        logger.error(e.getMessage, e)
        None
      }
    }

    result should not be(None)
    result.get.count() should be(1)
    result.get.show()
  }

  test("test mutable Map") {
    import sqlContext.implicits._
    val rdd = sc.parallelize(Seq(MapTest(scala.collection.mutable.Map("1"->"2"))))

    val result = Try {
      rdd.toDF()
    } match {
      case Success(e) => Option(e)
      case Failure(e) => {
        logger.error(e.getMessage, e)
        None
      }
    }

    result should not be(None)
    result.get.count() should be(1)
    result.get.show()
  }

  test("test immutable option Map") {
    import sqlContext.implicits._
    val rdd = sc.parallelize(Seq(ImmtableWithOption(Option(Map("1"->"2")))))

    val result = Try {
      rdd.toDF()
    } match {
      case Success(e) => Option(e)
      case Failure(e) => {
        logger.error(e.getMessage, e)
        None
      }
    }

    result should not be(None)
    result.get.count() should be(1)
    result.get.show()
  }

  test("test mutable option Map") {
    import sqlContext.implicits._
    val rdd = sc.parallelize(Seq(MutableWithOption(Option(scala.collection.mutable.Map("1"->"2")))))

    val result = Try {
      rdd.toDF()
    } match {
      case Success(e) => Option(e)
      case Failure(e) => {
        logger.error(e.getMessage, e)
        None
      }
    }

    result should not be(None)
    result.get.count() should be(1)
    result.get.show()
  }

  test("test option with prim") {
    import sqlContext.implicits._
    val rdd = sc.parallelize(Seq(PrimWithOption(Option("foo"))))

    val result = Try {
      rdd.toDF()
    } match {
      case Success(e) => Option(e)
      case Failure(e) => {
        logger.error(e.getMessage, e)
        None
      }
    }

    result should not be(None)
    result.get.count() should be(1)
    result.get.show()
  }

  test("test option with array") {
    import sqlContext.implicits._
    val rdd = sc.parallelize(Seq(ArrayWithOption(Option(Seq("foo")))))

    val result = Try {
      rdd.toDF()
    } match {
      case Success(e) => Option(e)
      case Failure(e) => {
        logger.error(e.getMessage, e)
        None
      }
    }

    result should not be(None)
    result.get.count() should be(1)
    result.get.show()
  }
}
{code}





--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message