[Solved] Streaming. Cannot get socketTextStream to receive anything.

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

[Solved] Streaming. Cannot get socketTextStream to receive anything.

kytay
This post was updated on .
Hi

I am learning spark streaming, and is trying out the JavaNetworkCount example.


#1 - This is the code I wrote
JavaStreamingContext sctx = new JavaStreamingContext("local", appName, new Duration(5000));
JavaReceiverInputDStream<String> lines = sctx.socketTextStream("127.0.0.1", 9999);
       
JavaDStream<String> words =lines.flatMap(
                new FlatMapFunction<String, String>() {
                        @Override
                        public Iterable<String> call(String arg0) throws Exception {
                               
                                System.out.println("Print text:" + arg0);
                                return Arrays.asList(arg0.split(" "));
                        }
                });

#2  - This is the socketCode I am using
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;

public class TestTcpServer {

        public static void main(String argv[]) throws Exception      
        {        
                String clientSentence;          
                String capitalizedSentence;          
                ServerSocket welcomeSocket = new ServerSocket(9999);  
               
                int i = 0;
               
                while(true)          
                {            
                        Socket connectionSocket = welcomeSocket.accept();            
                        BufferedReader inFromClient = new BufferedReader(
                                        new InputStreamReader(connectionSocket.getInputStream())
                                        );            
                        DataOutputStream outToClient = new DataOutputStream(connectionSocket.getOutputStream());
                       
                        while(true)
                        {
                                String sendingStr = "Sending... data... " + i;
                                outToClient.writeBytes(sendingStr);
                                System.out.println(sendingStr);
                                i++;
                                Thread.sleep(3000);
                        }        
                }      
        }
}

What I am trying to do is to get the JavaNetworkCount in #1 to start printing all the text I am receiving. But so far I failed to achieve that.

I have been using Hercules Setup to simulate as a TCP server, as well as a simple serversocket code in #2...
But I am not seeing any text being printed on the console.

Is public Iterable<String> call(String arg0) throws Exception being called every 5 secs?

The console log is in http://pastebin.com/THzdzGhg
Reply | Threaded
Open this post in threaded view
|

Re: Streaming. Cannot get socketTextStream to receive anything.

Akhil
You simply use the nc command to do this. like:

nc -p 12345

will open the 12345 port and from the terminal you can provide whatever input you require for your StreamingCode.

Thanks
Best Regards


On Fri, Jul 11, 2014 at 2:41 AM, kytay <[hidden email]> wrote:
Hi

I am learning spark streaming, and is trying out the JavaNetworkCount
example.


#1 - This is the code I wrote
JavaStreamingContext sctx = new JavaStreamingContext("local", appName, new
Duration(5000));
JavaReceiverInputDStream<String> lines = sctx.socketTextStream("127.0.0.1",
9999);

JavaDStream<String> words =lines.flatMap(
                new FlatMapFunction<String, String>() {
                        @Override
                        public Iterable<String> call(String arg0) throws Exception {

                                System.out.println("Print text:" + arg0);
                                return Arrays.asList(arg0.split(" "));
                        }
                });

#2  - This is the socketCode I am using
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;

public class TestTcpServer {

        public static void main(String argv[]) throws Exception
        {
                String clientSentence;
                String capitalizedSentence;
                ServerSocket welcomeSocket = new ServerSocket(9999);

                int i = 0;

                while(true)
                {
                        Socket connectionSocket = welcomeSocket.accept();
                        BufferedReader inFromClient = new BufferedReader(
                                        new InputStreamReader(connectionSocket.getInputStream())
                                        );
                        DataOutputStream outToClient = new
DataOutputStream(connectionSocket.getOutputStream());

                        while(true)
                        {
                                String sendingStr = "Sending... data... " + i;
                                outToClient.writeBytes(sendingStr);
                                System.out.println(sendingStr);
                                i++;
                                Thread.sleep(3000);
                        }
                }
        }
}

What I am trying to do is to get the JavaNetworkCount in #1 to start
printing all the text I am receiving. But so far I failed to achieve that.

I have been using  Hercules Setup
<http://www.hw-group.com/products/hercules/details_en.html>   to simulate as
a TCP server, as well as a simple serversocket code in #2...
But I am not seeing any text being printed on the console.

Is public Iterable<String> call(String arg0) throws Exception being called
every 5 secs?

The console log is in  http://pastebin.com/THzdzGhg
<http://pastebin.com/THzdzGhg>



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Streaming. Cannot get socketTextStream to receive anything.

Akhil
Sorry, the command is

nc -lk 12345

Thanks
Best Regards


