spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gengliang Wang <>
Subject [OSS DIGEST] The major changes of Apache Spark from Feb 26 to Mar 10
Date Tue, 31 Mar 2020 05:26:20 GMT
Hi all,

This is the bi-weekly Apache Spark digest from the Databricks OSS team.
For each API/configuration/behavior change, there will be an *[API] *tag in
the title.

Mark running map stages of finished job as finished, and cancel running
tasks (+59, -12)>

When a job finished, its running (re-submitted) map stages should be marked
as finished if not used by the other jobs. The running tasks of these
stages are cancelled. Also, the ListenerBus should be notified too,
otherwise, these map stage items will stay on the "Active Stages" page of
web UI and never gone. This PR fixed the issues.

[image: active_stage]
Remove resource coordination support from Standalone (+17, -463)>

Resource coordination is mainly designed for the scenario where multiple
workers launched on the same host. However, it is, actually, a non-existed
scenario for today's Spark. Spark now can start multiple executors in a
single Worker, while it only allows one executor per Worker at very
beginning. It really helps nothing for users to launch multiple workers on
the same host. Thus, it's not worth for us to bring over complicated
implementation and potential high maintain cost for such an impossible
Fix Encoder thread-safety bug in createDataset(Seq) (+2, -1)>

This PR fixes a thread-safety bug in SparkSession.createDataset(Seq): if
the caller-supplied Encoder is used in multiple threads then
createDataset's usage of the encoder may lead to incorrect / corrupt
results because the Encoder's internal mutable state will be updated from
multiple threads.
Use its sql type for UDT when checking the type of length (fixed/var) or
mutable (+85, -1)>

This patch fixes the bug of UnsafeRow, which does not handle the UDT
specifically, in isFixedLength and isMutable. These methods don't check its
SQL type for UDT, always treating UDT as variable-length, and non-mutable.

It doesn't bring any issue if UDT is used to represent the complicated
type, but when UDT is used to represent some type that is matched with
fixed length of SQL type, it exposes the chance of correctness issues, as
these informations sometimes decide how the value should be handled.

Misclassifying the type of length for UDT can corrupt the value when the
row is presented to the input of GenerateUnsafeRowJoiner, which brings the
correctness issue.
ClassCastException when a generator having nested inner generators (+40, -3)

A query below failed;

scala> sql("select array(array(1, 2), array(3))
20/03/01 13:51:56 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
java.lang.ClassCastException: scala.collection.mutable.ArrayOps$ofRef
cannot be cast to org.apache.spark.sql.catalyst.util.ArrayData
	at org.apache.spark.sql.catalyst.expressions.ExplodeBase.eval(generators.scala:313)
	at org.apache.spark.sql.execution.GenerateExec.$anonfun$doExecute$8(GenerateExec.scala:108)
	at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
	at scala.collection.Iterator$ConcatIterator.hasNext(Iterator.scala:222)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:458)

This PR modified the hasNestedGenerator code in ExtractGenerator for
correctly catching the nested inner generators.
"[SPARK-30808][SQL] Enable Java 8 time API in Thrift server" (+43, -67)>

In the Hive module, the server side provides the Date-time values that
simply use value.toSting, and the client side regenerates the results back
in HiveBaseResultSet with java.sql.Date(Timestamp).valueOf. There will be
inconsistency between client and server if we use java8 APIs. Thus, the PR
got reverted.
MapType should be prohibited in hash expressions (+39, -17)>

hash() and xxhash64() cannot be used on elements of Maptype. A new
configuration spark.sql.legacy.allowHashOnMapType is introduced to allow
users to restore the previous behaviour.

When spark.sql.legacy.allowHashOnMapType is set to false:

scala> spark.sql("select hash(map())");
org.apache.spark.sql.AnalysisException: cannot resolve 'hash(map())'
due to data type mismatch: input to function hash cannot contain
elements of MapType; line 1 pos 7;
'Project [unresolvedalias(hash(map(), 42), None)]
+- OneRowRelation

when spark.sql.legacy.allowHashOnMapType is set to true :

scala> spark.sql("set spark.sql.legacy.allowHashOnMapType=true");
res3: org.apache.spark.sql.DataFrame = [key: string, value: string]

scala> spark.sql("select hash(map())").first()
res4: org.apache.spark.sql.Row = [42]

