Hi Akhil,

We disabled broadcast join for some reason (We suspect something related to Azure Storage connection with HDinsight, not 100% sure for now though).

Per suggested in this group, I have tried repartition before joining and make sure I have massive executor.memory (raised to 50g and confirmed at Application page) but no effect so far.

However I did found something new, when the application left 1 executor keep working and almost nothing loading, it actually slowly doing disk swap: 

19/04/01 08:07:08 INFO InternalParquetRecordWriter: mem size 134395806 > 134217728: flushing 771932 records to disk.
19/04/01 08:07:08 INFO InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 127269732
19/04/01 08:07:19 INFO InternalParquetRecordWriter: mem size 134219642 > 134217728: flushing 771932 records to disk.
19/04/01 08:07:19 INFO InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 127214844
19/04/01 08:07:31 INFO InternalParquetRecordWriter: mem size 134375258 > 134217728: flushing 771072 records to disk.
19/04/01 08:07:31 INFO InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 127038692
19/04/01 08:07:42 INFO InternalParquetRecordWriter: mem size 134249657 > 134217728: flushing 770779 records to disk.
19/04/01 08:07:42 INFO InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 127034977
19/04/01 08:07:53 INFO InternalParquetRecordWriter: mem size 135002505 > 134217728: flushing 773529 records to disk.
19/04/01 08:07:53 INFO InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 127475260
19/04/01 08:08:04 INFO InternalParquetRecordWriter: mem size 134268777 > 134217728: flushing 770779 records to disk.
19/04/01 08:08:04 INFO InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 127108857
19/04/01 08:08:16 INFO InternalParquetRecordWriter: mem size 134990722 > 134217728: flushing 773529 records to disk.
19/04/01 08:08:16 INFO InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 127385012
19/04/01 08:08:27 INFO InternalParquetRecordWriter: mem size 134366774 > 134217728: flushing 771072 records to disk.
19/04/01 08:08:27 INFO InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 127018472
19/04/01 08:08:39 INFO InternalParquetRecordWriter: mem size 135244073 > 134217728: flushing 773034 records to disk.
19/04/01 08:08:39 INFO InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 127472615
19/04/01 08:08:51 INFO InternalParquetRecordWriter: mem size 135040836 > 134217728: flushing 773529 records to disk.
19/04/01 08:08:51 INFO InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 127405071
19/04/01 08:09:02 INFO InternalParquetRecordWriter: mem size 134508763 > 134217728: flushing 773034 records to disk.
19/04/01 08:09:03 INFO InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 127342497
19/04/01 08:09:13 INFO InternalParquetRecordWriter: mem size 134241316 > 134217728: flushing 771932 records to disk.
19/04/01 08:09:13 INFO InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 127193927
19/04/01 08:09:24 INFO InternalParquetRecordWriter: mem size 135406928 > 134217728: flushing 773034 records to disk.
19/04/01 08:09:24 INFO InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 127516711
19/04/01 08:09:36 INFO InternalParquetRecordWriter: mem size 134665676 > 134217728: flushing 761350 records to disk.




On Mon, Mar 25, 2019 at 8:22 PM Akhil Das <akhld@hacked.work> wrote:
Have you tried the broadcast join? Since the other tables are relatively smaller in size, a broadcast join would help here https://spark.apache.org/docs/latest/sql-performance-tuning.html#broadcast-hint-for-sql-queries

On Mon, 25 Mar 2019, 19:15 Mike Chan, <mikechancs@gmail.com> wrote:
Hi Gourav,

Could data skew happens even I have small data size? Frankly speaking, I checked my joining tables is a

9 GB table A
left outer join 10MB table B
left outer join 5MB table C
left outer join 1MB table D
1MB table E left outer join with table C

And I have 20GB of spark.executor.memory as well
and that's it. Today I re-examed everything that if I 

9 GB table A
left outer join 10MB table B
left outer join 5MB table C (pre-left outer join with table E)
left outer join 1MB table D

and repartition(200) before writing the table out. Then the query changed from > 3 hours to 20 minutes. Although I have no idea why it can trigger statistics like:

  • Total Time Across All Tasks: 2.4 h
  • Locality Level Summary: Node local: 126; Process local: 35; Rack local: 39
  • Shuffle Read: 104.1 GB / 438828498
  • Shuffle Write: 167.8 GB / 438828185
  • Shuffle Spill (Memory): 534.0 GB
  • Shuffle Spill (Disk): 160.2 GB
image.png

And below is the "SQL" tab (I'm not sure this is something like execution plan in RDBMS

image.png
image.png
image.png
image.png

Today after several suggestions gathered, I tried below options:
.partitionBy("a date selected") in write.table statement -> nothing changed
.repartition(1000) in write.table statement -> nothing changed
sqlContext.setConf("spark.sql.shuffle.partitions", "1000") -> spin more tasks but nothing changed in speed

I do find out that in this new cluster,
spark.sql.join.preferSortMergeJoin => false
spark.sql.autoBroadcastJoinThreshold => -1

Not sure they're related though.

Thank you guys for giving advice. As you can tell I'm new to spark.

Best Regards,
Mike


On Mon, Mar 25, 2019 at 6:06 PM Gourav Sengupta <gourav.sengupta@gmail.com> wrote:
Hi,

I think that articles and demonstrations from 2016 is really really old and deprecated. What ofcourse does not change is the fact that we all have to understand our data better.

That said, try to go through this, I am not quite sure whether this feature is available in APACHE spark or not, but it certainly gives some idea: https://docs.databricks.com/delta/join-performance/skew-join.html



Regards,
Gourav

On Mon, Mar 25, 2019 at 4:30 AM Akhil Das <akhld@hacked.work> wrote:
Looks like a data skew, find a better partitioning on the dataset. This might help 
https://databricks.com/session/handling-data-skew-adaptively-in-spark-using-dynamic-repartitioning

On Sun, 24 Mar 2019, 22:49 Mike Chan, <mikechancs@gmail.com> wrote:
Dear all,

I have a spark sql that used to execute < 10 mins now running at 3 hours after a cluster migration and need to deep dive on what it's actually doing. I'm new to spark and please don't mind if I'm asking something unrelated.

Env: Azure HDinsight spark 2.4 on Azure storage
SQL: Read and Join some data and finally write result to a Hive metastore

Application Behavior: 
Within the first 15 mins, it loads and complete most tasks (199/200); left only 1 executor process alive and continually to shuffle read / write data. Because now it only leave 1 executor, we need to wait 3 hours until this application finish.
image.png

Left only 1 executor alive
image.png

Not sure what's the executor doing:
image.png

From time to time, we can tell the shuffle read increased:
image.png

Therefore I increased the spark.executor.memory to 20g, but nothing changed. From Ambari and YARN I can tell the cluster has many resources left.
image.png

Release of almost all executor
image.png

The sparl.sql ends with below code:
.write.mode("overwrite").saveAsTable("default.mikemiketable")

As you can tell I'm new to spark and not 100% getting what's going on here. The huge shuffle spill looks ugly, but they probably not the reason of slow execution - the reason why only 1 executor doing the job it is. Greatly appreciate if you can share how to troubleshot / further look into it. Thank you very much.

Best Regards,
Mik