spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mike Chan <mikecha...@gmail.com>
Subject Re: [spark sql performance] Only 1 executor to write output?
Date Mon, 01 Apr 2019 09:12:19 GMT
Hi Akhil,

We disabled broadcast join for some reason (We suspect something related to
Azure Storage connection with HDinsight, not 100% sure for now though).

Per suggested in this group, I have tried repartition before joining and
make sure I have massive executor.memory (raised to 50g and confirmed at
Application page) but no effect so far.

However I did found something new, when the application left 1 executor
keep working and almost nothing loading, it actually slowly doing disk
swap:

19/04/01 08:07:08 INFO InternalParquetRecordWriter: mem size 134395806
> 134217728: flushing 771932 records to disk.
19/04/01 08:07:08 INFO InternalParquetRecordWriter: Flushing mem
columnStore to file. allocated memory: 127269732
19/04/01 08:07:19 INFO InternalParquetRecordWriter: mem size 134219642
> 134217728: flushing 771932 records to disk.
19/04/01 08:07:19 INFO InternalParquetRecordWriter: Flushing mem
columnStore to file. allocated memory: 127214844
19/04/01 08:07:31 INFO InternalParquetRecordWriter: mem size 134375258
> 134217728: flushing 771072 records to disk.
19/04/01 08:07:31 INFO InternalParquetRecordWriter: Flushing mem
columnStore to file. allocated memory: 127038692
19/04/01 08:07:42 INFO InternalParquetRecordWriter: mem size 134249657
> 134217728: flushing 770779 records to disk.
19/04/01 08:07:42 INFO InternalParquetRecordWriter: Flushing mem
columnStore to file. allocated memory: 127034977
19/04/01 08:07:53 INFO InternalParquetRecordWriter: mem size 135002505
> 134217728: flushing 773529 records to disk.
19/04/01 08:07:53 INFO InternalParquetRecordWriter: Flushing mem
columnStore to file. allocated memory: 127475260
19/04/01 08:08:04 INFO InternalParquetRecordWriter: mem size 134268777
> 134217728: flushing 770779 records to disk.
19/04/01 08:08:04 INFO InternalParquetRecordWriter: Flushing mem
columnStore to file. allocated memory: 127108857
19/04/01 08:08:16 INFO InternalParquetRecordWriter: mem size 134990722
> 134217728: flushing 773529 records to disk.
19/04/01 08:08:16 INFO InternalParquetRecordWriter: Flushing mem
columnStore to file. allocated memory: 127385012
19/04/01 08:08:27 INFO InternalParquetRecordWriter: mem size 134366774
> 134217728: flushing 771072 records to disk.
19/04/01 08:08:27 INFO InternalParquetRecordWriter: Flushing mem
columnStore to file. allocated memory: 127018472
19/04/01 08:08:39 INFO InternalParquetRecordWriter: mem size 135244073
> 134217728: flushing 773034 records to disk.
19/04/01 08:08:39 INFO InternalParquetRecordWriter: Flushing mem
columnStore to file. allocated memory: 127472615
19/04/01 08:08:51 INFO InternalParquetRecordWriter: mem size 135040836
> 134217728: flushing 773529 records to disk.
19/04/01 08:08:51 INFO InternalParquetRecordWriter: Flushing mem
columnStore to file. allocated memory: 127405071
19/04/01 08:09:02 INFO InternalParquetRecordWriter: mem size 134508763
> 134217728: flushing 773034 records to disk.
19/04/01 08:09:03 INFO InternalParquetRecordWriter: Flushing mem
columnStore to file. allocated memory: 127342497
19/04/01 08:09:13 INFO InternalParquetRecordWriter: mem size 134241316
> 134217728: flushing 771932 records to disk.
19/04/01 08:09:13 INFO InternalParquetRecordWriter: Flushing mem
columnStore to file. allocated memory: 127193927
19/04/01 08:09:24 INFO InternalParquetRecordWriter: mem size 135406928
> 134217728: flushing 773034 records to disk.
19/04/01 08:09:24 INFO InternalParquetRecordWriter: Flushing mem
columnStore to file. allocated memory: 127516711
19/04/01 08:09:36 INFO InternalParquetRecordWriter: mem size 134665676
> 134217728: flushing 761350 records to disk.





On Mon, Mar 25, 2019 at 8:22 PM Akhil Das <akhld@hacked.work> wrote:

