Well the answers you got there are correct as well. 
Unfortunately I am not familiar with Neo4j enough to comment any more. Is the Neo4j graph database running externally (outside Spark cluster), or within the driver process, or on all the executors? Can you clarify that?

TD


On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj <gautam1237@gmail.com> wrote:
Alright, I have also asked this question in StackOverflow: http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark

The code there is pretty neat.

On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das <tdas@databricks.com> wrote:
I am not sure if you realized but the code snipper it pretty mangled up in the email we received. It might be a good idea to put the code in pastebin or gist, much much easier for everyone to read. 


On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r <gautam1237@gmail.com> wrote:
I'm trying to use Neo4j with Apache Spark Streaming but I am finding
serializability as an issue.

Basically, I want Apache Spark to parse and bundle my data in real time.
After, the data has been bundled it should be stored in the database, Neo4j.
However, I am getting this error:

org.apache.spark.SparkException: Task not serializable
    at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
    at
org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
    at org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
    at twoGrams.Main$4.call(Main.java:102)
    at twoGrams.Main$4.call(Main.java:1)
    at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
    at
org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
    at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
    at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
    at
org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
    at scala.util.Try$.apply(Try.scala:161)
    at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
    at
org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
    at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.NotSerializableException:
org.neo4j.kernel.EmbeddedGraphDatabase
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
    at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at
java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
    at
java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
    at
java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
    at
org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
    at
org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
    at
org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
    ... 17 more
Here is my code:

output a stream of type: JavaPairDStream<String, ArrayList&lt;String>>

output.foreachRDD(
                new
Function2<JavaPairRDD&lt;String,ArrayList&lt;String>>,Time,Void>(){

                    @Override
                    public Void call(
                            JavaPairRDD<String, ArrayList&lt;String>> arg0,
                            Time arg1) throws Exception {
                        // TODO Auto-generated method stub

                        arg0.foreach(
                                new
VoidFunction<Tuple2&lt;String,ArrayList&lt;String>>>(){

                                    @Override
                                    public void call(
                                            Tuple2<String,
ArrayList&lt;String>> arg0)
                                            throws Exception {
                                        // TODO Auto-generated method stub
                                        try( Transaction tx =
graphDB.beginTx()){

if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null)
                                                System.out.println("Alread
in Database:" + arg0._1);
                                            else{

Neo4jOperations.createHMac(graphDB, arg0._1);
                                            }
                                            tx.success();
                                        }
                                    }

                        });
                        return null;
                    }



                });
Neo4jOperations Class:

public class Neo4jOperations{

public static Node getHMacFromValue(GraphDatabaseService graphDB,String
value){
        try(ResourceIterator<Node>
HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"),
"value", value).iterator()){
            return HMacs.next();
        }
    }

    public static void createHMac(GraphDatabaseService graphDB,String
value){
        Node HMac=graphDB.createNode(DynamicLabel.label("HMac"));
        HMac.setProperty("value", value);
        HMac.setProperty("time", new
SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
    }
}
I know that I have to Serialize the class Neo4jOperations, but I'm able to
figure out how. Or is there any other way to achieve this?

Also, how can I store output of Spark Streaming in a database ?




--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Using-Neo4j-with-Apache-Spark-tp22012.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org





--
Gautam