On Fri, Jul 11, 2014 at 6:46 AM, Akhil Das <[hidden email]> wrote:
You simply use the nc command to do this. like:

nc -p 12345

will open the 12345 port and from the terminal you can provide whatever input you require for your StreamingCode.

Thanks
Best Regards


On Fri, Jul 11, 2014 at 2:41 AM, kytay <[hidden email]> wrote:
Hi

I am learning spark streaming, and is trying out the JavaNetworkCount
example.


#1 - This is the code I wrote
JavaStreamingContext sctx = new JavaStreamingContext("local", appName, new
Duration(5000));
JavaReceiverInputDStream<String> lines = sctx.socketTextStream("127.0.0.1",
9999);

JavaDStream<String> words =lines.flatMap(
                new FlatMapFunction<String, String>() {
                        @Override
                        public Iterable<String> call(String arg0) throws Exception {

                                System.out.println("Print text:" + arg0);
                                return Arrays.asList(arg0.split(" "));
                        }
                });

#2  - This is the socketCode I am using
import java.io.BufferedReader;
import java.io.DataOutputStream;
import java.io.InputStreamReader;
import java.net.ServerSocket;
import java.net.Socket;

public class TestTcpServer {

        public static void main(String argv[]) throws Exception
        {
                String clientSentence;
                String capitalizedSentence;
                ServerSocket welcomeSocket = new ServerSocket(9999);

                int i = 0;

                while(true)
                {
                        Socket connectionSocket = welcomeSocket.accept();
                        BufferedReader inFromClient = new BufferedReader(
                                        new InputStreamReader(connectionSocket.getInputStream())
                                        );
                        DataOutputStream outToClient = new
DataOutputStream(connectionSocket.getOutputStream());

                        while(true)
                        {
                                String sendingStr = "Sending... data... " + i;
                                outToClient.writeBytes(sendingStr);
                                System.out.println(sendingStr);
                                i++;
                                Thread.sleep(3000);
                        }
                }
        }
}

What I am trying to do is to get the JavaNetworkCount in #1 to start
printing all the text I am receiving. But so far I failed to achieve that.

I have been using  Hercules Setup
<http://www.hw-group.com/products/hercules/details_en.html>   to simulate as
a TCP server, as well as a simple serversocket code in #2...
But I am not seeing any text being printed on the console.

Is public Iterable<String> call(String arg0) throws Exception being called
every 5 secs?

The console log is in  http://pastebin.com/THzdzGhg
<http://pastebin.com/THzdzGhg>



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.


Reply | Threaded
Open this post in threaded view
|

Re: Streaming. Cannot get socketTextStream to receive anything.

kytay
Hi Akhil Das

I have tried the
nc -lk 9999
command too.

I was hoping the "System.out.println("Print text:" + arg0);" is printed when a stream is processed when lines.flatMap(...) is called.

But from my test with "nc -lk 9999", nothing is printed on the console at all.

==

To test out whether the "nc" tool is working, I have also test the "nc" tool with the Hercules TCP client test tool, it works fine.

So now the question goes back to why

JavaDStream<String> words =lines.flatMap(
                new FlatMapFunction<String, String>() {
                        @Override
                        public Iterable<String> call(String arg0) throws Exception {
                               
                                System.out.println("Print text:" + arg0);
                                return Arrays.asList(arg0.split(" "));
                        }
                });

is not printing the text I am sending through "nc -lk 9999".

===

Is there any other way to test if socketTextStream(...) is working?

Regards.
Reply | Threaded
Open this post in threaded view
|

Re: Streaming. Cannot get socketTextStream to receive anything.

kytay
I think I should be seeing any line of text that I have typed in the nc command.
Reply | Threaded
Open this post in threaded view
|

Re: Streaming. Cannot get socketTextStream to receive anything.

Akhil
Can you try this piece of code?

    SparkConf sparkConf = new SparkConf().setAppName("JavaNetworkWordCount");
    JavaStreamingContext ssc = new JavaStreamingContext(sparkConf, new      Duration(1000));

    JavaReceiverInputDStream<String> lines = ssc.socketTextStream(
            args[0], Integer.parseInt(args[1]), StorageLevels.MEMORY_AND_DISK_SER);
    JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>()  {
      @Override
      public Iterable<String> call(String x) {
        return Lists.newArrayList(SPACE.split(x));
      }
    });
    JavaPairDStream<String, Integer> wordCounts = words.mapToPair(
      new PairFunction<String, String, Integer>() {
        @Override
        public Tuple2<String, Integer> call(String s) {
          return new Tuple2<String, Integer>(s, 1);
        }
      }).reduceByKey(new Function2<Integer, Integer, Integer>() {
        @Override
        public Integer call(Integer i1, Integer i2) {
          return i1 + i2;
        }
      });

    wordCounts.print();
    ssc.start();
    ssc.awaitTermination();