Default table provider should be decided by catalog implementations (+84,

When CREATE TABLE SQL statement does not specify the provider, leave it to
the catalog implementations to decide.
PruneHiveTablePartitions should be executed as earlyScanPushDownRules (+21,

Similar to rule PruneFileSourcePartitions, PruneHiveTablePartitions should
also be executed as earlyScanPushDownRules to eliminate the impact on
statistic computation later.
Fix an analysis failure in generators with aggregate functions (+34, -0)>

We have supported the generators in SQL aggregate expressions by
SPARK-28782, but the generator(explode) query with the aggregate functions
in DataFrame failed.

The root cause is that ExtractGenerator wrongly replaces a project w/
aggregate functions before GlobalAggregates replaces it with an aggregate.

To avoid the case in ExtractGenerator, this PR adds a condition to ignore
generators having the aggregate functions.
Interval from year-month/date-time string should handle whitespaces (+115,

Currently, Spark parses interval from multi units strings or from
date-time/year-month pattern strings. The former handles all whitespace,
while the latter doesn't. After changes, Spark handles spaces for both
date-time and year-month pattern strings.

select interval '\n-\t10\t 12:34:46.789\t' day to second
-- !query 126 schema
struct<INTERVAL '-10 days -12 hours -34 minutes -46.789 seconds':interval>
-- !query 126 output
-10 days -12 hours -34 minutes -46.789 seconds

Disable using commit coordinator with NoopDataSource (+1, -0)>

This PR disables using the commit coordinator with NoopDataSource. There is
no need for a coordinator in benchmarks.
Raise exception instead of silent change for new DateFormatter (+269, -96)>

Add a new SQL conf spark.sql.legacy.timeParserPolicy:

   - When LEGACY, java.text.SimpleDateFormat is used for formatting and
   parsing dates/timestamps in a locale-sensitive manner, which is the
   approach before Spark 3.0.
   - When set to CORRECTED, classes from java.time.* packages are used for
   the same purpose.
   - The default value is EXCEPTION, RuntimeException is thrown when we
   will get different results.

Fix issues where some V1 commands allow tables that are not fully qualified
(+65, -42)>

There are few V1 commands such as REFRESH TABLE that still allow
spark_catalog.t because they run the commands with parsed table names
without trying to load them in the catalog. This PR addresses this issue.
Deprecate two-parameter TRIM/LTRIM/RTRIM functions (+66, -6)>

This PR aims to show a deprecation warning on two-parameter
TRIM/LTRIM/RTRIM function usages based on the community decision:
CreateArray/CreateMap's data type should not depend on SQLConf.get (+45,

This allows to avoid the issue when the configuration change between
different phases of planning, and this can silently break a query plan
which can lead to crashes or data corruption.
add back the legacy date/timestamp format support in CSV/JSON parser (+222,

Before Spark 3.0, the JSON/CSV parser has a special behavior that, when the
parser fails to parse a timestamp/date, fallback to another way to parse
it, to support some legacy format. The fallback was removed by and

This PR adds back this legacy fallback. Since we switch the API to do
datetime operations, we can't be exactly the same as before. Here we add
back the support of the legacy formats that are common (examples of Spark

   1. the fields can have one or two letters

scala> sql("""select from_json('{"time":"1123-2-22 2:22:22"}', 'time
|jsontostructs({"time":"1123-2-22 2:22:22"})|
|[1123-02-22 02:22:22]                      |

   1. the separator between data and time can be "T" as well

scala> sql("""select from_json('{"time":"2000-12-12T12:12:12"}', 'time
|[2000-12-12 12:12:12]                        |

   1. the second fraction can be arbitrary length

scala> sql("""select
from_json('{"time":"1123-02-22T02:22:22.123456789123"}', 'time
|[1123-02-15 02:22:22.123]                                 |

   1. date string can end up with any chars after "T" or space

scala> sql("""select from_json('{"time":"1123-02-22Tabc"}', 'time
|[1123-02-22]                            |

   1. remove "GMT" from the string before parsing

scala> sql("""select from_json('{"time":"GMT1123-2-22 2:22:22.123"}',
'time Timestamp')""").show(false)
|jsontostructs({"time":"GMT1123-2-22 2:22:22.123"})|
|[1123-02-22 02:22:22.123]                         |

Support time zone ids in casting strings to timestamps (+48, -32)>

This PR changes DateTimeUtils.stringToTimestamp to support any valid time
zone id at the end of input string. After the changes, the function accepts
zone ids in the formats:

   - no zone id. In that case, the function uses the local session time
   zone from the SQL config spark.sql.session.timeZone
   - -[h]h:[m]m
   - +[h]h:[m]m
   - Z
   - Short zone id, see
   - Zone ID starts with 'UTC+', 'UTC-', 'GMT+', 'GMT-', 'UT+' or 'UT-'.
   The ID is split in two, with a two or three letter prefix and a suffix
   starting with the sign. The suffix must be in the formats:
      - +|-h[h]
      - +|-hh[:]mm
      - +|-hh:mm:ss
      - +|-hhmmss
   - Region-based zone IDs in the form {area}/{city}, such as Europe/Paris
    or America/New_York. The default set of region ids is supplied by the
   IANA Time Zone Database (TZDB).

Deprecate untyped scala UDF (+2, -0)>

Use scala annotation deprecate to deprecate untyped scala UDF.
make it clear that people can deduplicate map keys (+112, -54)>

Rename the conf spark.sql.legacy.allowDuplicatedMapKeys to
spark.sql.mapKeyDedupPolicy and make it public.
Allow specifying session catalog name spark_catalog in qualified column
names for v1 tables (+111, -59)>

Currently, the user cannot specify the session catalog name (spark_catalog)
in qualified column names for v1 tables:

SELECT spark_catalog.default.t.i FROM spark_catalog.default.t

fails with cannot resolve 'spark_catalog.default.t.i.

This is inconsistent with v2 table behavior where catalog name can be used:


This PR proposes to fix the inconsistency and allow the user to specify
session catalog name in column names for v1 tables.
Use stringArgs in Expression.toString to respect hidden parameters (+3, -1)>

This PR proposes to respect hidden parameters by using stringArgs in
Expression.toString . By this, we can show the strings properly in some
cases such as NonSQLExpression.
refine AQE config names (+121, -112)>

This PR refines the AQE related config names(starting with

   1. remove the "shuffle" prefix. AQE is all about shuffle and we don't
   need to add the "shuffle" prefix everywhere.
   2. targetPostShuffleInputSize is obscure, rename to
   3. reducePostShufflePartitions doesn't match the actual optimization,
   rename to coalesceShufflePartitions
   4. minNumPostShufflePartitions is obscure, rename it minPartitionNum under
   the coalesceShufflePartitions namespace
   5. maxNumPostShufflePartitions is confusing with the word "max", rename
   it initialPartitionNum
   6. skewedJoinOptimization is too verbose. skew join is a well-known
   terminology in database area, we can just say skewJoin

Add checkValue for spark.sql.session.timeZone (+27, -0)>

The spark.sql.session.timeZone config can accept any string value including
invalid time zone ids, then it will fail other queries that rely on the
time zone. We should do the value checking in the set phase and fail fast
if the zone value is invalid.
mark connector APIs as Evolving (+48, -40)>

This PR makes it consistent and mark all Connector APIs as Evolving.
Provide ability to alter the provider of a table (+49, -1)>

This PR adds functionality to HiveExternalCatalog to be able to change the
provider of a table.

This is useful for catalogs in Spark 3.0 to be able to use alterTable to
change the provider of a table as part of an atomic REPLACE TABLE function.
Match schema_of_json to the schema inference of JSON data source (+54, -8)>

This PR proposes two things:


   Convert null to string type during schema inference of schema_of_json as
   JSON datasource does. This is a bug fix as well because null string is
   not the proper DDL formatted string and it is unable for SQL parser to
   recognise it as a type string. We should match it to JSON datasource and
   return a string type so schema_of_json returns a proper DDL formatted

   Let schema_of_json respect dropFieldIfAllNull option during schema

Support 32 or more grouping attributes for GROUPING_ID (+117, -53)>

Since Spark 3.1, grouping_id() returns long values. In Spark version 3.0
and earlier, this function returns int values. To restore the behavior
before Spark 3.0, you can set spark.sql.legacy.integerGroupingId to true.

This pr intends to support 32 or more grouping attributes for GROUPING_ID.
In the current master, an integer overflow can occur to compute grouping

For example, the query below generates wrong grouping IDs in the master;

scala> val numCols = 32 // or, 31
scala> val cols = (0 until numCols).map { i => s"c$i" }
scala> sql(s"create table test_$numCols (${ => s"$c
int").mkString(",")}, v int) using parquet")
scala> val insertVals = (0 until numCols).map { _ => 1 }.mkString(",")
scala> sql(s"insert into test_$numCols values ($insertVals,3)")
scala> sql(s"select grouping_id(), sum(v) from test_$numCols group by
grouping sets ((${cols.mkString(",")}),
(${cols.init.mkString(",")}))").show(10, false)
scala> sql(s"drop table test_$numCols")

// numCols = 32
|0            |3     |
|0            |3     | // Wrong Grouping ID

// numCols = 31
|0            |3     |
|1            |3     |

To fix this issue, this pr change code to use long values for
of int values.
Respect aliases in output ordering (+87, -23)>

Currently, in the following scenario, an unnecessary Sort node is

  val df = (0 until 20).toDF("i").as("df")
  df.repartition(8, df("i")).write.format("parquet")
    .bucketBy(8, "i").sortBy("i").saveAsTable("t")
  val t1 = spark.table("t")
  val t2 = t1.selectExpr("i as ii")
  t1.join(t2, t1("i") === t2("ii")).explain

After the changes, the unnecessary sort won't show up in the explain result.
Add higher order functions API to PySpark (+480, -0)>

Currently the higher order functions are available only using SQL and Scala
API and can use only SQL expressions

df.selectExpr("transform(values, x -> x + 1)")

This works reasonably well for simple functions, but can get really ugly
with complex functions (complex functions, casts), resulting objects are
somewhat verbose and we don't get any IDE support. Additionally DSL used,
though very simple, is not documented.

With changes proposed here, above query could be rewritten as:"values", lambda x: x + 1))

This PR add Python API for invoking following higher functions:

   - transform
   - exists
   - forall
   - filter
   - aggregate
   - zip_with
   - transform_keys
   - transform_values
   - map_filter
   - map_zip_with

Add SparkR interface for higher order functions (+421, -0)>

Currently the higher order functions are available only using SQL and Scala
API and can use only SQL expressions:

select(df, expr("transform(xs, x -> x + 1)")

This is error-prone, and hard to do right, when complex logic is used (when
 / otherwise, complex objects).

If this PR is accepted, above function could be simply rewritten as:

select(df, array_transform("xs", function(x) x + 1))

This PR add R API for invoking following higher functions:

   - transform -> array_transform (to avoid conflict with base::transform).
   - exists -> array_exists (to avoid conflict with base::exists).
   - forall -> array_forall (no conflicts, renamed for consistency)
   - filter -> array_filter (to avoid conflict with stats::filter).
   - aggregate -> array_aggregate (to avoid conflict with stats::transform).
   - zip_with -> arrays_zip_with (no conflicts, renamed for consistency)
   - transform_keys
   - transform_values
   - map_filter
   - map_zip_with

Support FValueSelector for continuous features and continuous labels (+758,

Add FValueRegressionSelector for continuous features and continuous labels.
Accelerate InMemoryStore with a new index (+105, -25)>

Spark uses the class InMemoryStore as the KV storage for live UI and
history server(by default if no LevelDB file path is provided). In
InMemoryStore, all the task data in one application is stored in a hashmap,
which key is the task ID and the value is the task data. This fine for
getting or deleting with a provided task ID. However, Spark stage UI always
shows all the task data in one stage and the current implementation is to
look up all the values in the hashmap. The time complexity is
O(numOfTasks). Also, when there are too many stages
(>spark.ui.retainedStages), Spark will linearly try to look up all the task
data of the stages to be deleted as well.

This can be very bad for a large application with many stages and tasks. We
can improve it by allowing the natural key of an entity to have a real
parent index. So that on each lookup with parent node provided, Spark can
look up all the natural keys(in our case, the task IDs) first, and then
find the data with the natural keys in the hashmap.
Add "shuffle write time" to task metrics summary in StagePage (+29, -16)>

   1. Added Shuffle Write Time to task metrics summary.
   2. Added checkbox for Shuffle Write Time as an additional metrics.
   3. Renamed Write Time column in task table to Shuffle Write Time and let
   it as an additional column.

[image: additional-metrics-after]
Upgrade aws-java-sdk-sts to 1.11.655 (+1, -1)>
Update xerces to 2.12.0 (+8, -2)>

View raw message