spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <t...@databricks.com>
Subject Re: Using Neo4j with Apache Spark
Date Thu, 12 Mar 2015 08:21:22 GMT
What is GraphDatabaseService object that you are using? Instead of creating
them on the driver (outside foreachRDD), can you create them inside the
RDD.foreach?

In general, the right pattern for doing this in the programming guide
http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd

So you should be doing (sorry for writing in scala)

dstream.foreachRDD ((rdd: RDD, time: Time) => {
    rdd.foreachPartition(iterator =>
        // Create GraphDatabaseService object, or fetch it from a pool
of GraphDatabaseService
objects
        // Use it to send the whole partition to Neo4j
        // Destroy the object or release it to the pool
})


On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj <gautam1237@gmail.com> wrote:

> Neo4j is running externally. It has nothing to do with Spark processes.
>
> Basically, the problem is, I'm unable to figure out a way to store output
> of Spark on the database. As Spark Streaming requires Neo4j Core Java API
> to be serializable as well.
>
> The answer points out to using REST API but their performance is really
> poor when compared to Core Java API :
> http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/
>
> On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das <tdas@databricks.com>
> wrote:
>
>> Well the answers you got there are correct as well.
>> Unfortunately I am not familiar with Neo4j enough to comment any more. Is
>> the Neo4j graph database running externally (outside Spark cluster), or
>> within the driver process, or on all the executors? Can you clarify that?
>>
>> TD
>>
>>
>> On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj <gautam1237@gmail.com>
>> wrote:
>>
>>> Alright, I have also asked this question in StackOverflow:
>>> http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark
>>>
>>> The code there is pretty neat.
>>>
>>> On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das <tdas@databricks.com>
>>> wrote:
>>>
>>>> I am not sure if you realized but the code snipper it pretty mangled up
>>>> in the email we received. It might be a good idea to put the code in
>>>> pastebin or gist, much much easier for everyone to read.
>>>>
>>>>
>>>> On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r <gautam1237@gmail.com>
>>>> wrote:
>>>>
>>>>> I'm trying to use Neo4j with Apache Spark Streaming but I am finding
>>>>> serializability as an issue.
>>>>>
>>>>> Basically, I want Apache Spark to parse and bundle my data in real
>>>>> time.
>>>>> After, the data has been bundled it should be stored in the database,
>>>>> Neo4j.
>>>>> However, I am getting this error:
>>>>>
>>>>> org.apache.spark.SparkException: Task not serializable
>>>>>     at
>>>>>
>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>>>>>     at
>>>>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>>>>>     at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>>>>>     at
>>>>>
>>>>> org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
>>>>>     at
>>>>> org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
>>>>>     at twoGrams.Main$4.call(Main.java:102)
>>>>>     at twoGrams.Main$4.call(Main.java:1)
>>>>>     at
>>>>>
>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>>>>     at
>>>>>
>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>>>>     at
>>>>>
>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>>>>>     at
>>>>>
>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>>     at
>>>>>
>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>>     at scala.util.Try$.apply(Try.scala:161)
>>>>>     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>>>>>     at
>>>>>
>>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>>>>>     at
>>>>>
>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>     at
>>>>>
>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>> Caused by: java.io.NotSerializableException:
>>>>> org.neo4j.kernel.EmbeddedGraphDatabase
>>>>>     at
>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>>>>>     at
>>>>>
>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>     at
>>>>>
>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>     at
>>>>>
>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>     at
>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>     at
>>>>>
>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>     at
>>>>>
>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>     at
>>>>>
>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>     at
>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>     at
>>>>>
>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>     at
>>>>>
>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>     at
>>>>>
>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>     at
>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>     at
>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>>>>     at
>>>>>
>>>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>>>>>     at
>>>>>
>>>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>>>>>     at
>>>>>
>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>>>>>     ... 17 more
>>>>> Here is my code:
>>>>>
>>>>> output a stream of type: JavaPairDStream<String, ArrayList&lt;String>>
>>>>>
>>>>> output.foreachRDD(
>>>>>                 new
>>>>> Function2<JavaPairRDD&lt;String,ArrayList&lt;String>>,Time,Void>(){
>>>>>
>>>>>                     @Override
>>>>>                     public Void call(
>>>>>                             JavaPairRDD<String, ArrayList&lt;String>>
>>>>> arg0,
>>>>>                             Time arg1) throws Exception {
>>>>>                         // TODO Auto-generated method stub
>>>>>
>>>>>                         arg0.foreach(
>>>>>                                 new
>>>>> VoidFunction<Tuple2&lt;String,ArrayList&lt;String>>>(){
>>>>>
>>>>>                                     @Override
>>>>>                                     public void call(
>>>>>                                             Tuple2<String,
>>>>> ArrayList&lt;String>> arg0)
>>>>>                                             throws Exception {
>>>>>                                         // TODO Auto-generated method
>>>>> stub
>>>>>                                         try( Transaction tx =
>>>>> graphDB.beginTx()){
>>>>>
>>>>> if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null)
>>>>>
>>>>> System.out.println("Alread
>>>>> in Database:" + arg0._1);
>>>>>                                             else{
>>>>>
>>>>> Neo4jOperations.createHMac(graphDB, arg0._1);
>>>>>                                             }
>>>>>                                             tx.success();
>>>>>                                         }
>>>>>                                     }
>>>>>
>>>>>                         });
>>>>>                         return null;
>>>>>                     }
>>>>>
>>>>>
>>>>>
>>>>>                 });
>>>>> Neo4jOperations Class:
>>>>>
>>>>> public class Neo4jOperations{
>>>>>
>>>>> public static Node getHMacFromValue(GraphDatabaseService graphDB,String
>>>>> value){
>>>>>         try(ResourceIterator<Node>
>>>>> HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"),
>>>>> "value", value).iterator()){
>>>>>             return HMacs.next();
>>>>>         }
>>>>>     }
>>>>>
>>>>>     public static void createHMac(GraphDatabaseService graphDB,String
>>>>> value){
>>>>>         Node HMac=graphDB.createNode(DynamicLabel.label("HMac"));
>>>>>         HMac.setProperty("value", value);
>>>>>         HMac.setProperty("time", new
>>>>>
>>>>> SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
>>>>>     }
>>>>> }
>>>>> I know that I have to Serialize the class Neo4jOperations, but I'm
>>>>> able to
>>>>> figure out how. Or is there any other way to achieve this?
>>>>>
>>>>> Also, how can I store output of Spark Streaming in a database ?
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> View this message in context:
>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Neo4j-with-Apache-Spark-tp22012.html
>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>> Nabble.com.
>>>>>
>>>>> ---------------------------------------------------------------------
>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>
>>>>>
>>>>
>>>
>>>
>>> --
>>> Gautam
>>>
>>
>>
>
>
> --
> Gautam
>

Mime
View raw message