spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Allison Wang <>
Subject [OSS DIGEST] The major changes of Apache Spark from July 29 to August 25
Date Fri, 23 Oct 2020 23:40:37 GMT
Hi all,

This is the bi-weekly Apache Spark digest from the Databricks OSS team
(this time we combined 4 weeks).
For each API/configuration/behavior change, an *[API] *tag is added in the
All the content has been appended to this doc
Kryo should support multiple user registrators (+13, -12)>

This PR fixes a regression for spark.kryo.registrator in 3.0. In the
previous Spark version (2.x), it supports multiple user registrators by

private val userRegistrators = conf.get("spark.kryo.registrator", "")

But it doesn't work in 3.0. It was fixed by adding toSequence in
Kryo.scala. The configuration spark.kryo.registrator was changed back to
Sequence of String for supporting multiple user registrators. The default
value is Nil.
Fix the order between initialization for ExecutorPlugin and starting
heartbeat thread (+89, -10)>

This PR changes the order between initialization for ExecutorPlugin and
starting the heartbeat thread in Executor. In the current master, the
heartbeat thread in an executor starts after plugin initialization so if
the initialization takes a long time, the heartbeat is not sent to the
driver and the executor will be removed from the cluster.
Add a config to switch allow/disallow to create SparkContext in executors
(+72, -9)>

This PR is a follow up of [SPARK-32160] Disallow to create SparkContext in
executors <>. It adds a config to
switch allow/disallow to create SparkContext in executors since some users
or libraries actually create SparkContext in executors.

spark.driver.allowSparkContextInExecutors (Default: false)

   - If set to true, SparkContext can be created in executors.

Reduce job failures during decommissioning (+539, -7)>

This PR reduces the prospect of a job loss during decommissioning. It fixes
two holes in the current decommissioning framework:


   Loss of decommissioned executors is not treated as a job failure: We
   know that the decommissioned executor would be dying soon, so its death is
   clearly not caused by the application.

   Shuffle files on the decommissioned host are cleared when the first
   fetch failure is detected from a decommissioned host: This is a bit tricky
   in terms of when to clear the shuffle state. Ideally you want to clear it
   the millisecond before the shuffle service on the node dies (or the
   executor dies when there is no external shuffle service) -- too soon and it
   could lead to some wastage and too late would lead to fetch failures.

The approach here is to do this clearing when the very first fetch failure
is observed on the decommissioned block manager, without waiting for other
blocks to also signal a failure.
Shutdown executor once we are done decommissioning (+310, -44)>

This PR shuts down the executor immediately when decommissioning is
finished (i.e., all tasks are done, and the configured migration of all RDD
blocks and shuffle data is completed). It helps the cluster manager to
release the unneeded resources as soon as possible.
CoarseGrainedSchedulerBackend.maxNumConcurrentTasks should consider all
kinds of resources (+133, -59)>

Previously, CoarseGrainedSchedulerBackend.maxNumConcurrentTasks() calculates
only the CPU resources for the max concurrent tasks. This can cause the
application to hang when a barrier stage requires the other custom
resources (e.g., GPU) but the cluster doesn't have enough corresponding
resources. This PR fixed the hang issue by calculating all kinds of
resources in CoarseGrainedSchedulerBackend.maxNumConcurrentTasks().
Add StorageLevel.DISK_ONLY_3 (+17, -4)>

This PR added StorageLevel.DISK_ONLY_3 as a built-in StorageLevel for
better user experience.
Use graceful decommissioning as part of dynamic scaling (+380, -134)>

This PR makes Spark dynamic scaling use graceful decommissioning when it is
enabled instead of killing executors directly, in order to avoid triggering
task recomputation as much as possible.
ExecutorPlugin doesn't work with Standalone Cluster and Kubernetes with
--jars (+122, -9)>

This PR makes Executor load jars and files added by --jars and --files on
Executor initialization, to avoid downloading those jars/files twice. This
PR also fixes the issue of ExecutorPlugin not working with Standalone
Cluster and Kubernetes when a plugin is added by --jars and --files option
with spark-submit.
Fix regressions in DecommissionWorkerSuite (+153, -37)>

The PR fixes the flakiness of DecommissionWorkerSuite caused by a
regression introduced in SPARK-31197 and adds the following config:

spark.executor.decommission.removed.infoCacheTTL (Default: 5m)

