Fwd: Some Questions & Doubts regarding Spark process

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Fwd: Some Questions & Doubts regarding Spark process

vinay Bajaj

I am attaching the Spark process web Info screenshot, have a look at screenshot.

1) For A single Map operator why it shows multiple complete Stages, with same information.
2) As you can see the Number of Complete workers is more than Maximum workers (2931/2339). Can you please tell me why it shows like that ??
3) How a stage is designed in spark As you can see my code After first Map with groupByKey and filter I am running one more Map then filter then Count But this spark Combined these three stages and Named it as Count (you can see in ScreenShot attached). Can you please explain How does it combine stages and what is the logic or idea behind this??

Code:
JavaRDD<String> lines = sc.textFile(path);
        logger.info("Read file successfully");

        //System.out.println("count = " + lines.count());

        JavaPairRDD<String, List<String>> data = lines.map(new PairFunction<String,String,String>() {
            @Override
            public Tuple2<String, String> call(String s) {
                //logger.info("Extracting Domain Name: " + s);
                return new Tuple2<String,String>(getDomainName(s),s);
            }
        }).groupByKey().filter(new Function<Tuple2<String, List<String>>, Boolean>() {
            @Override
            public Boolean call(Tuple2<String, List<String>> stringListTuple2) throws Exception {
                String domainName = stringListTuple2._1;
                if (DOMAIN_VALIDATOR.isValid(domainName)) {
                    return true;
                }
                return false;
            }
        });

        logger.info("Halfway processed!!");

        JavaRDD<Tuple2<String, String>> processed = data.map(new Function<Tuple2<String, List<String>>, Tuple2<String, String>>() {
            @Override
            public Tuple2<String, String> call(Tuple2<String, List<String>> stringListTuple2) throws Exception {
                String domainName = stringListTuple2._1;
                List<String> values = stringListTuple2._2;

                //logger.log(INFO, "Parsing domain: " + domainName);
                DomainStats statistics = new DomainStats(domainName);

                Record record = new Record();

                for (String entry : values) {
                    try {
                        Parser.parseLogEntry(entry, record);
                        record.validate();
                        statistics.process(record);
                    } catch (TaskException e) {
                        logger.warn("Null entries in parsing: " + entry, e);
                    } catch (ParseException e) {
                        logger.warn("Parse Exception in formatting date: " + entry, e);
                    } catch (ContryUnavailableException e) {
                        logger.error("Country unavailable of IP: " + record.getIp(), e);
                    } catch (Exception e) {
                        logger.error("Unknown error in processing: " + entry + e.getStackTrace(), e);
                    }
                }

                if (statistics.getUSStatistics().getViews() + statistics.getNonUSStatistics().getViews() == 0) {
                    return new Tuple2<String, String>(domainName, "0");
                }
                statistics.setRegionURLStats();

                return new Tuple2<String, String>(domainName, "1");
            }
        });

        JavaRDD<Tuple2<String,String>> tt = processed.filter(new Function<Tuple2<String, String>, Boolean>() {
            @Override
            public Boolean call(Tuple2<String, String> stringStringTuple2) throws Exception {
                if (stringStringTuple2._2.equals( "1" ))
                    return true;
                return false;
            }
        });

        System.out.println(tt.count());
        logger.info("All domains Processed!!!");



spark.png (89K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Fwd: Some Questions & Doubts regarding Spark process

Guillaume Pitel
Hi

I am attaching the Spark process web Info screenshot, have a look at screenshot.

1) For A single Map operator why it shows multiple complete Stages, with same information.
If you don't cache your result and it's needed several time in the computation, Spark recomputes the Map, and thus it appears several times.
2) As you can see the Number of Complete workers is more than Maximum workers (2931/2339). Can you please tell me why it shows like that ??
Usually it happens when one of your Executor dies (usually from serious memory exhaustion, but many causes can be found)
Only advice I can give is to watch your logs for ERROR and Exception
3) How a stage is designed in spark As you can see my code After first Map with groupByKey and filter I am running one more Map then filter then Count But this spark Combined these three stages and Named it as Count (you can see in ScreenShot attached). Can you please explain How does it combine stages and what is the logic or idea behind this??

I'll let someone else answer you on that, but basically, you can trust Spark to optimize this correctly.

Guillaume
--
eXenSa
Guillaume PITEL, Président
+33(0)6 25 48 86 80

eXenSa S.A.S.
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05
Reply | Threaded
Open this post in threaded view
|

Re: Fwd: Some Questions & Doubts regarding Spark process

vinay Bajaj
Hi

Thanks for your reply.

It will be very helpful if you could elaborate your ideas on spark.locality.wait and multiple locality levels (process-local, node-local, rack-local and then any) and what is the best configuration i can achieve by modifying this wait and what is the difference between process local and node local.

Regards
Vinay Bajaj


On Wed, Feb 12, 2014 at 2:19 PM, Guillaume Pitel <[hidden email]> wrote:
Hi

I am attaching the Spark process web Info screenshot, have a look at screenshot.

1) For A single Map operator why it shows multiple complete Stages, with same information.
If you don't cache your result and it's needed several time in the computation, Spark recomputes the Map, and thus it appears several times.

2) As you can see the Number of Complete workers is more than Maximum workers (2931/2339). Can you please tell me why it shows like that ??
Usually it happens when one of your Executor dies (usually from serious memory exhaustion, but many causes can be found)
Only advice I can give is to watch your logs for ERROR and Exception

3) How a stage is designed in spark As you can see my code After first Map with groupByKey and filter I am running one more Map then filter then Count But this spark Combined these three stages and Named it as Count (you can see in ScreenShot attached). Can you please explain How does it combine stages and what is the logic or idea behind this??

I'll let someone else answer you on that, but basically, you can trust Spark to optimize this correctly.

Guillaume
--
eXenSa
Guillaume PITEL, Président
+33(0)6 25 48 86 80

eXenSa S.A.S.
41, rue Périer - 92120 Montrouge - FRANCE
Tel +33(0)1 84 16 36 77 / Fax +33(0)9 72 28 37 05