abnormal latency when running Spark Streaming

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

abnormal latency when running Spark Streaming

Yingjun Wu
Dear all,

I have implemented a simple Spark streaming application to perform windowing wordcount job. However, it seems that the latency is extremely high, compared with running exactly the same job in Storm. The source code is attached as follows:

public final class MyKafkaWordcountMain {

        public static void main(String[] args) throws Exception {
                String zkClient = "computer:2181";
                String kafkaGroup = "group1";
                String kafkaTopic = "topic1";
                int numThreads = 1;

                int batchDuration = Integer.valueOf(args[0]);
                int windowDuration = Integer.valueOf(args[1]);
                int slideDuration = Integer.valueOf(args[2]);

                SparkConf sparkConf = new SparkConf().setAppName("MyKafkaWordCount");
                JavaStreamingContext jssc = new JavaStreamingContext(sparkConf,
                                new Duration(batchDuration));
                jssc.checkpoint("file///temp/");

                Map<String, Integer> topicMap = new HashMap<String, Integer>();
                topicMap.put(kafkaTopic, numThreads);
                JavaPairReceiverInputDStream<String, String> messages = KafkaUtils
                                .createStream(jssc, zkClient, kafkaGroup, topicMap);

                JavaDStream<String> lines = messages
                                .map(new Function<Tuple2<String, String>, String>() {
                                        @Override
                                        public String call(Tuple2<String, String> tuple2){
                                                return tuple2._2();
                                        }
                });

                //print the number of lines contained in each input RDD.
                lines.count().map(new Function<Long, Long>() {
                        private long totalCount = 0;
                        private long startTime = System.currentTimeMillis();

                        @Override
                        public Long call(Long in) throws Exception {
                                long endTime = System.currentTimeMillis();
                                String mystring = "=======input=======\nstartTime="
                                                + startTime
                                                + ", endTime="
                                                + endTime
                                                + ", elapsedTime="
                                                + (endTime-startTime)*1.0/1000
                                                + "sec, input count"
                                                + in
                                                + "\n======================\n";
                                System.out.println(mystring);
                                return in;
                        }
                }).print();

                JavaDStream<String> words = lines
                                .flatMap(new FlatMapFunction<String, String>() {

                                        @Override
                                        public Iterable<String> call(String x) throws Exception {
                                                // TODO Auto-generated method stub
                                                String[] words = x.split(" ");
// for (String word : words) {
// word = word.replaceAll("[^A-Za-z0-9]", "")
// .toLowerCase();
// }
                                                return Lists.newArrayList(words);
                                        }
                                });

                JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
                                new PairFunction<String, String, Integer>(){
                                        @Override
                                        public Tuple2<String, Integer> call(String s){
                                                return new Tuple2<String, Integer>(s, 1);
                                        }
                                }).reduceByKeyAndWindow(new Function2<Integer, Integer, Integer>(){
                                        @Override
                                        public Integer call(Integer i1, Integer i2){
                                                return i1+i2;
                                        }
                                }, new Function2<Integer, Integer, Integer>(){
                                        @Override
                                        public Integer call(Integer i1, Integer i2){
                                                return i1-i2;
                                        }
                                },
                                new Duration(windowDuration), new Duration(slideDuration));
                wordCounts.checkpoint(new Duration(100000));

                wordCounts.count().map(new Function<Long, Long>() {
                        private long startTime = System.currentTimeMillis();

                        @Override
                        public Long call(Long in) throws Exception {
                                long endTime = System.currentTimeMillis();
                                String mystring = "=====output=======\nstartTime="
                                                + startTime
                                                + ", endTime="
                                                + endTime
                                                + ", elapsedTime="
                                                + (endTime-startTime)*1.0/1000
                                                + "sec\n======================\n";
                                System.out.println(mystring);
                                return in;
                        }
                }).print();
                jssc.start();
                jssc.awaitTermination();
        }
}

I am wondering is there anything wrong with my source code? Or is my method to measure the latency problematic? Thanks.

Regards,
Yingjun
BD
Reply | Threaded
Open this post in threaded view
|

Re: abnormal latency when running Spark Streaming

BD
Hi Yingjun,

Do you see a stable latency or the latency keeps increasing? And could you provide some details about the input data rate/node, batch interval, windowDuration and slideDuration when you see the high latency?