spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: about an exception when receiving data from kafka topic using Direct mode of Spark Streaming
Date Thu, 26 May 2016 15:00:59 GMT
Honestly given this thread, and the stack overflow thread, I'd say you need
to back up, start very simply, and learn spark.  If for some reason the
official docs aren't doing it for you, learning spark from oreilly is a
good book.

Given your specific question, why not just

messages.foreachRDD { rdd =>
rdd.foreachPartition { iterator =>
  someWorkOnAnIterator(iterator)


All of the other extraneous stuff you're doing doesn't make any sense to
me.



On Thu, May 26, 2016 at 2:48 AM, Alonso Isidoro Roman <alonsoir@gmail.com>
wrote:

> Hi Matthias and Cody,
>
> You can see in the code that StreamingContext.start() is called after the
> messages.foreachRDD output action. Another problem @Cody is how can i avoid
> the inner .foreachRDD(_.foreachPartition(it =>
> recommender.predictWithALS(it.toSeq))) in order to invoke asynchronously
> recommender.predictWithALS which runs a machine learning ALS implementation
> with a message from the kafka topic?.
>
> In the actual code i am not using for now any code to save data within the
> mongo instance, for now, it is more important to be focus in how to receive
> the message from the kafka topic and feeding asynchronously the ALS
> implementation. Probably the Recommender object will need the code for
>  interact with the mongo instance.
>
> The idea of the process is to receive data from the kafka topic, calculate
> its recommendations based on the incoming message and save the results
> within a mongo instance. Is it possible?  Am i missing something important?
>
> def main(args: Array[String]) {
>     // Process program arguments and set properties
>
>     if (args.length < 2) {
>       System.err.println("Usage: " + this.getClass.getSimpleName + "
> <brokers> <topics>")
>       System.exit(1)
>     }
>
>     val Array(brokers, topics) = args
>
>     println("Initializing Streaming Spark Context and kafka connector...")
>     // Create context with 2 second batch interval
>     val sparkConf = new SparkConf().setAppName("AmazonKafkaConnector")
>                                    .setMaster("local[4]")
>
> .set("spark.driver.allowMultipleContexts", "true")
>
>     val sc = new SparkContext(sparkConf)
>
> sc.addJar("target/scala-2.10/blog-spark-recommendation_2.10-1.0-SNAPSHOT.jar")
>     val ssc = new StreamingContext(sparkConf, Seconds(2))
>     //this checkpointdir should be in a conf file, for now it is hardcoded!
>     val streamingCheckpointDir =
> "/Users/aironman/my-recommendation-spark-engine/checkpoint"
>     ssc.checkpoint(streamingCheckpointDir)
>
>     // Create direct kafka stream with brokers and topics
>     val topicsSet = topics.split(",").toSet
>     val kafkaParams = Map[String, String]("metadata.broker.list" ->
> brokers)
>     val messages = KafkaUtils.createDirectStream[String, String,
> StringDecoder, StringDecoder](ssc, kafkaParams, topicsSet)
>     println("Initialized Streaming Spark Context and kafka connector...")
>
>     //create recomendation module
>     println("Creating rating recommender module...")
>     val ratingFile= "ratings.csv"
>     val recommender = new Recommender(sc,ratingFile)
>     println("Initialized rating recommender module...")
>
>     //i have to convert messages which is a InputDStream into a
> Seq[AmazonRating]...
>     try{
>     messages.foreachRDD( rdd =>{
>       val count = rdd.count()
>       if (count > 0){
>         //someMessages should be AmazonRating...
>         val someMessages = rdd.take(count.toInt)
>         println("<------>")
>         println("someMessages is " + someMessages)
>         someMessages.foreach(println)
>         println("<------>")
>         println("<---POSSIBLE SOLUTION--->")
>
>         messages
>         .map { case (_, jsonRating) =>
>           val jsValue = Json.parse(jsonRating)
>           AmazonRating.amazonRatingFormat.reads(jsValue) match {
>             case JsSuccess(rating, _) => rating
>             case JsError(_) => AmazonRating.empty
>           }
>              }
>         .filter(_ != AmazonRating.empty)
>         //probably is not a good idea to do this...
>         .foreachRDD(_.foreachPartition(it =>
> recommender.predictWithALS(it.toSeq)))
>
>         println("<---POSSIBLE SOLUTION--->")
>
>       }
>       }
>     )
>     }catch{
>       case e: IllegalArgumentException => {println("illegal arg.
> exception")};
>       case e: IllegalStateException    => {println("illegal state
> exception")};
>       case e: ClassCastException       => {println("ClassCastException")};
>       case e: Exception                => {println(" Generic Exception")};
>     }finally{
>
>       println("Finished taking data from kafka topic...")
>     }
>
>     //println("jsonParsed is " + jsonParsed)
>     //The idea is to save results from Recommender.predict within mongodb,
> so i will have to deal with this issue
>     //after resolving the issue of
> .foreachRDD(_.foreachPartition(recommender.predict(_.toSeq)))
>
>     *ssc.start()*
>     ssc.awaitTermination()
>
>     println("Finished!")
>   }
> }
>
> Thank you for reading until here, please, i need your assistance.
>
> Regards
>
>
> Alonso Isidoro Roman
> [image: https://]about.me/alonso.isidoro.roman
>
> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>
> 2016-05-25 17:33 GMT+02:00 Alonso Isidoro Roman <alonsoir@gmail.com>:
>
>> Hi Matthias and Cody, thanks for the answer. This is the code that is
>> raising the runtime exception:
>>
>> try{
>>     messages.foreachRDD( rdd =>{
>>       val count = rdd.count()
>>       if (count > 0){
>>         //someMessages should be AmazonRating...
>>         val someMessages = rdd.take(count.toInt)
>>         println("<------>")
>>         println("someMessages is " + someMessages)
>>         someMessages.foreach(println)
>>         println("<------>")
>>         println("<---POSSIBLE SOLUTION--->")
>>         messages
>>         .map { case (_, jsonRating) =>
>>           val jsValue = Json.parse(jsonRating)
>>           AmazonRating.amazonRatingFormat.reads(jsValue) match {
>>             case JsSuccess(rating, _) => rating
>>             case JsError(_) => AmazonRating.empty
>>           }
>>              }
>>         .filter(_ != AmazonRating.empty)
>>         *//this line raises the runtime error, but if i comment it
>> another different runtime exception happens!*
>>         .foreachRDD(_.foreachPartition(it =>
>> recommender.predictWithALS(it.toSeq)))
>>         println("<---POSSIBLE SOLUTION--->")
>>       }
>>       }
>>     )
>>     }catch{
>>       case e: IllegalArgumentException => {println("illegal arg.
>> exception")};
>>       case e: IllegalStateException    => {println("illegal state
>> exception")};
>>       case e: ClassCastException       => {println("ClassCastException")};
>>       case e: Exception                => {println(" Generic Exception")};
>>     }finally{
>>
>>       println("Finished taking data from kafka topic...")
>>     }
>>
>> If i comment the line with the second foreachRDD, the next runtime
>> exception happens within a fresh start, i mean, the kafka producer push
>> data within the topic:
>>
>> 16/05/25 17:26:12 ERROR JobScheduler: Error running job streaming job
>> 1464189972000 ms.0
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
>> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
>> 0.0 (TID 0, localhost): java.lang.ClassCastException:
>> org.apache.spark.util.SerializableConfiguration cannot be cast to [B
>>
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>> If i push another json data within the topic, the next exception happens:
>>
>> 16/05/25 17:27:16 INFO DAGScheduler: Job 1 finished: runJob at
>> KafkaRDD.scala:98, took 0,039689 s
>>
>> <------>
>>
>> someMessages is [Lscala.Tuple2;@712ca120
>>
>> (null,{"userId":"someUserId","productId":"0981531679","rating":9.0})
>>
>> <------>
>>
>> <---POSSIBLE SOLUTION--->
>>
>> 16/05/25 17:27:16 INFO JobScheduler: Finished job streaming job
>> 1464190036000 ms.0 from job set of time 1464190036000 ms
>>
>> 16/05/25 17:27:16 INFO JobScheduler: Total delay: 0,063 s for time
>> 1464190036000 ms (execution: 0,055 s)
>>
>> 16/05/25 17:27:16 INFO KafkaRDD: Removing RDD 43 from persistence list
>>
>> 16/05/25 17:27:16 ERROR JobScheduler: Error running job streaming job
>> 1464190036000 ms.0
>>
>> java.lang.IllegalStateException: Adding new inputs, transformations, and
>> output operations after starting a context is not supported
>>
>> at
>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:222)
>>
>> at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
>>
>> at
>> org.apache.spark.streaming.dstream.MappedDStream.<init>(MappedDStream.scala:25)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
>>
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>
>> at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)
>>
>> at
>> org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260)
>>
>> at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:557)
>>
>> at
>> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:125)
>>
>> at
>> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:114)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>>
>> at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
>>
>> at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>>
>> at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
>>
>> at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
>>
>> at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>>
>> at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>>
>> at scala.util.Try$.apply(Try.scala:161)
>>
>> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>>
>> at
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
>>
>> at
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>>
>> at
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>>
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>
>> at
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> If i decomment the second foreachRDD, this exception happens within a
>> fresh start of the spark streaming process after the kafka producer sends
>> data to the topic:
>>
>> org.apache.spark.SparkException: Job aborted due to stage failure: Task 0
>> in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage
>> 0.0 (TID 0, localhost): java.lang.ClassCastException:
>> org.apache.spark.util.SerializableConfiguration cannot be cast to [B
>>
>> at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62)
>>
>> at org.apache.spark.scheduler.Task.run(Task.scala:89)
>>
>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>> at java.lang.Thread.run(Thread.java:745)
>>
>> With a second call of the kafka producer, rises this runtime exception:
>>
>> <------>
>>
>> someMessages is [Lscala.Tuple2;@581b83b
>>
>> (null,{"userId":"someUserId","productId":"0981531679","rating":8.0})
>>
>> <------>
>>
>> <---POSSIBLE SOLUTION--->
>>
>> 16/05/25 17:31:30 INFO JobScheduler: Finished job streaming job
>> 1464190290000 ms.0 from job set of time 1464190290000 ms
>>
>> 16/05/25 17:31:30 INFO JobScheduler: Total delay: 0,066 s for time
>> 1464190290000 ms (execution: 0,059 s)
>>
>> 16/05/25 17:31:30 INFO KafkaRDD: Removing RDD 37 from persistence list
>>
>> 16/05/25 17:31:30 ERROR JobScheduler: Error running job streaming job
>> 1464190290000 ms.0
>>
>> java.lang.IllegalStateException: Adding new inputs, transformations, and
>> output operations after starting a context is not supported
>>
>> at
>> org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:222)
>>
>> at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
>>
>> at
>> org.apache.spark.streaming.dstream.MappedDStream.<init>(MappedDStream.scala:25)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
>>
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>
>> at
>> org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>
>> at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)
>>
>> at
>> org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260)
>>
>> at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:557)
>>
>> at
>> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:125)
>>
>> at
>> example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:114)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>>
>> at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
>>
>> at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>>
>> at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>>
>> at
>> org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
>>
>> at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
>>
>> at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>>
>> at
>> org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>>
>> at scala.util.Try$.apply(Try.scala:161)
>>
>> at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>>
>> at
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
>>
>> at
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>>
>> at
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>>
>> at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>
>> at
>> org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>>
>> In your opinion, what changes do i have to do in order to get this code
>> up and running correctly?
>>
>> The idea is to run every rating message that i receive from kafka topic
>> in order to run recommender.predictWithALS method and save results within a
>> mongo instance. I was thinking that this kind of task should be
>> asynchronous, wasn't he? if i am right, how should i change the method to
>> do such that way?
>>
>> Recommender.predictWithALS method:
>>
>> def predictWithALS(ratings: Seq[AmazonRating]) = {
>>     // train model
>>     val myRatings = ratings.map(toSparkRating)
>>     val myRatingRDD = sc.parallelize(myRatings)
>>
>>     val startAls = DateTime.now
>>     val model = ALS.train((sparkRatings ++
>> myRatingRDD).repartition(NumPartitions), 10, 20, 0.01)
>>
>>     val myProducts = myRatings.map(_.product).toSet
>>     val candidates = sc.parallelize((0 until
>> productDict.size).filterNot(myProducts.contains))
>>
>>     // get ratings of all products not in my history ordered by rating
>> (higher first) and only keep the first NumRecommendations
>>     val myUserId = userDict.getIndex(MyUsername)
>>     val recommendations = model.predict(candidates.map((myUserId,
>> _))).collect
>>     val endAls = DateTime.now
>>     val result =
>> recommendations.sortBy(-_.rating).take(NumRecommendations).map(toAmazonRating)
>>     val alsTime = Seconds.secondsBetween(startAls, endAls).getSeconds
>>
>>     println(s"ALS Time: $alsTime seconds")
>>     result
>>   }
>>
>> thank you very much
>>
>> Alonso Isidoro Roman
>> [image: https://]about.me/alonso.isidoro.roman
>>
>> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>>
>> 2016-05-25 16:54 GMT+02:00 Matthias Niehoff <
>> matthias.niehoff@codecentric.de>:
>>
>>> Hi,
>>>
>>> you register some output actions (in this case foreachRDD) after
>>> starting the streaming context. StreamingContext.start() has to be called
>>> after all! output actions.
>>>
>>> 2016-05-25 15:59 GMT+02:00 Alonso <alonsoir@gmail.com>:
>>>
>>>> Hi, i am receiving this exception when direct spark streaming process
>>>> tries to pull data from kafka topic:
>>>>
>>>> 16/05/25 11:30:30 INFO CheckpointWriter: Checkpoint for time
>>>> 1464168630000 ms saved to file
>>>> 'file:/Users/aironman/my-recommendation-spark-engine/checkpoint/checkpoint-1464168630000',
>>>> took 5928 bytes and 8 ms
>>>>
>>>> 16/05/25 11:30:30 INFO Executor: Finished task 0.0 in stage 2.0 (TID 2).
1041 bytes result sent to driver
>>>> 16/05/25 11:30:30 INFO TaskSetManager: Finished task 0.0 in stage 2.0 (TID
2) in 4 ms on localhost (1/1)
>>>> 16/05/25 11:30:30 INFO TaskSchedulerImpl: Removed TaskSet 2.0, whose tasks
have all completed, from pool
>>>> 16/05/25 11:30:30 INFO DAGScheduler: ResultStage 2 (runJob at KafkaRDD.scala:98)
finished in 0,004 s
>>>> 16/05/25 11:30:30 INFO DAGScheduler: Job 2 finished: runJob at KafkaRDD.scala:98,
took 0,008740 s
>>>> <------>
>>>> someMessages is [Lscala.Tuple2;@2641d687
>>>> (null,{"userId":"someUserId","productId":"0981531679","rating":6.0})
>>>> <------>
>>>> <---POSSIBLE SOLUTION--->
>>>> 16/05/25 11:30:30 INFO JobScheduler: Finished job streaming job 1464168630000
ms.0 from job set of time 1464168630000 ms
>>>> 16/05/25 11:30:30 INFO KafkaRDD: Removing RDD 105 from persistence list
>>>> 16/05/25 11:30:30 INFO JobScheduler: Total delay: 0,020 s for time 1464168630000
ms (execution: 0,012 s)
>>>> 16/05/25 11:30:30 ERROR JobScheduler: Error running job streaming job 1464168630000
ms.0*java.lang.IllegalStateException: Adding new inputs, transformations, and output operations
after starting a context is not supported
>>>> 	at* org.apache.spark.streaming.dstream.DStream.validateAtInit(DStream.scala:222)
>>>> 	at org.apache.spark.streaming.dstream.DStream.<init>(DStream.scala:64)
>>>> 	at org.apache.spark.streaming.dstream.MappedDStream.<init>(MappedDStream.scala:25)
>>>> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
>>>> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$map$1.apply(DStream.scala:558)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
>>>> 	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
>>>> 	at org.apache.spark.SparkContext.withScope(SparkContext.scala:714)
>>>> 	at org.apache.spark.streaming.StreamingContext.withScope(StreamingContext.scala:260)
>>>> 	at org.apache.spark.streaming.dstream.DStream.map(DStream.scala:557)
>>>> 	at example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:125)
>>>> 	at example.spark.AmazonKafkaConnector$$anonfun$main$1.apply(AmazonKafkaConnectorWithMongo.scala:114)
>>>> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>>>> 	at org.apache.spark.streaming.dstream.DStream$$anonfun$foreachRDD$1$$anonfun$apply$mcV$sp$3.apply(DStream.scala:661)
>>>> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
>>>> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>>>> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
>>>> 	at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
>>>> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
>>>> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>>>> 	at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
>>>> 	at scala.util.Try$.apply(Try.scala:161)
>>>> 	at org.apache.spark.streaming.scheduler.Job.run(Job.scala:39)
>>>> 	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply$mcV$sp(JobScheduler.scala:224)
>>>> 	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>>>> 	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler$$anonfun$run$1.apply(JobScheduler.scala:224)
>>>> 	at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
>>>> 	at org.apache.spark.streaming.scheduler.JobScheduler$JobHandler.run(JobScheduler.scala:223)
>>>> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>>>> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>>>> 	at java.lang.Thread.run(Thread.java:745)
>>>> 16/05/25 11:30:30 INFO BlockManager: Removing RDD 105
>>>>
>>>>
>>>> This is the code that rises the exception in the spark streaming
>>>> process:
>>>>
>>>> try{
>>>>     messages.foreachRDD( rdd =>{
>>>>       val count = rdd.count()
>>>>       if (count > 0){
>>>>         //someMessages should be AmazonRating...
>>>>         val someMessages = rdd.take(count.toInt)
>>>>         println("<------>")
>>>>         println("someMessages is " + someMessages)
>>>>         someMessages.foreach(println)
>>>>         println("<------>")
>>>>         println("<---POSSIBLE SOLUTION--->")
>>>>         messages
>>>>         .map { case (_, jsonRating) =>
>>>>           val jsValue = Json.parse(jsonRating)
>>>>           AmazonRating.amazonRatingFormat.reads(jsValue) match {
>>>>             case JsSuccess(rating, _) => rating
>>>>             case JsError(_) => AmazonRating.empty
>>>>           }
>>>>              }
>>>>         .filter(_ != AmazonRating.empty)
>>>>         *//I think that this line provokes the runtime exception...*
>>>> *        .foreachRDD(_.foreachPartition(it =>
>>>> recommender.predictWithALS(it.toSeq)))*
>>>>
>>>>         println("<---POSSIBLE SOLUTION--->")
>>>>
>>>>       }
>>>>       }
>>>>     )
>>>>     }catch{
>>>>       case e: IllegalArgumentException => {println("illegal arg.
>>>> exception")};
>>>>       case e: IllegalStateException    => {println("illegal state
>>>> exception")};
>>>>       case e: ClassCastException       =>
>>>> {println("ClassCastException")};
>>>>       case e: Exception                => {println(" Generic
>>>> Exception")};
>>>>     }finally{
>>>>
>>>>       println("Finished taking data from kafka topic...")
>>>>     }
>>>>
>>>> Recommender object:
>>>>
>>>> *def predictWithALS(ratings: Seq[AmazonRating])* = {
>>>>     // train model
>>>>     val myRatings = ratings.map(toSparkRating)
>>>>     val myRatingRDD = sc.parallelize(myRatings)
>>>>
>>>>     val startAls = DateTime.now
>>>>     val model = ALS.train((sparkRatings ++
>>>> myRatingRDD).repartition(NumPartitions), 10, 20, 0.01)
>>>>
>>>>     val myProducts = myRatings.map(_.product).toSet
>>>>     val candidates = sc.parallelize((0 until
>>>> productDict.size).filterNot(myProducts.contains))
>>>>
>>>>     // get ratings of all products not in my history ordered by rating
>>>> (higher first) and only keep the first NumRecommendations
>>>>     val myUserId = userDict.getIndex(MyUsername)
>>>>     val recommendations = model.predict(candidates.map((myUserId,
>>>> _))).collect
>>>>     val endAls = DateTime.now
>>>>     val result =
>>>> recommendations.sortBy(-_.rating).take(NumRecommendations).map(toAmazonRating)
>>>>     val alsTime = Seconds.secondsBetween(startAls, endAls).getSeconds
>>>>
>>>>     println(s"ALS Time: $alsTime seconds")
>>>>     result
>>>>   }
>>>> }
>>>>
>>>> And this is the kafka producer that push the json data within the topic:
>>>>
>>>> object AmazonProducerExample {
>>>>   def main(args: Array[String]): Unit = {
>>>>
>>>>     val productId = args(0).toString
>>>>     val userId = args(1).toString
>>>>     val rating = args(2).toDouble
>>>>     val topicName = "amazonRatingsTopic"
>>>>
>>>>     val producer = Producer[String](topicName)
>>>>
>>>>     //0981531679 is Scala Puzzlers...
>>>>     //AmazonProductAndRating
>>>>     AmazonPageParser.parse(productId,userId,rating).onSuccess { case
>>>> amazonRating =>
>>>>       //Is this the correct way? the best performance? possibly not,
>>>> what about using avro or parquet?
>>>>       producer.send(Json.toJson(amazonRating).toString)
>>>>       //producer.send(amazonRating)
>>>>       println("amazon product with rating sent to kafka cluster..." +
>>>> amazonRating.toString)
>>>>       System.exit(0)
>>>>     }
>>>>
>>>>   }
>>>> }
>>>>
>>>>
>>>> I have written a stack overflow post
>>>> <http://stackoverflow.com/questions/37303202/about-an-error-accessing-a-field-inside-tuple2>,
>>>> with more details, please help, i am stuck with this issue and i don't know
>>>> how to continue.
>>>>
>>>> Regards
>>>>
>>>> Alonso Isidoro Roman
>>>> [image: https://]about.me/alonso.isidoro.roman
>>>>
>>>> <https://about.me/alonso.isidoro.roman?promo=email_sig&utm_source=email_sig&utm_medium=email_sig&utm_campaign=external_links>
>>>>
>>>> ------------------------------
>>>> View this message in context: about an exception when receiving data
>>>> from kafka topic using Direct mode of Spark Streaming
>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/about-an-exception-when-receiving-data-from-kafka-topic-using-Direct-mode-of-Spark-Streaming-tp27022.html>
>>>> Sent from the Apache Spark User List mailing list archive
>>>> <http://apache-spark-user-list.1001560.n3.nabble.com/> at Nabble.com.
>>>>
>>>
>>>
>>>
>>> --
>>> Matthias Niehoff | IT-Consultant | Agile Software Factory  | Consulting
>>> codecentric AG | Zeppelinstr 2 | 76185 Karlsruhe | Deutschland
>>> tel: +49 (0) 721.9595-681 | fax: +49 (0) 721.9595-666 | mobil: +49 (0)
>>> 172.1702676
>>> www.codecentric.de | blog.codecentric.de | www.meettheexperts.de |
>>> www.more4fi.de
>>>
>>> Sitz der Gesellschaft: Solingen | HRB 25917| Amtsgericht Wuppertal
>>> Vorstand: Michael Hochgürtel . Mirko Novakovic . Rainer Vehns
>>> Aufsichtsrat: Patric Fedlmeier (Vorsitzender) . Klaus Jäger . Jürgen
>>> Schütz
>>>
>>> Diese E-Mail einschließlich evtl. beigefügter Dateien enthält
>>> vertrauliche und/oder rechtlich geschützte Informationen. Wenn Sie nicht
>>> der richtige Adressat sind oder diese E-Mail irrtümlich erhalten haben,
>>> informieren Sie bitte sofort den Absender und löschen Sie diese E-Mail und
>>> evtl. beigefügter Dateien umgehend. Das unerlaubte Kopieren, Nutzen oder
>>> Öffnen evtl. beigefügter Dateien sowie die unbefugte Weitergabe dieser
>>> E-Mail ist nicht gestattet
>>>
>>
>>
>

Mime
View raw message