spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Marcin Tustin <mtus...@handybook.com>
Subject Re: [Spark-SQL] Reduce Shuffle Data by pushing filter toward storage
Date Thu, 21 Apr 2016 18:10:29 GMT
I think that's an important result. Could you format your email to split
out your parts a little more? It all runs together for me in gmail, so it's
hard to follow, and I very much would like to.

On Thu, Apr 21, 2016 at 2:07 PM, atootoonchian <ali@levyx.com> wrote:

> SQL query planner can have intelligence to push down filter commands
> towards
> the storage layer. If we optimize the query planner such that the IO to the
> storage is reduced at the cost of running multiple filters (i.e., compute),
> this should be desirable when the system is IO bound. An example to prove
> the case in point is below from TPCH test bench:
>
> Let’s look at query q19 of TPCH test bench.
> select
>     sum(l_extendedprice* (1 - l_discount)) as revenue
> from lineitem, part
> where
>       ( p_partkey = l_partkey
>         and p_brand = 'Brand#12'
>         and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG')
>         and l_quantity >= 1 and l_quantity <= 1 + 10
>         and p_size between 1 and 5
>         and l_shipmode in ('AIR', 'AIR REG')
>         and l_shipinstruct = 'DELIVER IN PERSON')
>       or
>       ( p_partkey = l_partkey
>         and p_brand = 'Brand#23'
>         and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED PACK')
>         and l_quantity >= 10 and l_quantity <= 10 + 10
>         and p_size between 1 and 10
>         and l_shipmode in ('AIR', 'AIR REG')
>         and l_shipinstruct = 'DELIVER IN PERSON')
>       or
>       ( p_partkey = l_partkey
>         and p_brand = 'Brand#34'
>         and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG')
>         and l_quantity >= 20 and l_quantity <= 20 + 10
>         and p_size between 1 and 15
>         and l_shipmode in ('AIR', 'AIR REG')
>         and l_shipinstruct = 'DELIVER IN PERSON')
>
> Latest version of Spark creates a following planner (not exactly, more
> readable planner) to execute q19.
> Aggregate [(sum(cast((l_extendedprice * (1.0 - l_discount))
>   Project [l_extendedprice,l_discount]
>     Join Inner, Some(((p_partkey = l_partkey) &&
> ((((((
>    (p_brand = Brand#12) &&
>     p_container IN (SM CASE,SM BOX,SM PACK,SM PKG)) &&
>    (l_quantity >= 1.0)) && (l_quantity <= 11.0)) &&
>    (p_size <= 5)) ||
> (((((p_brand = Brand#23) &&
>      p_container IN (MED BAG,MED BOX,MED PKG,MED PACK)) &&
>     (l_quantity >= 10.0)) && (l_quantity <= 20.0)) &&
>     (p_size <= 10))) ||
> (((((p_brand = Brand#34) &&
>      p_container IN (LG CASE,LG BOX,LG PACK,LG PKG)) &&
>     (l_quantity >= 20.0)) && (l_quantity <= 30.0)) &&
>     (p_size <= 15)))))
>       Project [l_partkey, l_quantity, l_extendedprice, l_discount]
>         Filter ((isnotnull(l_partkey) &&
>                 (isnotnull(l_shipinstruct) &&
>                 (l_shipmode IN (AIR,AIR REG) &&
>                 (l_shipinstruct = DELIVER IN PERSON))))
>           LogicalRDD [l_orderkey, l_partkey, l_suppkey, l_linenumber,
> l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus,
> l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode,
> l_comment], MapPartitionsRDD[316]
>       Project [p_partkey, p_brand, p_size, p_container]
>         Filter ((isnotnull(p_partkey) &&
>     (isnotnull(p_size) &&
>     (cast(cast(p_size as decimal(20,0)) as int) >= 1)))
>           LogicalRDD [p_partkey, p_name, p_mfgr, p_brand, p_type, p_size,
> p_container, p_retailprice, p_comment], MapPartitionsRDD[314]
>
> As you see only three filter commands are pushed before join process is
> executed.
>   l_shipmode IN (AIR,AIR REG)
>   l_shipinstruct = DELIVER IN PERSON
>   (cast(cast(p_size as decimal(20,0)) as int) >= 1)
>
> And the following filters are applied during the join process
>   p_brand = Brand#12
>   p_container IN (SM CASE,SM BOX,SM PACK,SM PKG)
>   l_quantity >= 1.0 && l_quantity <= 11.0
>   p_size <= 5
>   p_brand = Brand#23
>   p_container IN (MED BAG,MED BOX,MED PKG,MED PACK)
>   l_quantity >= 10.0 && l_quantity <= 20.0
>   p_size <= 10
>   p_brand = Brand#34
>   p_container IN (LG CASE,LG BOX,LG PACK,LG PKG)
>   l_quantity >= 20.0 && l_quantity <= 30.0
>   p_size <= 15
>
> Let’s look at the following sequence of SQL commands which produce same
> result.
> val partDfFilter = sqlContext.sql("""
>         |select p_brand, p_partkey from part
>         |where
>         | (p_brand = 'Brand#12'
>         |   and p_container in ('SM CASE', 'SM BOX', 'SM PACK', 'SM PKG')
>         |   and p_size between 1 and 5)
>         | or
>         | (p_brand = 'Brand#23'
>         |   and p_container in ('MED BAG', 'MED BOX', 'MED PKG', 'MED
> PACK')
>         |   and p_size between 1 and 10)
>         | or
>         | (p_brand = 'Brand#34'
>         |   and p_container in ('LG CASE', 'LG BOX', 'LG PACK', 'LG PKG')
>         |   and p_size between 1 and 15)
>        """.stripMargin)
>
> val itemLineDfFilter = sqlContext.sql("""
>         |select
>         | l_partkey, l_quantity, l_extendedprice, l_discount from lineitem
>         |where
>         | (l_quantity >= 1 and l_quantity <= 30
>         |   and l_shipmode in ('AIR', 'AIR REG')
>         |   and l_shipinstruct = 'DELIVER IN PERSON')
>       """.stripMargin)
>
> partDfFilter.registerTempTable("partFilter")
> itemLineDfFilter.registerTempTable("lineitemFilter")
>
> var q19Query = """
>         |select
>         | sum(l_extendedprice* (1 - l_discount)) as revenue
>         |from
>         | lineitemFilter,
>         | partFilter
>         |where
>         | (p_partkey = l_partkey
>         |   and p_brand = 'Brand#12'
>         |   and l_quantity >= 1 and l_quantity <= 1 + 10)
>         | or
>         | ( p_partkey = l_partkey
>         |   and p_brand = 'Brand#23'
>         |   and l_quantity >= 10 and l_quantity <= 10 + 10)
>         | or
>         | ( p_partkey = l_partkey
>         |   and p_brand = 'Brand#34'
>         |   and l_quantity >= 20 and l_quantity <= 20 + 10)
>       """.stripMargin
>
> And as following planner shows how spark will execute new q19 query.
> Aggregate [(sum(cast((l_extendedprice * (1.0 - l_discount))
>   Project [l_extendedprice,l_discount]
>     Join Inner, Some(((p_partkey = l_partkey) &&
> (((((p_brand = Brand#12) &&
>     (l_quantity >= 1.0)) && (l_quantity <= 11.0)) ||
>   (((p_brand = Brand#23) &&
>     (l_quantity >= 10.0)) && (l_quantity <= 20.0))) ||
>   (((p_brand = Brand#34) &&
>     (l_quantity >= 20.0)) && (l_quantity <= 30.0)))))
>       Project [l_partkey, l_quantity, l_extendedprice, l_discount]
>         Filter ((isnotnull(l_partkey) &&
>                ((isnotnull(l_shipinstruct) &&
>                  isnotnull(l_quantity)) &&
>               (((cast(l_quantity as float) >= 1.0) &&
>                 (cast(l_quantity as float) <= 30.0)) &&
>                 (l_shipmode IN (AIR,AIR REG) &&
>                 (l_shipinstruct = DELIVER IN PERSON)))))
>           LogicalRDD [l_orderkey, l_partkey, l_suppkey, l_linenumber,
> l_quantity, l_extendedprice, l_discount, l_tax, l_returnflag, l_linestatus,
> l_shipdate, l_commitdate, l_receiptdate, l_shipinstruct, l_shipmode,
> l_comment], MapPartitionsRDD[316]
>             Project [p_partkey, p_brand, p_size, p_container]
>               Filter ((isnotnull(p_partkey) &&
>                 isnotnull(cast(cast(p_partkey as decimal(20,0)) as int)))
> &&
> (isnotnull(p_size) &&
>             ((cast(cast(p_size as decimal(20,0)) as int) >= 1) &&
>             (((((p_brand = Brand#12) &&
>                  p_container IN (SM CASE,SM BOX,SM PACK,SM PKG)) &&
>                      (cast(cast(p_size as decimal(20,0)) as int) <= 5)) ||
>   (((p_brand = Brand#23) &&
>      p_container IN (MED BAG,MED BOX,MED PKG,MED PACK)) &&
>      (cast(cast(p_size as decimal(20,0)) as int) <= 10))) ||
>   (((p_brand = Brand#34) &&
>      p_container IN (LG CASE,LG BOX,LG PACK,LG PKG)) &&
>      (cast(cast(p_size as decimal(20,0)) as int) <= 15))))))
>                   LogicalRDD [p_partkey, p_name, p_mfgr, p_brand, p_type,
> p_size, p_container, p_retailprice, p_comment], MapPartitionsRDD[314]
>
> With new approach all filter commands is pushed down beyond join process
>   l_shipmode IN (AIR,AIR REG)
>   l_shipinstruct = DELIVER IN PERSON
>   cast(cast(p_size as decimal(20,0)) as int) >= 1)
>   p_brand = Brand#12
>   p_container IN (SM CASE,SM BOX,SM PACK,SM PKG)
>   l_quantity >= 1.0 && l_quantity <= 11.0
>   p_size <= 5
>   p_brand = Brand#23
>   p_container IN (MED BAG,MED BOX,MED PKG,MED PACK)
>   l_quantity >= 10.0 && l_quantity <= 20.0
>   p_size <= 10
>   p_brand = Brand#34
>   p_container IN (LG CASE,LG BOX,LG PACK,LG PKG)
>   l_quantity >= 20.0 && l_quantity <= 30.0
>   p_size <= 15
>
> But still some filter commands needs to be executed during join process to
> distinguish different sets of items. In other words some filter commands
> are
> re-evaluated.
>   p_brand = Brand#12
>   l_quantity >= 1.0 && l_quantity <= 11.0
>   p_brand = Brand#23
>   l_quantity >= 10.0 && l_quantity <= 20.0
>   p_brand = Brand#34
>   l_quantity#807 >= 20.0 && l_quantity#807 <= 30.0
>
> Our main goal to push down filter as much as possible is to minimize I/O
> and
> maximize processor utilization. So let’s compare result of original q19 and
> modified q19 from I/O point of view.
>
>
> +--------+--------+---------------------------------------------+--------------------------------------------+
> | TPCH   | Stage  | Q19
> |
> Q19 modified                                      |
> | Scale   |
>
> +----------+---------------+----------------+----------+----------------+---------------+
> | Factor  |           | Input      | Shuffle Read | Shuffle Write  | Input
> | Shuffle Read  | Shuffle Write |
>
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
> | 1         | 1         | 724 MB  |                    | 4.2 MB           |
> 724 MB  |                      | 2.7 MB         |
>
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
> | 1         | 2         | 23.0 MB |                    | 4.0 MB           |
> 23.0 MB |                      | 22.9 KB        |
>
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
> | 1         | 3         |              | 8.2 MB         | 11.0 KB         |
> | 2.7 MB           | 11.0 KB        |
>
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
> | 1         | 4         |              | 11.0 KB        |
> |               | 11.0 KB        |                     |
>
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
> | 10       | 1         | 7.2 GB   |                     | 43.5 MB        |
> 7.2 GB    |                     | 28.0 MB        |
>
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
> | 10       | 2         | 232 MB  |                     | 39.1 MB        |
> 232 MB   |                     | 146.2 KB       |
>
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
> | 10       | 3         |              | 82.5 MB        | 11.0 KB         |
> | 28.1 MB         | 11.0 KB        |
>
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
> | 10       | 4         |              | 11.0 KB        |
> |              | 11.0 KB         |                    |
>
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
> | 100     | 1         | 74.1 GB |                     | 448 MB          |
> 74.1 GB |                      | 266 MB        |
>
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
> | 100     | 2         | 2.3 GB   |                     | 385 MB          |
> 2.3 GB   |                      | 1570 KB       |
>
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
> | 100     | 3         |              | 834 MB         | 11.0 KB         |
> | 288 MB          | 11.0 KB        |
>
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
> | 100     | 4         |              | 11.0 KB        |
> |              | 11.0 KB         |                    |
>
> +--------+--------+----------+---------------+----------------+----------+----------------+---------------+
>
> As rate of read and write amplification reduction for each scale factor is
> shown in the following table.
>
> +--------------------+--------------------------+------------------------------+--------+
> | TPCH Scale Facto  | Q19 Shuffle Data         | Q19 Modified Shuffle Data
> |
> Rate    |
>
> +--------------------+--------------------------+------------------------------+--------+
> | 1                         | 8.211 MB                    | 2.733 MB
> | 3.00    |
>
> +--------------------+--------------------------+------------------------------+--------+
> | 10                       | 82.611 MB                  | 28.157 MB
> | 2.93    |
>
> +--------------------+--------------------------+------------------------------+--------+
> | 100                     | 834.311 MB                | 288.081 MB
> | 2.89    |
>
> +--------------------+--------------------------+------------------------------+--------+
> So as you see shuffle read and write amplification can be reduced by factor
> of 3 if we can push more intelligent toward of storage.
>
>
>
>
>
>
> --
> View this message in context:
> http://apache-spark-developers-list.1001551.n3.nabble.com/Spark-SQL-Reduce-Shuffle-Data-by-pushing-filter-toward-storage-tp17297.html
> Sent from the Apache Spark Developers List mailing list archive at
> Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe, e-mail: dev-unsubscribe@spark.apache.org
> For additional commands, e-mail: dev-help@spark.apache.org
>
>

-- 
Want to work at Handy? Check out our culture deck and open roles 
<http://www.handy.com/careers>
Latest news <http://www.handy.com/press> at Handy
Handy just raised $50m 
<http://venturebeat.com/2015/11/02/on-demand-home-service-handy-raises-50m-in-round-led-by-fidelity/>
led 
by Fidelity


Mime
View raw message