spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Matei Zaharia <matei.zaha...@gmail.com>
Subject Re: forcing node local processing
Date Sun, 01 Dec 2013 23:51:01 GMT
Ah, interesting, thanks for reporting that. Do you mind opening a JIRA issue for it? I think
the right way would be to wait at least X seconds after start before deciding that some blocks
don’t have preferred locations available.

Matei

On Dec 1, 2013, at 9:08 AM, Erik Freed <erikjfreed@codecision.com> wrote:

> I did check the DNS scenario when I first got started on this problem - have been bit
by that more than once setting up Spark on various clusters which had inconsistent DNS setups.
That wasn't it though.
> 
> It turns out that  there was a race condition between when the executors were registering
and when the Taskset manager thought they were fair game.  If i used the spark context in
that period, the RDD was cached in an awkward state.
> 
> By putting in a wait after creating the spark context for all executors to establish
themselves, the node local affinity was consistent. 
> 
> I tried to find a way to cleanly wait for the spark context to settle - but could not
find a hook for that. I would say that is a feature worth putting in. There is a period after
you create a spark context where you actually can't use it - that seems awkward.
> 
> Thanks for the help Andrew - I owe you one!
> 
> cheers,
> Erik
> 
> 
> On Tue, Nov 26, 2013 at 3:59 PM, Andrew Ash <andrew@andrewash.com> wrote:
> I've seen issues where no task is node local because the hostname on Spark and hadoop
is close but not quite the same -- e.g. myhost.andrewash.com vs myhost  One if FQDN the other
is not.  Can you confirm that the hostname present in the master's webui (:8080) in the address
column of the workers section matches what's listed in the namenode's webui? (:50070)
> 
> Example:
> 13/01/18 09:30:46 INFO cluster.TaskSetManager: Starting task 3.0:75 as TID 75 on slave
worker-20130118102928-ip-10-32-47-242-50152: ip-10-32-47-242 (non-preferred, not one of 10.32.47.242,
10.32.46.243, 10.32.31.202)
> 
> The line you pasted is PROCESS_LOCAL, which means that the output of one transformation
is the input of the next one in the same JVM.  If you were reading out of HBase, I would expect
you could only achieve NODE_LOCALITY on the initial data read since the data isn't yet loaded
into the Spark process.  Can you run a simple query that doesn't involve any transformation?
 I would imagine a simple "select * from table limit 10" or equivalent would do it.
