spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Erik Freed <erikjfr...@codecision.com>
Subject Re: forcing node local processing
Date Sun, 01 Dec 2013 17:08:50 GMT
I did check the DNS scenario when I first got started on this problem -
have been bit by that more than once setting up Spark on various clusters
which had inconsistent DNS setups. That wasn't it though.

It turns out that  there was a race condition between when the executors
were registering and when the Taskset manager thought they were fair game.
 If i used the spark context in that period, the RDD was cached in an
awkward state.

By putting in a wait after creating the spark context for all executors to
establish themselves, the node local affinity was consistent.

I tried to find a way to cleanly wait for the spark context to settle - but
could not find a hook for that. I would say that is a feature worth putting
in. There is a period after you create a spark context where you actually
can't use it - that seems awkward.

Thanks for the help Andrew - I owe you one!

cheers,
Erik


On Tue, Nov 26, 2013 at 3:59 PM, Andrew Ash <andrew@andrewash.com> wrote:

> I've seen issues where no task is node local because the hostname on Spark
> and hadoop is close but not quite the same -- e.g. myhost.andrewash.comvs myhost  One
if FQDN the other is not.  Can you confirm that the hostname
> present in the master's webui (:8080) in the address column of the workers
> section matches what's listed in the namenode's webui? (:50070)
>
> Example:
> 13/01/18 09:30:46 INFO cluster.TaskSetManager: Starting task 3.0:75 as TID
> 75 on slave worker-20130118102928-ip-10-32-47-242-50152: ip-10-32-47-242
> (non-preferred, not one of 10.32.47.242, 10.32.46.243, 10.32.31.202)
>
> The line you pasted is PROCESS_LOCAL, which means that the output of one
> transformation is the input of the next one in the same JVM.  If you were
> reading out of HBase, I would expect you could only achieve NODE_LOCALITY
> on the initial data read since the data isn't yet loaded into the Spark
> process.  Can you run a simple query that doesn't involve any
> transformation?  I would imagine a simple "select * from table limit 10" or
> equivalent would do it.
>
> Andrew
>
>
>
> On Tue, Nov 26, 2013 at 3:46 PM, Erik Freed <erikjfreed@codecision.com>wrote:
>
>> hmmm - I see 'spark.deploy.spreadOut' which defaults to true - but that
>> does not seem to be impacting me positively...
>>
>>
>> On Tue, Nov 26, 2013 at 3:19 PM, Erik Freed <erikjfreed@codecision.com>wrote:
>>
>>> Thanks - I have pondered that piece of code long and hard trying
>>> different combinations of each of those - e.g. setting spark.
>>> locality.wait and spark.locality.wait.node very high and the others
>>> very low  -- nothing. It worked on 0.7 but at that point we were using lots
>>> of hbase regions/spark partitions/tasks and spark.spreadout = true (I think
>>> that is no longer supported)
>>>
>>> The one clue is that it sometimes uses node local sporadically. I would
>>> be more than happy to provide tons of logs but there isn't as far as I can
>>> see any logging of this part of the code other than many lines all saying
>>> something like:
>>>
>>> 2013-11-26 15:02:45,400 INFO [spark-akka.actor.default-dispatcher-4]
>>> Starting task 2.0:0 as TID 104 on executor 0: <elided host> (PROCESS_LOCAL)
>>>
>>> and the fact that the UI shows the RDD not partitioning across the
>>> appropriate hbase region nodes. I was thinking this was some sort of DNS
>>> short vs full name but changing that didn't seem to do anything.
>>>
>>>
>>> On Tue, Nov 26, 2013 at 3:08 PM, Andrew Ash <andrew@andrewash.com>wrote:
>>>
>>>> Do you also set any of spark.locality.wait.{process,node,rack} ?  Those
>>>> override spark.locality.wait for specific locality levels.
>>>>
>>>>   private def getLocalityWait(level: TaskLocality.TaskLocality): Long =
>>>> {
>>>>     val defaultWait = System.getProperty("spark.locality.wait", "3000")
>>>>     level match {
>>>>       case TaskLocality.PROCESS_LOCAL =>
>>>>         System.getProperty("spark.locality.wait.process",
>>>> defaultWait).toLong
>>>>       case TaskLocality.NODE_LOCAL =>
>>>>         System.getProperty("spark.locality.wait.node",
>>>> defaultWait).toLong
>>>>       case TaskLocality.RACK_LOCAL =>
>>>>         System.getProperty("spark.locality.wait.rack",
>>>> defaultWait).toLong
>>>>       case TaskLocality.ANY =>
>>>>         0L
>>>>     }
>>>>   }
>>>>
>>>> The other option I'm thinking is maybe these tasks are jumping straight
>>>> to TaskLocality.ANY with no locality preference.  Do you have any logs you
>>>> can share that include this fallback to less-preferred localities?
>>>>
>>>> Did you have this working properly on 0.7.x ?
>>>>
>>>>
>>>>
>>>> On Tue, Nov 26, 2013 at 2:54 PM, Erik Freed <erikjfreed@codecision.com>wrote:
>>>>
>>>>> Hi Andrew  - thanks - that's a good thought - unfortunately, I have
>>>>> those set in the same pre context creation place as all the other variables
>>>>> that I have been using for months quite happily and that seem to impact
>>>>> Spark nicely. I have it set to Int.MaxValue.toString which I am guessing
is
>>>>> large enough.
>>>>>
>>>>> It very occasionally will use all data local nodes, and sometimes a
>>>>> mix, but mostly all process-local...
>>>>>
>>>>>
>>>>> On Tue, Nov 26, 2013 at 2:45 PM, Andrew Ash <andrew@andrewash.com>wrote:
>>>>>
>>>>>> Hi Erik,
>>>>>>
>>>>>> I would guess that if you set spark.locality.wait to an absurdly
>>>>>> large value then you would have essentially that effect.
>>>>>>
>>>>>> Maybe you aren't setting the system property before creating your
>>>>>> Spark context?
>>>>>>
>>>>>> http://spark.incubator.apache.org/docs/latest/configuration.html
>>>>>>
>>>>>> Andrew
>>>>>>
>>>>>>
>>>>>> On Tue, Nov 26, 2013 at 2:40 PM, Erik Freed <
>>>>>> erikjfreed@codecision.com> wrote:
>>>>>>
>>>>>>> Hi All,
>>>>>>> After switching to 0.8, and reducing the number of partitions/tasks
>>>>>>> for a large scale computation, I have been unable to force Spark
to use
>>>>>>> only executors on nodes where hbase data is local. I have not
been able to
>>>>>>> find a setting for spark.locality.wait that makes any difference.
It is not
>>>>>>> an option for us to let spark chose non data local nodes. Is
their some
>>>>>>> example code of how to get this to work the way we want? We have
our own
>>>>>>> input RDD that mimics the NewHadoopRdd and it seems to be doing
the correct
>>>>>>> thing in all regards wrt to preferred locations.
>>>>>>>
>>>>>>> Do I have to write my own compute Tasks and schedule them myself?
>>>>>>>
>>>>>>> Anyone have any suggestions? I am stumped.
>>>>>>>
>>>>>>> cheers,
>>>>>>> Erik
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>>
>>>>> --
>>>>> Erik James Freed
>>>>> CoDecision Software
>>>>> 510.859.3360
>>>>> erikjfreed@codecision.com
>>>>>
>>>>> 1480 Olympus Avenue
>>>>> Berkeley, CA
>>>>> 94708
>>>>>
>>>>> 179 Maria Lane
>>>>> Orcas, WA
>>>>> 98245
>>>>>
>>>>
>>>>
>>>
>>>
>>> --
>>> Erik James Freed
>>> CoDecision Software
>>> 510.859.3360
>>> erikjfreed@codecision.com
>>>
>>> 1480 Olympus Avenue
>>> Berkeley, CA
>>> 94708
>>>
>>> 179 Maria Lane
>>> Orcas, WA
>>> 98245
>>>
>>
>>
>>
>> --
>> Erik James Freed
>> CoDecision Software
>> 510.859.3360
>> erikjfreed@codecision.com
>>
>> 1480 Olympus Avenue
>> Berkeley, CA
>> 94708
>>
>> 179 Maria Lane
>> Orcas, WA
>> 98245
>>
>
>


-- 
Erik James Freed
CoDecision Software
510.859.3360
erikjfreed@codecision.com

1480 Olympus Avenue
Berkeley, CA
94708

179 Maria Lane
Orcas, WA
98245

Mime
View raw message