spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steve Pruitt <bpru...@opentext.com>
Subject [Spark ML] [Pyspark] [Scenario Beginner] [Level Beginner]
Date Tue, 02 Apr 2019 15:13:16 GMT
I am still struggling with getting fit() to work on my dataset.
The Spark ML exception that is the issue is:

LAPACK.dppsv returned 6 because A is not positive definite. Is A derived from a singular matrix
(e.g. collinear column values)?

Comparing my standardized Weight values with the tutorial's values.  I see I have some negative
values.  The tutorial values are all positive.  The above exception message mentions non positive
value, so it's probably my issue.

The calculation for standardizing my Weight values Weight - Weight_Mean / Weight_StdDev is
producing negative values when the Weight which can between 1 - 72000 is small.
I have a suggestion to try using MinMaxScaler.  But, it operates on a Vector and I have a
single value.  Not sure, I see how I make this work.

My stats is very old.  Is there a way to achieve positive values only when standardizing something
like my Weight values above?

Thanks.

-S



From: Steve Pruitt <bpruitt@opentext.com>
Sent: Monday, April 01, 2019 12:39 PM
To: user <user@spark.apache.org>
Subject: [EXTERNAL] - [Spark ML] [Pyspark] [Scenario Beginner] [Level Beginner]

After following a tutorial on Recommender systems using Pyspark / Spark ML.  I decided to
jump in with my own dataset.  I am specifically trying to predict video suggestions based
on an implicit feature for the time a video was watched.  I wrote a generator to produce my
dataset.  I have a total of five videos each 1200 seconds in length.  I randomly selected
which videos a user watched and a random time between 0-1200.  I generated 10k records.  Weight
is the time watched feature.  It looks a like this.

UserId,VideoId,Weight
0,1,645
0,2,870
0,3,1075
0,4,486
0,5,900
1,1,353
1,2,988
1,3,152
1,4,953
1,5,641
2,3,12
2,4,444
2,5,87
3,2,658
3,4,270
3,5,530
4,2,722
4,3,255
:

After reading the dataset.  I convert all columns to Integer in place.  Describing Weight
produces:

   summary  Weight
0 count       30136
1 mean       597.717945314574
2 stddev     346.475684454489
3 min          0
4 max         1200

Next, I standardized the weight column by:

df = dataset.select(mean('Weight').alias('mean_weight'), stddev('Weight').alias('stddev_weight')).crossJoin(dataset).withColumn('weight_scaled',
(col('Weight') - col('mean_weight')) / col('stddev_weight'))

df.toPandas().head() shows:

   mean_weight  stddev_weight  UserId  VideoId  Weight  weight_scaled
0  597.717945   346.475684        0            1             645        0.136466
1  597.717945   346.475684        0            2             870        0.785862
2  597.717945   346.475684        0            3            1075       1.377534
3  597.717945   346.475684        0            4            486         -0.322441
4  597.717945   346.475684        0            5            900         0.872448
:
10 597.717945   346.475684       2           3             12          -1.690502
11 597.717945   346.475684       2           4             444        -0.443662
12 597.717945   346.475684       2           5             87           -1.474037
:

After splitting df 80 / 20 to get training / testing

I defined the ALS algo with:

als = ALS(maxIter=10, regParam=0.1, userCol='UserId', itemCol='VideoId', implicitPrefs=True,
ratingCol='weight_scaled', coldStartStrategy='drop')

and then

model = als.fit(trainingData)

Calling fit() is where I get the following error, I don't understand.

Py4JJavaError                             Traceback (most recent call last)
<ipython-input-12-20463d9db24b> in <module>
----> 1 model = als.fit(trainingData)

C:\Executables\spark-2.3.0-bin-hadoop2.7\python\pyspark\ml\base.py in fit(self, dataset, params)
    130                 return self.copy(params)._fit(dataset)
    131             else:
