Fwd: Some Questions & Doubts regarding Spark process

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

Fwd: Some Questions & Doubts regarding Spark process

vinay Bajaj
I am attaching the Spark process web Info screenshot, have a look at screenshot.

1) For A single Map operator why it shows multiple complete Stages, with same information.
2) As you can see the Number of Complete workers is more than Maximum workers (2931/2339). Can you please tell me why it shows like that ??
3) How a stage is designed in spark As you can see my code After first Map with groupByKey and filter I am running one more Map then filter then Count But this spark Combined these three stages and Named it as Count (you can see in ScreenShot attached). Can you please explain How does it combine stages and what is the logic or idea behind this??

JavaRDD<String> lines = sc.textFile(path);
        logger.info("Read file successfully");

        //System.out.println("count = " + lines.count());

        JavaPairRDD<String, List<String>> data = lines.map(new PairFunction<String,String,String>() {
            public Tuple2<String, String> call(String s) {
                //logger.info("Extracting Domain Name: " + s);
                return new Tuple2<String,String>(getDomainName(s),s);
        }).groupByKey().filter(new Function<Tuple2<String, List<String>>, Boolean>() {
            public Boolean call(Tuple2<String, List<String>> stringListTuple2) throws Exception {
                String domainName = stringListTuple2._1;
                if (DOMAIN_VALIDATOR.isValid(domainName)) {
                    return true;
                return false;

        logger.info("Halfway processed!!");

        JavaRDD<Tuple2<String, String>> processed = data.map(new Function<Tuple2<String, List<String>>, Tuple2<String, String>>() {
            public Tuple2<String, String> call(Tuple2<String, List<String>> stringListTuple2) throws Exception {
                String domainName = stringListTuple2._1;
                List<String> values = stringListTuple2._2;

                //logger.log(INFO, "Parsing domain: " + domainName);
                DomainStats statistics = new DomainStats(domainName);

                Record record = new Record();

                for (String entry : values) {
                    try {
                        Parser.parseLogEntry(entry, record);
                    } catch (TaskException e) {
                        logger.warn("Null entries in parsing: " + entry, e);
                    } catch (ParseException e) {
                        logger.warn("Parse Exception in formatting date: " + entry, e);
                    } catch (ContryUnavailableException e) {
                        logger.error("Country unavailable of IP: " + record.getIp(), e);
                    } catch (Exception e) {
                        logger.error("Unknown error in processing: " + entry + e.getStackTrace(), e);

                if (statistics.getUSStatistics().getViews() + statistics.getNonUSStatistics().getViews() == 0) {
                    return new Tuple2<String, String>(domainName, "0");

                return new Tuple2<String, String>(domainName, "1");

        JavaRDD<Tuple2<String,String>> tt = processed.filter(new Function<Tuple2<String, String>, Boolean>() {
            public Boolean call(Tuple2<String, String> stringStringTuple2) throws Exception {
                if (stringStringTuple2._2.equals( "1" ))
                    return true;
                return false;

        logger.info("All domains Processed!!!");

Vinay Bajaj

spark.png (89K) Download Attachment