Hi all,
I have an iterative algorithm in spark that uses each iteration as the
input for the following one, but the size of the data does not change. I am
using localCheckpoint to cut the data's lineage (and also facilitate some
computations that reuse dfs). However, this runs slower and slower as time
goes on, and when I looked at the logs it turned out each job is
broadcasting larger and larger amounts of data. I can't figure out why this
is happening or how to stop it  with the actual size remaining constant
the only thing I can imagine increasing is the lineage data, but that is
cut by the checkpoint...
Here's an abridged version of the script:
#in class 1
while (self.t < self.ttarget):
newSnapshot, timePassed = self.integrator.advance(self.cluster) #calls
the below function
self.cluster = newSnapshot
self.t += timePassed
if self.dt_out and self.next_out <= self.t:
self.snapshot() # this saves the dataframe to disk  Job #3  it
does 1 broadcast
self.next_out += self.dt_out
#in class 2  integrator
def advance(self, df_clust):
df_clust = df_clust.localCheckpoint().repartition(self.nparts, "id") #
Job #1  does one broadcast
df_F = self.calc_F(df_clust).localCheckpoint() # Job #2  does two
broadcasts
df_v, df_r = self.step_v(df_clust, df_F), self.step_r(
df_clust, df_F)
df_clust = df_r.join(df_v, "id")
return (df_clust, self.dt)
When I checked the logs, as expected they fall into a repeating pattern of
the 3 jobs (I'm saving to disk on every iteration so it's simpler), that
look identical for every iteration. However, the size of ALL broadcasts is
increasing over time  for example
[image: broadcast_1.png]
[image: broadcast_2.png]
I'd really appreciate any insight into what's causing this..
Regards,
Kalin

Mime 
 Unnamed multipart/related (inline, None, 0 bytes)
 Unnamed multipart/alternative (inline, None, 0 bytes)
