drill-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Paul Rogers <par0...@yahoo.com.INVALID>
Subject Re: Embedding Drill as a distributed query engine
Date Wed, 22 Jan 2020 06:08:43 GMT
Hi Benjamin,

Your comments all make perfect sense. Using Spark is a great idea for long-running jobs, or
those that need code, such as ML and so on. Drill works best for tasks that can be expressed
in SQL. As you know, Spark needs to start a new JVM for each query so it can host job-specific
byte codes. Drill, being SQL based, just compiles SQL to Java and executes all queries in
the same JVM. Queries thus start faster and there is less overhead, which is what you want
if you run many short-lived, SQL queries. (Where short-lived means roughly "less than a minute.")
In short, Drill and Spark solve different problems; if you have both sets of problems, you'll
need both tools at your disposal.]

A bit more about Drill. Drill does use Calcite. But, Drill is not a simple data layer on top
of Calcite. Drill is a fully distributed, general-purpose big data engine, like Presto, Spark
or Impala. Both Drill and Spark create multi-level DAGs.

When possible, aggregation will be done locally, but often that is not possible. Consider
a simple of sum of products sold, broken down by product:

SELECT sum(quantity) FROM prodSales
GROUP BY prodCode

Although the first-level aggregation can be done locally on each node, roll-up aggregation
must be done by bringing like data together. Drill does this with hash partitioning: if you
have N Drillbits, data from each of the Drillbits will be exchanged to the other N. with 1/N
going from each sender to each receiver.

That way, aggregation is also distributed; there is no single "reducer" node. After aggregation
is complete, all N Drillbits send their totals to the Foreman and then on to the client. So,
a question would be if your DB nodes can handle the load of the exchanged data? Or, is all
this shuffling better left to dedicated query nodes?

The above shows that Drill makes generous use of RPC to exchange data. Drill's RPC is of binary
value vectors, and so is quite efficient. You mentioned your API is not performant. I wonder,
are you using a compact binary format, or a more classic JSON-based REST API? In my experience,
REST does not perform very well in this use case. The best RPC solution would be to copy binary
data into a buffer, send it over the wire to Drill, and copy that binary data directly into

The next issue is data volume. If your queries are selective, you'll ship lots of data to
Drill, only for Drill to discard it. A better solution is to implement "filter push-down"
to reduce the amount of data read from your DB. Do you have indexes? Shards? Key/value pairs?
In any of those cases, you can write code in Drill to push predicates to your DB, in the form
your DB needs, so you reduce the volume of data sent to Drill (or Spark or whatever.)

The same is true of projection push-down; only ship off your DB node the columns that Drill
wants. Many "NoSQL" databases offer some form of this "data access" API; perhaps your DB does

Apache Arrow is trying to do something similar. We've discussed having an Arrow integration
in Drill, at least for reading data. Arrow might provide a useful filtering mechanism. But,
unless your data is stored columnar, you'll pay the overhead of copying data to Arrow. On
the Drill side, a copy in to Drill vectors is needed whether the data arrives in Arrow or
some other format.

A clever (though non-trivial) solution would be to load data in your DB into Drill vectors,
use Drill RPC to ship that to a Drillbit, where Drill could use the data without the need
for a copy. (This is what Arrow promises, but Drill is not based on Arrow.) That way, you
benefit from Drill's already-built data transfer mechanism.

You also mentioned that context switches are costly. Tricky to debug that via e-mail. Are
you sending only one row at a time perhaps? If so, you really want to send decent-sized batches:
1K or 4K rows, up to some reasonable message size such as 1 MB. Since Drill (and presumably
your DB) are multi-threaded, there will be thread-switches even if they are in the same process.
Drill uses large record batches to minimize its own thread switches. Presto, Impala, Spark
and Hive do the same.

So, maybe two options to consider:

1. A two-part solution in which your DB does the first-level projection/selection filtering
and Drill does the remaining heavy lifting (joins, aggregations, etc.)

