spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gautam Bajaj <gautam1...@gmail.com>
Subject Re: Using Neo4j with Apache Spark
Date Thu, 12 Mar 2015 08:32:32 GMT
I tried that too. It result in same serializability issue.

GraphDatabaseSerive that I'm using is : GraphDatabaseFactory() :
http://neo4j.com/api_docs/2.0.0/org/neo4j/graphdb/factory/GraphDatabaseFactory.html

On Thu, Mar 12, 2015 at 5:21 PM, Tathagata Das <tdas@databricks.com> wrote:

> What is GraphDatabaseService object that you are using? Instead of
> creating them on the driver (outside foreachRDD), can you create them
> inside the RDD.foreach?
>
> In general, the right pattern for doing this in the programming guide
>
> http://spark.apache.org/docs/latest/streaming-programming-guide.html#design-patterns-for-using-foreachrdd
>
> So you should be doing (sorry for writing in scala)
>
> dstream.foreachRDD ((rdd: RDD, time: Time) => {
>     rdd.foreachPartition(iterator =>
>         // Create GraphDatabaseService object, or fetch it from a pool of GraphDatabaseService
> objects
>         // Use it to send the whole partition to Neo4j
>         // Destroy the object or release it to the pool
> })
>
>
> On Thu, Mar 12, 2015 at 1:15 AM, Gautam Bajaj <gautam1237@gmail.com>
> wrote:
>
>> Neo4j is running externally. It has nothing to do with Spark processes.
>>
>> Basically, the problem is, I'm unable to figure out a way to store output
>> of Spark on the database. As Spark Streaming requires Neo4j Core Java API
>> to be serializable as well.
>>
>> The answer points out to using REST API but their performance is really
>> poor when compared to Core Java API :
>> http://www.rene-pickhardt.de/get-the-full-neo4j-power-by-using-the-core-java-api-for-traversing-your-graph-data-base-instead-of-cypher-query-language/
>>
>> On Thu, Mar 12, 2015 at 5:09 PM, Tathagata Das <tdas@databricks.com>
>> wrote:
>>
>>> Well the answers you got there are correct as well.
>>> Unfortunately I am not familiar with Neo4j enough to comment any more.
>>> Is the Neo4j graph database running externally (outside Spark cluster), or
>>> within the driver process, or on all the executors? Can you clarify that?
>>>
>>> TD
>>>
>>>
>>> On Thu, Mar 12, 2015 at 12:58 AM, Gautam Bajaj <gautam1237@gmail.com>
>>> wrote:
>>>
>>>> Alright, I have also asked this question in StackOverflow:
>>>> http://stackoverflow.com/questions/28896898/using-neo4j-with-apache-spark
>>>>
>>>> The code there is pretty neat.
>>>>
>>>> On Thu, Mar 12, 2015 at 4:55 PM, Tathagata Das <tdas@databricks.com>
>>>> wrote:
>>>>
>>>>> I am not sure if you realized but the code snipper it pretty mangled
>>>>> up in the email we received. It might be a good idea to put the code
in
>>>>> pastebin or gist, much much easier for everyone to read.
>>>>>
>>>>>
>>>>> On Thu, Mar 12, 2015 at 12:48 AM, d34th4ck3r <gautam1237@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> I'm trying to use Neo4j with Apache Spark Streaming but I am finding
>>>>>> serializability as an issue.
>>>>>>
>>>>>> Basically, I want Apache Spark to parse and bundle my data in real
>>>>>> time.
>>>>>> After, the data has been bundled it should be stored in the database,
>>>>>> Neo4j.
>>>>>> However, I am getting this error:
>>>>>>
>>>>>> org.apache.spark.SparkException: Task not serializable
>>>>>>     at
>>>>>>
>>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166)
>>>>>>     at
>>>>>> org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158)
>>>>>>     at org.apache.spark.SparkContext.clean(SparkContext.scala:1264)
>>>>>>     at
>>>>>>
>>>>>> org.apache.spark.api.java.JavaRDDLike$class.foreach(JavaRDDLike.scala:297)
>>>>>>     at
>>>>>> org.apache.spark.api.java.JavaPairRDD.foreach(JavaPairRDD.scala:45)
>>>>>>     at twoGrams.Main$4.call(Main.java:102)
>>>>>>     at twoGrams.Main$4.call(Main.java:1)
>>>>>>     at
>>>>>>
>>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>>>>>     at
>>>>>>
>>>>>> org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:282)
>>>>>>     at
>>>>>>
>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:41)
>>>>>>     at
>>>>>>
>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>>>     at
>>>>>>
>>>>>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:40)
>>>>>>     at scala.util.Try$.apply(Try.scala:161)
>>>>>>     at org.apache.spark.streaming.scheduler.Job.run(Job.scala:32)
>>>>>>     at
>>>>>>
>>>>>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:172)
>>>>>>     at
>>>>>>
>>>>>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>>>>>>     at
>>>>>>
>>>>>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>> Caused by: java.io.NotSerializableException:
>>>>>> org.neo4j.kernel.EmbeddedGraphDatabase
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1183)
>>>>>>     at
>>>>>>
>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>     at
>>>>>>
>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>     at
>>>>>>
>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>>     at
>>>>>>
>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>     at
>>>>>>
>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>     at
>>>>>>
>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>>     at
>>>>>>
>>>>>> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
>>>>>>     at
>>>>>>
>>>>>> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
>>>>>>     at
>>>>>>
>>>>>> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
>>>>>>     at
>>>>>> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
>>>>>>     at
>>>>>>
>>>>>> org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
>>>>>>     at
>>>>>>
>>>>>> org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73)
>>>>>>     at
>>>>>>
>>>>>> org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164)
>>>>>>     ... 17 more
>>>>>> Here is my code:
>>>>>>
>>>>>> output a stream of type: JavaPairDStream<String, ArrayList&lt;String>>
>>>>>>
>>>>>> output.foreachRDD(
>>>>>>                 new
>>>>>> Function2<JavaPairRDD&lt;String,ArrayList&lt;String>>,Time,Void>(){
>>>>>>
>>>>>>                     @Override
>>>>>>                     public Void call(
>>>>>>                             JavaPairRDD<String, ArrayList&lt;String>>
>>>>>> arg0,
>>>>>>                             Time arg1) throws Exception {
>>>>>>                         // TODO Auto-generated method stub
>>>>>>
>>>>>>                         arg0.foreach(
>>>>>>                                 new
>>>>>> VoidFunction<Tuple2&lt;String,ArrayList&lt;String>>>(){
>>>>>>
>>>>>>                                     @Override
>>>>>>                                     public void call(
>>>>>>                                             Tuple2<String,
>>>>>> ArrayList&lt;String>> arg0)
>>>>>>                                             throws Exception {
>>>>>>                                         // TODO Auto-generated method
>>>>>> stub
>>>>>>                                         try( Transaction tx =
>>>>>> graphDB.beginTx()){
>>>>>>
>>>>>> if(Neo4jOperations.getHMacFromValue(graphDB, arg0._1)!=null)
>>>>>>
>>>>>> System.out.println("Alread
>>>>>> in Database:" + arg0._1);
>>>>>>                                             else{
>>>>>>
>>>>>> Neo4jOperations.createHMac(graphDB, arg0._1);
>>>>>>                                             }
>>>>>>                                             tx.success();
>>>>>>                                         }
>>>>>>                                     }
>>>>>>
>>>>>>                         });
>>>>>>                         return null;
>>>>>>                     }
>>>>>>
>>>>>>
>>>>>>
>>>>>>                 });
>>>>>> Neo4jOperations Class:
>>>>>>
>>>>>> public class Neo4jOperations{
>>>>>>
>>>>>> public static Node getHMacFromValue(GraphDatabaseService
>>>>>> graphDB,String
>>>>>> value){
>>>>>>         try(ResourceIterator<Node>
>>>>>> HMacs=graphDB.findNodesByLabelAndProperty(DynamicLabel.label("HMac"),
>>>>>> "value", value).iterator()){
>>>>>>             return HMacs.next();
>>>>>>         }
>>>>>>     }
>>>>>>
>>>>>>     public static void createHMac(GraphDatabaseService graphDB,String
>>>>>> value){
>>>>>>         Node HMac=graphDB.createNode(DynamicLabel.label("HMac"));
>>>>>>         HMac.setProperty("value", value);
>>>>>>         HMac.setProperty("time", new
>>>>>>
>>>>>> SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime()));
>>>>>>     }
>>>>>> }
>>>>>> I know that I have to Serialize the class Neo4jOperations, but I'm
>>>>>> able to
>>>>>> figure out how. Or is there any other way to achieve this?
>>>>>>
>>>>>> Also, how can I store output of Spark Streaming in a database ?
>>>>>>
>>>>>>
>>>>>>
>>>>>>
>>>>>> --
>>>>>> View this message in context:
>>>>>> http://apache-spark-user-list.1001560.n3.nabble.com/Using-Neo4j-with-Apache-Spark-tp22012.html
>>>>>> Sent from the Apache Spark User List mailing list archive at
>>>>>> Nabble.com.
>>>>>>
>>>>>> ---------------------------------------------------------------------
>>>>>> To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
>>>>>> For additional commands, e-mail: user-help@spark.apache.org
>>>>>>
>>>>>>
>>>>>
>>>>
>>>>
>>>> --
>>>> Gautam
>>>>
>>>
>>>
>>
>>
>> --
>> Gautam
>>
>
>


-- 
Gautam

Mime
View raw message