spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Eric Liang <...@databricks.com>
Subject Re: Scaling partitioned Hive table support
Date Mon, 08 Aug 2016 19:51:05 GMT
I like the former approach -- it seems more generally applicable to other
catalogs and IIUC would let you defer pruning until execution time. Pruning
is work that should be done by the catalog anyways, as is the case when
querying over an (unconverted) hive table.

You might also want to look at https://github.com/apache/spark/pull/14241 ,
which refactors some of the file scan execution to defer pruning.

On Mon, Aug 8, 2016, 11:53 AM Michael Allman <michael@videoamp.com> wrote:

> Hello,
>
> I'd like to propose a modification in the way Hive table partition
> metadata are loaded and cached. Currently, when a user reads from a
> partitioned Hive table whose metadata are not cached (and for which Hive
> table conversion is enabled and supported), all partition metadata is
> fetched from the metastore:
>
>
> https://github.com/apache/spark/blob/5effc016c893ce917d535cc1b5026d8e4c846721/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveMetastoreCatalog.scala#L252-L260
>
> This is highly inefficient in some scenarios. In the most extreme case, a
> user starts a new Spark app, runs a query which reads from a single
> partition in a table with a large number of partitions and terminates their
> app. All partition metadata are loaded and their files' schema are merged,
> but only a single partition is read. Instead, I propose we load and cache
> partition metadata on-demand, as needed to build query plans.
>
> We've long encountered this performance problem at VideoAmp and have taken
> different approaches to address it. In addition to the load time, we've
> found that loading all of a table's partition metadata can require a
> significant amount of JVM heap space. Our largest tables OOM our Spark
> drivers unless we allocate several GB of heap space.
>
> Certainly one could argue that our situation is pathological and rare, and
> that the problem in our scenario is with the design of our tables—not
> Spark. However, even in tables with more modest numbers of partitions,
> loading only the necessary partition metadata and file schema can
> significantly reduce the query planning time, and is definitely more memory
> efficient.
>
> I've written POCs for a couple of different implementation approaches.
> Though incomplete, both have been successful in their basic goal. The first
> extends `org.apache.spark.sql.catalyst.catalog.ExternalCatalog` and as such
> is more general. It requires some new abstractions and refactoring of
> `HadoopFsRelation` and `FileCatalog`, among others. It places a greater
> burden on other implementations of `ExternalCatalog`. Currently the only
> other implementation of `ExternalCatalog` is `InMemoryCatalog`, and my code
> throws an `UnsupportedOperationException` on that implementation.
>
> The other approach is simpler and only touches code in the codebase's
> `hive` project. Basically, conversion of `MetastoreRelation` to
> `HadoopFsRelation` is deferred to physical planning when the metastore
> relation is partitioned. During physical planning, the partition pruning
> filters in a logical query plan are used to select the required partition
> metadata and a `HadoopFsRelation` is built from those. The new logical plan
> is then re-injected into the planner.
>
> I'd like to get the community's thoughts on my proposal and implementation
> approaches.
>
> Thanks!
>
> Michael
>

Mime
View raw message