2. A library-based solution. I believe that the Calcite project does, in fact, have a simple
data engine available as part of the project. I recall seeing some demo that reads CSV files
(check out the Docs page). If you only need simple queries, and want to use Calcite, maybe
this is an alternative. It won't scale like Drill (or Spark), but depending on your use case,
it might be worth a look; if only to contrast with Drill.

- Paul


    On Tuesday, January 21, 2020, 5:58:05 PM PST, Benjamin Schaff <benjamin.schaff@reactivecore.com>
 Thanks you all for your quick answers that's amazing responsiveness.

Let me address things the best I could since it's a bit complex so you
could provide me with some feedback.

The current SQL engine is based on spark SQL and is embedded a little bit
better than a custom datasource so I am fully aware of memory constraints
and everything related to it.

The issue I have with that model is the context switch between spark
internal format and our rpc layer that is really costly for a still unclear
reason. Using calcite directly on a node to query just one partition gave
us really good performance with our rpc endpoint so I wanted to embed drill
in the same JVM (which you can't do with spark because the executors are
forked on the fly) was to avoid that context switching. If nobody think
it's wise, feasible or easy I guess I could try the same integration as
spark but also providing a shared memory API.

I also wanted to leverage embedded mode in the hope that partial
aggregations would be executed close to the data avoiding a lot of

What are your thoughts ?

Thanks in advance.

Le mar. 21 janv. 2020 20 h 10, Paul Rogers <par0328@yahoo.com.invalid> a
écrit :

