spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Warfish <sebastian.ka...@gmail.com>
Subject Enum values in custom objects mess up RDD operations
Date Thu, 06 Aug 2015 08:41:19 GMT
Hi everyone,

I was working with Spark for a little while now and have encountered a very
strange behaviour that caused me a lot of headaches:

I have written my own POJOs to encapsulate my data and this data is held in
some JavaRDDs. Part of these POJOs is a member variable of a custom enum
type. Whenever I do some operations on these RDDs such as subtract,
groupByKey, reduce or similar things, the results are inconsistent and
non-sensical. However, this happens only when the application runs in
standalone cluster mode (10 nodes). When running locally on my developer
machine, the code executes just fine. If you want to reproduce this
behaviour,  here
<http://apache-spark-user-list.1001560.n3.nabble.com/file/n24149/SparkTest.zip>  
is the complete Maven project that you can run out of the box. I am running
Spark 1.4.0 and submitting the application using 
/usr/local/spark-1.4.0-bin-hadoop2.4/bin/spark-submit --class
de.spark.test.Main ./SparkTest-1.0-SNAPSHOT.jar



Consider the following code for my custom object:


package de.spark.test;

import java.io.Serializable;
import java.util.Objects;

public class MyObject implements Serializable {

    private MyEnum myEnum;

    public MyObject(MyEnum myEnum) {
        this.myEnum = myEnum;
    }

    public MyEnum getMyEnum() {
        return myEnum;
    }

    public void setMyEnum(MyEnum myEnum) {
        this.myEnum = myEnum;
    }

    @Override
    public int hashCode() {
        int hash = 5;
        hash = 41 * hash + Objects.hashCode(this.myEnum);
        return hash;
    }

    @Override
    public boolean equals(Object obj) {
        if (obj == null) {
            return false;
        }
        if (getClass() != obj.getClass()) {
            return false;
        }
        final MyObject other = (MyObject) obj;
        if (this.myEnum != other.myEnum) {
            return false;
        }
        return true;
    }
    
    @Override
    public String toString() {
        return "MyObject{" + "myEnum=" + myEnum + '}';
    }

}


As you can see, I have overriden equals() and hashCode() (both are
auto-generated). The enum is given as follows:


package de.spark.test;

import java.io.Serializable;

public enum MyEnum implements Serializable {
  VALUE1, VALUE2
}


The main() method is defined by:


package de.spark.test;

import java.util.ArrayList;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;

public class Main {

  public static void main(String[] args) {
    SparkConf conf = new SparkConf().setAppName("Spark Test")
                                    .setMaster("myMaster");

    JavaSparkContext jsc = new JavaSparkContext(conf);

    System.out.println("///////////////////////////////////////////////////
Object generation");

    List<MyObject> l1 = new ArrayList<>();
    
    for(int i = 0; i < 1000; i++) {
        l1.add(new MyObject(MyEnum.VALUE1));
    }
    
    JavaRDD<MyObject> myObjectRDD1 = jsc.parallelize(l1);
    JavaRDD<MyObject> myObjectRDD2 = jsc.parallelize(l1);

    System.out.println("myObjectRDD1 count          = " +
myObjectRDD1.count());
    System.out.println("myObjectRDD2 count          = " +
myObjectRDD2.count());
    
    System.out.println("///////////////////////////////////////////////////
Distinct");
    
    JavaRDD<MyObject> myObjectRDD1Distinct = myObjectRDD1.distinct();
    JavaRDD<MyObject> myObjectRDD2Distinct = myObjectRDD2.distinct();
    
    System.out.println("myObjectRDD1Distinct count  = " +
myObjectRDD1Distinct.count());
    System.out.println("myObjectRDD2Distinct count  = " +
myObjectRDD2Distinct.count());
    
    System.out.println("///////////////////////////////////////////////////
Subtract");

    JavaRDD<MyObject> myObjectRDD1Minus1 =
myObjectRDD1.subtract(myObjectRDD1);
    JavaRDD<MyObject> myObjectRDD1Minus2 =
myObjectRDD1.subtract(myObjectRDD2);
    JavaRDD<MyObject> myObjectRDD2Minus1 =
myObjectRDD2.subtract(myObjectRDD1);

    System.out.println("myObjectRDD1Minus1 count    = " +
myObjectRDD1Minus1.count());
    System.out.println("myObjectRDD1Minus2 count    = " +
myObjectRDD1Minus2.count());
    System.out.println("myObjectRDD2Minus1 count    = " +
myObjectRDD2Minus1.count());
    
    System.out.println("///////////////////////////////////////////////////
End");
  }
  
}


Both RDDs contain 1000 exactly equal objects, one would expect each call of
distinct() to result in 1 and subtract(JavaRDD<MyObject>) to result in empty
RDDs. However here is some sample output:


/////////////////////////////////////////////////// Object generation
myObjectRDD1 count          = 1000
myObjectRDD2 count          = 1000
/////////////////////////////////////////////////// Distinct
myObjectRDD1Distinct count  = 1
myObjectRDD2Distinct count  = 2
/////////////////////////////////////////////////// Subtract
myObjectRDD1Minus1 count    = 500
myObjectRDD1Minus2 count    = 0
myObjectRDD2Minus1 count    = 0
/////////////////////////////////////////////////// End


And this is a new run, directly following the previous one:

/////////////////////////////////////////////////// Object generation
myObjectRDD1 count          = 1000
myObjectRDD2 count          = 1000
/////////////////////////////////////////////////// Distinct
myObjectRDD1Distinct count  = 2
myObjectRDD2Distinct count  = 1
/////////////////////////////////////////////////// Subtract
myObjectRDD1Minus1 count    = 500
myObjectRDD1Minus2 count    = 500
myObjectRDD2Minus1 count    = 0
/////////////////////////////////////////////////// End


Some thoughts/observations: As soon as I take the enum value out of the
hashCode() function of MyObject, the code works just fine, i.e. the new
hashCode() function becomes

    @Override
    public int hashCode() {
        int hash = 5;
//        hash = 41 * hash + Objects.hashCode(this.myEnum);
        return hash;
    }

Additionally, the code executes fine on a local machine and only behaves
strangely on a cluster. These two observations make me believe that Spark
uses the hashCode of each object to distribute the objects between worker
nodes and somehow the enum value results in inconsistent hash codes.

Can someone help me out here?




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Enum-values-in-custom-objects-mess-up-RDD-operations-tp24149.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message