spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Boris Clémençon (JIRA) <j...@apache.org>
Subject [jira] [Commented] (SPARK-19681) save and load pipeline and then use it yield java.lang.RuntimeException
Date Tue, 21 Feb 2017 23:47:44 GMT

    [ https://issues.apache.org/jira/browse/SPARK-19681?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15877033#comment-15877033
] 

Boris Clémençon  commented on SPARK-19681:
------------------------------------------

EDIT:
the problem is in Compare.assertDataFrameEquals, not in Spark. I close the ticket.

> save and load pipeline and then use it yield java.lang.RuntimeException
> -----------------------------------------------------------------------
>
>                 Key: SPARK-19681
>                 URL: https://issues.apache.org/jira/browse/SPARK-19681
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.1.0
>            Reporter: Boris Clémençon 
>              Labels: spark-ml
>
> Here is the unit test that fails:
> import org.apache.spark.SparkConf
> import org.apache.spark.ml.Pipeline
> import org.apache.spark.ml.classification.LogisticRegression
> import org.apache.spark.ml.evaluation.BinaryClassificationEvaluator
> import org.apache.spark.ml.feature.{SQLTransformer, VectorAssembler}
> import org.apache.spark.ml.tuning.{CrossValidator, CrossValidatorModel, ParamGridBuilder}
> import org.apache.spark.sql.{DataFrame, SparkSession}
> import org.scalatest.{BeforeAndAfter, FlatSpec, Matchers}
> import scala.util.Random
> /**
>   * Created by borisclemencon on 21/02/2017.
>   */
> class PipelineTest extends FlatSpec with Matchers with BeforeAndAfter {
>   val featuresCol = "features"
>   val responseCol = "response"
>   val weightCol = "weight"
>   val features = Array("X1", "X2")
>   val lambdas = Array(0.01)
>   val alpha = 0.2
>   val maxIter = 50
>   val nfolds = 5
>   var spark: SparkSession = _
>   before {
>     val sparkConf: SparkConf = new SparkConf().
>       set("spark.serializer", "org.apache.spark.serializer.KryoSerializer").
>       set("spark.ui.enabled", "false"). // faster and remove 'spark test java.net.BindException:
Address already in use' warnings!
>       set("spark.driver.host", "127.0.0.1")
>     spark = SparkSession.
>       builder().
>       config(sparkConf).
>       appName("BlendWeightTransformerTest").
>       master("local[*]").
>       getOrCreate()
>   }
>   def makeDataset(n: Int = 100): DataFrame = {
>     val sc = spark
>     import sc.implicits._
>     val n = 1000
>     val data =
>       for (i <- 1 to n) yield {
>         val pn = if (Random.nextDouble() < 0.1) "a" else "b"
>         val x1: Double = Random.nextGaussian() * 5
>         val x2: Double = Random.nextGaussian() * 2
>         val response: Int = if (Random.nextBoolean()) 1 else 0
>         (pn, x1, x2, response)
>       }
>     data.toDF(packageNameCol, "X1", "X2", responseCol)
>   }
>   "load()" should "produce the same pipeline and result before and after save()" in {
>     val lr = new LogisticRegression().
>       setFitIntercept(true).
>       setMaxIter(maxIter).
>       setElasticNetParam(alpha).
>       setStandardization(true).
>       setFamily("binomial").
>       setFeaturesCol(featuresCol).
>       setLabelCol(responseCol)
>     val assembler = new VectorAssembler().setInputCols(features).setOutputCol(featuresCol)
>     val pipeline = new Pipeline().setStages(Array(assembler, lr))
>     val evaluator = new BinaryClassificationEvaluator().
>       setLabelCol(responseCol).
>       setMetricName("areaUnderROC")
>     val paramGrid = new ParamGridBuilder().
>       addGrid(lr.regParam, lambdas).
>       build()
>     // Train with simple grid cross validation
>     val cv = new CrossValidator().
>       setEstimator(pipeline).
>       setEvaluator(evaluator).
>       setEstimatorParamMaps(paramGrid).
>       setNumFolds(nfolds) // Use 3+ in practice
>     val df = makeDataset(100).cache
>     val cvModel = cv.fit(df)
>     val answer = cvModel.transform(df)
>     answer.show(truncate = false)
>     val path = "./PipelineTestcvModel"
>     cvModel.write.overwrite().save(path)
>     val cvModelLoaded = CrossValidatorModel.load(path)
>     val output = cvModelLoaded.transform(df)
>     output.show(truncate = false)
>     Compare.assertDataFrameEquals(answer, output)
>   }
> }
> yield exception
> should produce the same blent pipeline and result before and after save() *** FAILED
***
> [info]   java.lang.RuntimeException: no default for type org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7
> [info]   at org.apache.spark.sql.catalyst.expressions.Literal$.default(literals.scala:179)
> [info]   at org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys$$anonfun$4.apply(patterns.scala:121)
> [info]   at org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys$$anonfun$4.apply(patterns.scala:114)
> [info]   at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> [info]   at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:241)
> [info]   at scala.collection.immutable.List.foreach(List.scala:381)
> [info]   at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:241)
> [info]   at scala.collection.immutable.List.flatMap(List.scala:344)
> [info]   at org.apache.spark.sql.catalyst.planning.ExtractEquiJoinKeys$.unapply(patterns.scala:114)
> [info]   at org.apache.spark.sql.execution.SparkStrategies$JoinSelection$.apply(SparkStrategies.scala:158)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message