Hi Sandy,

We are, yes. I strongly suspect we're not partitioning our data properly, but maybe 1.5G is simply too small for our workload. I'll bump the executor memory and see if we get better results.

It seems we should be setting it to (SPARK_WORKER_MEMORY + pyspark memory) / # of concurrent applications, but is there any advice on how to balance memory between executors and pyspark, or does it depend too much on the workload? How do we know if we're getting the most bang for our buck, so to speak?

Thanks again,

-Aaron


On Tue, Mar 11, 2014 at 6:06 PM, Sandy Ryza <sandy.ryza@cloudera.com> wrote:
Are you aware that you get an executor (and the 1.5GB) per machine, not per core?



On Tue, Mar 11, 2014 at 12:52 PM, Aaron Olson <aaron.olson@shopify.com> wrote:
Hi Sandy,

We're configuring that with the JAVA_OPTS environment variable in $SPARK_HOME/spark-worker-env.sh like this:

# JAVA OPTS
export SPARK_JAVA_OPTS="-Dspark.ui.port=0 -Dspark.default.parallelism=1024 -Dspark.cores.max=256 -Dspark.executor.memory=1500m -Dspark.worker.timeout=500 -Dspark.akka.timeout=500 "

Does that value seem low to you?

-Aaron




On Tue, Mar 11, 2014 at 3:08 PM, Sandy Ryza <sandy.ryza@cloudera.com> wrote:
Hi Aaron,

When you say "Java heap space is 1.5G per worker, 24 or 32 cores across 46 nodes. It seems like we should have more than enough to do this comfortably.", how are you configuring this?

-Sandy


On Tue, Mar 11, 2014 at 10:11 AM, Aaron Olson <aaron.olson@shopify.com> wrote:
Dear Sparkians,

We are working on a system to do relational modeling on top of Spark, all done in pyspark. While we've been learning a lot about Spark internals so far, we're currently running into memory issues and wondering how best to profile to fix them. Here are our symptoms:
  • We're operating on data sets up to 80G in size of uncompressed JSON, 66 million records in the largest one.
  • Sometimes we're joining those large data sets, but cardinality never exceeds 66 million (unless we've got a bug somewhere).
  • We're seeing various OOM problems: sometimes python takes all available mem, sometimes we OOM with no heap space left, and occasionally OOM with GC overhead limit exceeded.
  • Sometimes we also see what looks like a single huge message sent over the wire that exceeds the wire format limitations.
  • Java heap space is 1.5G per worker, 24 or 32 cores across 46 nodes. It seems like we should have more than enough to do this comfortably.
  • We're trying to isolate specific steps now, but every time it errors, we're partitioning (i.e. partitionByKey is involved somewhere).
We've been instrumneting according to the monitoring and tuning docs, but a bit at a loss for where we're going wrong. We suspect poor/wrong partitioning on our part somehow. With that in mind, some questions:
  • How exactly is partitioning information propagated? It looks like within a pipelined RDD the parent partitioning is preserved throughout unless we either specifically repartition or go through a reduce. We're splitting as much as we can on maps and letting reduces happen normally. Is that good practice?
  • When doing e.g. partitionByKey, does an entire partition get sent to one worker process?
  • When does Spark stream data? Are there easy ways to sabotage the streaming? Are there any knobs for us to twiddle here?
  • Is there any way to specify the number of shuffles for a given reduce step?
  • How can we get better insight into what our workers are doing, specifically around moving data in and out of python land?
I realise it's hard to troubleshoot in the absence of code but any test case we have would be contrived. We're collecting more metrics and trying to reason about what might be happening, but any guidance at this point would be most helpful.

Thanks!

--
Aaron Olson
Data Engineer, Shopify




--
Aaron Olson
Data Engineer, Shopify




--
Aaron Olson
Data Engineer, Shopify