spark-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "sam (Jira)" <j...@apache.org>
Subject [jira] [Updated] (SPARK-30101) spark.sql.shuffle.partitions is not in Configuration docs, but a very critical parameter
Date Tue, 03 Dec 2019 08:55:00 GMT

     [ https://issues.apache.org/jira/browse/SPARK-30101?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

sam updated SPARK-30101:
------------------------
    Description: 
I'm creating a `SparkSession` like this:

```
SparkSession
      .builder().appName("foo").master("local")
      .config("spark.default.parallelism", 2).getOrCreate()
```

when I run

```
((1 to 10) ++ (1 to 10)).toDS().distinct().count()
```

I get 200 partitions

```
19/12/02 10:29:34 INFO TaskSchedulerImpl: Adding task set 1.0 with 200 tasks
...
19/12/02 10:29:34 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 46 ms on
localhost (executor driver) (1/200)
```

It is the `distinct` that is broken since `ds.rdd.getNumPartitions` gives `2`, while `ds.distinct().rdd.getNumPartitions`
gives `200`.  `ds.rdd.groupBy(identity).map(_._2.head)` and `ds.rdd.distinct()` work correctly.

Finally I notice that the good old `RDD` interface has a `distinct` that accepts `numPartitions`
partitions, while `Dataset` does not.

.......................

According to below comments, it uses spark.sql.shuffle.partitions, which needs documenting
in configuration.

> Default number of partitions in RDDs returned by transformations like join, reduceByKey,
and parallelize when not set by user.

in https://spark.apache.org/docs/latest/configuration.html should say

> Default number of partitions in RDDs, but not DS/DF (see spark.sql.shuffle.partitions)
returned by transformations like join, reduceByKey, and parallelize when not set by user.



  was:
I'm creating a `SparkSession` like this:

```
SparkSession
      .builder().appName("foo").master("local")
      .config("spark.default.parallelism", 2).getOrCreate()
```

when I run

```
((1 to 10) ++ (1 to 10)).toDS().distinct().count()
```

I get 200 partitions

```
19/12/02 10:29:34 INFO TaskSchedulerImpl: Adding task set 1.0 with 200 tasks
...
19/12/02 10:29:34 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 46 ms on
localhost (executor driver) (1/200)
```

It is the `distinct` that is broken since `ds.rdd.getNumPartitions` gives `2`, while `ds.distinct().rdd.getNumPartitions`
gives `200`.  `ds.rdd.groupBy(identity).map(_._2.head)` and `ds.rdd.distinct()` work correctly.

Finally I notice that the good old `RDD` interface has a `distinct` that accepts `numPartitions`
partitions, while `Dataset` does not.





> spark.sql.shuffle.partitions is not in Configuration docs, but a very critical parameter
> ----------------------------------------------------------------------------------------
>
>                 Key: SPARK-30101
>                 URL: https://issues.apache.org/jira/browse/SPARK-30101
>             Project: Spark
>          Issue Type: Bug
>          Components: Spark Core
>    Affects Versions: 2.4.0, 2.4.4
>            Reporter: sam
>            Priority: Major
>
> I'm creating a `SparkSession` like this:
> ```
> SparkSession
>       .builder().appName("foo").master("local")
>       .config("spark.default.parallelism", 2).getOrCreate()
> ```
> when I run
> ```
> ((1 to 10) ++ (1 to 10)).toDS().distinct().count()
> ```
> I get 200 partitions
> ```
> 19/12/02 10:29:34 INFO TaskSchedulerImpl: Adding task set 1.0 with 200 tasks
> ...
> 19/12/02 10:29:34 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 2) in 46 ms
on localhost (executor driver) (1/200)
> ```
> It is the `distinct` that is broken since `ds.rdd.getNumPartitions` gives `2`, while
`ds.distinct().rdd.getNumPartitions` gives `200`.  `ds.rdd.groupBy(identity).map(_._2.head)`
and `ds.rdd.distinct()` work correctly.
> Finally I notice that the good old `RDD` interface has a `distinct` that accepts `numPartitions`
partitions, while `Dataset` does not.
> .......................
> According to below comments, it uses spark.sql.shuffle.partitions, which needs documenting
in configuration.
> > Default number of partitions in RDDs returned by transformations like join, reduceByKey,
and parallelize when not set by user.
> in https://spark.apache.org/docs/latest/configuration.html should say
> > Default number of partitions in RDDs, but not DS/DF (see spark.sql.shuffle.partitions)
returned by transformations like join, reduceByKey, and parallelize when not set by user.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org


Mime
View raw message