FlatMapGroupsWithStateFunction is called thrice - Production use case.

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

FlatMapGroupsWithStateFunction is called thrice - Production use case.

Robin Kuttaiah
Hello,

I have a use case where I need to read events(non correlated) from a source kafka topic, then correlate and push forward to another target topic.

I use spark structured streaming with FlatMapGroupsWithStateFunction along with   GroupStateTimeout.ProcessingTimeTimeout() . After each timeout, I apply some correlation logic on group events and push forward correlated events to another topic(via ForEachBatch). Non correlated events are stored in the state until they are correlated in a future set of events.

With this scenario, when I push a single event to source topic, I see it comes three times to FlatMapGroupsWithStateFunction(In separate timestamp) but only once in ForEachBatch processor(which is good).

Same event coming thrice in FlatMapGroupsWithStateFunction is a problem as it causes issues with my correlation logic.

Can someone help me to understand why this is seen thrice in FlatMapGroupsWithStateFunction?.

Code snippets are shown below. Please let me know what is missing and how can i solve this,

thanks,
Robin Kuttaiah

StreamQuery

    Dataset<MilestoneEvent> sessionUpdates = null;

    FlatMapGroupsWithStateFunction<String, Row, IdentifierConnector, MilestoneEvent> idstateUpdateFunction =
        new FlatMapIdFedGroupFunction(m_InsightEvent, m_InsightDeployment);

    try {

      sessionUpdates = idFedKafkaEvents
          .groupByKey(
              new MapFunction<Row, String>() {
                private static final long serialVersionUID = -797571731893988577L;

                @Override public String call(Row event) {
                  return event.getAs("EVENT_MODEL_ID_COL");
                }
              }, Encoders.STRING())
              .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
                  Encoders.bean(IdentifierConnector.class), Encoders.bean(MilestoneEvent.class),
                  GroupStateTimeout.ProcessingTimeTimeout());

    } catch (Exception oException) {
      //log and throw back exception
    }

    ForeachBatchProcessor oForeachBatch = new ForeachBatchProcessor(m_InsightDeployment, m_InsightEvent, m_strQueryName);

    DataStreamWriter<MilestoneEvent> events = sessionUpdates
        .writeStream()
        .queryName(queryName)
        .outputMode("append")
        .trigger(Trigger.ProcessingTime("
5 seconds"))
        .option("checkpointLocation", checkpointLocation)
        .foreachBatch(oForeachBatch);


FlatMapGroupsWithStateFunction:

public class FlatMapIdFedGroupFunction implements FlatMapGroupsWithStateFunction<String, Row, IdentifierConnector, MilestoneEvent> {

  public FlatMapIdFedGroupFunction(InsightEvent iEvent, InsightDeployment iDeployment) {
  }

  @Override
  public Iterator<MilestoneEvent> call(String key, Iterator<Row> events, GroupState<IdentifierConnector> state)
      throws Exception {

    List<MilestoneEvent> outputEvents = new ArrayList<MilestoneEvent>();
    IdentifierConnector session = null;
    IdFederationUtil.write("FlatMapIdFedGroupFunction invoked for " + key+"  "+System.currentTimeMillis()); //Called thrice

    if (!state.exists() ) {
      session = new IdentifierConnector();
    }  else {
      session = state.get();
    }

    while (events.hasNext()) {
      Row event = events.next();
      MilestoneEvent mEventCurr = IdFederationUtil.getMilestoneEvent(event, insightEvent);
      outputEvents.add(mEventCurr);
      IdFederationUtil.write(".........."+mEventCurr.getMilestoneId()); //Called thrice
      break;
    }
    return outputEvents.iterator();
  }
}


ForEachBatchFunction:

public class ForeachBatchProcessor implements VoidFunction2<Dataset<MilestoneEvent>, Long>, Serializable {

  private static final long serialVersionUID = 1L;

  public ForeachBatchProcessor(InsightDeployment in_oInsightDeployment,
      InsightEvent in_oInsightEvent, String in_strQueryName) {
\  }

  public void call(Dataset<MilestoneEvent> in_Rows, Long in_lBatchID)
      throws Exception {
    if (in_Rows.count() == 0L) {
      return;
    }
    IdFederationUtil.write("Processing batch " + in_lBatchID + "  "+ in_Rows.count());
    List<MilestoneEvent> events = in_Rows.collectAsList();
    for(MilestoneEvent m: events) {
      IdFederationUtil.write("......BATCH "+m.getMilestoneId());
    }
  }

}


Reply | Threaded
Open this post in threaded view
|

Re: FlatMapGroupsWithStateFunction is called thrice - Production use case.

Jungtaek Lim-2
Hi,

Could you please provide the Spark version? 

Also it would be pretty much helpful if you could provide a simple reproducer, like placing your reproducer which can simply be built (mvn or gradle or sbt) into your Github repository, plus the set of input data to see the behavior. Worth to know that others aren't interested in your own code even if they are interested in the problematic behavior itself. It'd be nice if you can minimize the hurdle on debugging.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Thu, Mar 11, 2021 at 4:54 PM Kuttaiah Robin <[hidden email]> wrote:
Hello,

I have a use case where I need to read events(non correlated) from a source kafka topic, then correlate and push forward to another target topic.

I use spark structured streaming with FlatMapGroupsWithStateFunction along with   GroupStateTimeout.ProcessingTimeTimeout() . After each timeout, I apply some correlation logic on group events and push forward correlated events to another topic(via ForEachBatch). Non correlated events are stored in the state until they are correlated in a future set of events.

With this scenario, when I push a single event to source topic, I see it comes three times to FlatMapGroupsWithStateFunction(In separate timestamp) but only once in ForEachBatch processor(which is good).

Same event coming thrice in FlatMapGroupsWithStateFunction is a problem as it causes issues with my correlation logic.

Can someone help me to understand why this is seen thrice in FlatMapGroupsWithStateFunction?.

Code snippets are shown below. Please let me know what is missing and how can i solve this,

thanks,
Robin Kuttaiah

StreamQuery

    Dataset<MilestoneEvent> sessionUpdates = null;

    FlatMapGroupsWithStateFunction<String, Row, IdentifierConnector, MilestoneEvent> idstateUpdateFunction =
        new FlatMapIdFedGroupFunction(m_InsightEvent, m_InsightDeployment);

    try {

      sessionUpdates = idFedKafkaEvents
          .groupByKey(
              new MapFunction<Row, String>() {
                private static final long serialVersionUID = -797571731893988577L;

                @Override public String call(Row event) {
                  return event.getAs("EVENT_MODEL_ID_COL");
                }
              }, Encoders.STRING())
              .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
                  Encoders.bean(IdentifierConnector.class), Encoders.bean(MilestoneEvent.class),
                  GroupStateTimeout.ProcessingTimeTimeout());

    } catch (Exception oException) {
      //log and throw back exception
    }

    ForeachBatchProcessor oForeachBatch = new ForeachBatchProcessor(m_InsightDeployment, m_InsightEvent, m_strQueryName);

    DataStreamWriter<MilestoneEvent> events = sessionUpdates
        .writeStream()
        .queryName(queryName)
        .outputMode("append")
        .trigger(Trigger.ProcessingTime("
5 seconds"))
        .option("checkpointLocation", checkpointLocation)
        .foreachBatch(oForeachBatch);


FlatMapGroupsWithStateFunction:

public class FlatMapIdFedGroupFunction implements FlatMapGroupsWithStateFunction<String, Row, IdentifierConnector, MilestoneEvent> {

  public FlatMapIdFedGroupFunction(InsightEvent iEvent, InsightDeployment iDeployment) {
  }

  @Override
  public Iterator<MilestoneEvent> call(String key, Iterator<Row> events, GroupState<IdentifierConnector> state)
      throws Exception {

    List<MilestoneEvent> outputEvents = new ArrayList<MilestoneEvent>();
    IdentifierConnector session = null;
    IdFederationUtil.write("FlatMapIdFedGroupFunction invoked for " + key+"  "+System.currentTimeMillis()); //Called thrice

    if (!state.exists() ) {
      session = new IdentifierConnector();
    }  else {
      session = state.get();
    }

    while (events.hasNext()) {
      Row event = events.next();
      MilestoneEvent mEventCurr = IdFederationUtil.getMilestoneEvent(event, insightEvent);
      outputEvents.add(mEventCurr);
      IdFederationUtil.write(".........."+mEventCurr.getMilestoneId()); //Called thrice
      break;
    }
    return outputEvents.iterator();
  }
}


ForEachBatchFunction:

public class ForeachBatchProcessor implements VoidFunction2<Dataset<MilestoneEvent>, Long>, Serializable {

  private static final long serialVersionUID = 1L;

  public ForeachBatchProcessor(InsightDeployment in_oInsightDeployment,
      InsightEvent in_oInsightEvent, String in_strQueryName) {
\  }

  public void call(Dataset<MilestoneEvent> in_Rows, Long in_lBatchID)
      throws Exception {
    if (in_Rows.count() == 0L) {
      return;
    }
    IdFederationUtil.write("Processing batch " + in_lBatchID + "  "+ in_Rows.count());
    List<MilestoneEvent> events = in_Rows.collectAsList();
    for(MilestoneEvent m: events) {
      IdFederationUtil.write("......BATCH "+m.getMilestoneId());
    }
  }

}


Reply | Threaded
Open this post in threaded view
|

Re: FlatMapGroupsWithStateFunction is called thrice - Production use case.

Robin Kuttaiah
Hi Jungtaek,

Thanks for looking into it. We use spark-2.4.3.
I removed most of our code and pasted here just to understand the flow.
Sorry for the delay. I would try to provide a simple reproducer when I find time, but this is really hurting us.

Another observation I see is basically only if I add some value into outputEvents. List in FlatMapIdFedGroupFunction I see this is called multiple times otherwise it's only once.

By any chance you can provide me on how to debug this and any guesses what could be wrong so that I can focus on debugging on the right path.

thanks
Robin Kuttaiah

On Fri, Mar 12, 2021 at 8:43 AM Jungtaek Lim <[hidden email]> wrote:
Hi,

Could you please provide the Spark version? 

Also it would be pretty much helpful if you could provide a simple reproducer, like placing your reproducer which can simply be built (mvn or gradle or sbt) into your Github repository, plus the set of input data to see the behavior. Worth to know that others aren't interested in your own code even if they are interested in the problematic behavior itself. It'd be nice if you can minimize the hurdle on debugging.

Thanks,
Jungtaek Lim (HeartSaVioR)

On Thu, Mar 11, 2021 at 4:54 PM Kuttaiah Robin <[hidden email]> wrote:
Hello,

I have a use case where I need to read events(non correlated) from a source kafka topic, then correlate and push forward to another target topic.

I use spark structured streaming with FlatMapGroupsWithStateFunction along with   GroupStateTimeout.ProcessingTimeTimeout() . After each timeout, I apply some correlation logic on group events and push forward correlated events to another topic(via ForEachBatch). Non correlated events are stored in the state until they are correlated in a future set of events.

With this scenario, when I push a single event to source topic, I see it comes three times to FlatMapGroupsWithStateFunction(In separate timestamp) but only once in ForEachBatch processor(which is good).

Same event coming thrice in FlatMapGroupsWithStateFunction is a problem as it causes issues with my correlation logic.

Can someone help me to understand why this is seen thrice in FlatMapGroupsWithStateFunction?.

Code snippets are shown below. Please let me know what is missing and how can i solve this,

thanks,
Robin Kuttaiah

StreamQuery

    Dataset<MilestoneEvent> sessionUpdates = null;

    FlatMapGroupsWithStateFunction<String, Row, IdentifierConnector, MilestoneEvent> idstateUpdateFunction =
        new FlatMapIdFedGroupFunction(m_InsightEvent, m_InsightDeployment);

    try {

      sessionUpdates = idFedKafkaEvents
          .groupByKey(
              new MapFunction<Row, String>() {
                private static final long serialVersionUID = -797571731893988577L;

                @Override public String call(Row event) {
                  return event.getAs("EVENT_MODEL_ID_COL");
                }
              }, Encoders.STRING())
              .flatMapGroupsWithState(idstateUpdateFunction, OutputMode.Append(),
                  Encoders.bean(IdentifierConnector.class), Encoders.bean(MilestoneEvent.class),
                  GroupStateTimeout.ProcessingTimeTimeout());

    } catch (Exception oException) {
      //log and throw back exception
    }

    ForeachBatchProcessor oForeachBatch = new ForeachBatchProcessor(m_InsightDeployment, m_InsightEvent, m_strQueryName);

    DataStreamWriter<MilestoneEvent> events = sessionUpdates
        .writeStream()
        .queryName(queryName)
        .outputMode("append")
        .trigger(Trigger.ProcessingTime("
5 seconds"))
        .option("checkpointLocation", checkpointLocation)
        .foreachBatch(oForeachBatch);


FlatMapGroupsWithStateFunction:

public class FlatMapIdFedGroupFunction implements FlatMapGroupsWithStateFunction<String, Row, IdentifierConnector, MilestoneEvent> {

  public FlatMapIdFedGroupFunction(InsightEvent iEvent, InsightDeployment iDeployment) {
  }

  @Override
  public Iterator<MilestoneEvent> call(String key, Iterator<Row> events, GroupState<IdentifierConnector> state)
      throws Exception {

    List<MilestoneEvent> outputEvents = new ArrayList<MilestoneEvent>();
    IdentifierConnector session = null;
    IdFederationUtil.write("FlatMapIdFedGroupFunction invoked for " + key+"  "+System.currentTimeMillis()); //Called thrice

    if (!state.exists() ) {
      session = new IdentifierConnector();
    }  else {
      session = state.get();
    }

    while (events.hasNext()) {
      Row event = events.next();
      MilestoneEvent mEventCurr = IdFederationUtil.getMilestoneEvent(event, insightEvent);
      outputEvents.add(mEventCurr);
      IdFederationUtil.write(".........."+mEventCurr.getMilestoneId()); //Called thrice
      break;
    }
    return outputEvents.iterator();
  }
}


ForEachBatchFunction:

public class ForeachBatchProcessor implements VoidFunction2<Dataset<MilestoneEvent>, Long>, Serializable {

  private static final long serialVersionUID = 1L;

  public ForeachBatchProcessor(InsightDeployment in_oInsightDeployment,
      InsightEvent in_oInsightEvent, String in_strQueryName) {
\  }

  public void call(Dataset<MilestoneEvent> in_Rows, Long in_lBatchID)
      throws Exception {
    if (in_Rows.count() == 0L) {
      return;
    }
    IdFederationUtil.write("Processing batch " + in_lBatchID + "  "+ in_Rows.count());
    List<MilestoneEvent> events = in_Rows.collectAsList();
    for(MilestoneEvent m: events) {
      IdFederationUtil.write("......BATCH "+m.getMilestoneId());
    }
  }

}