Where/How to call jssc.Stop() for Spark Streaming based on a message from Kafka

Previous Topic Next Topic
classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
Report Content as Inappropriate

Where/How to call jssc.Stop() for Spark Streaming based on a message from Kafka

This post has NOT been accepted by the mailing list yet.
This post was updated on .
I have spark Stream which receives data from Kafka topics and processes it. I want to stop spark stream based on a certain message that comes in from a different Kafka topic. To implement this I am following the approach mentioned by @TD here

Read Messages from driver program:
    Map<String, String> kafkaParams = new HashMap<>();
    kafkaParams.put("metadata.broker.list", kafkaProperties.getProperty("bootstrap.servers"));
    Set<String> commandTopics = new HashSet<>();
    JavaPairInputDStream<String, String> incomingCommand = StreamUtil.readStreamFromKafka(jssc, kafkaParams,
Note: readStreamFromKafka is a wrapper around KafkaUtils.createDirectStream.

I have a transformation function which reads the messages and looks for a particular string in the message, I want to terminate the stream once that message comes in, I want to return true from transformCommand when that message comes in and then terminate the stream, else value of variable ret will be false and stream will function normally.

    //Transform Method
      Boolean ret = transformCommand(incomingCommand);

    /*ISSUE: The above line is executed only once during the start of the stream and not while messages are being received, thus value of variable ret never changes and stream does not stops.*/
      System.out.println("----> Ret from tranformation "+ ret);


Transformation of Events:
    public static boolean transformCommand(JavaPairInputDStream<String, String> incomingCommand){
    StreamAction streamAction = new StreamAction();
    incomingCommand.foreachRDD(agg -> {
    agg.foreachPartition(aggActivity -> {
    aggActivity.forEachRemaining(activity -> {
    try {
    if(StringUtils.endsWith("ABC",StreamUtil.getValueForJsonNode("Action", activity._2))){
    } catch (IllegalAccessException | InvocationTargetException | NoSuchMethodException
    | IOException e) {
    //value of terminateCurrentSession is false here, but this should be true as I am setting this value above.
        System.out.println("---> " + streamAction.getTerminateCurrentSession());
    return streamAction.getTerminateCurrentSession();

What will be the best way to handle this scenario in spark framework, since I cannot stop stream from within the transformation method as it runs on executor?

Any help or direction would be great, thanks in advance!