spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Juho Autio <juho.au...@rovio.com>
Subject Re: [Spark SQL]: Slow insertInto overwrite if target table has many partitions
Date Thu, 25 Apr 2019 15:16:49 GMT
> Not sure if the dynamic overwrite logic is implemented in Spark or in Hive

AFAIK I'm using spark implementation(s). Does the thread dump that I posted
show that? I'd like to remain within Spark impl.

What I'm trying to ask is, do you spark developers see some ways to
optimize this?

Otherwise, I'm not sure what you mean by this:

> There is a probably a limit in the number of element you can pass in the
list of partitions for the listPartitionsWithAuthInfo API call

That request takes a "max" argument, which is just a limit. The type is
short, so max size per response is 32767. Any way, even with this single
request & response it already takes that 5 minutes.

On Thu, Apr 25, 2019 at 5:46 PM vincent gromakowski <
vincent.gromakowski@gmail.com> wrote:

> There is a probably a limit in the number of element you can pass in the
> list of partitions for the listPartitionsWithAuthInfo API call. Not sure if
> the dynamic overwrite logic is implemented in Spark or in Hive, in which
> case using hive 1.2.1 is probably the reason for un-optimized logic but
> also a huge constraint for solving this issue as upgrading Hive version is
> a real challenge
>
> Le jeu. 25 avr. 2019 à 15:10, Juho Autio <juho.autio@rovio.com> a écrit :
>
>> Ok, I've verified that hive> SHOW PARTITIONS is using get_partition_names,
>> which is always quite fast. Spark's insertInto uses
>> get_partitions_with_auth which is much slower (it also gets location
>> etc. of each partition).
>>
>> I created a test in java that with a local metastore client to measure
>> the time:
>>
>> I used the Short.MAX_VALUE (32767) as max for both (so also get 32767
>> partitions in both responses). I didn't get next page of results, but this
>> gives the idea already:
>>
>> listPartitionNames completed in: 1540 ms ~= 1,5 seconds
>> listPartitionsWithAuthInfo completed in: 303400 ms ~= 5 minutes
>>
>> I wonder if this can be optimized on metastore side, but at least it
>> doesn't seem to be CPU-bound on the RDS db (we're using Hive metastore,
>> backed by AWS RDS).
>>
>> So my original question remains; does spark need to know about all
>> existing partitions for dynamic overwrite? I don't see why it would.
>>
>> On Thu, Apr 25, 2019 at 10:12 AM vincent gromakowski <
>> vincent.gromakowski@gmail.com> wrote:
>>
>>> Which metastore are you using?
>>>
>>> Le jeu. 25 avr. 2019 à 09:02, Juho Autio <juho.autio@rovio.com> a
>>> écrit :
>>>
>>>> Would anyone be able to answer this question about the non-optimal
>>>> implementation of insertInto?
>>>>
>>>> On Thu, Apr 18, 2019 at 4:45 PM Juho Autio <juho.autio@rovio.com>
>>>> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> My job is writing ~10 partitions with insertInto. With the same input
>>>>> / output data the total duration of the job is very different depending
on
>>>>> how many partitions the target table has.
>>>>>
>>>>> Target table with 10 of partitions:
>>>>> 1 min 30 s
>>>>>
>>>>> Target table with ~10000 partitions:
>>>>> 13 min 0 s
>>>>>
>>>>> It seems that spark is always fetching the full list of partitions in
>>>>> target table. When this happens, the cluster is basically idling while
>>>>> driver is listing partitions.
>>>>>
>>>>> Here's a thread dump for executor driver from such idle time:
>>>>> https://gist.github.com/juhoautio/bdbc8eb339f163178905322fc393da20
>>>>>
>>>>> Is there any way to optimize this currently? Is this a known issue?
>>>>> Any plans to improve?
>>>>>
>>>>> My code is essentially:
>>>>>
>>>>> spark = SparkSession.builder \
>>>>>     .config('spark.sql.hive.caseSensitiveInferenceMode',
>>>>> 'NEVER_INFER') \
>>>>>     .config("hive.exec.dynamic.partition", "true") \
>>>>>     .config('spark.sql.sources.partitionOverwriteMode', 'dynamic') \
>>>>>     .config("hive.exec.dynamic.partition.mode", "nonstrict") \
>>>>>     .enableHiveSupport() \
>>>>>     .getOrCreate()
>>>>>
>>>>> out_df.write \
>>>>>     .option('mapreduce.fileoutputcommitter.algorithm.version', '2') \
>>>>>     .insertInto(target_table_name, overwrite=True)
>>>>>
>>>>> Table has been originally created from spark with saveAsTable.
>>>>>
>>>>> Does spark need to know anything about the existing partitions though?
>>>>> As a manual workaround I would write the files directly to the partition
>>>>> locations, delete existing files first if there's anything in that
>>>>> partition, and then call metastore to ALTER TABLE IF NOT EXISTS ADD
>>>>> PARTITION. This doesn't require previous knowledge on existing partitions.
>>>>>
>>>>> Thanks.
>>>>>
>>>>
>>
>> --
>> *Juho Autio*
>> Senior Data Engineer
>>
>> Data Engineering, Games
>> Rovio Entertainment Corporation
>> Mobile: + 358 (0)45 313 0122
>> juho.autio@rovio.com
>> www.rovio.com
>>
>> *This message and its attachments may contain confidential information
>> and is intended solely for the attention and use of the named addressee(s).
>> If you are not the intended recipient and / or you have received this
>> message in error, please contact the sender immediately and delete all
>> material you have received in this message. You are hereby notified that
>> any use of the information, which you have received in error in whatsoever
>> form, is strictly prohibited. Thank you for your co-operation.*
>>
>

Mime
View raw message