spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vadim Chekan <kot.bege...@gmail.com>
Subject Re: Window slide duration
Date Tue, 03 Jun 2014 23:35:54 GMT
Ok, it's a bug in spark. I've submitted a patch:
https://issues.apache.org/jira/browse/SPARK-2009


On Mon, Jun 2, 2014 at 8:39 PM, Vadim Chekan <kot.begemot@gmail.com> wrote:

> Thanks for looking into this Tathagata.
>
> Are you looking for traces of ReceiveInputDStream.clearMetadata call?
> Here is the log: http://wepaste.com/vchekan
>
> Vadim.
>
>
> On Mon, Jun 2, 2014 at 5:58 PM, Tathagata Das <tathagata.das1565@gmail.com
> > wrote:
>
>> Can you give all the logs? Would like to see what is clearing the key " 1401754908000
>> ms"
>>
>> TD
>>
>>
>> On Mon, Jun 2, 2014 at 5:38 PM, Vadim Chekan <kot.begemot@gmail.com>
>> wrote:
>>
>>> Ok, it seems like "Time ... is invalid" is part of normal workflow, when
>>> window DStream will ignore RDDs at moments in time when they do not match
>>> to the window sliding interval. But why am I getting exception is still
>>> unclear. Here is the full stack:
>>>
>>> 14/06/02 17:21:48 INFO WindowedDStream: Time 1401754908000 ms is invalid
>>> as zeroTime is 1401754907000 ms and slideDuration is 4000 ms and difference
>>> is 1000 ms
>>> 14/06/02 17:21:48 ERROR OneForOneStrategy: key not found: 1401754908000
>>> ms
>>> java.util.NoSuchElementException: key not found: 1401754908000 ms
>>>     at scala.collection.MapLike$class.default(MapLike.scala:228)
>>>     at scala.collection.AbstractMap.default(Map.scala:58)
>>>     at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>>>     at
>>> org.apache.spark.streaming.dstream.ReceiverInputDStream.getReceivedBlockInfo(ReceiverInputDStream.scala:77)
>>>     at
>>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:225)
>>>     at
>>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:223)
>>>     at
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>     at
>>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>>     at
>>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>>     at
>>> scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>>     at
>>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>>     at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>>     at
>>> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:223)
>>>     at org.apache.spark.streaming.scheduler.JobGenerator.org
>>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
>>>     at
>>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
>>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>>     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>>     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>>     at
>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>     at
>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>     at
>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>     at
>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>     at
>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>
>>>
>>> On Mon, Jun 2, 2014 at 5:22 PM, Vadim Chekan <kot.begemot@gmail.com>
>>> wrote:
>>>
>>>> Hi all,
>>>>
>>>> I am getting an error:
>>>> ================
>>>> 14/06/02 17:06:32 INFO WindowedDStream: Time 1401753992000 ms is
>>>> invalid as zeroTime is 1401753986000 ms and slideDuration is 4000 ms and
>>>> difference is 6000 ms
>>>> 14/06/02 17:06:32 ERROR OneForOneStrategy: key not found: 1401753992000
>>>> ms
>>>> ================
>>>>
>>>> My relevant code is:
>>>> ===================
>>>> ssc =  new StreamingContext(conf, Seconds(1))
>>>> val messageEvents = events.
>>>>       flatMap(e => evaluatorCached.value.find(e)).
>>>>       window(Seconds(8), Seconds(4))
>>>> messageEvents.print()
>>>> ===================
>>>>
>>>> Seems all right to me, window slide duration (4) is streaming context
>>>> batch duration (1) *2. So, what's the problem?
>>>>
>>>> Spark-v1.0.0
>>>>
>>>> --
>>>> From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT
>>>> is explicitly specified
>>>>
>>>
>>>
>>>
>>> --
>>> From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
>>> explicitly specified
>>>
>>
>>
>
>
> --
> From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
> explicitly specified
>



-- 
>From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
explicitly specified

Mime
View raw message