spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bob Tiernay <btier...@hotmail.com>
Subject RE: Closing over a var with changing value in Streaming application
Date Wed, 21 Jan 2015 12:13:58 GMT
Maybe I'm misunderstanding something here, but couldn't this be done with broadcast variables?
I there is the following caveat from the docs: 
"In addition, the object v should not be modified after it is broadcast in order to ensure
that all nodes get the same value of the broadcast variable (e.g. if the variable is shipped
to a new node later)"
But isn't this exactly the semantics you want (i.e. not the same value)?


Date: Wed, 21 Jan 2015 21:02:31 +0900
Subject: Re: Closing over a var with changing value in Streaming application
From: tgp@preferred.jp
To: akhil@sigmoidanalytics.com
CC: user@spark.apache.org

Hi again,

On Wed, Jan 21, 2015 at 4:53 PM, Tobias Pfeiffer <tgp@preferred.jp> wrote:On Wed, Jan
21, 2015 at 4:46 PM, Akhil Das <akhil@sigmoidanalytics.com> wrote:
How about using accumulators?
As far as I understand, they solve the part of the problem that I am not worried about, namely
increasing the counter. I was more worried about getting that counter/accumulator value back
to the executors.
Uh, I may have been a bit quick here...
So I had this one working:
  var totalNumberOfItems = 0L
  // update the keys of the stream data  val globallyIndexedItems = inputStream.map(keyVal
=>      (keyVal._1 + totalNumberOfItems, keyVal._2))  // increase the number of total seen
items  inputStream.foreachRDD(rdd => {    totalNumberOfItems += rdd.count  })
and used the dstream.foreachRDD(rdd => someVar += rdd.count) pattern at a number of places.
Then, however, I added a  dstream.transformWith(otherDStream, func)call, which somehow changed
the order in which the DStreams are computed. In particular, suddenly some of my DStream values
were computed before the foreachRDD calls that set the proper variables were executed, which
lead to completely unpredictable behavior. So especially when looking at the existence of
spark.streaming.concurrentJobs, I suddenly feel like none of DStream computations done on
executors should depend on the ordering of output operations done on the driver. (And I am
afraid this includes accumulator updates.)
Thinking about this, I feel I don't even know how I can realize a globally (over the lifetime
of my stream) increasing ID in my DStream. Do I need something like  val counts: DStream[(Int,
Long)] = stream.count().map((1, _)).updateStateByKey(...)with a pseudo-key just to keep a
tiny bit of state from one interval to the next?
Really thankful for any insights,Tobias
 		 	   		  
Mime
View raw message