Broadcast size increases with subsequent iterations

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

Broadcast size increases with subsequent iterations

Kalin Stoyanov
Hi all,

I have an iterative algorithm in spark that uses each iteration as the input for the following one, but the size of the data does not change. I am using localCheckpoint to cut the data's lineage (and also facilitate some computations that reuse df-s). However, this runs slower and slower as time goes on, and when I looked at the logs it turned out each job is broadcasting larger and larger amounts of data. I can't figure out why this is happening or how to stop it - with the actual size remaining constant the only thing I can imagine increasing is the lineage data, but that is cut by the checkpoint...

Here's an abridged version of the script:
#in class 1
while (self.t < self.ttarget):
    newSnapshot, timePassed = self.integrator.advance(self.cluster) #calls the below function
    self.cluster = newSnapshot
    self.t += timePassed

    if self.dt_out and self.next_out <= self.t:
        self.snapshot() # this saves the dataframe to disk - Job #3 - it does 1 broadcast
        self.next_out += self.dt_out

#in class 2 - integrator
def advance(self, df_clust):
    df_clust = df_clust.localCheckpoint().repartition(self.nparts, "id") # Job #1 - does one broadcast
    df_F = self.calc_F(df_clust).localCheckpoint() # Job #2 - does two broadcasts
    df_v, df_r = self.step_v(df_clust, df_F), self.step_r(
        df_clust, df_F)

    df_clust = df_r.join(df_v, "id")

    return (df_clust, self.dt)

When I checked the logs, as expected they fall into a repeating pattern of the 3 jobs (I'm saving to disk on every iteration so it's simpler), that look identical for every iteration. However, the size of ALL broadcasts is increasing over time - for example

broadcast_1.png
broadcast_2.png
I'd really appreciate any insight into what's causing this..

Regards,
Kalin

Reply | Threaded
Open this post in threaded view
|

Re: Broadcast size increases with subsequent iterations

Kalin Stoyanov
Hi all,

OK I figured it out. The above script does not do checkpoints properly and df sizes increase even though the DAGs look correct. I am not really sure why this happens but doing the checkpoints on separate rows without other operations fixed it:

#in class 1
while (self.t < self.ttarget):
    newSnapshot, timePassed = self.integrator.advance(self.cluster) #calls the below function
    self.cluster = newSnapshot
    self.t += timePassed

    if self.dt_out and self.next_out <= self.t:
        self.snapshot() # this saves the dataframe to disk - Job #3 - it does 1 broadcast
        self.next_out += self.dt_out

#in class 2 - integrator
def advance(self, df_clust):
    df_clust = df_clust.repartition(self.nparts, "id")
    df_clust = df_clust.localCheckpoint()
    df_F = self.calc_F(df_clust)
    df_F = df_F.localCheckpoint()
    df_v, df_r = self.step_v(df_clust, df_F), self.step_r(
        df_clust, df_F)

    df_clust = df_r.join(df_v, "id")

    return (df_clust, self.dt)

Regards,
Kalin

On Fri, Dec 4, 2020 at 1:59 PM Kalin Stoyanov <[hidden email]> wrote:
Hi all,

I have an iterative algorithm in spark that uses each iteration as the input for the following one, but the size of the data does not change. I am using localCheckpoint to cut the data's lineage (and also facilitate some computations that reuse df-s). However, this runs slower and slower as time goes on, and when I looked at the logs it turned out each job is broadcasting larger and larger amounts of data. I can't figure out why this is happening or how to stop it - with the actual size remaining constant the only thing I can imagine increasing is the lineage data, but that is cut by the checkpoint...

Here's an abridged version of the script:
#in class 1
while (self.t < self.ttarget):
    newSnapshot, timePassed = self.integrator.advance(self.cluster) #calls the below function
    self.cluster = newSnapshot
    self.t += timePassed

    if self.dt_out and self.next_out <= self.t:
        self.snapshot() # this saves the dataframe to disk - Job #3 - it does 1 broadcast
        self.next_out += self.dt_out

#in class 2 - integrator
def advance(self, df_clust):
    df_clust = df_clust.localCheckpoint().repartition(self.nparts, "id") # Job #1 - does one broadcast
    df_F = self.calc_F(df_clust).localCheckpoint() # Job #2 - does two broadcasts
    df_v, df_r = self.step_v(df_clust, df_F), self.step_r(
        df_clust, df_F)

    df_clust = df_r.join(df_v, "id")

    return (df_clust, self.dt)

When I checked the logs, as expected they fall into a repeating pattern of the 3 jobs (I'm saving to disk on every iteration so it's simpler), that look identical for every iteration. However, the size of ALL broadcasts is increasing over time - for example

broadcast_1.png
broadcast_2.png
I'd really appreciate any insight into what's causing this..

Regards,
Kalin