spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Juho Autio <juho.au...@rovio.com>
Subject Re: [Spark SQL]: Slow insertInto overwrite if target table has many partitions
Date Thu, 25 Apr 2019 13:10:00 GMT
Ok, I've verified that hive> SHOW PARTITIONS is using get_partition_names,
which is always quite fast. Spark's insertInto uses
get_partitions_with_auth which
is much slower (it also gets location etc. of each partition).

I created a test in java that with a local metastore client to measure the
time:

I used the Short.MAX_VALUE (32767) as max for both (so also get 32767
partitions in both responses). I didn't get next page of results, but this
gives the idea already:

listPartitionNames completed in: 1540 ms ~= 1,5 seconds
listPartitionsWithAuthInfo completed in: 303400 ms ~= 5 minutes

I wonder if this can be optimized on metastore side, but at least it
doesn't seem to be CPU-bound on the RDS db (we're using Hive metastore,
backed by AWS RDS).

So my original question remains; does spark need to know about all existing
partitions for dynamic overwrite? I don't see why it would.

On Thu, Apr 25, 2019 at 10:12 AM vincent gromakowski <
vincent.gromakowski@gmail.com> wrote:

> Which metastore are you using?
>
> Le jeu. 25 avr. 2019 à 09:02, Juho Autio <juho.autio@rovio.com> a écrit :
>
>> Would anyone be able to answer this question about the non-optimal
>> implementation of insertInto?
>>
>> On Thu, Apr 18, 2019 at 4:45 PM Juho Autio <juho.autio@rovio.com> wrote:
>>
>>> Hi,
>>>
>>> My job is writing ~10 partitions with insertInto. With the same input /
>>> output data the total duration of the job is very different depending on
>>> how many partitions the target table has.
>>>
>>> Target table with 10 of partitions:
>>> 1 min 30 s
>>>
>>> Target table with ~10000 partitions:
>>> 13 min 0 s
>>>
>>> It seems that spark is always fetching the full list of partitions in
>>> target table. When this happens, the cluster is basically idling while
>>> driver is listing partitions.
>>>
>>> Here's a thread dump for executor driver from such idle time:
>>> https://gist.github.com/juhoautio/bdbc8eb339f163178905322fc393da20
>>>
>>> Is there any way to optimize this currently? Is this a known issue? Any
>>> plans to improve?
>>>
>>> My code is essentially:
>>>
>>> spark = SparkSession.builder \
>>>     .config('spark.sql.hive.caseSensitiveInferenceMode', 'NEVER_INFER') \
>>>     .config("hive.exec.dynamic.partition", "true") \
>>>     .config('spark.sql.sources.partitionOverwriteMode', 'dynamic') \
>>>     .config("hive.exec.dynamic.partition.mode", "nonstrict") \
>>>     .enableHiveSupport() \
>>>     .getOrCreate()
>>>
>>> out_df.write \
>>>     .option('mapreduce.fileoutputcommitter.algorithm.version', '2') \
>>>     .insertInto(target_table_name, overwrite=True)
>>>
>>> Table has been originally created from spark with saveAsTable.
>>>
>>> Does spark need to know anything about the existing partitions though?
>>> As a manual workaround I would write the files directly to the partition
>>> locations, delete existing files first if there's anything in that
>>> partition, and then call metastore to ALTER TABLE IF NOT EXISTS ADD
>>> PARTITION. This doesn't require previous knowledge on existing partitions.
>>>
>>> Thanks.
>>>
>>

-- 
*Juho Autio*
Senior Data Engineer

Data Engineering, Games
Rovio Entertainment Corporation
Mobile: + 358 (0)45 313 0122
juho.autio@rovio.com
www.rovio.com

*This message and its attachments may contain confidential information and
is intended solely for the attention and use of the named addressee(s). If
you are not the intended recipient and / or you have received this message
in error, please contact the sender immediately and delete all material you
have received in this message. You are hereby notified that any use of the
information, which you have received in error in whatsoever form, is
strictly prohibited. Thank you for your co-operation.*

Mime
View raw message