spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Mendelson, Assaf" <Assaf.Mendel...@rsa.com>
Subject RE: Re:RE: how to merge dataframe write output files
Date Thu, 10 Nov 2016 08:49:07 GMT
As people stated, when you coalesce to 1 partition then basically you lose all parallelism,
however, you can coalesce to a difference value.
If for example you coalesce to 20 then you can parallelize up to 20 different tasks.
You have a total of 4 executors, with 2 cores each. This means that you basically have a core
parallelism of 8. In general it is best to have a number of tasks which is 2-3 times that
number for better distribution. So in general ~20 tasks would be a good idea. Looking at your
output I see part 00176 which I guess would mean you have an order of 200 tasks (which is
the default parallelism when you have a shuffle for example).
Coalescing to 20 would still give you enough parallelism to use your cluster and would give
you less files which are bigger.
Assaf.


From: Shreya Agarwal [mailto:shreyagr@microsoft.com]
Sent: Thursday, November 10, 2016 10:28 AM
To: lk_spark
Cc: user.spark
Subject: RE: Re:RE: how to merge dataframe write output files

Your coalesce should technically work - One thing to check would be overhead memory. You should
configure it as 10% of executor memory.  Also, you might need to increase maxResultSize. Also,
the data looks fine for the cluster unless your join yields >6G worth of data. Few things
to try -

  1.  Can you do a cache on both the DFs and try?
  2.  Can you do a cache on both, then use scala join on DFs instead of loading to sql ?
  3.  Can you try dataset instead of dataframe? (assuming you are on Spark 2.0)
  4.  If you still see errors, can you check YARN logs, or post here?

I am sorry I don't know the answer to this,  but pretty sure there should be a way to work
with fragmented files too.

From: lk_spark [mailto:lk_spark@163.com]
Sent: Thursday, November 10, 2016 12:20 AM
To: Shreya Agarwal <shreyagr@microsoft.com<mailto:shreyagr@microsoft.com>>
Cc: user.spark <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: Re:RE: how to merge dataframe write output files

thank you for reply,Shreya:
It's because the files is too small and hdfs dosen't like small file .
for your question. yes I want to create ExternalTable on the parquetfile floder. And how to
use fragmented files as you mention?

the tests case as below:
bin/spark-shell --master yarn --deploy-mode client --driver-memory 6g --executor-memory 8g
--executor-cores 2 --num-executors 4
val df = spark.read.parquet("/parquetdata/weixin/biz-tag-relation/")
df.createOrReplaceTempView("biztag")             #almost 70M   3673411 rows
val df2 = spark.read.parquet("/parquetdata/weixin/biz/month=201608")
df2.createOrReplaceTempView("biz1608")        #almost 90M   381700  rows
for(i <- 1 to 61) {
      val dft = spark.sql(s"select biz1608.*,biztag.tag_id from biz1608 left join biztag on
biz1608.bid = biztag.biz_id where biztag.tag_id = ${i}")
      dft.coalesce(1).write.parquet(s"/parquetdata/weixin/biztags/biztag${i}")
 }



At 2016-11-10 15:47:02, "Shreya Agarwal" <shreyagr@microsoft.com<mailto:shreyagr@microsoft.com>>
wrote:
Is there a reason you want to merge the files? The reason you are getting errors (afaik) is
because when you try to coalesce and then write, you are forcing all the content to reside
on one executor, and the size of data is exceeding the memory you have for storage in your
executor, hence causing the container to be killed. We can confirm this if you provide the
specs of your cluster. The whole purpose of multiple files is so that each executor can write
its partition out in parallel, without having to collect the data in one place.

Not to mention that it'll make your write incredibly slow and also it'll take away all the
speed of reading in the data from a parquet as there won't be any parallelism at the time
of input (if you try to input this parquet).

Again, the important question is - Why do you need it to be one file? Are you planning to
use it externally? If yes, can you not use fragmented files there? If the data is too big
for the Spark executor, it'll most certainly be too much for JRE or any other runtime  to
load in memory on a single box.

From: lk_spark [mailto:lk_spark@163.com<mailto:lk_spark@163.com>]
Sent: Wednesday, November 9, 2016 11:29 PM
To: user.spark <user@spark.apache.org<mailto:user@spark.apache.org>>
Subject: how to merge dataframe write output files

hi,all:
    when I call api df.write.parquet ,there is alot of small files :   how can I merge then
into on file ? I tried df.coalesce(1).write.parquet ,but it will get error some times

Container exited with a non-zero exit code 143
more an more...
-rw-r--r--   2 hadoop supergroup     14.5 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00165-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup     16.4 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00166-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup     17.1 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00167-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup     14.2 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00168-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup     15.7 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00169-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup     14.4 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00170-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup     17.1 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00171-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup     15.7 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00172-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup     16.0 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00173-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup     17.1 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00174-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup     14.0 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00175-0f61afe4-23e8-40bb-b30b-09652ca677bc.snappy.parquet
-rw-r--r--   2 hadoop supergroup     15.7 K 2016-11-10 15:11 /parquetdata/weixin/biztags/biztag2/part-r-00176-0f61afe4-23e8-40bb-b30b-09652ca677bc
more an more...
2016-11-10
________________________________
lk_spark




Mime
View raw message