spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tathagata Das <t...@databricks.com>
Subject Re: Using Neo4j with Apache Spark
Date Fri, 13 Mar 2015 02:47:33 GMT
Well, that's why I had also suggested using a pool of the GraphDBService
objects :)
Also present in the programming guide link I had given.

TD


On Thu, Mar 12, 2015 at 7:38 PM, Gautam Bajaj <gautam1237@gmail.com> wrote:

> Thanks a ton! That worked.
>
> However, this may have performance issue. As for each partition, I'd need
> to restart the server, that was the basic reason I was creating graphDb
> object outside this loop.
>
> On Fri, Mar 13, 2015 at 5:34 AM, Tathagata Das <tdas@databricks.com>
> wrote:
>
>> (Putting user@spark back in the to list)
>>
>> In the gist, you are creating graphDB object way outside the
>> RDD.foreachPartition. I said last time, create the graphDB object inside
>> the RDD.foreachPartition. You are creating it outside DStream.foreachRDD,
>> and then using it from inside the rdd.foreachPartition. That is bringing
>> the graphDB object in the task closure, and hence the system is trying to
>> serialize the graphDB object when its serializing the closure. If you
>> create the graphDB object inside the RDD.foreachPartition, then the closure
>> will not refer to any prior graphDB object and therefore not serialize
>> anything.
>>
>> On Thu, Mar 12, 2015 at 3:46 AM, Gautam Bajaj <gautam1237@gmail.com>
>> wrote:
>>
>>> Here: https://gist.github.com/d34th4ck3r/0c99d1e9fa288e0cc8ab
>>>
>>> I'll add the flag and send you stack trace, I have meetings now.
>>>
>>> On Thu, Mar 12, 2015 at 6:28 PM, Tathagata Das <tdas@databricks.com>
>>> wrote:
>>>
>>>> Could you show us that version of the code?
>>>>
>>>> Also helps to turn on java flag of extended debug info. That will show
>>>> the lineage of objects leading to the nonserilaizable one.
>>>> On Mar 12, 2015 1:32 AM, "Gautam Bajaj" <gautam1237@gmail.com> wrote:
>>>>
>>>>> I tried that too. It result in same serializability issue.
>>>>>
>>>>> GraphDatabaseSerive that I'm using is : GraphDatabaseFactory() :
>>>>> http://neo4j.com/api_docs/2.0.0/org/neo4j/graphdb/factory/GraphDatabaseFactory.html
>>>>>
>>>>> On Thu, Mar 12, 2015 at 5:21 PM, Tathagata Das <tdas@databricks.com>
>>>>> wrote:
>>>>>
>>>>>> What is GraphDatabaseService object that you are using? Instead of
>>>>>> creating them on the driver (outside foreachRDD), can you create
them
>>>>>> inside the RDD.foreach?
>>>>>>
>>>>>> In general, the right pattern for doing this in the programming guide
>>>>>>
>>>>>> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>>>>>>
>>>>>> So you should be doing (sorry for writing in scala)
>>>>>>
>>>>>> dstream.foreachRDD ((rdd: RDD, time: Time) => {
>>>>>>     rdd.foreachPartition(iterator =>
>>>>>>         // Create GraphDatabaseService object, or fetch it from a
>>>>>> pool of GraphDatabaseService objects
>>>>>>         // Use it to send the whole partition to Neo4j
>>>>>>         // Destroy the object or release it to the pool
>>>>>> })
>>>>>>
>>>>>>
>>>>>> On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj <gautam1237@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> Neo4j is running externally. It has nothing to do with Spark
>>>>>>> processes.
>>>>>>>
>>>>>>> Basically, the problem is, I'm unable to figure out a way to
store
>>>>>>> output of Spark on the database. As Spark Streaming requires
Neo4j Core
>>>>>>> Java API to be serializable as well.
>>>>>>>
>>>>>>> The answer points out to using REST API but their performance
is
>>>>>>> really poor when compared to Core Java API :
>>>>>>> http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/
>>>>>>>
>>>>>>> On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das <tdas@databricks.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Well the answers you got there are correct as well.
>>>>>>>> Unfortunately I am not familiar with Neo4j enough to comment
any
>>>>>>>> more. Is the Neo4j graph database running externally (outside
Spark
>>>>>>>> cluster), or within the driver process, or on all the executors?
Can you
>>>>>>>> clarify that?
>>>>>>>>
>>>>>>>> TD
>>>>>>>>
>>>>>>>>
>>>>>>>> On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj <
>>>>>>>> gautam1237@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Alright, I have also asked this question in StackOverflow:
>>>>>>>>> http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark
>>>>>>>>>
>>>>>>>>> The code there is pretty neat.
>>>>>>>>>
>>>>>>>>> On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das <
>>>>>>>>> tdas@databricks.com> wrote:
>>>>>>>>>
>>>>>>>>>> I am not sure if you realized but the code snipper
it pretty
>>>>>>>>>> mangled up in the email we received. It might be
a good idea to put the
>>>>>>>>>> code in pastebin or gist, much much easier for everyone
to read.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r <
>>>>>>>>>> gautam1237@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> I'm trying to use Neo4j with Apache Spark Streaming
but I am
>>>>>>>>>>> finding
>>>>>>>>>>> serializability as an issue.
>>>>>>>>>>>
>>>>>>>>>>> Basically, I want Apache Spark to parse and bundle
my data in
>>>>>>>>>>> real time.
>>>>>>>>>>> After, the data has been bundled it should be
stored in the
>>>>>>>>>>> database, Neo4j.
>>>>>>>>>>> However, I am getting this error:
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.SparkException: Task not serializable
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
>>>>>>>>>>>     at
>>>>>>>>>>> org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
>>>>>>>>>>>     at twoGrams.Main$4.call(Main.java:102)
>>>>>>>>>>>     at twoGrams.Main$4.call(Main.java:1)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>>>>>>>>     at scala.util.Try$.apply(Try.scala:161)
>>>>>>>>>>>     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>>>>>>> Caused by: java.io.NotSerializableException:
>>>>>>>>>>> org.neo4j.kernel.EmbeddedGraphDatabase
>>>>>>>>>>>     at
>>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>>>>>>     at
>>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>>>>>>     at
>>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>>>>>>     at
>>>>>>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>>>>>>>     at
>>>>>>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>>>>>>>>>>>     at
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>>>>>>>>>>>     ... 17 more
>>>>>>>>>>> Here is my code:
>>>>>>>>>>>
>>>>>>>>>>> output a stream of type: JavaPairDStream<String,
>>>>>>>>>>> ArrayList&lt;String>>
>>>>>>>>>>>
>>>>>>>>>>> output.foreachRDD(
>>>>>>>>>>>                 new
>>>>>>>>>>>
>>>>>>>>>>> Function2<JavaPairRDD&lt;String,ArrayList&lt;String>>,Time,Void>(){
>>>>>>>>>>>
>>>>>>>>>>>                     @Override
>>>>>>>>>>>                     public Void call(
>>>>>>>>>>>                             JavaPairRDD<String,
>>>>>>>>>>> ArrayList&lt;String>> arg0,
>>>>>>>>>>>                             Time arg1) throws
Exception {
>>>>>>>>>>>                         // TODO Auto-generated
method stub
>>>>>>>>>>>
>>>>>>>>>>>                         arg0.foreach(
>>>>>>>>>>>                                 new
>>>>>>>>>>> VoidFunction<Tuple2&lt;String,ArrayList&lt;String>>>(){
>>>>>>>>>>>
>>>>>>>>>>>                                     @Override
>>>>>>>>>>>                                     public void
call(
>>>>>>>>>>>                                             Tuple2<String,
>>>>>>>>>>> ArrayList&lt;String>> arg0)
>>>>>>>>>>>                                             throws
Exception {
>>>>>>>>>>>                                         // TODO
Auto-generated
>>>>>>>>>>> method stub
>>>>>>>>>>>                                         try(
Transaction tx =
>>>>>>>>>>> graphDB.beginTx()){
>>>>>>>>>>>
>>>>>>>>>>> if(Neo4jOperations.getHMacFromValue(graphDB,
arg0._1)!=null)
>>>>>>>>>>>
>>>>>>>>>>> System.out.println("Alread
>>>>>>>>>>> in Database:" + arg0._1);
>>>>>>>>>>>                                             else{
>>>>>>>>>>>
>>>>>>>>>>> Neo4jOperations.createHMac(graphDB, arg0._1);
>>>>>>>>>>>                                             }
>>>>>>>>>>>                                             tx.success();
>>>>>>>>>>>                                         }
>>>>>>>>>>>                                     }
>>>>>>>>>>>
>>>>>>>>>>>                         });
>>>>>>>>>>>                         return null;
>>>>>>>>>>>                     }
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>                 });
>>>>>>>>>>> Neo4jOperations Class:
>>>>>>>>>>>
>>>>>>>>>>> public class Neo4jOperations{
>>>>>>>>>>>
>>>>>>>>>>> public static Node getHMacFromValue(GraphDatabaseService
>>>>>>>>>>> graphDB,String
>>>>>>>>>>> value){
>>>>>>>>>>>         try(ResourceIterator<Node>
>>>>>>>>>>>
>>>>>>>>>>> HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"),
>>>>>>>>>>> "value", value).iterator()){
>>>>>>>>>>>             return HMacs.next();
>>>>>>>>>>>         }
>>>>>>>>>>>     }
>>>>>>>>>>>
>>>>>>>>>>>     public static void createHMac(GraphDatabaseService
>>>>>>>>>>> graphDB,String
>>>>>>>>>>> value){
>>>>>>>>>>>         Node HMac=graphDB.createNode(DynamicLabel.label("HMac"));
>>>>>>>>>>>         HMac.setProperty("value", value);
>>>>>>>>>>>         HMac.setProperty("time", new
>>>>>>>>>>>
>>>>>>>>>>> SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
>>>>>>>>>>>     }
>>>>>>>>>>> }
>>>>>>>>>>> I know that I have to Serialize the class Neo4jOperations,
but
>>>>>>>>>>> I'm able to
>>>>>>>>>>> figure out how. Or is there any other way to
achieve this?
>>>>>>>>>>>
>>>>>>>>>>> Also, how can I store output of Spark Streaming
in a database ?
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> --
>>>>>>>>>>> View this message in context:
>>>>>>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Neo4j-with-Apache-Spark-tp22012.html
>>>>>>>>>>> Sent from the Apache Spark User List mailing
list archive at
>>>>>>>>>>> Nabble.com.
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> ---------------------------------------------------------------------
>>>>>>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> --
>>>>>>>>> Gautam
>>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> --
>>>>>>> Gautam
>>>>>>>
>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Gautam
>>>>>
>>>>
>>>
>>>
>>> --
>>> Gautam
>>>
>>
>>
>
>
> --
> Gautam
>

Mime
View raw message