spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From d34th4ck3r <gautam1...@gmail.com>
Subject Using Neo4j with Apache Spark
Date Thu, 12 Mar 2015 07:48:12 GMT
I'm trying to use Neo4j with Apache Spark Streaming but I am finding
serializability as an issue.

Basically, I want Apache Spark to parse and bundle my data in real time.
After, the data has been bundled it should be stored in the database, Neo4j.
However, I am getting this error:

org.apache.spark.SparkException: Task not serializable
    at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
    at
org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
    at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
    at twoGrams.Main$4.call(Main.java:102)
    at twoGrams.Main$4.call(Main.java:1)
    at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
    at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
    at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
    at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
    at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
    at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException:
org.neo4j.kernel.EmbeddedGraphDatabase
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
    at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 17 more
Here is my code:

output a stream of type: JavaPairDStream<String, ArrayList&lt;String>>

output.foreachRDD(
                new
Function2<JavaPairRDD&lt;String,ArrayList&lt;String>>,Time,Void>(){

                    @Override
                    public Void call(
                            JavaPairRDD<String, ArrayList&lt;String>> arg0,
                            Time arg1) throws Exception {
                        // TODO Auto-generated method stub

                        arg0.foreach(
                                new
VoidFunction<Tuple2&lt;String,ArrayList&lt;String>>>(){

                                    @Override
                                    public void call(
                                            Tuple2<String,
ArrayList&lt;String>> arg0)
                                            throws Exception {
                                        // TODO Auto-generated method stub
                                        try( Transaction tx =
graphDB.beginTx()){
                                           
if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null)
                                                System.out.println("Alread
in Database:" + arg0._1);
                                            else{
                                               
Neo4jOperations.createHMac(graphDB, arg0._1);
                                            }
                                            tx.success();
                                        }
                                    }

                        });
                        return null;
                    }



                });
Neo4jOperations Class:

public class Neo4jOperations{

public static Node getHMacFromValue(GraphDatabaseService graphDB,String
value){
        try(ResourceIterator<Node>
HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"),
"value", value).iterator()){
            return HMacs.next();
        }
    }

    public static void createHMac(GraphDatabaseService graphDB,String
value){
        Node HMac=graphDB.createNode(DynamicLabel.label("HMac"));
        HMac.setProperty("value", value);
        HMac.setProperty("time", new
SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
    }
}
I know that I have to Serialize the class Neo4jOperations, but I'm able to
figure out how. Or is there any other way to achieve this?

Also, how can I store output of Spark Streaming in a database ?




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-Neo4j-with-Apache-Spark-tp22012.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message