SaveAsHadoopFiles not functioning in DStream example

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

SaveAsHadoopFiles not functioning in DStream example

gogowater
This post has NOT been accepted by the mailing list yet.
Hi, I am poking around the streaming example to do my prototype.  I want to save all outputs from the streaming result into HDFS.  So, I insert the SaveAsHadoopFiles API into the example code, but no data is written into hdfs.  I verified the hdfs and the path is correct and hadoop is working correctly.

Perhaps it is my lack of understanding how dstream is working, this should be working.  If not, it should error out.  Please advise.

public class JavaNetworkWordCount {
  public static void main(String[] args) {
    if (args.length < 3) {
      System.err.println("Usage: NetworkWordCount <master> <hostname> <port>\n" +
          "In local mode, <master> should be 'local[n]' with n > 1");
      System.exit(1);
    }

    // Create the context with a 1 second batch size
    JavaStreamingContext ssc = new JavaStreamingContext(args[0], "NetworkWordCount",
            new Duration(1000), System.getenv("SPARK_HOME"), System.getenv("SPARK_EXAMPLES_JAR"));

    // Create a NetworkInputDStream on target ip:port and count the
    // words in input stream of \n delimited test (eg. generated by 'nc')
    JavaDStream<String> lines = ssc.socketTextStream(args[1], Integer.parseInt(args[2]));
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
      @Override
      public Iterable<String> call(String x) {
        return Lists.newArrayList(x.split(" "));
      }
    });
    JavaPairDStream<String, Integer> wordCounts = words.map(
      new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(String s) throws Exception {
          return new Tuple2<String, Integer>(s, 1);
        }
      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) throws Exception {
          return i1 + i2;
        }
      });

    wordCounts.print();
   
    // ----------------------------------------------------------
    // NEW CODE: Save into hadoop
    wordCounts.saveAsHadoopFiles("hdfs://localhost:9000/data/streaming/wordcount/text-", "txt");        
    // ----------------------------------------------------------
   
    ssc.start();
  }
}