phoenix-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "James Taylor (JIRA)" <>
Subject [jira] [Commented] (PHOENIX-1217) Spool GROUP BY to disk using HBase itself
Date Thu, 28 Aug 2014 07:55:00 GMT


James Taylor commented on PHOENIX-1217:

Comments exploring this option via an email conversation between myself, Lars Hofhansl, and
Marcel Kutsch: 
Date: Tue, 25 Feb 2014 11:00:08 -0800
Subject: Re: using HBase to do spill
From: Lars Hofhansl <>
To: James Taylor <>
Cc: Marcel Kutsch <>

Ah. So if you had direct access to the Memstore through the HRegion you
could it? Something like HRegion.getMemstore().
Maybe we can add that to HRegion going forward.

-- Lars

On Tue, Feb 25, 2014 at 10:47 AM, James Taylor <>wrote:

> It's ok if the last aggregation is on disk. A raw scan runs to do a final
> aggregation on the region server which combines them together. This is just
> an optimization, as in most cases, since the number of distinct values is
> low, you'd end up with a single batch of Puts that might never even be
> flushed. If you don't do this optimization, though, you end up writing one
> row per row in the region (even if there's say three distinct values among
> 10M rows). Functionally it'd still work, but perf would suck.
> On Tue, Feb 25, 2014 at 10:42 AM, Lars Hofhansl <>wrote:
>> Yep. That way you control what you keep in memory. Relying HRegion for
>> that you can only indirectly control when it gets flushed to disk, so on
>> each lookup you'd need to check the disk since the current value might be
>> found there.
>> I guess even in your approach, how would you know that the last value of
>> the aggregation is not on disk? I guess each time you batch write the data
>> to the HRegion you'd need to seed the next batch with the former value.
>> What it that does not fit into memory? (I'm probably not understanding this
>> correctly...)
>> On Tue, Feb 25, 2014 at 10:02 AM, James Taylor <>wrote:
>>> WFH today. I think my other alternative is simple and would work: batch
>>> the mutations yourself instead of using hregion.put() with each upsert.
>>> Then you can lookup the Put in your batch to combine them together. When
>>> the batch gets big enough, do a hregion.batchMutate() and set the flush
>>> size threshold very small (as essentially we'd want to have the flush occur
>>> when the batchMutate happens).
>>> Does that sound reasonable?
>>> On Tue, Feb 25, 2014 at 9:56 AM, Lars Hofhansl <
>>> > wrote:
>>>> Hmm... Not sure the HRegion stuff would be right solution then, at
>>>> least not without further changes to the HRegion class. Let's chat when you
>>>> get in today.
>>>> On Mon, Feb 24, 2014 at 11:35 PM, James Taylor <>wrote:
>>>>> I'd like to be able to opportunistically peek into the
>>>>> "not-yet-flushed" data as a poor-mans cache. The idea being that if we
>>>>> lots of rows, but few distinct values, we'd never flush. I don't want
>>>>> continually go to disk when the rows are being Put, though. I thought
>>>>> be better to let HBase do it's merge sort when we do the raw scan before
>>>>> sending the results back to the client (so that we're not doing a bunch
>>>>> random reads/seeks).
>>>>> I guess we could use a block cache, but then we're wasting the memory
>>>>> holding the "unflushed" data. There's no way to get to this?
>>>>> On Mon, Feb 24, 2014 at 1:44 PM, Lars Hofhansl <
>>>>>> wrote:
>>>>>> hregion.getStore(cf).get(rowKey), will also hit disk if the key is
>>>>>> not found in the memstore.
>>>>>> Isn't that what you want, what if you actually had to spill to disk
>>>>>> at some point?
>>>>>> You mean is had to hit disk, just to check whether the KV exists?
>>>>>> can mitigate this with a row or rowcol bloomfitler. Or would might
need to
>>>>>> initialize a block cache a separate block cache.
>>>>>> On Mon, Feb 24, 2014 at 12:26 PM, James Taylor <
>>>>>>> wrote:
>>>>>>> On Mon, Feb 24, 2014 at 11:53 AM, Lars Hofhansl <
>>>>>>>> wrote:
>>>>>>>> Assuming you'd only use one column family, you can do this:
>>>>>>>> 1. get() the old value from the HRegion
>>>>>>> Wouldn't the hregion.get() hit disk if it doesn't find it in
>>>>>>> memstore? That's what I want to avoid. I'd like to be able to
do something
>>>>>>> along these lines:
>>>>>>>     KeyValue kv = hregion.getStore(cf).get(rowKey);
>>>>>>>     if (kv == null) {
>>>>>>>         hregion.put(aggregatedKv);
>>>>>>>     } else {
>>>>>>>         aggregatedKv = aggregate(aggregatedKv, kv);
>>>>>>>         hregion.getStore().upsert(aggregatedKv);
>>>>>>>     }
>>>>>>> Is that possible?
>>>>>>>> 2. get the store from the HRegion (there's one per column
>>>>>>>> Either call getStore(<cf>) or getStores()
>>>>>>>> 3. update your key values accordingly and call Store.upsert.
If the
>>>>>>>> KVs are still in the memstore they'll be updated in place.
>>>>>>>> Upserts are not friendly to the GC since we cannot use the
>>>>>>>> memstoreLAB allocation for those (too much fragmentation
inside the LABs).
>>>>>>>> So they should be used only where needed (like in aggregate,
but not to
>>>>>>>> just overflow a result set that does not change). Upsert
also is not MVCC
>>>>>>>> save (0.94), but that is not an issue for you.
>>>>>>>> (That reminds me... At some point we have to look through
>>>>>>>> Phoenix code and make sure it's GC friendly.)
>>>>>>>> -- Lars
>>>>>>>> On Mon, Feb 24, 2014 at 11:27 AM, James Taylor <
>>>>>>>>> wrote:
>>>>>>>>> Yes, the latter. The aggregate value is a "running total"
for all
>>>>>>>>> rows with the same row key (formed from evaluating the
group by and forming
>>>>>>>>> a row key). We need to be able to get back the current
value from the
>>>>>>>>> memstore so we can aggregate it together with the new
value and then upsert
>>>>>>>>> it back. We don't care about old versions in this case
- it's better to not
>>>>>>>>> keep the old values as it'll consume less memory.
>>>>>>>>> Do we have to write our own version of an incrementColumnValue
>>>>>>>>> method (kind of an aggregateColumnValue)? How do we get
the old value from
>>>>>>>>> the memstore (ensuring that we only look in memstore
and return null if
>>>>>>>>> it's not found)?
>>>>>>>>> Thanks!
>>>>>>>>> James
>>>>>>>>> On Mon, Feb 24, 2014 at 10:26 AM, Lars Hofhansl <
>>>>>>>>>> wrote:
>>>>>>>>>> The memstore has an upsert() method that does exactly
that. Note,
>>>>>>>>>> though, that that was specifically designed for incrementColumnValue,
>>>>>>>>>> append, etc. Is that what you mean?
>>>>>>>>>> But I am not sure I follow what the problem is. Whether
you use
>>>>>>>>>> your own data structure or an HRegion for spilling
has no bearing on how
>>>>>>>>>> much data you write, no?
>>>>>>>>>> Is it because in an aggregation you update the same
value over
>>>>>>>>>> and over again and you do not care about the old
>>>>>>>>>> -- Lars
>>>>>>>>>> On Sat, Feb 22, 2014 at 12:07 PM, James Taylor <
>>>>>>>>>>> wrote:
>>>>>>>>>>> (Lars - can you give advice on how to do the
item in bold below)?
>>>>>>>>>>> Marcel, one thing I was thinking about was that
in using this
>>>>>>>>>>> new algorithm, we'll end up essentially re-writing
the entire region's
>>>>>>>>>>> worth of data, even in the case where there are
only a few distinct values.
>>>>>>>>>>> That's because we'd be doing a Put for each row
that we processes while
>>>>>>>>>>> scanning. One way to fix this would be to keep
our own batch of Puts, and
>>>>>>>>>>> then instead of doing an hregion.put() each time,
we'd do an
>>>>>>>>>>> hregion.batchMutate() when our batch becomes
"big enough". Then we could
>>>>>>>>>>> check if a group by key is in our batch and aggregate
them together (like
>>>>>>>>>>> we do today). That way, we'd only be writing
the distinct values.
>>>>>>>>>>> *Another, more HBase-y way to do this would be
to continue doing
>>>>>>>>>>> the hregion.put(), but for the lookup, we'd check
the memstore and if
>>>>>>>>>>> found, replace that memstore kv with the aggregated
kv. I'm not sure how to
>>>>>>>>>>> do it this way, but Lars would know.*
>>>>>>>>>>> The only problem with the former way is that
we'd have our own
>>>>>>>>>>> batch and then they'd continue to be held in
memory in the memstore after
>>>>>>>>>>> we do our hregion.batchMutate(), so we'd be double-buffering.
>>>>>>>>>>> Thanks,
>>>>>>>>>>> James
>>>>>>>>>>> On Tue, Feb 4, 2014 at 10:25 AM, Lars Hofhansl
>>>>>>>>>>>> wrote:
>>>>>>>>>>>> This sounds right.
>>>>>>>>>>>> The only extra thing you need to do is creating
a local
>>>>>>>>>>>> filesystem.
>>>>>>>>>>>> Something like FileSystem.get("file://<your
temp location>",
>>>>>>>>>>>> new Configuration());
>>>>>>>>>>>> Might need to tweak the configuration, otherwise
I'll pick up
>>>>>>>>>>>> the HBase default.
>>>>>>>>>>>> Then - after looking a bit more - the best
API to use is (in
>>>>>>>>>>>> HRegion):
>>>>>>>>>>>>   public static HRegion newHRegion(Path tableDir,
HLog log,
>>>>>>>>>>>> FileSystem fs,
>>>>>>>>>>>>       Configuration conf, HRegionInfo regionInfo,
>>>>>>>>>>>> HTableDescriptor htd,
>>>>>>>>>>>>       RegionServerServices rsServices) {
>>>>>>>>>>>> It can be called something like this:
>>>>>>>>>>>> HRegion r = HRegion.newHRegion(tableDir,
null, fs, conf, hri, htd, null);
>>>>>>>>>>>>  r.initialize(null);
>>>>>>>>>>>> An HRegionInfo can be created simply with
>>>>>>>>>>>> HRegionInfo(byte[] tableName). This will
set both start and end key to
>>>>>>>>>>>> null, which means all keys fall in this region.
>>>>>>>>>>>> Let me know how this works. If there's anything
simplifying we
>>>>>>>>>>>> can put into HBase that would be good to
know as well, I can make that
>>>>>>>>>>>> happen.
>>>>>>>>>>>> -- Lars
>>>>>>>>>>>> On Sat, Feb 1, 2014 at 7:43 PM, James Taylor
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>> Hey Marcel,
>>>>>>>>>>>>> Wow, this is great, and good timing as
I spoke with Lars
>>>>>>>>>>>>> in-depth about this idea, and he liked
it. He gave some good tips the APIs
>>>>>>>>>>>>> we can use to ensure that the spilled
data stays local (*Lars
>>>>>>>>>>>>> - can you confirm I got this right?*).
>>>>>>>>>>>>> First, the trick to get a scan to show
all versions, is to use
>>>>>>>>>>>>> the scan.setRaw(true) before getting
the Scanner.
>>>>>>>>>>>>> Second, would you mind coding against
the mirror on Github of
>>>>>>>>>>>>> our Apache git here (as we've switch
to Apache now):
You can send me
>>>>>>>>>>>>> a pull request against this repo.
>>>>>>>>>>>>> The tips that Lars gave me:
>>>>>>>>>>>>> - create your own HRegion which ensures
that things stay local
>>>>>>>>>>>>> by using the static method: HRegion.openHRegion(Path
>>>>>>>>>>>>> final HRegionInfo info, final HTableDescriptor
htd, finalHLog wal,
>>>>>>>>>>>>> final Configuration conf)
>>>>>>>>>>>>> There are a bunch of static constructors,
so if the above
>>>>>>>>>>>>> doesn't work, we can ask Lars which one
is the "right" one. The other one
>>>>>>>>>>>>> that looked promising to me was this
>>>>>>>>>>>>>   public static HRegion createHRegion(final
HRegionInfo info,
>>>>>>>>>>>>> final Path rootDir,
 final Configuration conf,
 final HTableDescriptor
>>>>>>>>>>>>> hTableDescriptor,
 final HLog hlog,
 final boolean
>>>>>>>>>>>>> initialize, final boolean ignoreHLog)
>>>>>>>>>>>>> with ignoreHLog=true, hlog=null, not
sure about initialize,
>>>>>>>>>>>>> but probably true
>>>>>>>>>>>>> You can create an HRegionInfo like this:
>>>>>>>>>>>>>     new HRegionInfo(spillableTableNameAsBytes,
null, null,
>>>>>>>>>>>>> false);
>>>>>>>>>>>>> For HLog, just use null
>>>>>>>>>>>>> For HTableDescriptor, use
>>>>>>>>>>>>> HTableDescriptor(spillableTableNameAsBytes)
and then do an
>>>>>>>>>>>>> hTableDesc.addFamily(new
>>>>>>>>>>>>> HColumnDescriptor(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES))
>>>>>>>>>>>>> It doesn't really matter what you call
the family, but you
>>>>>>>>>>>>> should call
>>>>>>>>>>>>> familyDesc.setMaxVersions(Integer.MAX_VALUE);
// as you want
>>>>>>>>>>>>> to keep all versions
>>>>>>>>>>>>> Also do a
>>>>>>>>>>>>> tableDesc.setMaxFileSize(Long.MAX_VALUE);
// as we never want
>>>>>>>>>>>>> to trigger a split
>>>>>>>>>>>>> And finally, an important one that controls
how much each
>>>>>>>>>>>>> spillable group will hold in memory is:
>>>>>>>>>>>>> tableDesc.setMemStoreFlushSize(long memstoreFlushSize)
>>>>>>>>>>>>> We should make that configurable from
>>>>>>>>>>>>> - then use the hregion.put(put, false)
to put the data. I
>>>>>>>>>>>>> don't think there's any advantage to
batching here, since it's all local.
>>>>>>>>>>>>> One design decision/trade-off that we
can worry about later is
>>>>>>>>>>>>> whether we want to spin up a single HRegion
which would be shared across
>>>>>>>>>>>>> all regions spilling to a given region
server, versus spinning up a new one
>>>>>>>>>>>>> each time. The problem with spinning
up a new one each time is that each
>>>>>>>>>>>>> one will have a limited amount of memory
versus a single region-server-wide
>>>>>>>>>>>>> global one that could have more memory
allocated to it.
>>>>>>>>>>>>> The other issue is whether we want to
always go through these
>>>>>>>>>>>>> APIs, or use the in-memory GroupByCache
until we run into a memory issue
>>>>>>>>>>>>> and falling back to these. These HRegion
APIs will use more memory, as
>>>>>>>>>>>>> we'll keep all the duplicates until the
end and only then condense them
>>>>>>>>>>>>> down.
>>>>>>>>>>>>> Let me know if you have questions.
>>>>>>>>>>>>> Thanks!
>>>>>>>>>>>>> James
>>>>>>>>>>>>> On Sat, Feb 1, 2014 at 5:44 PM, Marcel
Kutsch <
>>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>> Hi James,
>>>>>>>>>>>>>> I hacked up your proposed solution
>>>>>>>>>>>>>> When you get a chance, could you
please take a quick look at
>>>>>>>>>>>>>> the attached file. I must be missing
something,the scanner only returns the
>>>>>>>>>>>>>> latest version of the row and not
all of them. Maybe you see
>>>>>>>>>>>>>> something obvious.
>>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>>> Marcel
>>>>>>>>>>>>>> ----------------------------------------------------------
>>>>>>>>>>>>>> Marcel,
>>>>>>>>>>>>>> I think it's worth trying the HBase-based
spill approach.
>>>>>>>>>>>>>> Should be pretty easy. Here's what
you need to do:
>>>>>>>>>>>>>> 1) Pre-create a new Phoenix table
>>>>>>>>>>>>>> ConnectionQueryServicesImpl.init(String
url, Properties props). This
>>>>>>>>>>>>>> ensures that the spill table will
always be there. Just add another
>>>>>>>>>>>>>> try/catch block after the one for
>>>>>>>>>>>>>> that issues the following:
>>>>>>>>>>>>>> KEY) " +
>>>>>>>>>>>>>>         HConstants.VERSIONS + "="
+ Integer.MAX_VALUE +","
>>>>>>>>>>>>>>         HColumnDescriptor.DATA_BLOCK_ENCODING
+ "=" + "NONE"
>>>>>>>>>>>>>> We might also want to play around
with the
>>>>>>>>>>>>>> HTableDescriptor.MEMSTORE_FLUSHSIZE
(the size before the in-memory cache
>>>>>>>>>>>>>> gets flushed to disk) and HTableDescriptor.MAX_FILESIZE
(how big a file can
>>>>>>>>>>>>>> get before it's split).
>>>>>>>>>>>>>> 2) From your co-processor, you'd
get your hands on an HTable
>>>>>>>>>>>>>>         RegionCoprocessorEnvironment
env = c.getEnvironment();
>>>>>>>>>>>>>>         HTableInterface htable =
>>>>>>>>>>>>>> env.getTable(Bytes.toBytes("SYSTEM.SPILLABLE_CACHE"));
>>>>>>>>>>>>>> 3) Just use the regular HBase API
to spill the cache:
>>>>>>>>>>>>>>         long ts = 1;
>>>>>>>>>>>>>>         List<Mutation> puts
>>>>>>>>>>>>>> Lists.newArrayListWithExpectedSize(BATCH_SIZE);
>>>>>>>>>>>>>>         // Create a Put and add the
key and value that you
>>>>>>>>>>>>>> used in your SpillableGroupByCache
>>>>>>>>>>>>>>         Put put = new Put(key, ts++);
// Increment timestamp
>>>>>>>>>>>>>> for every Put
>>>>>>>>>>>>>>         put.setWriteToWAL(false);
>>>>>>>>>>>>>>         put.add(QueryConstants.DEFAULT_COLUMN_FAMILY_BYTES,
>>>>>>>>>>>>>> QueryConstants.EMPTY_COLUMN_BYTES,
>>>>>>>>>>>>>>         puts.add(put);
>>>>>>>>>>>>>>         ....
>>>>>>>>>>>>>>         // Once there are enough
in the batch, call
>>>>>>>>>>>>>> htable.batch
>>>>>>>>>>>>>>         // Not sure how to prevent
these puts from going out
>>>>>>>>>>>>>> over the wire
>>>>>>>>>>>>>>         htable.batch(puts);
>>>>>>>>>>>>>> 4) To access, just implement your
own RegionScanner that
>>>>>>>>>>>>>> wraps a ResultScanner and use this
to implement the
>>>>>>>>>>>>>> GroupByCache.getScanner() method.
>>>>>>>>>>>>>>         Scan scan = new Scan();
>>>>>>>>>>>>>>         scan.setRaw(true);
>>>>>>>>>>>>>>         // We may need to "preallocate"
a chunk of timestamps
>>>>>>>>>>>>>> from an Atomic long
>>>>>>>>>>>>>>         // so that multiple, simultaneous
spillages don't
>>>>>>>>>>>>>> tromp on each other. We can
>>>>>>>>>>>>>>         // do this after perf testing,
>>>>>>>>>>>>>>         scan.setTimeRange(new TimeRange(minTs,
>>>>>>>>>>>>>>         // If we can keep the data
local, this scanner will
>>>>>>>>>>>>>> bypass the RPC and deserialization
>>>>>>>>>>>>>>         // hit we'd otherwise take.
>>>>>>>>>>>>>>         ResultScanner scanner = htable.getScanner(scan);
>>>>>>>>>>>>>>    Take a look at
>>>>>>>>>>>>>> GroupedAggregateRegionObserver.scanOrdered(),
as this is essentially how
>>>>>>>>>>>>>> you want to implement your RegionScanner.
Think of this scanner above as
>>>>>>>>>>>>>> the RegionScanner argument to the
scanOrdered() method. You basically want
>>>>>>>>>>>>>> to aggregate together all rows with
the same row key and add this to the
>>>>>>>>>>>>>> List<KeyValue> results argument
for the next method of your RegionScanner
>>>>>>>>>>>>>> impl:
>>>>>>>>>>>>>>            @Override
>>>>>>>>>>>>>>             public boolean next(List<KeyValue>
>>>>>>>>>>>>>> throws IOException
>>>>>>>>>>>>>> Let me know what you think. I'll
ask Lars in the meantime if
>>>>>>>>>>>>>> there's a way to ensure keeping the
data local for a table. One way we
>>>>>>>>>>>>>> could do it ourselves is to pre-split
>>>>>>>>>>>>>> then send over from the client a
byte[] startKey that we know lives on the
>>>>>>>>>>>>>> particular region server on which
we'd be doing the spilling (or perhaps we
>>>>>>>>>>>>>> can dynamically calculate before
spilling). Then, we'd just prepend this
>>>>>>>>>>>>>> startKey to our key and the data
would essentially be pegged to the region
>>>>>>>>>>>>>> server on which we're working. We
can make it fixed length, and then just
>>>>>>>>>>>>>> skip those bytes with a fixed offset
for the KeyValue we send back in (4).
>>>>>>>>>>>>>> Thanks!

> Spool GROUP BY to disk using HBase itself
> -----------------------------------------
>                 Key: PHOENIX-1217
>                 URL:
>             Project: Phoenix
>          Issue Type: Bug
>            Reporter: James Taylor
> We have an implementation in place now to spill a GROUP BY to disk on the server side
if it gets too big to fit into memory. An alternate, simpler implementation would be to use
HBase itself during this spooling instead.

This message was sent by Atlassian JIRA

View raw message