Hi all,
OK I figured it out. The above script does not do checkpoints properly and
df sizes increase even though the DAGs look correct. I am not really sure
why this happens but doing the checkpoints on separate rows without other
operations fixed it:
#in class 1
while (self.t < self.ttarget):
newSnapshot, timePassed = self.integrator.advance(self.cluster) #calls
the below function
self.cluster = newSnapshot
self.t += timePassed
if self.dt_out and self.next_out <= self.t:
self.snapshot() # this saves the dataframe to disk  Job #3  it
does 1 broadcast
self.next_out += self.dt_out
#in class 2  integrator
def advance(self, df_clust):
* df_clust = df_clust.**repartition(self.nparts, "id")*
* df_clust = df_clust.localCheckpoint() *
* df_F = self.calc_F(df_clust)*
* df_F = df_F.localCheckpoint() *
df_v, df_r = self.step_v(df_clust, df_F), self.step_r(
df_clust, df_F)
df_clust = df_r.join(df_v, "id")
return (df_clust, self.dt)
Regards,
Kalin
On Fri, Dec 4, 2020 at 1:59 PM Kalin Stoyanov <kgs.void@gmail.com> wrote:
> Hi all,
>
> I have an iterative algorithm in spark that uses each iteration as the
> input for the following one, but the size of the data does not change. I am
> using localCheckpoint to cut the data's lineage (and also facilitate some
> computations that reuse dfs). However, this runs slower and slower as time
> goes on, and when I looked at the logs it turned out each job is
> broadcasting larger and larger amounts of data. I can't figure out why this
> is happening or how to stop it  with the actual size remaining constant
> the only thing I can imagine increasing is the lineage data, but that is
> cut by the checkpoint...
>
> Here's an abridged version of the script:
> #in class 1
> while (self.t < self.ttarget):
> newSnapshot, timePassed = self.integrator.advance(self.cluster) #calls
> the below function
> self.cluster = newSnapshot
> self.t += timePassed
>
> if self.dt_out and self.next_out <= self.t:
> self.snapshot() # this saves the dataframe to disk  Job #3  it
> does 1 broadcast
> self.next_out += self.dt_out
>
> #in class 2  integrator
> def advance(self, df_clust):
> df_clust = df_clust.localCheckpoint().repartition(self.nparts, "id") #
> Job #1  does one broadcast
> df_F = self.calc_F(df_clust).localCheckpoint() # Job #2  does two
> broadcasts
> df_v, df_r = self.step_v(df_clust, df_F), self.step_r(
> df_clust, df_F)
>
> df_clust = df_r.join(df_v, "id")
>
> return (df_clust, self.dt)
>
> When I checked the logs, as expected they fall into a repeating pattern of
> the 3 jobs (I'm saving to disk on every iteration so it's simpler), that
> look identical for every iteration. However, the size of ALL broadcasts is
> increasing over time  for example
>
> [image: broadcast_1.png]
> [image: broadcast_2.png]
> I'd really appreciate any insight into what's causing this..
>
> Regards,
> Kalin
>
>
