So, I think I have data saved so that each partition (part-r-00000, etc)
is exactly what I wan to translate into an output file of a format not related to 

I believe I've figured out how to tell Spark to read the data set without re-partitioning (in 
another post I mentioned this -- I have a non-splitable InputFormat).

I do something like
   mapPartitionWithIndex( (partId, iter) => 
           conf = new Configuration()
           fs = Filesystem.get(conf)
           strm = fs.create(new Path(...))
            //  write data to stream
          strm.close() // in finally block }

This runs for a few hundred input files (so each executors sees 10's of files),
and it chugs along nicely, then suddenly everything shuts down.
I can restart (telling it to skip the partIds which it has already completed), and it
chugs along again for a while (going past the previous stopping point) and again dies.

I am a t a loss.  This work for the first 10's of files (so it runs for about 1hr) then quits,
and I see no useful error information (no Exceptions except the stuff below.
I'm not shutting it down.

Any idea what I might check? I've bumped up the memory multiple times (16G currently)
and fiddled with increasing other parameters.

Exception in thread "main" org.apache.spark.SparkException: Job cancelled because SparkContext was shut down
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:694)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:693)
    at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
    at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:693)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor.postStop(DAGScheduler.scala:1399)
    at akka.dispatch.Mailbox.processAllSystemMessages(Mailbox.scala:262)