spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Reynold Xin <r...@databricks.com>
Subject Re: "Dynamic variables" in Spark
Date Tue, 22 Jul 2014 04:39:04 GMT
Thanks for the thoughtful email, Neil and Christopher.

If I understand this correctly, it seems like the dynamic variable is just
a variant of the accumulator (a static one since it is a global object).
Accumulators are already implemented using thread-local variables under the
hood. Am I misunderstanding something?



On Mon, Jul 21, 2014 at 5:54 PM, Christopher Nguyen <ctn@adatao.com> wrote:

> Hi Neil, first off, I'm generally a sympathetic advocate for making changes
> to Spark internals to make it easier/better/faster/more awesome.
>
> In this case, I'm (a) not clear about what you're trying to accomplish, and
> (b) a bit worried about the proposed solution.
>
> On (a): it is stated that you want to pass some Accumulators around. Yet
> the proposed solution is for some "shared" variable that may be set and
> "mapped out" and possibly "reduced back", but without any accompanying
> accumulation semantics. And yet it doesn't seem like you only want just the
> broadcast property. Can you clarify the problem statement with some
> before/after client code examples?
>
> On (b): you're right that adding variables to SparkContext should be done
> with caution, as it may have unintended consequences beyond just serdes
> payload size. For example, there is a stated intention of supporting
> multiple SparkContexts in the future, and this proposed solution can make
> it a bigger challenge to do so. Indeed, we had a gut-wrenching call to make
> a while back on a subject related to this (see
> https://github.com/mesos/spark/pull/779). Furthermore, even in a single
> SparkContext application, there may be multiple "clients" (of that
> application) whose intent to use the proposed "SparkDynamic" would not
> necessarily be coordinated.
>
> So, considering a ratio of a/b (benefit/cost), it's not clear to me that
> the benefits are significant enough to warrant the costs. Do I
> misunderstand that the benefit is to save one explicit parameter (the
> "context") in the signature/closure code?
>
> --
> Christopher T. Nguyen
> Co-founder & CEO, Adatao <http://adatao.com>
> linkedin.com/in/ctnguyen
>
>
>
> On Mon, Jul 21, 2014 at 2:10 PM, Neil Ferguson <nferguson@gmail.com>
> wrote:
>
> > Hi all
> >
> > I have been adding some metrics to the ADAM project
> > https://github.com/bigdatagenomics/adam, which runs on Spark, and have a
> > proposal for an enhancement to Spark that would make this work cleaner
> and
> > easier.
> >
> > I need to pass some Accumulators around, which will aggregate metrics
> > (timing stats and other metrics) across the cluster. However, it is
> > cumbersome to have to explicitly pass some "context" containing these
> > accumulators around everywhere that might need them. I can use Scala
> > implicits, which help slightly, but I'd still need to modify every method
> > in the call stack to take an implicit variable.
> >
> > So, I'd like to propose that we add the ability to have "dynamic
> variables"
> > (basically thread-local variables) to Spark. This would avoid having to
> > pass the Accumulators around explicitly.
> >
> > My proposed approach is to add a method to the SparkContext class as
> > follows:
> >
> > /**
> >  * Sets the value of a "dynamic variable". This value is made available
> to
> > jobs
> >  * without having to be passed around explicitly. During execution of a
> > Spark job
> >  * this value can be obtained from the [[SparkDynamic]] object.
> >  */
> > def setDynamicVariableValue(value: Any)
> >
> > Then, when a job is executing the SparkDynamic can be accessed to obtain
> > the value of the dynamic variable. The implementation of this object is
> as
> > follows:
> >
> > object SparkDynamic {
> >   private val dynamicVariable = new DynamicVariable[Any]()
> >   /**
> >    * Gets the value of the "dynamic variable" that has been set in the
> > [[SparkContext]]
> >    */
> >   def getValue: Option[Any] = {
> >     Option(dynamicVariable.value)
> >   }
> >   private[spark] def withValue[S](threadValue: Option[Any])(thunk: =>
> S): S
> > = {
> >     dynamicVariable.withValue(threadValue.orNull)(thunk)
> >   }
> > }
> >
> > The change involves modifying the Task object to serialize the value of
> the
> > dynamic variable, and modifying the TaskRunner class to deserialize the
> > value and make it available in the thread that is running the task (using
> > the SparkDynamic.withValue method).
> >
> > I have done a quick prototype of this in this commit:
> >
> >
> https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6
> > and it seems to work fine in my (limited) testing. It needs more testing,
> > tidy-up and documentation though.
> >
> > One drawback is that the dynamic variable will be serialized for every
> Task
> > whether it needs it or not. For my use case this might not be too much
> of a
> > problem, as serializing and deserializing Accumulators looks fairly
> > lightweight -- however we should certainly warn users against setting a
> > dynamic variable containing lots of data. I thought about using broadcast
> > tables here, but I don't think it's possible to put Accumulators in a
> > broadcast table (as I understand it, they're intended for purely
> read-only
> > data).
> >
> > What do people think about this proposal? My use case aside, it seems
> like
> > it would be a generally useful enhancment to be able to pass certain data
> > around without having to explicitly pass it everywhere.
> >
> > Neil
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message