spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Takeshi Yamamuro (JIRA)" <j...@apache.org>
Subject [jira] [Comment Edited] (SPARK-25196) Analyze column statistics in cached query
Date Thu, 23 Aug 2018 02:29:00 GMT

    [ https://issues.apache.org/jira/browse/SPARK-25196?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16589604#comment-16589604
] 

Takeshi Yamamuro edited comment on SPARK-25196 at 8/23/18 2:28 AM:
-------------------------------------------------------------------

yea, sure. I'll make a pr and do more discussion there after branch-2.4 cut. thanks.


was (Author: maropu):
yea, sure. So, I'll make a pr after branch-2.4 cut. thanks.

> Analyze column statistics in cached query
> -----------------------------------------
>
>                 Key: SPARK-25196
>                 URL: https://issues.apache.org/jira/browse/SPARK-25196
>             Project: Spark
>          Issue Type: New Feature
>          Components: SQL
>    Affects Versions: 2.3.1
>            Reporter: Takeshi Yamamuro
>            Priority: Minor
>
> In common usecases, users read catalog table data, join/aggregate them, and then cache
the result for following reuse. Since we are only allowed to analyze column statistics in
catalog tables via ANALYZE commands, the optimization depends on non-existing or inaccurate
column statistics of cached data. So, I think it'd be nice if Spark could analyze cached data
and hold temporary column statistics for InMemoryRelation.
> For example, we might be able to add a new API (e.g., analyzeColumnCacheQuery) to do
so in CacheManager;
>  POC: [https://github.com/apache/spark/compare/master...maropu:AnalyzeCacheQuery]
> {code:java}
> scala> sql("SET spark.sql.cbo.enabled=true")
> scala> sql("SET spark.sql.statistics.histogram.enabled=true")
> scala> spark.range(1000).selectExpr("id % 33 AS c0", "rand() AS c1", "0 AS c2").write.saveAsTable("t")
> scala> sql("ANALYZE TABLE t COMPUTE STATISTICS FOR COLUMNS c0, c1, c2")
> scala> val cacheManager = spark.sharedState.cacheManager
> scala> def printColumnStats(data: org.apache.spark.sql.DataFrame) = {
>      |   data.queryExecution.optimizedPlan.stats.attributeStats.foreach {
>      |     case (k, v) => println(s"[$k]: $v")
>      |   }
>      | }
> scala> def df() = spark.table("t").groupBy("c0").agg(count("c1").as("v1"), sum("c2").as("v2"))
> // Prints column statistics in catalog table `t`
> scala> printColumnStats(spark.table("t"))
> [c0#7073L]: ColumnStat(Some(33),Some(0),Some(32),Some(0),Some(8),Some(8),Some(Histogram(3.937007874015748,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@209c0be5)))
> [c1#7074]: ColumnStat(Some(997),Some(5.958619423369615E-4),Some(0.9988009488973438),Some(0),Some(8),Some(8),Some(Histogram(3.937007874015748,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@4ef69c53)))
> [c2#7075]: ColumnStat(Some(1),Some(0),Some(0),Some(0),Some(4),Some(4),Some(Histogram(3.937007874015748,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@7cbaf548)))
> // Prints column statistics on query result `df`
> scala> printColumnStats(df())
> [c0#7073L]: ColumnStat(Some(33),Some(0),Some(32),Some(0),Some(8),Some(8),Some(Histogram(3.937007874015748,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@209c0be5)))
> // Prints column statistics on cached data of `df`
> scala> printColumnStats(df().cache)
> <No Column Statistics>
> // A new API described above
> scala> cacheManager.analyzeColumnCacheQuery(df(), "v1" :: "v2" :: Nil)
>                                                                                 
> // Then, prints again
> scala> printColumnStats(df())
> [v1#7101L]: ColumnStat(Some(2),Some(30),Some(31),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@e2ff893)))
> [v2#7103L]: ColumnStat(Some(1),Some(0),Some(0),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@1498a4d)))
> scala> cacheManager.analyzeColumnCacheQuery(df(), "c0" :: Nil)
> scala> printColumnStats(df())
> [v1#7101L]: ColumnStat(Some(2),Some(30),Some(31),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@e2ff893)))
> [v2#7103L]: ColumnStat(Some(1),Some(0),Some(0),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@1498a4d)))
> [c0#7073L]: ColumnStat(Some(33),Some(0),Some(32),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;@626bcfc8)))
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message