spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From András Kolbert <kolbertand...@gmail.com>
Subject Re: Tasks are skewed to one executor
Date Mon, 26 Apr 2021 21:06:19 GMT
Not pandas DF in the driver, but a distributed grouped map using the
executors:
https://spark.apache.org/docs/2.4.4/sql-pyspark-pandas-with-arrow.html#grouped-map


On Mon, 26 Apr 2021 at 22:44, Mich Talebzadeh <mich.talebzadeh@gmail.com>
wrote:

> Hi,
>
> Just to clarify, are you also using Pandas? Pandas data frames are stored
> in RAM and cannot take part in parallel processing like Spark data frames.
> Is that the reason the application resulted in OOM error?
>
> HTH
>
>
>
>    view my Linkedin profile
> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Mon, 26 Apr 2021 at 21:25, András Kolbert <kolbertandras@gmail.com>
> wrote:
>
>> hi,
>>
>> The total available memory for an executor is 4GB, so I can't set it
>> unfortunately to 3000.
>>
>> The issue is actually when I apply the pandas grouped map again and again.
>>
>> It seems that these functions it seems that the data is not cleaned up,
>> as repeatedly calling the same pandas grouped map start resulting in
>> executors dying.
>>
>> Is there a GC option/anything that I can tune to prevent this?
>>
>> Andras
>>
>>
>>
>> On Thu, 22 Apr 2021 at 15:55, Mich Talebzadeh <mich.talebzadeh@gmail.com>
>> wrote:
>>
>>> how about
>>>
>>> --conf spark.yarn.executor.memoryOverhead=xyz
>>> say
>>> --conf spark.yarn.executor.memoryOverhead=3000
>>>
>>>    view my Linkedin profile
>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>
>>>
>>>
>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>> any loss, damage or destruction of data or any other property which may
>>> arise from relying on this email's technical content is explicitly
>>> disclaimed. The author will in no case be liable for any monetary damages
>>> arising from such loss, damage or destruction.
>>>
>>>
>>>
>>>
>>> On Wed, 21 Apr 2021 at 22:57, András Kolbert <kolbertandras@gmail.com>
>>> wrote:
>>>
>>>> Yes, you are right it is able to handle, however, without the spikes
>>>> it'd be even more reliable.
>>>>
>>>> One thing that I generally noticed: I can't manage to force my actual
>>>> RDD's to be unpersisted after each of the streaming micro batches. Maybe
>>>> that can also gradually fill up my executors and explain the fallover
>>>> periodically?!
>>>>
>>>> [image: image.png]
>>>>
>>>> These RDDs are rarely cleaned up.
>>>>
>>>>
>>>> [image: image.png]
>>>>
>>>>
>>>>
>>>> Context is created in the following way:
>>>>
>>>> def create_context():
>>>>     ssc = StreamingContext(sc, 60)
>>>>     opts = {"metadata.broker.list": X",
>>>>             "auto.offset.reset": "largest",
>>>>             "group.id": 'X'}
>>>>     tl = "Y"
>>>>
>>>>     kvs = KafkaUtils.createDirectStream(ssc, tl.split(","), opts)
>>>>     kvs.checkpoint(360) # checkpoint interval
>>>>     lines = kvs.map(lambda row: row[1])
>>>>     lines.foreachRDD(myfunction)
>>>>     ssc.checkpoint(checkpoint_folder
>>>>     return ssc
>>>>
>>>>
>>>>
>>>>
>>>>
>>>> On Sun, 18 Apr 2021 at 11:30, Mich Talebzadeh <
>>>> mich.talebzadeh@gmail.com> wrote:
>>>>
>>>>> As I can see from your steaming time your processing is just over 1
>>>>> minute for a batch interval of two minutes. So you appear to be able
to
>>>>> handle to volume of streaming?
>>>>>
>>>>> Next can you also provide the DAG diagram corresponding to the SQL run
>>>>> plus details
>>>>>
>>>>> What command are you using to submit this job and parameters?
>>>>>
>>>>> The problem is that parallel processing may not be useful for all
>>>>> scenarios but we can talk about later.
>>>>>
>>>>> HTH,
>>>>>
>>>>> Mich
>>>>>
>>>>>
>>>>>
>>>>>    view my Linkedin profile
>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>
>>>>>
>>>>>
>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>>> any loss, damage or destruction of data or any other property which may
>>>>> arise from relying on this email's technical content is explicitly
>>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>>> arising from such loss, damage or destruction.
>>>>>
>>>>>
>>>>>
>>>>>
>>>>> On Sat, 17 Apr 2021 at 07:25, András Kolbert <kolbertandras@gmail.com>
>>>>> wrote:
>>>>>
>>>>>> The issue has not been resolved unfortunately, even when I increased
>>>>>> the partitions for the saltness.~
>>>>>>
>>>>>> This is the application after 3 days of run.
>>>>>> [image: image.png]
>>>>>>
>>>>>> [image: image.png]
>>>>>>
>>>>>> Loads of dead executors overtime.
>>>>>>
>>>>>> [image: image.png]
>>>>>>
>>>>>>
>>>>>>
>>>>>> Where you see the spikes in processing, it is ALWAYS because
>>>>>> periodically executors die with OOM issues.
>>>>>>
>>>>>> My assumption is that it is just too many broadcast hash joins and
>>>>>> that maybe fills up the memory overtime? Is it possible to somehow
clean
>>>>>> the memory before the executor dies (some GC tuning)?
>>>>>>
>>>>>> Re Spark 3.0. At the moment it is not an option as dependency systems
>>>>>> still use Kafka 0.8.. and Spark 3.0 is not compatible with that.
That's why
>>>>>> I use Spark 2.4.4
>>>>>>
>>>>>> Thanks
>>>>>> Andras
>>>>>>
>>>>>>
>>>>>> On Mon, 12 Apr 2021 at 18:19, Gourav Sengupta <
>>>>>> gourav.sengupta@gmail.com> wrote:
>>>>>>
>>>>>>> Hi Andras,
>>>>>>>
>>>>>>> Any chance you might get to use SPARK 3?
>>>>>>>
>>>>>>> Regards,
>>>>>>> Gourav Sengupta
>>>>>>>
>>>>>>> On Mon, Apr 12, 2021 at 3:37 PM András Kolbert <
>>>>>>> kolbertandras@gmail.com> wrote:
>>>>>>>
>>>>>>>> partition_id has 45 partitions.
>>>>>>>>
>>>>>>>>    1. After aggregation is ready, the dataframe is persisted
on
>>>>>>>>    HDFS (45 separate parquet files, each is around 18M-19MB).
>>>>>>>>    2. Then those partitions are read in and cached. (df =
>>>>>>>>    spark.read.load(hdfs_path))
>>>>>>>>    At this point, Spark UI shows 23 partitions being cached,
>>>>>>>>    taking up approx 140-280MB on heap memory space per executors,
>>>>>>>>    remaining (1400-1700MB on heap memory)
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, 12 Apr 2021 at 16:01, Mich Talebzadeh <
>>>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> it should read
>>>>>>>>>
>>>>>>>>> How many partitions did you use in your groupBy (account,product,
>>>>>>>>> *partion_id*)?
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> Try increasing it with some randomised
>>>>>>>>> uniformly distributed integer between  0-*bins* where...
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>    view my Linkedin profile
>>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>>> for any loss, damage or destruction of data or any other
property which may
>>>>>>>>> arise from relying on this email's technical content
is explicitly
>>>>>>>>> disclaimed. The author will in no case be liable for
any monetary damages
>>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, 12 Apr 2021 at 14:57, Mich Talebzadeh <
>>>>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Sounds like there is mileage in salting. Other parameters
may
>>>>>>>>>> play a role as well.
>>>>>>>>>>
>>>>>>>>>> How many partitions did you use in your groupBy
>>>>>>>>>> (account,product, salt)?
>>>>>>>>>>
>>>>>>>>>> It may not be large enough.
>>>>>>>>>>
>>>>>>>>>> Try increasing it with some randomised
>>>>>>>>>> uniformly distributed integer between  0-n where
>>>>>>>>>>
>>>>>>>>>> bins = 20   ## play around with this number
>>>>>>>>>>
>>>>>>>>>> df2 = df.withColumn("salt", (rand * bins ).cast("integer"))
>>>>>>>>>>
>>>>>>>>>> then
>>>>>>>>>>
>>>>>>>>>> groupBy(account,product,salt)
>>>>>>>>>>
>>>>>>>>>> Try this and change bins as needed
>>>>>>>>>>
>>>>>>>>>> HTH
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>    view my Linkedin profile
>>>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all
>>>>>>>>>> responsibility for any loss, damage or destruction
of data or any other
>>>>>>>>>> property which may arise from relying on this email's
technical content is
>>>>>>>>>> explicitly disclaimed. The author will in no case
be liable for any
>>>>>>>>>> monetary damages arising from such loss, damage or
destruction.
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, 12 Apr 2021 at 09:44, András Kolbert <
>>>>>>>>>> kolbertandras@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> The cluster is on prem, not in the cloud.
>>>>>>>>>>>
>>>>>>>>>>> Adding the partition id to the group by helped
slightly but the
>>>>>>>>>>> most benefits I got when I took out the following
property:
>>>>>>>>>>>
>>>>>>>>>>> spark.sql.autoBroadcastJoinThreshold = -1
>>>>>>>>>>>
>>>>>>>>>>> When I have this on, the process caches the big
dataframe to a
>>>>>>>>>>> couple of executors, and that makes them very
vulnerable for OOM.
>>>>>>>>>>>
>>>>>>>>>>> When the autoBroadcastJoin is on, the executors
still
>>>>>>>>>>> periodically die. My assumption is that having
autoBroadcastJOin on, means
>>>>>>>>>>> that during processing Spark broadcasts the smaller
dataframes each time to
>>>>>>>>>>> their memory, without deleting them from there
and gradually my executors
>>>>>>>>>>> die with OOM.
>>>>>>>>>>>
>>>>>>>>>>> This is after 48minutes runtime, 13 dead executors.
>>>>>>>>>>>
>>>>>>>>>>> [image: image.png]
>>>>>>>>>>>
>>>>>>>>>>> [image: image.png]
>>>>>>>>>>>
>>>>>>>>>>> I tried to broadcast(dataframe) manually, by
specifing broadcast
>>>>>>>>>>> explicitly, and then unpersisting when it changes.
Does not seem to work.
>>>>>>>>>>>
>>>>>>>>>>> Any idea what i should do?
>>>>>>>>>>>
>>>>>>>>>>> Other parameters that I specified:
>>>>>>>>>>> [image: image.png]
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, 12 Apr 2021 at 10:02, Gourav Sengupta
<
>>>>>>>>>>> gourav.sengupta@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> looks like you have answered some questions
whcih I generally
>>>>>>>>>>>> ask. Another thing, can you please let me
know the environment? Is it AWS,
>>>>>>>>>>>> GCP, Azure, Databricks, HDP, etc?
>>>>>>>>>>>>
>>>>>>>>>>>> Regards,
>>>>>>>>>>>> Gourav
>>>>>>>>>>>>
>>>>>>>>>>>> On Sun, Apr 11, 2021 at 8:39 AM András Kolbert
<
>>>>>>>>>>>> kolbertandras@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Sure!
>>>>>>>>>>>>>
>>>>>>>>>>>>> Application:
>>>>>>>>>>>>> - Spark version 2.4
>>>>>>>>>>>>> - Kafka Stream (DStream, from a kafka
0.8 brokers)
>>>>>>>>>>>>> - 7 executors, 2cores, 3700M memory size
>>>>>>>>>>>>>
>>>>>>>>>>>>> Logic:
>>>>>>>>>>>>> - Process initialises a dataframe that
contains metrics for an
>>>>>>>>>>>>> account/product metrics (e.g. {"account":A,
"product": X123, "metric"; 51}
>>>>>>>>>>>>> - After initialisation, the dataframe
is persisted on HDFS
>>>>>>>>>>>>> (dataframe is around 1GB total size in
memory)
>>>>>>>>>>>>> - Streaming:
>>>>>>>>>>>>> - each bach, processes incoming data,
unions the main
>>>>>>>>>>>>> dataframe with the new account/product/metric
interaction dataframe,
>>>>>>>>>>>>> aggregates the total, and then persist
on HDFS again (each batch we save
>>>>>>>>>>>>> the total dataframe again)
>>>>>>>>>>>>> - The screenshot I sent earlier, was
after this aggregation,
>>>>>>>>>>>>> and how all the data seems to be ended
up on the same executor. That could
>>>>>>>>>>>>> explain why the executor periodically
dies with OOM.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Mich, I hope this provides extra information
:)
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>> Andras
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, 10 Apr 2021 at 16:42, Mich Talebzadeh
<
>>>>>>>>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Can you provide a bit more info please?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> How are you running this job and
what is the streaming
>>>>>>>>>>>>>> framework (kafka, files etc)?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> HTH
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Mich
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>    view my Linkedin profile
>>>>>>>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> *Disclaimer:* Use it at your own
risk. Any and all
>>>>>>>>>>>>>> responsibility for any loss, damage
or destruction of data or any other
>>>>>>>>>>>>>> property which may arise from relying
on this email's technical content is
>>>>>>>>>>>>>> explicitly disclaimed. The author
will in no case be liable for any
>>>>>>>>>>>>>> monetary damages arising from such
loss, damage or destruction.
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> On Sat, 10 Apr 2021 at 14:28, András
Kolbert <
>>>>>>>>>>>>>> kolbertandras@gmail.com> wrote:
>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> hi,
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> I have a streaming job and quite
often executors die (due to
>>>>>>>>>>>>>>> memory errors/ "unable to find
location for shuffle etc) during the
>>>>>>>>>>>>>>> processing. I started digging
and found that some of the tasks are
>>>>>>>>>>>>>>> concentrated to one executor,
just as below:
>>>>>>>>>>>>>>> [image: image.png]
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Can this be the reason?
>>>>>>>>>>>>>>> Should I repartition the underlying
data before I execute a
>>>>>>>>>>>>>>> groupby on the top of it?
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Any advice is welcome
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>>> Andras
>>>>>>>>>>>>>>>
>>>>>>>>>>>>>>

Mime
View raw message