drill-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Paul Rogers <par0...@yahoo.com.INVALID>
Subject Re: Excessive Memory Use in Parquet Files (From Drill Slack Channel)
Date Tue, 24 Mar 2020 23:04:04 GMT
Hi Charles,
Thanks for forwarding this. Looks like Idan found the right answer. Still, I repeated the
analysis and have some suggestions.

Looked at the code mentioned in the message chain. This is a place where our error handling
could use work:

  public void allocateNew() throws OutOfMemoryException {
    if (!allocateNewSafe()) {
      throw new OutOfMemoryException();
    }
  }

Some default allocation failed, but we preserve none of the relevant information: nothing
about the kind of vector, nothing about the cause of the failure. There are dozens of implementations
of allocateNewSafe(); it is impossible to determine which was called.

A typical implementation:
  public boolean allocateNewSafe() {
    long curAllocationSize = ...
    try{
      allocateBytes(curAllocationSize);
    } catch (DrillRuntimeException ex) {
      return false;
    }
    return true;
  }


We catch the exception, then ignore it. Sign... We can fix all this, but it does not help
with this specific issue. See DRILL-7658.

As it turns out, most of the implementations are in the generated vector classes. These classes,
oddly, have their own redundant copy of allocateNewSafe(). Since we don't see those methods
on the stack, we can quickly narrow down the candidates to:

* AbstractMapVector (any map vector)
* A few others that won't occur for Parquet


Given this, it means the allocation is failing when allocating a map. Idan mentions "one column
is an array of a single element comprised of 15 columns". We can presume that the "element"
is actually a map, and that the map has 15 columns.

So, looks like the map allocation failed during a partition sender (the next element on the
stack). The partition sender takes incoming batches (presumably from he scan, though the stack
trace does not say because were at the root of the DAG), and splits them by key to destination
nodes.

Idan mentions the query runs on a single machine. So, the partitions are only to threads on
that same machine. Idan mentions a 16-core machine. Since Drill parallelizes queries to 70%
of the cores, we may be running 11 threads, so each partition sender tries to buffer data
for 11 receivers. Each will buffer three batches of data for a total of 33 batches.

Next we need to know how many records are in each batch. Seems we have two default values,
defined in drill-override.conf:


    store.parquet.flat.batch.num_records: 32767,
    store.parquet.complex.batch.num_records: 4000,


If we think the record has a map, then perhaps Parquet choose the "complex" count of 4000
records? I think this can be checked by looking at the query profile which, if I recall, should
be produced even for a failed query.

So, let's guess 4000 records * 33 buffered batches = 130K records. We don't know the size
of each, however. (And, note that Idan said that he artificially increased parallelism, so
the buffering need is greater than the above back-of-the-envelope calcs.)


We do know the size of the data: 15 files of 150K each. Let's assume that is compressed. So,
if all files are in memory, that would be 15 * 150K * 10:1 compression ratio = 22 MB, which
is tiny. So, it is unlikely that, Drill is actually buffering all 33 batches. This tells us
that something else is going wrong; we are not actually running out memory for data, just
as Idan suggested, we are exhausting memory for some other reason.


Reading further it looks like Idan found his own solution. He increased parallelism to the
point where the internal buffering of each Parquet reader used up all available memory. This
is probably a bug, but Parquet is a a fiendishly complex beast. Over time, people threw all
kinds of parallel readers, buffering and other things at it to beat Impala in TPC benchmarks.

