spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Neil Ferguson <nfergu...@gmail.com>
Subject Re: "Dynamic variables" in Spark
Date Tue, 22 Jul 2014 21:17:27 GMT
Hi Christopher

Thanks for your reply. I'll try and address your points -- please let me
know if I missed anything.

Regarding clarifying the problem statement, let me try and do that with a
real-world example. I have a method that I want to measure the performance
of, which has the following signature at the moment:

def merge(target: IndelRealignmentTarget): IndelRealignmentTarget = { //
impl }

Let's assume I have a Timers class, which contains various timers (that are
internally implemented using Accumulators). Each timer lets me instrument a
function call using its "time" method. Let's imagine that I add that as an
implicit parameter to the above method, as follows:

def merge(target: IndelRealignmentTarget)(implicit timers: Timers):
IndelRealignmentTarget = timers.mergeTarget.time { // impl }

This is not a big problem -- I've had to add an extra parameter to the
method, but it's not a big deal. However, the call stack of this method
looks something like the following:

IndelRealignmentTarget.merge
|-RealignmentTargetFinder.joinTargets
  |-RealignmentTargetFinder.findTargets
    |-RealignmentTargetFinder$.apply
      |-RealignIndels.realignIndels
        |-RealignIndels$.apply
          |-ADAMRecordRDDFunctions.adamRealignIndels
            |-Transform.run

So, I'd have to change every one of these methods to take the extra
parameter, which is pretty cumbersome. More importantly, when developers
want to add additional metrics to the code they'll have to think about how
to get an instance of Timers to the code they're developing.

So I'd really like the Timers object to be available in a thread-local
variable when I need it, without having to pass it around.

Regarding the implications of adding additional variables to SparkContext
-- I'm not sure I understand why this change would make it more difficult
to have multiple SparkContexts in the future. Could you clarify please?
Bear in mind that I'm not proposing adding any thread-local data to
SparkContext. The SparkContext merely holds the data, which is added to a
thread-local variable at task execution time.

Regarding having multiple clients of the SparkContext -- are you talking
about having multiple applications all sharing the same SparkContext? It
seems like there's data in SparkContext that is specific to a particular
application at the moment, like the JAR files for example, so this doesn't
seem inconsistent with that. Perhaps I'm misunderstanding here.

Neil


On Tue, Jul 22, 2014 at 1:54 AM, Christopher Nguyen <ctn@adatao.com> wrote:

> Hi Neil, first off, I'm generally a sympathetic advocate for making changes
> to Spark internals to make it easier/better/faster/more awesome.
>
> In this case, I'm (a) not clear about what you're trying to accomplish, and
> (b) a bit worried about the proposed solution.
>
> On (a): it is stated that you want to pass some Accumulators around. Yet
> the proposed solution is for some "shared" variable that may be set and
> "mapped out" and possibly "reduced back", but without any accompanying
> accumulation semantics. And yet it doesn't seem like you only want just the
> broadcast property. Can you clarify the problem statement with some
> before/after client code examples?
>
> On (b): you're right that adding variables to SparkContext should be done
> with caution, as it may have unintended consequences beyond just serdes
> payload size. For example, there is a stated intention of supporting
> multiple SparkContexts in the future, and this proposed solution can make
> it a bigger challenge to do so. Indeed, we had a gut-wrenching call to make
> a while back on a subject related to this (see
> https://github.com/mesos/spark/pull/779). Furthermore, even in a single
> SparkContext application, there may be multiple "clients" (of that
> application) whose intent to use the proposed "SparkDynamic" would not
> necessarily be coordinated.
>
> So, considering a ratio of a/b (benefit/cost), it's not clear to me that
> the benefits are significant enough to warrant the costs. Do I
> misunderstand that the benefit is to save one explicit parameter (the
> "context") in the signature/closure code?
>
> --
> Christopher T. Nguyen
> Co-founder & CEO, Adatao <http://adatao.com>
> linkedin.com/in/ctnguyen
>
>
>
> On Mon, Jul 21, 2014 at 2:10 PM, Neil Ferguson <nferguson@gmail.com>
> wrote:
>
> > Hi all
> >
> > I have been adding some metrics to the ADAM project
> > https://github.com/bigdatagenomics/adam, which runs on Spark, and have a
> > proposal for an enhancement to Spark that would make this work cleaner
> and
> > easier.
> >
> > I need to pass some Accumulators around, which will aggregate metrics
> > (timing stats and other metrics) across the cluster. However, it is
> > cumbersome to have to explicitly pass some "context" containing these
> > accumulators around everywhere that might need them. I can use Scala
> > implicits, which help slightly, but I'd still need to modify every method
> > in the call stack to take an implicit variable.
> >
> > So, I'd like to propose that we add the ability to have "dynamic
> variables"
> > (basically thread-local variables) to Spark. This would avoid having to
> > pass the Accumulators around explicitly.
> >
> > My proposed approach is to add a method to the SparkContext class as
> > follows:
> >
> > /**
> >  * Sets the value of a "dynamic variable". This value is made available
> to
> > jobs
> >  * without having to be passed around explicitly. During execution of a
> > Spark job
> >  * this value can be obtained from the [[SparkDynamic]] object.
> >  */
> > def setDynamicVariableValue(value: Any)
> >
> > Then, when a job is executing the SparkDynamic can be accessed to obtain
> > the value of the dynamic variable. The implementation of this object is
> as
> > follows:
> >
> > object SparkDynamic {
> >   private val dynamicVariable = new DynamicVariable[Any]()
> >   /**
> >    * Gets the value of the "dynamic variable" that has been set in the
> > [[SparkContext]]
> >    */
> >   def getValue: Option[Any] = {
> >     Option(dynamicVariable.value)
> >   }
> >   private[spark] def withValue[S](threadValue: Option[Any])(thunk: =>
> S): S
> > = {
> >     dynamicVariable.withValue(threadValue.orNull)(thunk)
> >   }
> > }
> >
> > The change involves modifying the Task object to serialize the value of
> the
> > dynamic variable, and modifying the TaskRunner class to deserialize the
> > value and make it available in the thread that is running the task (using
> > the SparkDynamic.withValue method).
> >
> > I have done a quick prototype of this in this commit:
> >
> >
> https://github.com/nfergu/spark/commit/8be28d878f43ad6c49f892764011ae7d273dcea6
> > and it seems to work fine in my (limited) testing. It needs more testing,
> > tidy-up and documentation though.
> >
> > One drawback is that the dynamic variable will be serialized for every
> Task
> > whether it needs it or not. For my use case this might not be too much
> of a
> > problem, as serializing and deserializing Accumulators looks fairly
> > lightweight -- however we should certainly warn users against setting a
> > dynamic variable containing lots of data. I thought about using broadcast
> > tables here, but I don't think it's possible to put Accumulators in a
> > broadcast table (as I understand it, they're intended for purely
> read-only
> > data).
> >
> > What do people think about this proposal? My use case aside, it seems
> like
> > it would be a generally useful enhancment to be able to pass certain data
> > around without having to explicitly pass it everywhere.
> >
> > Neil
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message