Thanks
Best Regards


On Fri, Jul 11, 2014 at 9:58 AM, kytay <[hidden email]> wrote:
I think I should be seeing any line of text that I have typed in the nc
command.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9410.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Streaming. Cannot get socketTextStream to receive anything.

kytay
Hi Akhil Das

Thanks.

I tried the codes. and it works.

There's a problem with my socket codes that is not flushing the content out, and for the test tool, Hercules, I have to close the socket connection to "flush" the content out.

I am going to troubleshoot why nc works, and the codes and test tool don't.

Reply | Threaded
Open this post in threaded view
|

Re: Streaming. Cannot get socketTextStream to receive anything.

Tobias Pfeiffer
Hi,

I experienced exactly the same problems when using SparkContext with "local[1]" master specification, because in that case one thread is used for receiving data, the others for processing. As there is only one thread running, no processing will take place. Once you shut down the connection, the receiver thread will be used for processing.

Any chance you run into the same issue?

Tobias



On Mon, Jul 14, 2014 at 11:45 AM, kytay <[hidden email]> wrote:
Hi Akhil Das

Thanks.

I tried the codes. and it works.

There's a problem with my socket codes that is not flushing the content out,
and for the test tool, Hercules, I have to close the socket connection to
"flush" the content out.

I am going to troubleshoot why nc works, and the codes and test tool don't.





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9576.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Streaming. Cannot get socketTextStream to receive anything.

kytay
Hi Tobias

I have been using "local[4]" to test.
My problem is likely caused by the tcp host server that I am trying the emulate. I was trying to emulate the tcp host to send out messages. (although I am not sure at the moment :D)

First way I tried was to use a tcp tool called, Hercules.

Second way was to write a simple socket code to send message at interval. Like the one shown in #2 of my first post. I suspect the reason why it don't work is due the messages are not "flush" so no message was received on Spark Streaming.

I think I will need to do more testing to understand the behavior. I am currently not sure why "nc -lk" is working, and not the other tools or codes I am testing with.

Regards.

Reply | Threaded
Open this post in threaded view
|

Re: Streaming. Cannot get socketTextStream to receive anything.

Tathagata Das
When you are sending data using simple socket code to send messages, are those messages "\n" delimited? If its not, then the receiver of socketTextSTream, wont identify them as separate events, and keep buffering them. 

TD


On Sun, Jul 13, 2014 at 10:49 PM, kytay <[hidden email]> wrote:
Hi Tobias

I have been using "local[4]" to test.
My problem is likely caused by the tcp host server that I am trying the
emulate. I was trying to emulate the tcp host to send out messages.
(although I am not sure at the moment :D)

First way I tried was to use a tcp tool called, Hercules.

Second way was to write a simple socket code to send message at interval.
Like the one shown in #2 of my first post. I suspect the reason why it don't
work is due the messages are not "flush" so no message was received on Spark
Streaming.

I think I will need to do more testing to understand the behavior. I am
currently not sure why "nc -lk" is working, and not the other tools or codes
I am testing with.

Regards.





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Solved-Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p9588.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Streaming. Cannot get socketTextStream to receive anything.

kytay
This post was updated on .
Hi TD

You are right, I did not include "\n" to delimit the string flushed. That's the reason.

Is there a way for me to define the delimiter? Like SOH or ETX instead of "\n"

http://en.wikipedia.org/wiki/Control_character

Regards
kytay
Reply | Threaded
Open this post in threaded view
|

Re: Streaming. Cannot get socketTextStream to receive anything.

Tathagata Das

You will have to define your own stream-to-iterator function and use the socketStream. The function should return custom delimited object as bytes are continuously coming in. When data is insufficient, the function should block.

TD

On Jul 23, 2014 6:52 PM, "kytay" <[hidden email]> wrote:
Hi TD

You are right, I did not include "\n" to delimit the string flushed. That's
the reason.

Is there a way for me to define the delimiter? Like SOH or ETX instead of
"\n"

Regards
kytay



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Solved-Streaming-Cannot-get-socketTextStream-to-receive-anything-tp9382p10558.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.
Reply | Threaded
Open this post in threaded view
|

Re: [Solved] Streaming. Cannot get socketTextStream to receive anything.

AlexxGreen
This post has NOT been accepted by the mailing list yet.
In reply to this post by kytay
You also can try other software solution to simulate COM ports Serial Port Emulator (http://www.eltima.com/products/serial-port-emulator/). This product is fully baked. Good developer tech support. Product does all it claims to do.