

Hi all,
I have an iterative algorithm in spark that uses each iteration as the input for the following one, but the size of the data does not change. I am using localCheckpoint to cut the data's lineage (and also facilitate some computations that reuse dfs). However, this runs slower and slower as time goes on, and when I looked at the logs it turned out each job is broadcasting larger and larger amounts of data. I can't figure out why this is happening or how to stop it  with the actual size remaining constant the only thing I can imagine increasing is the lineage data, but that is cut by the checkpoint...
Here's an abridged version of the script: #in class 1 while (self.t < self.ttarget): newSnapshot, timePassed = self.integrator.advance(self.cluster) #calls the below function self.cluster = newSnapshot self.t += timePassed
if self.dt_out and self.next_out <= self.t: self.snapshot() # this saves the dataframe to disk  Job #3  it does 1 broadcast self.next_out += self.dt_out
#in class 2  integrator def advance(self, df_clust): df_clust = df_clust.localCheckpoint().repartition(self.nparts, "id") # Job #1  does one broadcast df_F = self.calc_F(df_clust).localCheckpoint() # Job #2  does two broadcasts df_v, df_r = self.step_v(df_clust, df_F), self.step_r( df_clust, df_F)
df_clust = df_r.join(df_v, "id")
return (df_clust, self.dt)
When I checked the logs, as expected they fall into a repeating pattern of the 3 jobs (I'm saving to disk on every iteration so it's simpler), that look identical for every iteration. However, the size of ALL broadcasts is increasing over time  for example
I'd really appreciate any insight into what's causing this..
Regards, Kalin


Hi all,
OK I figured it out. The above script does not do checkpoints properly and df sizes increase even though the DAGs look correct. I am not really sure why this happens but doing the checkpoints on separate rows without other operations fixed it:
#in class 1 while (self.t < self.ttarget): newSnapshot, timePassed = self.integrator.advance(self.cluster) #calls the below function self.cluster = newSnapshot self.t += timePassed
if self.dt_out and self.next_out <= self.t: self.snapshot() # this saves the dataframe to disk  Job #3  it does 1 broadcast self.next_out += self.dt_out
#in class 2  integrator def advance(self, df_clust): df_clust = df_clust.repartition(self.nparts, "id")
df_clust = df_clust.localCheckpoint()
df_F = self.calc_F(df_clust)
df_F =
df_F.localCheckpoint()
df_v, df_r = self.step_v(df_clust, df_F), self.step_r( df_clust, df_F)
df_clust = df_r.join(df_v, "id")
return (df_clust, self.dt)
Regards, Kalin
Hi all,
I have an iterative algorithm in spark that uses each iteration as the input for the following one, but the size of the data does not change. I am using localCheckpoint to cut the data's lineage (and also facilitate some computations that reuse dfs). However, this runs slower and slower as time goes on, and when I looked at the logs it turned out each job is broadcasting larger and larger amounts of data. I can't figure out why this is happening or how to stop it  with the actual size remaining constant the only thing I can imagine increasing is the lineage data, but that is cut by the checkpoint...
Here's an abridged version of the script: #in class 1 while (self.t < self.ttarget): newSnapshot, timePassed = self.integrator.advance(self.cluster) #calls the below function self.cluster = newSnapshot self.t += timePassed
if self.dt_out and self.next_out <= self.t: self.snapshot() # this saves the dataframe to disk  Job #3  it does 1 broadcast self.next_out += self.dt_out
#in class 2  integrator def advance(self, df_clust): df_clust = df_clust.localCheckpoint().repartition(self.nparts, "id") # Job #1  does one broadcast df_F = self.calc_F(df_clust).localCheckpoint() # Job #2  does two broadcasts df_v, df_r = self.step_v(df_clust, df_F), self.step_r( df_clust, df_F)
df_clust = df_r.join(df_v, "id")
return (df_clust, self.dt)
When I checked the logs, as expected they fall into a repeating pattern of the 3 jobs (I'm saving to disk on every iteration so it's simpler), that look identical for every iteration. However, the size of ALL broadcasts is increasing over time  for example
I'd really appreciate any insight into what's causing this..
Regards, Kalin