Duration for which a decommissioned executor's information will be kept
after its removal. Keeping the decommissioned info after removal helps
pinpoint fetch failures to decommissioning even after the mapper executor
has been decommissioned. This allows eager recovery from fetch failures
caused by decommissioning, increasing job robustness.

Decommission switch configuration should have the highest hierarchy (+30,

API change: configuration option renaming: Rename
spark.worker.decommission.enabled to spark.decommission.enabled and move it
from org.apache.spark.internal.config.Worker to
org.apache.spark.internal.config.package. (Note: this is a new
configuration that is not in any released versions yet)

Decommission has been supported in Standalone and k8s yet and may be
supported in Yarn( in the
future. Therefore, the switch configuration should have the highest
hierarchy rather than belongs to Standalone's Worker. In other words, it
should be independent of the cluster managers.
Unify task name in some logs between driver and executor (+44, -42)>

Usability improvement: This PR replaces some arbitrary task names in logs
with the widely used task name (e.g. "task 0.0 in stage 1.0 (TID 1)") among
driver and executor. This will change the task name in TaskDescription by
appending TID.

Users will see the more consistent task names in the log, in the form of task
name (e.g. "task 0.0 in stage 1.0 (TID 1)") (where TID is Task ID).
Fix PartitionWriterStream partition length overflow (+20, -2)>

Regression bug fix: The count in PartitionWriterStream should be a long
value, instead of int. The issue is introduced by
When the overflow happens, the shuffle index file would record wrong index
of a reduceId, thus lead to FetchFailedException: Stream is corrupted error.

Also added some debug logs for easier debugging of similar issues in the
Decommissioned host/executor should be considered as inactive in
TaskSchedulerImpl (+102, -10)>

Improvement: Add decommissioning status checking for a host or executor
while checking it's active or not. And a decommissioned host or executor
should be considered as inactive.

First of all, this PR is not a correctness bug fix but gives improvement
indeed. And the main problem here we want to fix is that a decommissioned
host or executor should be considered as inactive.

TaskSetManager.computeValidLocalityLevels depends on
TaskSchedulerImpl.isExecutorAlive/hasExecutorsAliveOnHost to calculate the
locality levels. Therefore, the TaskSetManager could also get corresponding
locality levels of those decommissioned hosts or executors if they're not
considered as inactive. However, on the other side,
CoarseGrainedSchedulerBackend won't construct the WorkerOffer for those
decommissioned executors. That also means TaskSetManager might never have a
chance to launch tasks at certain locality levels but only suffers the
unnecessary delay because of delay scheduling. So, this PR helps to reduce
this kind of unnecessary delay by making decommissioned host/executor
inactive in TaskSchedulerImpl.
Support filters pushdown in Avro datasource (+430, -199)>

This PR adds support for filters pushdown in Avro datasource V1 and V2.

spark.sql.avro.filterPushdown.enabled (Default: true)

   - When true, enable filter pushdown to Avro datasource.

The changes improve performance on synthetic benchmarks up to 2 times on
JDK 11.
Add columnar exchanges (+260, -70)>

This PR adds abstract classes for ShuffleExchange and BroadcastExchange so
that users can provide their columnar implementations. It also updates
AdaptiveSparkPlanExec so that the columnar rules can see exchange nodes.
Check duplicate nested columns in read from built-in datasources (+152, -12)

When spark.sql.caseSensitive is false (by default), this PR checks that
there are no duplicated column names on the same level (top-level or nested
levels) in reading from in-built datasources Parquet, ORC, Avro and JSON.
If such duplicated columns exist, throw the exception:

org.apache.spark.sql.AnalysisException: Found duplicate column(s) in
the data schema:

This change is to make handling of duplicated nested columns similar to the
handling of duplicated top-level columns i. e. output the same error when
spark.sql.caseSensitive is false:

org.apache.spark.sql.AnalysisException: Found duplicate column(s) in
the data schema: `camelcase`

Check duplicate nested columns in read from JDBC datasource (+68, -10)>

This PR checks that there are no duplicated column names on the same level
(top-level or nested levels) in reading from the JDBC datasource. If such
duplicated columns exist, throw the exception:

org.apache.spark.sql.AnalysisException: Found duplicate column(s) in
the customSchema option value:

This check takes into account the SQL config spark.sql.caseSensitive (false by

This change is to make handling of duplicated nested columns similar to the
handling of duplicated top-level columns i.e. output the same error:

org.apache.spark.sql.AnalysisException: Found duplicate column(s) in
the customSchema option value: `camelcase`

Add code-gen for shuffled hash join (+499, -420)>

This PR adds codegen for shuffled hash join. Shuffled hash join codegen is
very similar to broadcast hash join codegen, so most of the code change is
to refactor existing codegen in BroadcastHashJoinExec to HashJoin. Codegen
shuffled hash join can help save CPU cost. We see a 30% wall clock time
improvement compared to existing non-codegen code path in the example test
query in JoinBenchmark
Make SQL cache serialization pluggable (+813, -320)>

This PR adds a config to let users change how SQL/Dataframe data is
compressed when cached.

spark.sql.cache.serializer (Default:

The name of a class that implements
org.apache.spark.sql.columnar.CachedBatchSerializer. It will be used to
translate SQL data into a format that can more efficiently be cached. The
underlying API is subject to change so use with caution. Multiple classes
cannot be specified. The class must have a no-arg constructor.
Support regexp function regexp_extract_all (+440, -27)>

This PR adds support for regexp function regexp_extract_all. It is a very
useful function that expands the capabilities of regexp_extract. For

SELECT regexp_extract('1a 2b 14m', '\d+', 0); -- 1SELECT
regexp_extract_all('1a 2b 14m', '\d+', 0); -- [1, 2, 14]SELECT
regexp_extract('1a 2b 14m', '(\d+)([a-z]+)', 2); -- 'a'SELECT
regexp_extract_all('1a 2b 14m', '(\d+)([a-z]+)', 2); -- ['a', 'b',

Some mainstream databases that support the syntax:

   - Presto:
   - Pig:


regexp_extract_all(str, regexp[, idx])

Extract all strings in the str that match the regexp expression and
corresponding to the regex group index.
Reports explicit errors for invalid usage of SET/RESET command (+129, -16)>

This PR modifies the parser code to handle invalid usages of a SET/RESET

For example

SET spark.sql.ansi.enabled true

The above SQL command does not change the configuration value and it just
tries to display the value of the configuration 'spark.sql.ansi.enabled
true'. This PR disallows using special characters including spaces in the
configuration name and reports a user-friendly error instead. In the error
message, it tells users a workaround to use quotes or a string literal if
they still need to specify a configuration with them.

Before this PR:

scala> sql("SET spark.sql.ansi.enabled true").show(1, -1)
|key                        |value      |
|spark.sql.ansi.enabled true|<undefined>|

After this PR:

scala> sql("SET spark.sql.ansi.enabled true")
Expected format is 'SET', 'SET key', or 'SET key=value'. If you want
to include special characters in key, please use quotes, e.g., SET `ke
y`=value.(line 1, pos 0)

== SQL ==
SET spark.sql.ansi.enabled true

Support Filter expression allows simultaneous use of DISTINCT (+554, -72)>

This PR adds support for using FILTER clauses on one or more DISTINCT
aggregate expressions. For example

select sum(distinct id) filter (where sex = 'man') from student;

This PR is related to #26656 <> which
only supports the FILTER clause on aggregate expression without DISTINCT.
Use {} in conversions maps and structs to strings (+105, -78)>

This PR changes the casting of the map and struct values to strings by
using the {} brackets instead of []. The behavior is controlled by the SQL
config spark.sql.legacy.castComplexTypesToString.enabled. When it is true,
CAST wraps maps and structs by [] in casting to strings. Otherwise, if this
is false, which is the default, maps and structs are wrapped by {}.

For example:

   - Before this change: struct<s:int,[dotNET, 2012]:bigint,[Java,
   - After this change: struct<s:int,{dotNET, 2012}:bigint,{Java,

This change is needed

   - To distinguish structs/maps from arrays.
   - To make show's output consistent with Hive and conversions to Hive
   - To display dataframe content in the same form by spark-sql and show
   - To be consistent with the *.sql tests

Implement ALTER TABLE in JDBC Table Catalog (+164, -25)>

This PR implemented the following ALTER TABLE syntaxes in JDBC Table

ALTER TABLE table_name ADD COLUMNS ( column_name datatype [ , ... ]
);ALTER TABLE table_name RENAME COLUMN old_column_name TO
new_column_name;ALTER TABLE table_name DROP COLUMN column_name;ALTER
TABLE table_name ALTER COLUMN column_name TYPE new_type;ALTER TABLE
table_name ALTER COLUMN column_name SET NOT NULL;

Convert null to "null" in structs, maps and arrays while casting to strings
(+60, -26)>

This PR proposed to convert the null elements of map/struct/array to the
"null" string rather than delete them while converting map/struct/array
values to strings. The old behavior can be restored by setting
spark.sql.legacy.omitNestedNullInCast.enabled to true. This helps to
distinguish the empty string element and the null element. For example,

Before this PR:

scala> Seq(Seq(""), Seq(null)).toDF().show+-----+|value|+-----+|
[]||   []|+-----+

After this PR:

scala> Seq(Seq(""), Seq(null)).toDF().show+------+| value|+------+|

Get table names directly from Hive tables (+5, -3)>

This PR proposed to get table names directly from a sequence of Hive tables
in HiveClientImpl.listTablesByType(). This PR not only avoids the
unnecessary conversion from the Hive table to the Catalog table but also
fixes the Hive SerDe loading issue, where multiple clients sharing the same
Hive metastore but only partial clients have the certain SerDe.
Fix the trim logic in UTF8String.toInt/toLong to handle non-ASCII
characters correctly (+172, -12)>

This PR fixed the trim logic in UTF8String.toInt/toLong to not treat
non-ASCII characters as whitespaces, e.g.

Before this PR:

scala> sql("SELECT cast('1中文' AS bigint)").show+---------------------+
                                                        |CAST(1中文 AS

After this PR:

scala> sql("SELECT cast('1中文' AS bigint)").show+---------------------+
                                                        |CAST(1中文 AS

Nested column predicate pushdown for ORC (+460, -310)>

This PR supported the nested column predicate pushdown for ORC(Parquet has
been supported before). This is configurable via the conf
Add unique ID on query execution (+7, -0)>

This PR added a unique ID on QueryExecution. Listeners can leverage the ID
to deduplicate redundant calls.
Support PostgreSQL bpchar type and array of char type (+3, -1)>

This PR supported PostgreSQL bpchar in order to get rid of Unsupported type
ARRAY error when users try to use the char array data type under Postgre
Show initial plan in AQE plan tree string (+210, -43)>

This PR added the initial plan in AdaptiveSparkPlanExec and generates tree
string for both the current/final plan and the initial plan. The difference
between Current Plan and Final Plan here is that current plan indicates an
intermediate state. The plan is subject to further transformations, while
the final plan represents an end state, which means the plan will no longer
be changed. For example,

Before this PR,


AdaptiveSparkPlan isFinalPlan=true+- *(3) BroadcastHashJoin
   :- BroadcastQueryStage 2


== Physical Plan ==AdaptiveSparkPlan (9)+- BroadcastHashJoin Inner
BuildRight (8)
   :- Project (3)
   :  +- Filter (2)

After this PR,


AdaptiveSparkPlan isFinalPlan=true+- == Final Plan ==
   *(3) BroadcastHashJoin
   :- BroadcastQueryStage 2
   :  +- BroadcastExchange
           ...+- == Initial Plan ==
   :- Sort
   :  +- Exchange


== Physical Plan ==AdaptiveSparkPlan (9)+- == Current Plan ==
   BroadcastHashJoin Inner BuildRight (8)
   :- Project (3)
   :  +- Filter (2)+- == Initial Plan ==
   BroadcastHashJoin Inner BuildRight (8)
   :- Project (3)
   :  +- Filter (2)

Anti Join Improvement with EmptyHashedRelation and
EmptyHashedRelationWithAllNullKeys (+124, -20)>

This PR improved Anti Join by:

   - Use EmptyHashedRelation to perform a fast stop for the common Anti
   Join as well
   - Eliminate BroadcastHashJoin(NAAJ) if buildSide is a
   EmptyHashedRelationWithAllNullKeys in AQE

Eliminate the filter clause in aggregate (+133, -14)>

This PR added an optimizer rule EliminateAggregateFilter to eliminate the
trivial filter clause in aggregate, e.g.,


can be optimized to:


Fix serialization of dates inserted to Hive tables (+11, -1)>

This PR fixes a bug that causes the SQL INSERT command to put incorrect
Date values into Hive tables. This PR fixes the erroneous implementation of
DaysWritable that did not respect date rebases and used not initialized
daysSinceEpoch (1970-01-01).
Parquet RLE float/double are read incorrectly on big endian platforms
(+166, -16)>

This PR fixes an issue introduced by SPARK-26985 in which RLE float/double
data in parquet files are retrieved correctly on big endian platforms. This
PR fixes the incorrect implementation in SPARK-26985 that read the RLE
entries from parquet files as BIG_ENDIAN on big endian platforms despite
the fact that parquet data is always in little endian format.
Partially pushdown supports data filter if it mixed in partition filters
(+38, -3)>

This PR extracts data-column filters mixed together with partition-column
filters in conjunctive conditions and pushes the extracted data-column
filters down into the file scan. For example, in the filter (partCol = '1'
AND dataCol = 1) OR (partCol = '2' and dataCol = 2), we can push down the
data-column filter i = 1 or i = 2.
Add SupportsPartitions APIs on DataSourceV2 (+688, -15)>

This PR adds partition management API support as part of Table API in
DataSource V2. It includes the API to get the partition schema of table; to
create/drop a partition; to check the existence of a partition; to retrieve
the partition metadata; to list partition identifiers; to replace the
partition metadata.
Full outer shuffled hash join (+693, -48)>

This PR implements full outer join in shuffled hash join without codegen
Add PlanStabilitySuite to detect SparkPlan regression (+130898, -65)>

This PR adds a test suite that checks query plans against golden files for
TPC-DS queries, in an effort to detect important changes in the Spark
optimizer and planner as well as potential bugs introduced by such changes.
ObjectSerializerPruning fails for RowEncoder (+52, -4)>

Bug fix: Fix type mismatch in ObjectSerializerPruning.alignNullTypeInIf, to
consider the isNull check generated in RowEncoder, which is Invoke(inputObject,
"isNullAt", BooleanType, Literal(index) :: Nil).
only support '\n' (+51, -6)>

New behavior: stricten the liner terminator check in Scrip Transform
no-serde (ROW FORMAT DELIMITED) mode to only accept \n, to ensure the
accuracy of data.

Scrip Transform no-serde (ROW FORMAT DELIMITED) mode LINE TERMINNATED BY only
support \n.

Tested in hive : Hive 1.1 [image: image]

Hive 2.3.7 [image: image]
Script Transform ROW FORMAT DELIMIT value should format value (+116, -16)>

Bug fix: For SQL

  USING 'cat' AS (a, b, c)
FROM testData

The correct

TOK_TABLEROWFORMATFIELD should be , nut actually ','

TOK_TABLEROWFORMATLINES should be \n but actually '\n'
Use getCanonicalName to fix byte[] compile issue (+8, -1)>

(Superseded by

Bug fix: Fix a bug in CodegenContext.addReferenceObj() which generates
wrong code ([B) for the byte[] Java type.

Unfortunately this PR introduces a new bug for nested Scala types, and was
fixed later by
'path' option can cause issues while inferring schema in CSV/JSON
datasources (+76, -21)>

Bug fix: When CSV/JSON data sources infer schema (e.g, def
inferSchema(files: Seq[FileStatus]), they use the files along with the
original options. files in inferSchema could have been deduced from the
"path" option if the option was present, so this can cause issues (e.g.,
reading more data, listing the path again) since the "path" option is added to
the files.

The existing behavior can cause the following issue:

class TestFileFilter extends PathFilter {
  override def accept(path: Path): Boolean = path.getParent.getName != "p=2"
val path = "/tmp"val df = spark.range(2)
df.write.json(path + "/p=1")
df.write.json(path + "/p=2")
val extraOptions = Map(
  "mapred.input.pathFilter.class" -> classOf[TestFileFilter].getName,
  "mapreduce.input.pathFilter.class" -> classOf[TestFileFilter].getName
// This works fine.
assert( == 2)
// The following with "path" option fails with the following://
assertion failed: Conflicting directory structures detected.
Suspicious paths//	file:/tmp//	file:/tmp/p=1
path).load.count() === 2)

Introduce AlreadyPlanned to prevent reanalysis of V1FallbackWriters (+196,

Bug fix: Fix a bug to avoid having a physical plan that is disconnected
from the physical plan that is being executed in V1WriteFallback execution.

This PR introduces a new LogicalNode type AlreadyPlanned, and related
physical plan and preparation rule.

With the DataSourceV2 write operations, we have a way to fallback to the V1
writer APIs using InsertableRelation. The gross part is that we're in
physical land, but the InsertableRelation takes a logical plan, so we have
to pass the logical plans to these physical nodes, and then potentially go
through re-planning. This re-planning can cause issues for an already
optimized plan.

A useful primitive could be specifying that a plan is ready for execution
through a logical node AlreadyPlanned. This would wrap a physical plan, and
then we can go straight to execution.
Downgrade Janino to fix a correctness bug (+17, -14)>

Bug fix: fix a code generation issue in conditional expressions in newer
Janino version by downgrading to known good version.

The symptom is about NaN comparison. For code below

if (double_value <= 0.0) {
} else {

If double_value is NaN, NaN <= 0.0 is false and we should go to the else
branch. However, current Spark goes to the if branch and causes correctness
issues like SPARK-32640.
ORC predicate pushdown should work with case-insensitive analysis (+243,

Bug fix: Fix ORC predicate pushdown under case-insensitive analysis cases.
The field names in pushed down predicates don't need to match in exact
letter case with physical field names in ORC files, if we enable
case-insensitive analysis.

Currently, ORC predicate pushdown doesn't work with case-insensitive
analysis. A predicate "a < 0" cannot pushdown to ORC file with field name
"A" under case-insensitive analysis.

But, Parquet predicate pushdown works with this case. We should make ORC
predicate pushdown work with case-insensitive analysis too.
Fix data corruption in boolean bit set compression (+29, -3)>

Bug fix: Fix a data corruption issue. Essentially the BooleanBitSet
CompressionScheme would miss nulls at the end of a CompressedBatch. The
values would then default to false.
Optimize BHJ/SHJ inner/semi join with empty hashed relation (+157, -53)>

Optimization: A very minor optimization for rare use cases, but in case the
build side turns out to be empty, we can leverage it to short-cut stream
side to save CPU and IO.

Example broadcast hash join query similar to JoinBenchmark with empty
hashed relation:

  def broadcastHashJoinLongKey(): Unit = {
    val N = 20 << 20
    val M = 1 << 16

    val dim = broadcast(spark.range(0).selectExpr("id as k", "cast(id
as string) as v"))
    codegenBenchmark("Join w long", N) {
      val df = spark.range(N).join(dim, (col("id") % M) === col("k"))

Comparing wall clock time for enabling and disabling this PR (for
non-codegen code path). Seeing like 8x improvement.

Java HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.4
Intel(R) Core(TM) i9-9980HK CPU  2.40GHz
Join w long:                              Best Time(ms)   Avg Time(ms)
  Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
Join PR disabled                                    637            646
         12         32.9          30.4       1.0X
Join PR enabled                                      77             78
          2        271.8           3.7       8.3X

For broadcast hash join and shuffled hash join, whenever the build side
hashed relation turns out to be empty. We don't need to execute stream side
plan at all, and can return an empty iterator (for inner join and left semi
join), because we know for sure that none of stream side rows can be
outputted as there's no match.
'path' option cannot coexist with load()'s path parameters (+93, -28)>

New behavior: Make the behavior consistent for the path option when loading
dataframes with a single path (e.g, option("path",
path).format("parquet").load(path) vs. option("path", path).parquet(path))
by disallowing path option to coexist with load's path parameters.

The existing behavior is inconsistent:

scala> Seq(1).toDF.write.mode("overwrite").parquet("/tmp/test")


"/tmp/test").parquet("/tmp/test").show+-----+|value|+-----+|    1||

Fix doc error and add migration guide for datetime pattern F (+3, -1)>

Doc/Migration guide: Fixes a doc error and add a migration guide for
datetime pattern.

There is a bug of the doc that we inherited from JDK

The SimpleDateFormatter (F Day of week in month) we used in 2.x and the
DatetimeFormatter (F week-of-month) we use now both have the opposite
meanings to what they declared in the java docs. And unfortunately, this
also leads to silent data change in Spark too.

The week-of-month is actually the pattern W in DatetimeFormatter, which is
banned to use in Spark 3.x.

If we want to keep the pattern F, we need to accept the behavior change
with the proper migration guide and fix the doc in Spark
Don't apply comment processing if 'comment' unset for CSV (+37, -14)>

Bug fix: Fix a bug that drops rows that start with a null char when this is
not requested or intended

Spark's CSV source can optionally ignore lines starting with a comment
char. Some code paths check to see if it's set before applying comment
logic (i.e. not set to default of \0), but many do not, including the one
that passes the option to Univocity. This means that rows beginning with a
null char were being treated as comments even when 'disabled'.
Add summary to MultilayerPerceptronClassificationModel (+222, -14)>

This PR adds training summary to MultilayerPerceptronClassificationModel so
that users can get the training process status, such as the loss value of
each iteration and total iteration number. It adds two user-facing methods:

   - MultilayerPerceptronClassificationModel.summary: gets summary of model
   on training set
   - MultilayerPerceptronClassificationModel.evaluate(dataset): evaluates
   the model on a test dataset

ML params default value parity in feature and tuning (+274, -135)>

This PR sets params default values in trait Params for feature and tuning
in both Scala and Python. It is to make ML have the same default param
values between estimator and its corresponding transformer, and also
between Scala and Python.
Fix parameters not being copied in CrossValidatorModel.copy(), read() and
write() (+172, -26)>

Bug fix: Changed the definitions of
CrossValidatorModel.copy()/_to_java()/_from_java() so that exposed
parameters (i.e. parameters with get() methods) are copied in these methods.

Parameters are copied in the respective Scala interface for
CrossValidatorModel.copy(). It fits the semantics to persist parameters
when calling and CrossValidatorModel.load() so
that the user gets the same model by saving and loading it after. Not
copying across numFolds also causes bugs like Array index out of bound and
losing sub-models because these parameters will always default to 3 (as
described in the JIRA ticket).
Fix double caching in KMeans/BiKMeans (+59, -83)>

Bug fix: Fix double caching in KMeans/BiKMeans:

   1. let the callers of runWithWeight to pass whether handlePersistence is
   2. persist and unpersist inside of runWithWeight;
   3. persist the norms if needed according to the comments;

Upgrade Kafka to 2.6.0 (+3, -3)>

This PR upgraded Kafka client library from 2.5.0 to 2.6.0 for Apache Spark
3.1.0. This upgrade includes the client-side bug fixes like KAFKA-10134 and
FileStreamSource: Cache fetched list of files beyond maxFilesPerTrigger as
unread files (+191, -16)>

This PR cached the fetched list of files in FileStreamSource to avoid
re-fetching whenever possible since fetching is quite expensive. This
improvement only takes effect when maxFilesPerTrigger is set and
latestFirst=false(default). Note that the driver process would require more
memory with this change, though it doesn't hurt much as the peak memory is
still similar.

Manually tests under the test environment:

   - input files
      - 171,839 files distributed evenly into 24 directories
      - each file contains 200 lines
   - query: read from the "file stream source" and repartition to 50, and
   write to the "file stream sink"
      - maxFilesPerTrigger = 100

shows the performance improvement:

Before this PR:


After this PR:


The area of brown color represents "latestOffset" where listing operation
is performed for FileStreamSource. After this PR, the cost for listing is
paid "only once", whereas before this PR it was for "every batch".
Check the Distinct by assuming it as Aggregate for Structured Streaming
(+57, -2)>

This PR proposed to treat the Distinct node as Aggregate in
UnsupportOperationChecker for streaming. This change not only gives the
better error message for Distinct related operations in Append mode that
doesn't have a watermark but also makes Distinct in complete mode runnable.
Streamline the logic on file stream source and sink metadata log to avoid
memory issue (+129, -87)>

Improvement: In many operations on CompactibleFileStreamLog reads a
metadata log file and materializes all entries into memory. As the nature
of the compact operation, CompactibleFileStreamLog may have a huge compact
log file with bunch of entries included, and for now they're just
monotonically increasing, which means the amount of memory to materialize
also grows incrementally. This leads pressure on GC.

This patch proposes to streamline the logic on file stream source and sink
whenever possible to avoid memory issue. To make this possible we have to
break the existing behavior of excluding entries - now the compactLogs method
is called with all entries, which forces us to materialize all entries into
memory. This is hopefully no effect on end users, because only file stream
sink has a condition to exclude entries, and the condition has been never
true. (DELETE_ACTION has been never set.)
Remove unused DELETE_ACTION in FileStreamSinkLog (+0, -23)>

Refactoring: Removing unused DELETE_ACTION in FileStreamSinkLog.
Describe JSON option allowNonNumericNumbers and support it by PySpark
json() (+53,

This PR:

   - Adds the read-only JSON option allowNonNumericNumbers in the
   DataFrameReader’s json() API
   - Adds new test cases for allowed JSON field values: NaN, +INF,
   +Infinity, Infinity, -INF and -Infinity

Add InheritableThread for local properties and fixing a thread leak issue
in pinned thread mode (+110, -15)>

This PR introduces InheritableThread class that works identically with
threading.Thread but:

   - It can inherit the inheritable attributes of a JVM thread such as
   InheritableThreadLocal when the pinned thread mode is enabled, see also
   #24898 <>.
   - This InheritableThread finishes the corresponding thread in JVM to
   prevent resource leaks automatically when the InheritableThread instance
   is garbage collected in Python side. In addition, this PR deprecates
   collectWithJobGroup since it was a temporary workaround added to avoid
   this leak issue and we have a fix for the issue here.

Add main page for PySpark documentation (+36, -0)>

This PR proposed to write the main page of PySpark documentation to give
the better usability and readability:

Port migration guide for PySpark docs (+184, -62)>

This PR ported old PySpark migration guide to new PySpark docs.
Un-deprecate inferring DataFrame schema from list of dict (+4, -11)>

New API (restoring deprecated API): Un-deprecates Spark's ability to infer
a DataFrame schema from a list of dictionaries. The ability is Pythonic and
matches functionality offered by pandas.

This change clarifies to users that this behavior is supported and is not
going away in the near future.
Add ability to set table description via Catalog.createTable() (+149, -6)>

New API: This PR enhances Catalog.createTable() to allow users to set the
table's description. This corresponds to the following SQL syntax:

COMMENT 'this is a fancy table';

This brings the Scala/Python catalog APIs a bit closer to what's already
possible via SQL.
Avoid encoding URL twice on https redirect (+44, -6)>

This PR fixes a UI issue when HTTPS is enabled. When HTTPS is enabled for
Spark UI, an HTTP request will be redirected as an encoded HTTPS URL:

When we create the redirect URL, we will call getRequestURI and
getQueryString. Both two methods may return an encoded string. However, we
pass them directly to the following URI constructor

URI(String scheme, String authority, String path, String query, String fragment)

As this URI constructor assumes both path and query parameters are decoded
strings, it will encode them again. This makes the redirect URL encoded
Expose stage level peak executor metrics via REST API (+1558, -53)>

This PR proposes to expose the peak executor metrics at the stage level via
the REST APIs:

   - /applications/<application_id>/stages/: peak values of executor
   metrics for each stage
   - /applications/<application_id>/stages/<stage_id>/< stage_attempt_id >:
   peak values of executor metrics for each executor for the stage,
   followed by peak values of executor metrics for the stage

Exposing the stage level peak executor metrics can help better understand
your application's resource utilization.
Upgrade netty-all to 4.1.51.Final (+4, -4)>

This PR brings the bug fixes from the latest netty version from
Remove arrow::as_tibble usage in SparkR (+2, -18)>

This PR removed arrow::as_tibble usage in SparkR since SparkR has increased
the minimal version of Arrow R version to 1.0.0(SPARK-32452
<>), and Arrow R 0.14
dropped as_tibble.
Remove the words "experimental" in the k8s document (+0, -4)>

This PR removed the words "experimental" in the k8s document from the
primary branch in order to prepare a GA announcement for the k8s scheduler
in the next feature release (v3.1.0).
Publish failed and succeeded test reports in GitHub Actions (+52, -11)>

This PR proposes to report the failed and succeeded tests in GitHub Actions
in order to improve the development velocity by leveraging
Report SparkR test results with JUnit reporter (+8, -5)>

This PR proposes to generate a JUnit XML test report in SparkR tests that
can be leveraged in both Jenkins and GitHub Actions.
Upload unit-tests.log as an artifact (+6, -0)>

Infra improvement: This PR proposes to upload target/unit-tests.log into
the artifact so it will be able to download here: [image: Screen Shot
2020-08-18 at 2 23 18 PM]

Jenkins has this feature. It should be best to have the same dev
functionalities with it.
Support appId/execId placeholder in K8s SPARK_EXECUTOR_DIRS (+16, -0)>

Kubernetes improvement: This PR aims to support replacements of
Use workflow_dispatch to enable manual test triggers (+16, -1)>

Infra improvement: Reduce the pressure of GitHub Actions on the Spark

Add a workflow_dispatch entry in the GitHub Action script (
build_and_test.yml). This update can enable developers to run the Spark
tests for a specific branch on their own local repository, so I think it
might help to check if all the tests can pass before opening a new PR.

[image: Screen Shot 2020-08-21 at 16 28 41]

View raw message