spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From innowireless TaeYun Kim <>
Subject RE: Bulk-load to HBase
Date Mon, 22 Sep 2014 09:21:44 GMT
Thank you for your detailed reply.

First, the purpose of MyKey class is a wrapper to provide equals() and Comparable interface
to byte[].

groupByKey() is for performance.
I have to merge the byte[]s that have the same key.
If merging is done with reduceByKey(), a lot of intermediate byte[] allocation and System.arraycopy()
is executed, and it is too slow. So I had to resort to groupByKey(), and in the callback allocate
the byte[] that has the total size of the byte[]s, and arraycopy() into it.
groupByKey() works for this, since the size of the group is manageable in my application.
And in fact it actually worked well when I implemented the same process with HBase Put class.
So, I assume that it is not the problem.

WithIndex is for excluding the record for the first partition.
I could remove the record after collect()and sort(), but it was easier.

I think that the problem is that when mapPartitionsWithIndex() executes, the size of the partition
is too big. (several GB - it's the size of the HBase regions, so it has to be several GB.)
I could allocate more memory to the executor, but then I cannot spawn enough number of executors
for the previous RDD operations.

It would be nice if:
- mapPartitionsWithIndex() had loaded the partition by small chunks as the iterator sweeps
through it, or
- there were a function named firstRecordsOfPartitions().

About Parallelism lost:
I thought that it is the possible alternative to mapPartitionsWithIndex() which can be run
with smaller memory footprint.

About strange Spark behavior:
I don't think that Spark is malfunctioning.
I just want to know the more detailed flow information - how can I check?


-----Original Message-----
From: Sean Owen [] 
Sent: Monday, September 22, 2014 5:46 PM
To: innowireless TaeYun Kim
Cc: user
Subject: Re: Bulk-load to HBase

I see a number of potential issues:

On Mon, Sep 22, 2014 at 8:42 AM, innowireless TaeYun Kim <>
>   JavaPairRDD<MyKey, byte[]> rdd =
>           // MyKey has a byte[] member for rowkey

Two byte[] with the same contents are not equals(), so won't work as you intend as a key.
Is there more to it? I assume so given your comment later.

>     .groupByKey()

This forces all values for each key to be held in memory when the key is processed. Are you
sure you can't do something comparable with reduceByKey?

>      rdd.mapPartitionsWithIndex(...)
>           // Gets the first record of each partitions.
>           // First partition's first record is excluded, since it's not needed.

You won't need "WithIndex" for that, though I doubt it matters.

> 1. OutOfMemory exception on mapPartitionsWithIndex() for splitKeys.
> In my case, the number of regions is fairly small for the RDD, and the size of a region
is big.
> This is intentional since the reasonable size of a HBase region is several GB.
> But, for Spark, it is too big for a partition that can be handled for an executor.
> I thought mapPartitionsWithIndex would not load the entire partition, but I was wrong.
> Maybe it loaded the whole partition while I only wanted to fetch the first record of
the iterator.

You can give executors more memory but I think groupByKey is your problem.

> I could save all the partitions with save...() API and then load each partition separately
and call first().
> But I does not feel right. Parallelism is lost.

Why is that necessary?

> 2. Strange Spark behavior
> It is not fatal as 1, but it's strange.
> In my code, the flow is as follows: flatMapToPair -> groupByKey -> 
> mapValues -> sortByKey But when I watch the Spark UI, it is executed 
> as follows: flatMapToPair -> sortByKey -> sortByKey(again!) -> mapValues Since
in my case the number of records are very large between flatMapToPair and mapValues, it seems
that Spark executes sortByKey on the worst timing.
> I tried to trick the Spark with replacing mapValues with mapToPair, but the execution
order did not change.
> Why?

The final operation called by a top-level method X may not be X.
Double-check these operation are from your current run and not an earlier one, and that the
code you're executing is what you think it is. It is not going to somehow execute things in
an order that is semantically different, no.

To unsubscribe, e-mail:
For additional commands, e-mail:

View raw message