drill-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rahul challapalli <challapallira...@gmail.com>
Subject Re: Pushing down Joins, Aggregates and filters, and data distribution questions
Date Thu, 01 Jun 2017 20:32:22 GMT
I would first recommend you spend some time reading the execution flow
inside drill [1]. Try to understand specifically what major/minor fragments
are and that different major fragments can have different levels of
parallelism.

Let us take a simple query which runs on a 2 node cluster

select * from employee where salary > 100000;

Now how do we control parallelism for the above query? Unfortunately the
generic answer is not a simple one. But since I conveniently took a simple
query with a single major fragment, lets make an effort to understand this.
There are 3 variables which control the parallelism

1. No of cores available
2. planner.width.max_per_node : Maximum number of minor fragments within a
major fragment per node
3. Parallelism supported by the scan for the particular storage plugin
involved

Lets try to understand the last parameter which is of interest to storage
plugin developers. Like you hinted, the number of sub-scans determines the
parallelism of the above query in the absence of the first 2 variables. But
how many subscan's can exist? This unfortunately depends on how you can
split the data (by respecting the row boundaries) and is dependent on the
storage format. Hypothetically, lets say you have a file which is composed
of 100 parts and each part contains few records and you know that a single
record is not split across multiple parts. Now with this setup, the storage
plugin simply has to get the number of parts present in the data and
instantiate that many subscans.

So in the above simplistic setup the max parallelization that can be
achieved for the major fragment (and in effect the whole query) is
determined by the number of parts present in the data which is 100. Now if
you do not set (2), the default max parallelization limit is 70% of the
number of cores available. If (2) is set by the user, that determines the
max threads that can be used per node. So for our example, the max
parallelization that can be supported is MIN(100,
planner.width.max_per_node). So if the user has planner.width.max_per_node
set to 30, then we end up with a total of 60 threads (on 2 nodes combined)
which need to run 100 minor fragments

With this understanding lets move to the next related topic which is
"Assignment". Now we have 60 threads (across 2 nodes) and 100 minor
fragments. So how do you assign minor fragments to specific nodes? This is
determined by the affinity that a particular node has for handling a
particular subscan. This can be controlled by the storage plugin by using
the "public List<EndpointAffinity> getOperatorAffinity()" method in the
GroupScan class.

Now to your questions

1. If I have multiple *SubScan*s to be executed, will each *SubScan* be
   handled by a single *Scan* operator ? So whenever I have *n* *SubScan*s,
   I'll have *n* Scan operators distributed among Drill's cluster ?

I am not sure if I even understood your question correctly. Each minor
fragment gets executed in a single thread. In my example, each minor
fragment executes one subscan, followed by project, filter etc. Read [1] to
understand more about this.

2. How can I control the amount of any type of physical operators per
   Drill cluster or node ? For instance, what if I want to have less
   *Filter* operators or more *Scan* operators, how can I do that ?

I am not sure if we can control parallelism at the operator level within a
major fragment.

[1] https://drill.apache.org/docs/drill-query-execution/


On Thu, Jun 1, 2017 at 5:17 AM, Muhammad Gelbana <m.gelbana@gmail.com>
wrote:
>
> First of all, I was very happy to at last attend the hangouts meeting,
I've
> been trying to do so for quite sometime.
>
> I know I confused most of you during the meeting but that's because my
> requirements aren't crystal clear at the moment and I'm still learning
what
> Drill can do. Hopefully I learn enough so I would be confident about the
> options I have when I need to make implementation decisions.
>
> Now to the point, and let me restate my case..
>
> We have a proprietary datasource that can perform limits, aggregations,
> filters and joins very fast. This datasource can handle SQL queries but
not
> all possible SQL syntax. I've been successful, so far, to pushdown joins,
> filters and limits, but I'm still struggling with aggregates. I've sent an
> email about aggregates to Calcite's mailing list.
>
> The amount of data this datasource may be required to process can be
> billions of records and 100s of GBs of data. So we are looking forward to
> distribute this data among multiple servers to overcome storage
limitations
> and maximize throughput.
>
> This distribution can be just duplicating the data to maximize throughput,
> so each server will have the same set of data, *or* records may be
> distributed among different servers, without duplication among these
> servers because a single server may not be able to hold all the data. So
> some tables may be duplicated and some tables may be distributed among
> servers. Let's assume that the distribution details of each table is
> available for the plugin.
>
> Now I understand that for Drill to implement a query, it supports a
> set of physical
> operators <https://drill.apache.org/docs/physical-operators/>. These
> operators logic\code is generated at runtime and it's distributed among a
> Drill cluster to be compiled and executed.
>
> So to scan a table distributed\duplicated among 3 servers, I may want to
> configure Drill to execute *SELECT * FROM TABLE* by running the same query
> with an extra filter (to read\scan a specific portion of the table if the
> table is duplicated, to maximize throughput) or by running the query
> without modifications but having Drill run the query multiple times, once
> against each server. I assume this can be done by having the table's
> *GroupScan* return 3 different *SubScan*s when the
> *GroupScan.getSpecificScan(int)* is called multiple times, with different
> parameters of course.
>
> These different parameters can be controlled by the output of
> *GroupScan.getMinParallelizationWidth()* and
> *GroupScan.getMaxParallelizationWidth()*, correct ?
>
> Please correct me if I'm wrong about anything.
>
> Now assuming what I said is correct, I have a couple of questions:
>
>    1. If I have multiple *SubScan*s to be executed, will each *SubScan* be
>    handled by a single *Scan* operator ? So whenever I have *n*
*SubScan*s,
>    I'll have *n* Scan operators distributed among Drill's cluster ?
>    2. How can I control the amount of any type of physical operators per
>    Drill cluster or node ? For instance, what if I want to have less
>    *Filter* operators or more *Scan* operators, how can I do that ?
>
> I'm aware that my distribution goal may not be as simple as I may have
made
> it sound.
>
> Pardon me for the huge email and thanks a lot for your time.
>
> *---------------------*
> *Muhammad Gelbana*
> http://www.linkedin.com/in/mgelbana

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message