spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Sean R. Owen (Jira)" <j...@apache.org>
Subject [jira] [Commented] (SPARK-27842) Inconsistent results of Statistics.corr() and PearsonCorrelation.computeCorrelationMatrix()
Date Sun, 27 Oct 2019 01:53:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-27842?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16960498#comment-16960498
] 

Sean R. Owen commented on SPARK-27842:
--------------------------------------

I'd like to look into it. What numbers do you see in the two runs? very different or very
small differences?
My guess is that you get different orders of the values in the two cases because you have
a different number of executor slots. In theory doesn't make a difference, in practice probably
causes some slight differences due to some floating-point inaccuracies. I don't know enough
yet to say whether that's it and whether it's resolvable.
I'd also be interested to know if it occurs on the master branch.

> Inconsistent results of Statistics.corr() and PearsonCorrelation.computeCorrelationMatrix()
> -------------------------------------------------------------------------------------------
>
>                 Key: SPARK-27842
>                 URL: https://issues.apache.org/jira/browse/SPARK-27842
>             Project: Spark
>          Issue Type: Bug
>          Components: ML, Spark Core, Windows
>    Affects Versions: 2.3.1
>            Reporter: Peter Nijem
>            Priority: Major
>         Attachments: vectorList.txt
>
>
> Hi,
> I am working with Spark Java API in local mode (1 node, 8 cores). Spark version as follows
in my pom.xml:
> *MLLib*
> _<artifactId>spark-mllib_2.11</artifactId>_
>  _<version>2.3.1</version>_
> *Core*
> _<artifactId>spark-core_2.11</artifactId>_
>  _<version>2.3.1</version>_
> I am experiencing inconsistent results of correlation when starting my Spark application
with 8 cores vs 1/2/3 cores.
> I've created a Main class which reads from a file a list of Vectors; 240 Vector which
each Vector is of the length of 226. 
> As you can see, I am firstly initializing Spark with local[*], running Correlation, saving
the Matrix and stopping Spark. Then, I do the same but for local[3].
> I am expecting to get the same matrices on both runs. But this is not the case. The
input file is attached.
> I tried to compute the correlation using PearsonCorrealtion.computeCorrelationMatrix()
but I faced the same issue here as well.
>  
> In my work, I am dependent on the resulting correlation matrix. Thus, I am experiencing
bad results in y application due to the inconsistent results I am getting. As a workaround,
I am working now with local[1]
>  
>  
> {code:java}
> import org.apache.spark.SparkConf;
> import org.apache.spark.api.java.JavaSparkContext;
> import org.apache.spark.mllib.linalg.DenseVector;
> import org.apache.spark.mllib.linalg.Matrix;
> import org.apache.spark.mllib.linalg.Vector;
> import org.apache.spark.mllib.stat.Statistics;
> import org.apache.spark.rdd.RDD;
> import org.junit.Assert;
> import java.io.BufferedReader;
> import java.io.FileReader;
> import java.io.IOException;
> import java.math.RoundingMode;
> import java.text.DecimalFormat;
> import java.util.ArrayList;
> import java.util.Arrays;
> import java.util.List;
> import java.util.stream.Collectors;
> public class TestSparkCorr {
> private static JavaSparkContext ctx;
> public static void main(String[] args) {
> List<List<Double>> doublesLists = readInputFile();
> List<Vector> resultVectors = getVectorsList(doublesLists);
> //===========================================================================
> initSpark("*");
> RDD<Vector> RDD_vector = ctx.parallelize(resultVectors).rdd();
> Matrix m = Statistics.corr(RDD_vector, "pearson");
> stopSpark();
> //===========================================================================
> initSpark("3");
> RDD<Vector> RDD_vector_3 = ctx.parallelize(resultVectors).rdd();
> Matrix m3 = Statistics.corr(RDD_vector_3, "pearson");
> stopSpark();
> //===========================================================================
> Assert.assertEquals(m3, m);
> }
> private static List<Vector> getVectorsList(List<List<Double>> doublesLists)
{
> List<Vector> resultVectors = new ArrayList<>(doublesLists.size());
> for (List<Double> vector : doublesLists) {
> double[] x = new double[vector.size()];
> for(int i = 0; i < x.length; i++){
> x[i] = vector.get(i);
> }
> resultVectors.add(new DenseVector(x));
> }
> return resultVectors;
> }
> private static List<List<Double>> readInputFile() {
> List<List<Double>> doublesLists = new ArrayList<>();
> try (BufferedReader reader = new BufferedReader(new FileReader(
> ".//output//vectorList.txt"))) {
> String line = reader.readLine();
> while (line != null) {
> String[] splitLine = line.substring(1, line.length() - 2).split(",");
> List<Double> doubles = Arrays.stream(splitLine).map(x -> Double.valueOf(x.trim())).collect(Collectors.toList());
> doublesLists.add(doubles);
> // read next line
> line = reader.readLine();
> }
> } catch (IOException e) {
> e.printStackTrace();
> }
> return doublesLists;
> }
> private static void initSpark(String coresNum) {
> final SparkConf sparkConf = new SparkConf().setAppName("Span");
> sparkConf.setMaster(String.format("local[%s]", coresNum));
> sparkConf.set("spark.ui.enabled", "false");
> ctx = new JavaSparkContext(sparkConf);
> }
> private static void stopSpark() {
> ctx.stop();
> if(ctx.sc().isStopped()){
> ctx = null;
> }
> }
> }
> {code}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message