> 
> Andrew
> 
> 
> 
> On Tue, Nov 26, 2013 at 3:46 PM, Erik Freed <erikjfreed@codecision.com> wrote:
> hmmm - I see 'spark.deploy.spreadOut' which defaults to true - but that does not seem
to be impacting me positively...
> 
> 
> On Tue, Nov 26, 2013 at 3:19 PM, Erik Freed <erikjfreed@codecision.com> wrote:
> Thanks - I have pondered that piece of code long and hard trying different combinations
of each of those - e.g. setting spark.locality.wait and spark.locality.wait.node very high
and the others very low  -- nothing. It worked on 0.7 but at that point we were using lots
of hbase regions/spark partitions/tasks and spark.spreadout = true (I think that is no longer
supported)
> 
> The one clue is that it sometimes uses node local sporadically. I would be more than
happy to provide tons of logs but there isn't as far as I can see any logging of this part
of the code other than many lines all saying something like:
> 
> 2013-11-26 15:02:45,400 INFO [spark-akka.actor.default-dispatcher-4] Starting task 2.0:0
as TID 104 on executor 0: <elided host> (PROCESS_LOCAL)
> 
> and the fact that the UI shows the RDD not partitioning across the appropriate hbase
region nodes. I was thinking this was some sort of DNS short vs full name but changing that
didn't seem to do anything.
> 
> 
> On Tue, Nov 26, 2013 at 3:08 PM, Andrew Ash <andrew@andrewash.com> wrote:
> Do you also set any of spark.locality.wait.{process,node,rack} ?  Those override spark.locality.wait
for specific locality levels.
> 
>   private def getLocalityWait(level: TaskLocality.TaskLocality): Long = {
>     val defaultWait = System.getProperty("spark.locality.wait", "3000")
>     level match {
>       case TaskLocality.PROCESS_LOCAL =>
>         System.getProperty("spark.locality.wait.process", defaultWait).toLong
>       case TaskLocality.NODE_LOCAL =>
>         System.getProperty("spark.locality.wait.node", defaultWait).toLong
>       case TaskLocality.RACK_LOCAL =>
>         System.getProperty("spark.locality.wait.rack", defaultWait).toLong
>       case TaskLocality.ANY =>
>         0L
>     }
>   }
> 
> The other option I'm thinking is maybe these tasks are jumping straight to TaskLocality.ANY
with no locality preference.  Do you have any logs you can share that include this fallback
to less-preferred localities?
> 
> Did you have this working properly on 0.7.x ?
> 
> 
> 
> On Tue, Nov 26, 2013 at 2:54 PM, Erik Freed <erikjfreed@codecision.com> wrote:
> Hi Andrew  - thanks - that's a good thought - unfortunately, I have those set in the
same pre context creation place as all the other variables that I have been using for months
quite happily and that seem to impact Spark nicely. I have it set to Int.MaxValue.toString
which I am guessing is large enough.
> 
> It very occasionally will use all data local nodes, and sometimes a mix, but mostly all
process-local...
> 
> 
> On Tue, Nov 26, 2013 at 2:45 PM, Andrew Ash <andrew@andrewash.com> wrote:
> Hi Erik,
> 
> I would guess that if you set spark.locality.wait to an absurdly large value then you
would have essentially that effect.
> 
> Maybe you aren't setting the system property before creating your Spark context?
> 
> http://spark.incubator.apache.org/docs/latest/configuration.html
> 
> Andrew
> 
> 
> On Tue, Nov 26, 2013 at 2:40 PM, Erik Freed <erikjfreed@codecision.com> wrote:
> Hi All,
> After switching to 0.8, and reducing the number of partitions/tasks for a large scale
computation, I have been unable to force Spark to use only executors on nodes where hbase
data is local. I have not been able to find a setting for spark.locality.wait that makes any
difference. It is not an option for us to let spark chose non data local nodes. Is their some
example code of how to get this to work the way we want? We have our own input RDD that mimics
the NewHadoopRdd and it seems to be doing the correct thing in all regards wrt to preferred
locations.
> 
> Do I have to write my own compute Tasks and schedule them myself?
> 
> Anyone have any suggestions? I am stumped.
> 
> cheers,
> Erik
> 
> 
> 
> 
> 
> 
> -- 
> Erik James Freed
> CoDecision Software
> 510.859.3360
> erikjfreed@codecision.com
> 
> 1480 Olympus Avenue
> Berkeley, CA
> 94708
> 
> 179 Maria Lane
> Orcas, WA
> 98245
> 
> 
> 
> 
> -- 
> Erik James Freed
> CoDecision Software
> 510.859.3360
> erikjfreed@codecision.com
> 
> 1480 Olympus Avenue
> Berkeley, CA
> 94708
> 
> 179 Maria Lane
> Orcas, WA
> 98245
> 
> 
> 
> -- 
> Erik James Freed
> CoDecision Software
> 510.859.3360
> erikjfreed@codecision.com
> 
> 1480 Olympus Avenue
> Berkeley, CA
> 94708
> 
> 179 Maria Lane
> Orcas, WA
> 98245
> 
> 
> 
> 
> -- 
> Erik James Freed
> CoDecision Software
> 510.859.3360
> erikjfreed@codecision.com
> 
> 1480 Olympus Avenue
> Berkeley, CA
> 94708
> 
> 179 Maria Lane
> Orcas, WA
> 98245


Mime
View raw message