to summarize, this was an issue of selecting serialized representations
for large ultrasparse matrices. Thanks again for sharing your feedback
with us.
1) Inmemory representation: In CSR every nonzero will require 12 bytes
 this is 240MB in your case. The overall memory consumption, however,
depends on the distribution of nonzeros: In CSR, each block with at
least one nonzero requires 4KB for row pointers. Assuming uniform
distribution (the worst case), this gives us 80GB. This is likely the
problem here. Every empty block would have an overhead of 44Bytes but
for the worstcase assumption, there are no empty blocks left. We do not
use COO for checkpoints because it would slow down subsequent operations.
2) Serialized/ondisk representation: For sparse datasets that are
expected to exceed aggregate memory, we used to use a serialized
representation (with storage level MEM_AND_DISK_SER) which uses sparse,
ultrasparse, or empty representations. In this form, ultrasparse
blocks require 9 + 16*nnz bytes and empty blocks require 9 bytes.
Therefore, with this representation selected, you're dataset should
easily fit in aggregate memory. Also, note that chkpoint is only a
transformation that persists the rdd, the subsequent operation then
pulls the data into memory.
At a highlevel this was a bug. We missed ultrasparse representations
when introducing an improvement that stores sparse matrices in MCSR
format in CSR format on checkpoints which eliminated the need to use a
serialized storage level. I just deliver a fix. Now we store such
ultrasparse matrices again in serialized form which should
significantly reduce the memory pressure.
Regards,
Matthias
On 5/3/2017 9:38 AM, Mingyang Wang wrote:
> Hi all,
>
> I was playing with a super sparse matrix FK, 2e7 by 1e6, with only one
> nonzero value on each row, that is 2e7 nonzero values in total.
>
> With driver memory of 1GB and executor memory of 100GB, I found the HOP
> "Spark chkpoint", which is used to pin the FK matrix in memory, is really
> expensive, as it invokes lots of disk operations.
>
> FK is stored in binary format with 24 blocks, each block is ~45MB, and ~1GB
> in total.
>
> For example, with the script as
>
> """
> FK = read($FK)
> print("Sum of FK = " + sum(FK))
> """
>
> things worked fine, and it took ~8s.
>
> While with the script as
>
> """
> FK = read($FK)
> if (1 == 1) {}
> print("Sum of FK = " + sum(FK))
> """
>
> things changed. It took ~92s and I observed lots of disk spills from logs.
> Based on the stats from Spark UI, it seems the materialized FK requires
>> 54GB storage and thus introduces disk operations.
>
> I was wondering, is this the expected behavior of a super sparse matrix?
>
>
> Regards,
> Mingyang
>
