spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Vadim Chekan <kot.bege...@gmail.com>
Subject Re: Window slide duration
Date Tue, 03 Jun 2014 03:39:45 GMT
Thanks for looking into this Tathagata.

Are you looking for traces of ReceiveInputDStream.clearMetadata call?
Here is the log: http://wepaste.com/vchekan

Vadim.


On Mon, Jun 2, 2014 at 5:58 PM, Tathagata Das <tathagata.das1565@gmail.com>
wrote:

> Can you give all the logs? Would like to see what is clearing the key " 1401754908000
> ms"
>
> TD
>
>
> On Mon, Jun 2, 2014 at 5:38 PM, Vadim Chekan <kot.begemot@gmail.com>
> wrote:
>
>> Ok, it seems like "Time ... is invalid" is part of normal workflow, when
>> window DStream will ignore RDDs at moments in time when they do not match
>> to the window sliding interval. But why am I getting exception is still
>> unclear. Here is the full stack:
>>
>> 14/06/02 17:21:48 INFO WindowedDStream: Time 1401754908000 ms is invalid
>> as zeroTime is 1401754907000 ms and slideDuration is 4000 ms and difference
>> is 1000 ms
>> 14/06/02 17:21:48 ERROR OneForOneStrategy: key not found: 1401754908000 ms
>> java.util.NoSuchElementException: key not found: 1401754908000 ms
>>     at scala.collection.MapLike$class.default(MapLike.scala:228)
>>     at scala.collection.AbstractMap.default(Map.scala:58)
>>     at scala.collection.mutable.HashMap.apply(HashMap.scala:64)
>>     at
>> org.apache.spark.streaming.dstream.ReceiverInputDStream.getReceivedBlockInfo(ReceiverInputDStream.scala:77)
>>     at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:225)
>>     at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:223)
>>     at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>     at
>> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>>     at
>> scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
>>     at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
>>     at
>> scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>>     at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:108)
>>     at
>> org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:223)
>>     at org.apache.spark.streaming.scheduler.JobGenerator.org
>> $apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:165)
>>     at
>> org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$start$1$$anon$1$$anonfun$receive$1.applyOrElse(JobGenerator.scala:76)
>>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>     at
>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>     at
>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>     at
>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>     at
>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>     at
>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>>
>> On Mon, Jun 2, 2014 at 5:22 PM, Vadim Chekan <kot.begemot@gmail.com>
>> wrote:
>>
>>> Hi all,
>>>
>>> I am getting an error:
>>> ================
>>> 14/06/02 17:06:32 INFO WindowedDStream: Time 1401753992000 ms is invalid
>>> as zeroTime is 1401753986000 ms and slideDuration is 4000 ms and difference
>>> is 6000 ms
>>> 14/06/02 17:06:32 ERROR OneForOneStrategy: key not found: 1401753992000
>>> ms
>>> ================
>>>
>>> My relevant code is:
>>> ===================
>>> ssc =  new StreamingContext(conf, Seconds(1))
>>> val messageEvents = events.
>>>       flatMap(e => evaluatorCached.value.find(e)).
>>>       window(Seconds(8), Seconds(4))
>>> messageEvents.print()
>>> ===================
>>>
>>> Seems all right to me, window slide duration (4) is streaming context
>>> batch duration (1) *2. So, what's the problem?
>>>
>>> Spark-v1.0.0
>>>
>>> --
>>> From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
>>> explicitly specified
>>>
>>
>>
>>
>> --
>> From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
>> explicitly specified
>>
>
>


-- 
>From RFC 2631: In ASN.1, EXPLICIT tagging is implicit unless IMPLICIT is
explicitly specified

Mime
View raw message