spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Tobias Pfeiffer <...@preferred.jp>
Subject Re: Closing over a var with changing value in Streaming application
Date Wed, 21 Jan 2015 12:02:31 GMT
Hi again,

On Wed, Jan 21, 2015 at 4:53 PM, Tobias Pfeiffer <tgp@preferred.jp> wrote:
>
> On Wed, Jan 21, 2015 at 4:46 PM, Akhil Das <akhil@sigmoidanalytics.com>
> wrote:
>
>> How about using accumulators
>> <http://spark.apache.org/docs/1.2.0/programming-guide.html#accumulators>?
>>
>
> As far as I understand, they solve the part of the problem that I am not
> worried about, namely increasing the counter. I was more worried about
> getting that counter/accumulator value back to the executors.
>

Uh, I may have been a bit quick here...

So I had this one working:

  var totalNumberOfItems = 0L
  // update the keys of the stream data
  val globallyIndexedItems = inputStream.map(keyVal =>
      (keyVal._1 + totalNumberOfItems, keyVal._2))
  // increase the number of total seen items
  inputStream.foreachRDD(rdd => {
    totalNumberOfItems += rdd.count
  })

and used the dstream.foreachRDD(rdd => someVar += rdd.count) pattern at a
number of places.

Then, however, I added a
  dstream.transformWith(otherDStream, func)
call, which somehow changed the order in which the DStreams are computed.
In particular, suddenly some of my DStream values were computed before the
foreachRDD calls that set the proper variables were executed, which lead to
completely unpredictable behavior. So especially when looking at the
existence of spark.streaming.concurrentJobs, I suddenly feel like none of
DStream computations done on executors should depend on the ordering of
output operations done on the driver. (And I am afraid this includes
accumulator updates.)

Thinking about this, I feel I don't even know how I can realize a globally
(over the lifetime of my stream) increasing ID in my DStream. Do I need
something like
val counts: DStream[(Int, Long)] = stream.count().map((1,
_)).updateStateByKey(...)
with a pseudo-key just to keep a tiny bit of state from one interval to the
next?

Really thankful for any insights,
Tobias

Mime
View raw message