--> 132                 return self._fit(dataset)
    133         else:
    134             raise ValueError("Params must be either a param map or a list/tuple of
param maps, "

C:\Executables\spark-2.3.0-bin-hadoop2.7\python\pyspark\ml\wrapper.py in _fit(self, dataset)
    286
    287     def _fit(self, dataset):
--> 288         java_model = self._fit_java(dataset)
    289         model = self._create_model(java_model)
    290         return self._copyValues(model)

C:\Executables\spark-2.3.0-bin-hadoop2.7\python\pyspark\ml\wrapper.py in _fit_java(self, dataset)
    283         """
    284         self._transfer_params_to_java()
--> 285         return self._java_obj.fit(dataset._jdf)
    286
    287     def _fit(self, dataset):

C:\Executables\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\java_gateway.py
in __call__(self, *args)
   1158         answer = self.gateway_client.send_command(command)
   1159         return_value = get_return_value(
-> 1160             answer, self.gateway_client, self.target_id, self.name)
   1161
   1162         for temp_arg in temp_args:

C:\Executables\spark-2.3.0-bin-hadoop2.7\python\pyspark\sql\utils.py in deco(*a, **kw)
    61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

C:\Executables\spark-2.3.0-bin-hadoop2.7\python\lib\py4j-0.10.6-src.zip\py4j\protocol.py in
get_return_value(answer, gateway_client, target_id, name)
    318                 raise Py4JJavaError(
    319                     "An error occurred while calling {0}{1}{2}.\n".
--> 320                     format(target_id, ".", name), value)
    321             else:
    322                 raise Py4JError(

Py4JJavaError: An error occurred while calling o211.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 5 in stage 61.0
failed 1 times, most recent failure: Lost task 5.0 in stage 61.0 (TID 179, localhost, executor
driver): org.apache.spark.ml.optim.SingularMatrixException: LAPACK.dppsv returned 6 because
A is not positive definite. Is A derived from a singular matrix (e.g. collinear column values)?
                at org.apache.spark.mllib.linalg.CholeskyDecomposition$.checkReturnValue(CholeskyDecomposition.scala:65)
                at org.apache.spark.mllib.linalg.CholeskyDecomposition$.solve(CholeskyDecomposition.scala:41)
                at org.apache.spark.ml.recommendation.ALS$CholeskySolver.solve(ALS.scala:747)
                at org.apache.spark.ml.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1699)
                at org.apache.spark.ml.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1660)
                at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$37$$anonfun$apply$38.apply(PairRDDFunctions.scala:757)
                at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$37$$anonfun$apply$38.apply(PairRDDFunctions.scala:757)
                at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
                at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:217)
                at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
                at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
                at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
                at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
                at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
                at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
                at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
                at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
                at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
                at org.apache.spark.scheduler.Task.run(Task.scala:109)
                at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
                at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
                at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
                at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
                at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
                at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
                at scala.Option.foreach(Option.scala:257)
                at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
                at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
                at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
                at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
                at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
                at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
                at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
                at org.apache.spark.SparkContext.runJob(SparkContext.scala:2124)
                at org.apache.spark.rdd.RDD$$anonfun$aggregate$1.apply(RDD.scala:1118)
                at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
                at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
                at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
                at org.apache.spark.rdd.RDD.aggregate(RDD.scala:1111)
                at org.apache.spark.ml.recommendation.ALS$.computeYtY(ALS.scala:1711)
                at org.apache.spark.ml.recommendation.ALS$.org$apache$spark$ml$recommendation$ALS$$computeFactors(ALS.scala:1652)
                at org.apache.spark.ml.recommendation.ALS$$anonfun$train$4.apply(ALS.scala:972)
                at org.apache.spark.ml.recommendation.ALS$$anonfun$train$4.apply(ALS.scala:969)
                at scala.collection.immutable.Range.foreach(Range.scala:160)
                at org.apache.spark.ml.recommendation.ALS$.train(ALS.scala:969)
                at org.apache.spark.ml.recommendation.ALS.fit(ALS.scala:674)
                at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
                at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
                at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
                at java.lang.reflect.Method.invoke(Method.java:498)
                at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
                at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
                at py4j.Gateway.invoke(Gateway.java:282)
                at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
                at py4j.commands.CallCommand.execute(CallCommand.java:79)
                at py4j.GatewayConnection.run(GatewayConnection.java:214)
                at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.ml.optim.SingularMatrixException: LAPACK.dppsv returned 6 because
A is not positive definite. Is A derived from a singular matrix (e.g. collinear column values)?
                at org.apache.spark.mllib.linalg.CholeskyDecomposition$.checkReturnValue(CholeskyDecomposition.scala:65)
                at org.apache.spark.mllib.linalg.CholeskyDecomposition$.solve(CholeskyDecomposition.scala:41)
                at org.apache.spark.ml.recommendation.ALS$CholeskySolver.solve(ALS.scala:747)
                at org.apache.spark.ml.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1699)
                at org.apache.spark.ml.recommendation.ALS$$anonfun$org$apache$spark$ml$recommendation$ALS$$computeFactors$1.apply(ALS.scala:1660)
                at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$37$$anonfun$apply$38.apply(PairRDDFunctions.scala:757)
                at org.apache.spark.rdd.PairRDDFunctions$$anonfun$mapValues$1$$anonfun$apply$37$$anonfun$apply$38.apply(PairRDDFunctions.scala:757)
                at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)
                at org.apache.spark.storage.memory.MemoryStore.putIteratorAsValues(MemoryStore.scala:217)
                at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1092)
                at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1083)
                at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1018)
                at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1083)
                at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:809)
                at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:335)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:286)
                at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
                at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
                at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
                at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
                at org.apache.spark.scheduler.Task.run(Task.scala:109)
                at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
                at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                ... 1 more

Thanks in advance.

-S

Mime
View raw message