foreach not working

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

foreach not working

eric perler
hello..

i am on my second day with spark.. and im having trouble getting the foreach function to work with the network wordcount example.. i can see the the "flatMap" and "map" methods are being invoked.. but i dont seem to be getting into the foreach method... not sure if what i am doing even makes sense.. any help is appreciated... thx !!!

JavaDStream<String> lines = ssc.socketTextStream("localhost", 1234);

JavaDStream<String> words = lines
.flatMap(new FlatMapFunction<String, String>() {
@Override
public Iterable<String> call(String x) {
                                                //-- this is being invoked
return Lists.newArrayList(x.split(" "));
}
});

JavaPairDStream<String, Integer> wordCounts = words
.map(new PairFunction<String, String, Integer>() {
@Override
public Tuple2<String, Integer> call(String s) throws Exception {
                                                //-- this is being invoked
return new Tuple2<String, Integer>(s, 1);
}
});

wordCounts.foreach(collectTuplesFunc);

ssc.start();
ssc.awaitTermination();
}

Function collectTuplesFunc = new Function<JavaPairRDD<Tuple2<byte[], byte[]>, Void>, Void>() {

@Override
public Void call(JavaPairRDD<Tuple2<byte[], byte[]>, Void> arg0)
throws Exception {
//-- this is NOT being invoked
return null;
}
};

i am assuming that in the foreach call is where i would write to an external system.. please correct me if this assumption is wrong

thanks again