I am seeing skewed execution times.  As far as I can tell, they are attributable to differences in data locality - tasks with locality PROCESS_LOCAL run fast, NODE_LOCAL, slower, and ANY, slowest.

This seems entirely as it should be - the question is, why the different locality levels?

I am seeing skewed caching, as I mentioned before - in the case I isolated, with 4 nodes, they were distributed at about 42%, 31%, 20%, and 6%.  However, the total amount was significantly less than the memory of any single node, so I don't think they could have overpopulated their cache.  I am occasionally seeing task failures, but the re-execute themselves, and work fine the next time.  Yet I'm still seeing incomplete caching (from 65% cached up to 100%, depending on the run).

I shouldn't have much variance in task time - this is simply a foreach over the data, adding to an accumulator, and the data is completely randomly distributed, so should be pretty even overall.

I am seeing GC regressions occasionally - they slow a request from about 2 seconds to about 5 seconds.  They 8 minute slowdown seems to be solely attributable to the data locality issue, as far as I can tell.  There was some further confusion though in the times I mentioned - the list I gave (3.1 min, 2 seconds, ... 8 min) were not different runs with different cache %s, they were iterations within a single run with 100% caching.

                       -Nathan



On Thu, Nov 13, 2014 at 1:45 AM, Aaron Davidson <ilikerps@gmail.com> wrote:
Spark's scheduling is pretty simple: it will allocate tasks to open cores on executors, preferring ones where the data is local. It even performs "delay scheduling", which means waiting a bit to see if an executor where the data resides locally becomes available.

Are yours tasks seeing very skewed execution times? If some tasks are taking a very long time and using all the resources on a node, perhaps the other nodes are quickly finishing many tasks, and actually overpopulating their caches. If a particular machine were not overpopulating its cache, and there are no failures, then you should see 100% cached after the first run.

It's also strange that running totally uncached takes 3.1 minutes, but running 80-90% cached may take 8 minutes. Does your workload produce nondeterministic variance in task times? Was it a single straggler, or many tasks, that was keeping the job from finishing? It's not too uncommon to see occasional performance regressions while caching due to GC, though 2 seconds to 8 minutes is a bit extreme.

On Wed, Nov 12, 2014 at 9:01 PM, Nathan Kronenfeld <nkronenfeld@oculusinfo.com> wrote:
Sorry, I think I was not clear in what I meant.
I didn't mean it went down within a run, with the same instance.

I meant I'd run the whole app, and one time, it would cache 100%, and the next run, it might cache only 83%

Within a run, it doesn't change.

On Wed, Nov 12, 2014 at 11:31 PM, Aaron Davidson <ilikerps@gmail.com> wrote:
The fact that the caching percentage went down is highly suspicious. It should generally not decrease unless other cached data took its place, or if unless executors were dying. Do you know if either of these were the case?

On Tue, Nov 11, 2014 at 8:58 AM, Nathan Kronenfeld <nkronenfeld@oculusinfo.com> wrote:
Can anyone point me to a good primer on how spark decides where to send what task, how it distributes them, and how it determines data locality?

I'm trying a pretty simple task - it's doing a foreach over cached data, accumulating some (relatively complex) values.

So I see several inconsistencies I don't understand:

(1) If I run it a couple times, as separate applications (i.e., reloading, recaching, etc), I will get different %'s cached each time.  I've got about 5x as much memory as I need overall, so it isn't running out.  But one time, 100% of the data will be cached; the next, 83%, the next, 92%, etc.

(2) Also, the data is very unevenly distributed. I've got 400 partitions, and 4 workers (with, I believe, 3x replication), and on my last run, my distribution was 165/139/25/71.  Is there any way to get spark to distribute the tasks more evenly?

(3) If I run the problem several times in the same execution (to take advantage of caching etc.), I get very inconsistent results.  My latest try, I get:
  • 1st run: 3.1 min
  • 2nd run: 2 seconds
  • 3rd run: 8 minutes
  • 4th run: 2 seconds
  • 5th run: 2 seconds
  • 6th run: 6.9 minutes
  • 7th run: 2 seconds
  • 8th run: 2 seconds
  • 9th run: 3.9 minuts
  • 10th run: 8 seconds

I understand the difference for the first run; it was caching that time.  Later times, when it manages to work in 2 seconds, it's because all the tasks were PROCESS_LOCAL; when it takes longer, the last 10-20% of the tasks end up with locality level ANY.  Why would that change when running the exact same task twice in a row on cached data?

Any help or pointers that I could get would be much appreciated.


Thanks,

                 -Nathan



--
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenfeld@oculusinfo.com




--
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenfeld@oculusinfo.com




--
Nathan Kronenfeld
Senior Visualization Developer
Oculus Info Inc
2 Berkeley Street, Suite 600,
Toronto, Ontario M5A 4J5
Phone:  +1-416-203-3003 x 238
Email:  nkronenfeld@oculusinfo.com