> Hi Benjamin,
> Very cool project! Drill works well on top of custom data sources.
> That said, I suspect that actually running Drill inside your process will
> lead to a large amount of complexity. Your comment focuses on code issues.
> However, there are larger concerns. Although we think of Drill as a simple
> single-threaded, single node tool (when run in SqlLine or on a Mac), Drill
> is designed to be fully distributed.
> As queries get larger, you will find that Drill itself uses large amounts
> of memory and CPU to run a query quickly. (Imagine a join or sort of
> billions of rows from several tables.) Drill has its own memory management
> system to handle the large blocks of memory needed. Your DB also needs
> memory. You'd need a way to unify Drill's memory management with your own
> -- a daunting task.
> Grinding through billions of rows is CPU intensive. Drill manages its own
> thread and makes very liberal use of CPU. Your DB engine likely also has a
> threading model. Again, integrating the two is difficult. We could go on.
> In short, although Drill works well as a query engine on top of a custom
> data source; Drill itself is not designed to be a library included into
> your app process; it is designed to run as its own distributed set of
> processes running alongside your process.
> We could, of course, change the design, but that would be a bit of a big
> project because of the above issues. Might be interesting to think how
> you'd embed a distributed framework as a library in some host process. Not
> sure I've ever seen this done for any tool. (If anyone knows of an example,
> please let us know.)
> I wonder if there is a better solution. Run Drill alongside your DB on the
> same nodes. Have Drill then obtain data from your DB via an API. The quick
> & dirty solution is to use an RPC API. You can get fancy and use shared
> memory. A side benefit is that other tools can also use the API. For
> example, if you find you need Spark integration, it is easier to provide.
> (You can't, of course, run Spark in your DB process.)
> In this case, an "embedded solution" means that Drill is embedded in your
> app cluster (like ZK), not that it is embedded in your app process.
> In this way, you can tune Drill's memory and CPU usage separately from
> that of your engine, making the problem tractable. This model is, in fact,
> very similar to the traditional HDFS model in which both Drill and HDFS run
> on the same nodes. It is also similar to what MapR did with the MapR DB
> integration.
> Further, by separating the two, you can run Drill on its own nodes if you
> find your queries are getting larger and more expensive. That is, you can
> scale out be separating compute (Drill) from storage (your DB), allowing
> each to scale independently.
> And, of course, a failure in one engine (Drill or DB) won't take down the
> other if the two are in separate processes.
> In either case, your storage plugin needs to compute data locality. If
> your DB is distributed, then perhaps it has some scheme for distributing
> data: hash partitioning, range partitioning, or whatever. Somehow, if I
> have key 'x', I know to go to node Y to get that value. For example, in
> HDFS, Drill can distribute block scans to the node(s) with the blocks.
> Or, maybe data is randomly distributed, so that every scan must run
> against every DB node; in which case if you have N nodes, you'll run N
> scans and each will find whatever it happens to contain.
> If your DB has N nodes, then you need to distribute work to those nodes by
> telling Drill that the max parallelization (reported by the group scan) is
> N. Then, Drill will ask you for the SubScan for each of the N scans, and
> you can allocate work to those nodes. Either by subsetting the scan (as in
> HDFS) or just running the same scan everywhere.
> If you go with the two-process model, then your storage plugin can use
> soft affinity: run the scan on the node that has your DB, else run it on
> any node and use an RPC to obtain the data. This is how Drill works if it
> runs on a subset of HDFS nodes.
> You also asked about the Foreman. At present, Drill assumes nodes are
> homogeneous: all nodes evenly share work, including the work of the
> Foreman. Impala, for example, has added a feature to dedicate some nodes to
> be only coordinators (the equivalent of Drill's Foreman). Drill does not
> yet have that feature.
> Without the homogeneity assumption, Drill would need some kind of work
> scheduler to know to give less work to the Forman + Drillbit node and more
> work to the Drillbit-only nodes. Having Foreman-only nodes would keep
> things simpler. In your ase, such a Foreman would have to reside on a node
> other than one of your DB nodes to keep the DB nodes symmetrical.
> The above is a high-level survey of the challenges. We'd be happy to
> discuss specific issues as you refine your design.
> Thanks,
> - Paul
>    On Tuesday, January 21, 2020, 3:00:21 PM PST, Benjamin Schaff <
> benjamin.schaff@reactivecore.com> wrote:
>  Hi everyone,
> I would like to see if you could provide some recommendations/help around
> integrating Apache Drill as a distributed sql engine in a custom database.
> Maybe I am going about it the wrong way so any feedback is appreciated.
> What I would like to achieve, is to be able to embed drillbits into my
> database node, it's a distributed database written mostly in scala so it's
> running inside the jvm. As you would expect, each storage node holds a
> partition of the data and I would like for each SubScan to be routed to the
> drillbit instance embedded within the database node.
> At this point, drillbits are running communicating properly with zk (I am
> using zookeeper for the database also). I can connect to the Plugin I
> created using sqlline and I can list schemas and tables. So basically, all
> the metadata part is done and working.
> I managed to build-up the patitionwork and affinity using the distributed
> metadata off the database and I am stuck in the following situation.
> If I override the "DistributionAffinity getDistributionAffinity()" method
> to put it to "HARD", then I end up with having the following error:
> "IllegalArgumentException: Sender fragment endpoint list should not be
> empty", and the "applyAssignments" method of the GroupScan receives and
> empty list of endpoints.
> If I don't override it then node without "local access" get some work
> scheduled.
> I was wondering if there was a way to exclude drillbits to become a
> foreman.
> Thanks in advance for any guidance.
> --
> *This e-mail and any
> attachments may contain confidential information and
> is intended for use solely
> by the addressee(s).  If you are not the
> intended recipient of this e-mail, please be aware that any dissemination,
> distribution, copying, or other use of the e-mail in whole or in part, is
> strictly prohibited.  If you have
> received this e-mail in error, please
> notify the sender and permanently delete
> the original and all copies of the
> e-mail, attachments, and any printouts. * **

*This e-mail and any
attachments may contain confidential information and 
is intended for use solely
by the addressee(s).  If you are not the

intended recipient of this e-mail, please be aware that any dissemination,

distribution, copying, or other use of the e-mail in whole or in part, is

strictly prohibited.  If you have
received this e-mail in error, please 
notify the sender and permanently delete
the original and all copies of the 
e-mail, attachments, and any printouts. * **  
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message