spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Kalin Stoyanov <kgs.v...@gmail.com>
Subject Re: Broadcast size increases with subsequent iterations
Date Tue, 08 Dec 2020 11:23:49 GMT
Hi all,

OK I figured it out. The above script does not do checkpoints properly and
df sizes increase even though the DAGs look correct. I am not really sure
why this happens but doing the checkpoints on separate rows without other
operations fixed it:

#in class 1
while (self.t < self.ttarget):
    newSnapshot, timePassed = self.integrator.advance(self.cluster) #calls
the below function
    self.cluster = newSnapshot
    self.t += timePassed

    if self.dt_out and self.next_out <= self.t:
        self.snapshot() # this saves the dataframe to disk - Job #3 - it
does 1 broadcast
        self.next_out += self.dt_out

#in class 2 - integrator
def advance(self, df_clust):
*    df_clust = df_clust.**repartition(self.nparts, "id")*
*    df_clust = df_clust.localCheckpoint() *

*    df_F = self.calc_F(df_clust)*
*    df_F = df_F.localCheckpoint() *
    df_v, df_r = self.step_v(df_clust, df_F), self.step_r(
        df_clust, df_F)

    df_clust = df_r.join(df_v, "id")

    return (df_clust, self.dt)

Regards,
Kalin

On Fri, Dec 4, 2020 at 1:59 PM Kalin Stoyanov <kgs.void@gmail.com> wrote:

> Hi all,
>
> I have an iterative algorithm in spark that uses each iteration as the
> input for the following one, but the size of the data does not change. I am
> using localCheckpoint to cut the data's lineage (and also facilitate some
> computations that reuse df-s). However, this runs slower and slower as time
> goes on, and when I looked at the logs it turned out each job is
> broadcasting larger and larger amounts of data. I can't figure out why this
> is happening or how to stop it - with the actual size remaining constant
> the only thing I can imagine increasing is the lineage data, but that is
> cut by the checkpoint...
>
> Here's an abridged version of the script:
> #in class 1
> while (self.t < self.ttarget):
>     newSnapshot, timePassed = self.integrator.advance(self.cluster) #calls
> the below function
>     self.cluster = newSnapshot
>     self.t += timePassed
>
>     if self.dt_out and self.next_out <= self.t:
>         self.snapshot() # this saves the dataframe to disk - Job #3 - it
> does 1 broadcast
>         self.next_out += self.dt_out
>
> #in class 2 - integrator
> def advance(self, df_clust):
>     df_clust = df_clust.localCheckpoint().repartition(self.nparts, "id") #
> Job #1 - does one broadcast
>     df_F = self.calc_F(df_clust).localCheckpoint() # Job #2 - does two
> broadcasts
>     df_v, df_r = self.step_v(df_clust, df_F), self.step_r(
>         df_clust, df_F)
>
>     df_clust = df_r.join(df_v, "id")
>
>     return (df_clust, self.dt)
>
> When I checked the logs, as expected they fall into a repeating pattern of
> the 3 jobs (I'm saving to disk on every iteration so it's simpler), that
> look identical for every iteration. However, the size of ALL broadcasts is
> increasing over time - for example
>
> [image: broadcast_1.png]
> [image: broadcast_2.png]
> I'd really appreciate any insight into what's causing this..
>
> Regards,
> Kalin
>
>

Mime
View raw message