spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: Tasks are skewed to one executor
Date Mon, 26 Apr 2021 20:44:35 GMT
Hi,

Just to clarify, are you also using Pandas? Pandas data frames are stored
in RAM and cannot take part in parallel processing like Spark data frames.
Is that the reason the application resulted in OOM error?

HTH



   view my Linkedin profile
<https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Mon, 26 Apr 2021 at 21:25, András Kolbert <kolbertandras@gmail.com>
wrote:

> hi,
>
> The total available memory for an executor is 4GB, so I can't set it
> unfortunately to 3000.
>
> The issue is actually when I apply the pandas grouped map again and again.
>
> It seems that these functions it seems that the data is not cleaned up, as
> repeatedly calling the same pandas grouped map start resulting in executors
> dying.
>
> Is there a GC option/anything that I can tune to prevent this?
>
> Andras
>
>
>
> On Thu, 22 Apr 2021 at 15:55, Mich Talebzadeh <mich.talebzadeh@gmail.com>
> wrote:
>
>> how about
>>
>> --conf spark.yarn.executor.memoryOverhead=xyz
>> say
>> --conf spark.yarn.executor.memoryOverhead=3000
>>
>>    view my Linkedin profile
>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>>
>> On Wed, 21 Apr 2021 at 22:57, András Kolbert <kolbertandras@gmail.com>
>> wrote:
>>
>>> Yes, you are right it is able to handle, however, without the spikes
>>> it'd be even more reliable.
>>>
>>> One thing that I generally noticed: I can't manage to force my actual
>>> RDD's to be unpersisted after each of the streaming micro batches. Maybe
>>> that can also gradually fill up my executors and explain the fallover
>>> periodically?!
>>>
>>> [image: image.png]
>>>
>>> These RDDs are rarely cleaned up.
>>>
>>>
>>> [image: image.png]
>>>
>>>
>>>
>>> Context is created in the following way:
>>>
>>> def create_context():
>>>     ssc = StreamingContext(sc, 60)
>>>     opts = {"metadata.broker.list": X",
>>>             "auto.offset.reset": "largest",
>>>             "group.id": 'X'}
>>>     tl = "Y"
>>>
>>>     kvs = KafkaUtils.createDirectStream(ssc, tl.split(","), opts)
>>>     kvs.checkpoint(360) # checkpoint interval
>>>     lines = kvs.map(lambda row: row[1])
>>>     lines.foreachRDD(myfunction)
>>>     ssc.checkpoint(checkpoint_folder
>>>     return ssc
>>>
>>>
>>>
>>>
>>>
>>> On Sun, 18 Apr 2021 at 11:30, Mich Talebzadeh <mich.talebzadeh@gmail.com>
>>> wrote:
>>>
>>>> As I can see from your steaming time your processing is just over 1
>>>> minute for a batch interval of two minutes. So you appear to be able to
>>>> handle to volume of streaming?
>>>>
>>>> Next can you also provide the DAG diagram corresponding to the SQL run
>>>> plus details
>>>>
>>>> What command are you using to submit this job and parameters?
>>>>
>>>> The problem is that parallel processing may not be useful for all
>>>> scenarios but we can talk about later.
>>>>
>>>> HTH,
>>>>
>>>> Mich
>>>>
>>>>
>>>>
>>>>    view my Linkedin profile
>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>
>>>>
>>>>
>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>>>> any loss, damage or destruction of data or any other property which may
>>>> arise from relying on this email's technical content is explicitly
>>>> disclaimed. The author will in no case be liable for any monetary damages
>>>> arising from such loss, damage or destruction.
>>>>
>>>>
>>>>
>>>>
>>>> On Sat, 17 Apr 2021 at 07:25, András Kolbert <kolbertandras@gmail.com>
>>>> wrote:
>>>>
>>>>> The issue has not been resolved unfortunately, even when I increased
>>>>> the partitions for the saltness.~
>>>>>
>>>>> This is the application after 3 days of run.
>>>>> [image: image.png]
>>>>>
>>>>> [image: image.png]
>>>>>
>>>>> Loads of dead executors overtime.
>>>>>
>>>>> [image: image.png]
>>>>>
>>>>>
>>>>>
>>>>> Where you see the spikes in processing, it is ALWAYS because
>>>>> periodically executors die with OOM issues.
>>>>>
>>>>> My assumption is that it is just too many broadcast hash joins and
>>>>> that maybe fills up the memory overtime? Is it possible to somehow clean
>>>>> the memory before the executor dies (some GC tuning)?
>>>>>
>>>>> Re Spark 3.0. At the moment it is not an option as dependency systems
>>>>> still use Kafka 0.8.. and Spark 3.0 is not compatible with that. That's
why
>>>>> I use Spark 2.4.4
>>>>>
>>>>> Thanks
>>>>> Andras
>>>>>
>>>>>
>>>>> On Mon, 12 Apr 2021 at 18:19, Gourav Sengupta <
>>>>> gourav.sengupta@gmail.com> wrote:
>>>>>
>>>>>> Hi Andras,
>>>>>>
>>>>>> Any chance you might get to use SPARK 3?
>>>>>>
>>>>>> Regards,
>>>>>> Gourav Sengupta
>>>>>>
>>>>>> On Mon, Apr 12, 2021 at 3:37 PM András Kolbert <
>>>>>> kolbertandras@gmail.com> wrote:
>>>>>>
>>>>>>> partition_id has 45 partitions.
>>>>>>>
>>>>>>>    1. After aggregation is ready, the dataframe is persisted
on
>>>>>>>    HDFS (45 separate parquet files, each is around 18M-19MB).
>>>>>>>    2. Then those partitions are read in and cached. (df =
>>>>>>>    spark.read.load(hdfs_path))
>>>>>>>    At this point, Spark UI shows 23 partitions being cached,
taking
>>>>>>>    up approx 140-280MB on heap memory space per executors,
>>>>>>>    remaining (1400-1700MB on heap memory)
>>>>>>>
>>>>>>>
>>>>>>> On Mon, 12 Apr 2021 at 16:01, Mich Talebzadeh <
>>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>>
>>>>>>>> it should read
>>>>>>>>
>>>>>>>> How many partitions did you use in your groupBy (account,product,
>>>>>>>> *partion_id*)?
>>>>>>>>
>>>>>>>>
>>>>>>>> Try increasing it with some randomised
>>>>>>>> uniformly distributed integer between  0-*bins* where...
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>    view my Linkedin profile
>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>> for any loss, damage or destruction of data or any other
property which may
>>>>>>>> arise from relying on this email's technical content is explicitly
>>>>>>>> disclaimed. The author will in no case be liable for any
monetary damages
>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Mon, 12 Apr 2021 at 14:57, Mich Talebzadeh <
>>>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>>>
>>>>>>>>> Sounds like there is mileage in salting. Other parameters
may play
>>>>>>>>> a role as well.
>>>>>>>>>
>>>>>>>>> How many partitions did you use in your groupBy (account,product,
>>>>>>>>> salt)?
>>>>>>>>>
>>>>>>>>> It may not be large enough.
>>>>>>>>>
>>>>>>>>> Try increasing it with some randomised
>>>>>>>>> uniformly distributed integer between  0-n where
>>>>>>>>>
>>>>>>>>> bins = 20   ## play around with this number
>>>>>>>>>
>>>>>>>>> df2 = df.withColumn("salt", (rand * bins ).cast("integer"))
>>>>>>>>>
>>>>>>>>> then
>>>>>>>>>
>>>>>>>>> groupBy(account,product,salt)
>>>>>>>>>
>>>>>>>>> Try this and change bins as needed
>>>>>>>>>
>>>>>>>>> HTH
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>    view my Linkedin profile
>>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> *Disclaimer:* Use it at your own risk. Any and all responsibility
>>>>>>>>> for any loss, damage or destruction of data or any other
property which may
>>>>>>>>> arise from relying on this email's technical content
is explicitly
>>>>>>>>> disclaimed. The author will in no case be liable for
any monetary damages
>>>>>>>>> arising from such loss, damage or destruction.
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Mon, 12 Apr 2021 at 09:44, András Kolbert <
>>>>>>>>> kolbertandras@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> The cluster is on prem, not in the cloud.
>>>>>>>>>>
>>>>>>>>>> Adding the partition id to the group by helped slightly
but the
>>>>>>>>>> most benefits I got when I took out the following
property:
>>>>>>>>>>
>>>>>>>>>> spark.sql.autoBroadcastJoinThreshold = -1
>>>>>>>>>>
>>>>>>>>>> When I have this on, the process caches the big dataframe
to a
>>>>>>>>>> couple of executors, and that makes them very vulnerable
for OOM.
>>>>>>>>>>
>>>>>>>>>> When the autoBroadcastJoin is on, the executors still
>>>>>>>>>> periodically die. My assumption is that having autoBroadcastJOin
on, means
>>>>>>>>>> that during processing Spark broadcasts the smaller
dataframes each time to
>>>>>>>>>> their memory, without deleting them from there and
gradually my executors
>>>>>>>>>> die with OOM.
>>>>>>>>>>
>>>>>>>>>> This is after 48minutes runtime, 13 dead executors.
>>>>>>>>>>
>>>>>>>>>> [image: image.png]
>>>>>>>>>>
>>>>>>>>>> [image: image.png]
>>>>>>>>>>
>>>>>>>>>> I tried to broadcast(dataframe) manually, by specifing
broadcast
>>>>>>>>>> explicitly, and then unpersisting when it changes.
Does not seem to work.
>>>>>>>>>>
>>>>>>>>>> Any idea what i should do?
>>>>>>>>>>
>>>>>>>>>> Other parameters that I specified:
>>>>>>>>>> [image: image.png]
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Mon, 12 Apr 2021 at 10:02, Gourav Sengupta <
>>>>>>>>>> gourav.sengupta@gmail.com> wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi,
>>>>>>>>>>>
>>>>>>>>>>> looks like you have answered some questions whcih
I generally
>>>>>>>>>>> ask. Another thing, can you please let me know
the environment? Is it AWS,
>>>>>>>>>>> GCP, Azure, Databricks, HDP, etc?
>>>>>>>>>>>
>>>>>>>>>>> Regards,
>>>>>>>>>>> Gourav
>>>>>>>>>>>
>>>>>>>>>>> On Sun, Apr 11, 2021 at 8:39 AM András Kolbert
<
>>>>>>>>>>> kolbertandras@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Hi,
>>>>>>>>>>>>
>>>>>>>>>>>> Sure!
>>>>>>>>>>>>
>>>>>>>>>>>> Application:
>>>>>>>>>>>> - Spark version 2.4
>>>>>>>>>>>> - Kafka Stream (DStream, from a kafka 0.8
brokers)
>>>>>>>>>>>> - 7 executors, 2cores, 3700M memory size
>>>>>>>>>>>>
>>>>>>>>>>>> Logic:
>>>>>>>>>>>> - Process initialises a dataframe that contains
metrics for an
>>>>>>>>>>>> account/product metrics (e.g. {"account":A,
"product": X123, "metric"; 51}
>>>>>>>>>>>> - After initialisation, the dataframe is
persisted on HDFS
>>>>>>>>>>>> (dataframe is around 1GB total size in memory)
>>>>>>>>>>>> - Streaming:
>>>>>>>>>>>> - each bach, processes incoming data, unions
the main dataframe
>>>>>>>>>>>> with the new account/product/metric interaction
dataframe, aggregates the
>>>>>>>>>>>> total, and then persist on HDFS again (each
batch we save the total
>>>>>>>>>>>> dataframe again)
>>>>>>>>>>>> - The screenshot I sent earlier, was after
this aggregation,
>>>>>>>>>>>> and how all the data seems to be ended up
on the same executor. That could
>>>>>>>>>>>> explain why the executor periodically dies
with OOM.
>>>>>>>>>>>>
>>>>>>>>>>>> Mich, I hope this provides extra information
:)
>>>>>>>>>>>>
>>>>>>>>>>>> Thanks
>>>>>>>>>>>> Andras
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>> On Sat, 10 Apr 2021 at 16:42, Mich Talebzadeh
<
>>>>>>>>>>>> mich.talebzadeh@gmail.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Can you provide a bit more info please?
>>>>>>>>>>>>>
>>>>>>>>>>>>> How are you running this job and what
is the streaming
>>>>>>>>>>>>> framework (kafka, files etc)?
>>>>>>>>>>>>>
>>>>>>>>>>>>> HTH
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> Mich
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>    view my Linkedin profile
>>>>>>>>>>>>> <https://www.linkedin.com/in/mich-talebzadeh-ph-d-5205b2/>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> *Disclaimer:* Use it at your own risk.
Any and all
>>>>>>>>>>>>> responsibility for any loss, damage or
destruction of data or any other
>>>>>>>>>>>>> property which may arise from relying
on this email's technical content is
>>>>>>>>>>>>> explicitly disclaimed. The author will
in no case be liable for any
>>>>>>>>>>>>> monetary damages arising from such loss,
damage or destruction.
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Sat, 10 Apr 2021 at 14:28, András
Kolbert <
>>>>>>>>>>>>> kolbertandras@gmail.com> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>>> hi,
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> I have a streaming job and quite
often executors die (due to
>>>>>>>>>>>>>> memory errors/ "unable to find location
for shuffle etc) during the
>>>>>>>>>>>>>> processing. I started digging and
found that some of the tasks are
>>>>>>>>>>>>>> concentrated to one executor, just
as below:
>>>>>>>>>>>>>> [image: image.png]
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Can this be the reason?
>>>>>>>>>>>>>> Should I repartition the underlying
data before I execute a
>>>>>>>>>>>>>> groupby on the top of it?
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Any advice is welcome
>>>>>>>>>>>>>>
>>>>>>>>>>>>>> Thanks
>>>>>>>>>>>>>> Andras
>>>>>>>>>>>>>>
>>>>>>>>>>>>>

Mime
View raw message