> Have you tried the broadcast join? Since the other tables are relatively
> smaller in size, a broadcast join would help here
> https://spark.apache.org/docs/latest/sql-performance-tuning.html#broadcast-hint-for-sql-queries
>
> On Mon, 25 Mar 2019, 19:15 Mike Chan, <mikechancs@gmail.com> wrote:
>
>> Hi Gourav,
>>
>> Could data skew happens even I have small data size? Frankly speaking, I
>> checked my joining tables is a
>>
>> 9 GB table A
>> left outer join 10MB table B
>> left outer join 5MB table C
>> left outer join 1MB table D
>> 1MB table E left outer join with table C
>>
>> And I have 20GB of spark.executor.memory as well
>> and that's it. Today I re-examed everything that if I
>>
>> 9 GB table A
>> left outer join 10MB table B
>> left outer join 5MB table C (pre-left outer join with table E)
>> left outer join 1MB table D
>>
>> and repartition(200) before writing the table out. Then the query changed
>> from > 3 hours to 20 minutes. Although I have no idea why it can trigger
>> statistics like:
>>
>>
>>    - *Total Time Across All Tasks: *2.4 h
>>    - *Locality Level Summary: *Node local: 126; Process local: 35; Rack
>>    local: 39
>>    - *Shuffle Read: *104.1 GB / 438828498
>>    - *Shuffle Write: *167.8 GB / 438828185
>>    - *Shuffle Spill (Memory): *534.0 GB
>>    - *Shuffle Spill (Disk): *160.2 GB
>>
>> [image: image.png]
>>
>> And below is the "SQL" tab (I'm not sure this is something like execution
>> plan in RDBMS
>>
>> [image: image.png]
>> [image: image.png]
>> [image: image.png]
>> [image: image.png]
>>
>> Today after several suggestions gathered, I tried below options:
>> .partitionBy("a date selected") in write.table statement -> nothing
>> changed
>> .repartition(1000) in write.table statement -> nothing changed
>> sqlContext.setConf("spark.sql.shuffle.partitions", "1000") -> spin more
>> tasks but nothing changed in speed
>>
>> I do find out that in this new cluster,
>> spark.sql.join.preferSortMergeJoin => false
>> spark.sql.autoBroadcastJoinThreshold => -1
>>
>> Not sure they're related though.
>>
>> Thank you guys for giving advice. As you can tell I'm new to spark.
>>
>> Best Regards,
>> Mike
>>
>>
>> On Mon, Mar 25, 2019 at 6:06 PM Gourav Sengupta <
>> gourav.sengupta@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I think that articles and demonstrations from 2016 is really really old
>>> and deprecated. What ofcourse does not change is the fact that we all have
>>> to understand our data better.
>>>
>>> That said, try to go through this, I am not quite sure whether this
>>> feature is available in APACHE spark or not, but it certainly gives some
>>> idea: https://docs.databricks.com/delta/join-performance/skew-join.html
>>>
>>>
>>>
>>> Regards,
>>> Gourav
>>>
>>> On Mon, Mar 25, 2019 at 4:30 AM Akhil Das <akhld@hacked.work> wrote:
>>>
>>>> Looks like a data skew, find a better partitioning on the dataset. This
>>>> might help
>>>>
>>>> https://databricks.com/session/handling-data-skew-adaptively-in-spark-using-dynamic-repartitioning
>>>>
>>>> On Sun, 24 Mar 2019, 22:49 Mike Chan, <mikechancs@gmail.com> wrote:
>>>>
>>>>> Dear all,
>>>>>
>>>>> I have a spark sql that used to execute < 10 mins now running at 3
>>>>> hours after a cluster migration and need to deep dive on what it's actually
>>>>> doing. I'm new to spark and please don't mind if I'm asking something
>>>>> unrelated.
>>>>>
>>>>> Env: Azure HDinsight spark 2.4 on Azure storage
>>>>> SQL: Read and Join some data and finally write result to a Hive
>>>>> metastore
>>>>>
>>>>> Application Behavior:
>>>>> Within the first 15 mins, it loads and complete most tasks (199/200);
>>>>> left only 1 executor process alive and continually to shuffle read /
write
>>>>> data. Because now it only leave 1 executor, we need to wait 3 hours until
>>>>> this application finish.
>>>>> [image: image.png]
>>>>>
>>>>> Left only 1 executor alive
>>>>> [image: image.png]
>>>>>
>>>>> Not sure what's the executor doing:
>>>>> [image: image.png]
>>>>>
>>>>> From time to time, we can tell the shuffle read increased:
>>>>> [image: image.png]
>>>>>
>>>>> Therefore I increased the spark.executor.memory to 20g, but nothing
>>>>> changed. From Ambari and YARN I can tell the cluster has many resources
>>>>> left.
>>>>> [image: image.png]
>>>>>
>>>>> Release of almost all executor
>>>>> [image: image.png]
>>>>>
>>>>> The sparl.sql ends with below code:
>>>>> .write.mode("overwrite").saveAsTable("default.mikemiketable")
>>>>>
>>>>> As you can tell I'm new to spark and not 100% getting what's going on
>>>>> here. The huge shuffle spill looks ugly, but they probably not the reason
>>>>> of slow execution - the reason why only 1 executor doing the job it is.
>>>>> Greatly appreciate if you can share how to troubleshot / further look
into
>>>>> it. Thank you very much.
>>>>>
>>>>> Best Regards,
>>>>> Mik
>>>>>
>>>>

Mime
View raw message