Since a query that finishes is faster than a highly-tuned query that crashes, I'd recommend
throttling the slice count back. You really only need as many as there are cores. In fact,
you need less. Unlike other readers, Parquet launches a bunch of its own parallel readers
so each single Parquet reader will have many (I don't recall the number) of parallel column
readers, each aggressively buffering everything it can.

Since the data is small, there is no need for such heroics: Drill can read 20+ meg of data
quite quickly, even with a few threads. So, try that first and see if that works.

Once the query works, study the query profile to determine the memory budget and CPU usage.
Tune from there, keeping memory well within the available bounds.


Thanks,
- Paul

 

    On Tuesday, March 24, 2020, 11:46:47 AM PDT, Charles Givre <cgivre@gmail.com> wrote:
 
 
 
Idan Sheinberg  8:21 AM
Hi  there
I'm trying run a simple offset query (ORDER BY timestamp LIMIT 500 OFFSET 1000) against rather
complex parquet files (say 4 columns, once being an array currently consisting of a single
element comprised of 15 columns)
All files share the same Schema, of course.
 User Error Occurred: One or more nodes ran out of memory while executing the query. (null)
org.apache.drill.common.exceptions.UserException: RESOURCE ERROR: One or more nodes ran out
of memory while executing the query.
null
[Error Id: 67b61fc9-320f-47a1-8718-813843a10ecc ]
    at org.apache.drill.common.exceptions.UserException$Builder.build(UserException.java:657)
    at org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:338)
    at org.apache.drill.common.SelfCleaningRunnable.run(SelfCleaningRunnable.java:38)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.drill.exec.exception.OutOfMemoryException: null
    at org.apache.drill.exec.vector.complex.AbstractContainerVector.allocateNew(AbstractContainerVector.java:59)
    at org.apache.drill.exec.test.generated.PartitionerGen5$OutgoingRecordBatch.allocateOutgoingRecordBatch(PartitionerTemplate.java:380)
    at org.apache.drill.exec.test.generated.PartitionerGen5$OutgoingRecordBatch.initializeBatch(PartitionerTemplate.java:400)
    at org.apache.drill.exec.test.generated.PartitionerGen5.setup(PartitionerTemplate.java:126)
    at org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.createClassInstances(PartitionSenderRootExec.java:263)
    at org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.createPartitioner(PartitionSenderRootExec.java:218)
    at org.apache.drill.exec.physical.impl.partitionsender.PartitionSenderRootExec.innerNext(PartitionSenderRootExec.java:188)
    at org.apache.drill.exec.physical.impl.BaseRootExec.next(BaseRootExec.java:93)
    at org.apache.drill.exec.work.fragment.FragmentExecutor$1.run(FragmentExecutor.java:323)
    at org.apache.drill.exec.work.fragment.FragmentExecutor$1.run(FragmentExecutor.java:310)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:422)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
    at org.apache.drill.exec.work.fragment.FragmentExecutor.run(FragmentExecutor.java:310)
    ... 4 common frames omitted
Now, I'm running this query from a 16 core, 32GB Ram machine, with Heap sized at 20GB, Eden
sized at 16GB (added manually to JAVA_OPTS) and Direct Sized at 8 GB.
By querying sys.memory I can confirm all limits apply. At no point throughout the query Am
I nearing memory limit of the HEAP/DIRECT or the OS itself





8:25
However, due to the way org.apache.drill.exec.vector.complex.AbstractContainerVector.allocateNew
is impelmented
8:27
@Override
  public void allocateNew() throws OutOfMemoryException {
    if (!allocateNewSafe()) {
      throw new OutOfMemoryException();
    }
  }
8:27
The actual exception/error is swallowed, and I have no idea what's the cause of the failure
8:28
The data-set itself consists of say 15 parquet files, each one weighing at about 100kb
8:30
but as mentioned earlier, the parquet files are a bit more complex than the usual.
8:32
@cgivre @Vova Vysotskyi is there anything I can do or tweak to make this error go away?

cgivre  8:40 AM
Hmm...
8:40
This may be a bug.  Can you create an issue on our JIRA board?

Idan Sheinberg  8:43 AM
Sure
8:43
I'll get to it

cgivre  8:44 AM
I'd like for Paul Rogers to see this as I think he was the author of some of this.

Idan Sheinberg  8:44 AM
Hmm. I'll keep that in mind

cgivre  8:47 AM
We've been refactoring some of the complex readers as well, so its possible that is caused
this, but I'm not really sure.
8:47
What version of Drill?

cgivre  9:11 AM
This kind of info is super helpful as we're trying to work out all these details.
9:11
Reading schemas on the fly is not trivial, so when we find issues, we do like to resolve them

Idan Sheinberg  9:16 AM
This is drill 0.18 -SNAPSHOT as of last month
9:16
Ummmm
9:16
I do think I managed to resolve the issue however
9:16
I'm going to run some additional tests and let you know

cgivre  9:16 AM
What did you do?
9:17
You might want to rebase with today's build as well

Idan Sheinberg  9:21 AM
I'll come back with the details in a few moments

cgivre  9:38 AM
Thx
new messages

Idan Sheinberg  9:50 AM
Ok. See it seems as though it's a combination of a few things.
The data-set in question is still small (as mentioned before), but we are setting planner.slice_target 
to an extremely low value in order to trigger parallelism and speed up parquet parsing by
using multiple fragments.
We have 16 cores, 32 GB (C5.4xlarge on AWS) but we set planner.width.max_per_node  to further
increase parallelism.  it seems as though each fragment is handling parquet parsing on it's
own, and somehow incurs a great burden on
the direct memory buffer pool, as I do see 16GB peaks of direct memory usage after lowering
the planner.width.max_per_node to 16 (our available core).
The query planner itself reports the HASH_PARTITION_SENDER as the largest phase with 1-2 GB
of memory utilization
Seeing such an impact (16 GB of direct memory) for 1K items spread across 15 files, even with
a very complex parquet schema, seems unreasonable to me  
Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message