Spark Streaming on a cluster

classic Classic list List threaded Threaded
10 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark Streaming on a cluster

amirtuval
Hi

I am a newbie to spark and spark streaming - I just recently became aware of it, but it seems really relevant to what I am trying to achieve.

I am looking into using spark streaming, with an input stream from zeromq.
I am trying to figure out what machine is actually listening on the zeromq socket.

If I have a 10-machine cluster, and one machine acting as the "application machine", meaning the machine that runs the code I write, and submit jobs to the cluster - which of these machines will subscribe to the zeromq socket?
Looking at the code, I see a "Sub" socket is created, meaning all the data will pass through the socket, which leads me to believe that the application machine is the one listening on the socket, and passes all received data to the cluster. This means that this single machine might become a bottle neck in a high throughput use case.
A better approach would be to have each node in the cluster listen on a "fanout" socket, meaning each node in the cluster receive a part of the data.

I am not sure about any of this, as I am not an expert in ZeroMQ, and definitely not in spark. If someone can clarify how this works, I'd really appreciate it.
In addition, if there other input streams, that operate in a different manner that does not require the entire data to pass through the "application machine", I'd really appreciate knowing that too.

Thank you very much in advance,
Amir
Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming on a cluster

Tathagata Das
The default zeromq receiver that comes with the Spark repository does guarantee which machine the zeromq receiver will be launched, it can be on any of the worker machines in the cluster, NOT the application machine (called the "driver" in our terms). And your understanding of the code and the situation is generally correct (except the "passing through application machine" part). 

So if you want to do a distributed receiving, you have to create multiple zeromq streams, which will create multiple zeromq receivers. The system tries to spread the receivers across multiple machines. But you have to make sure that the data is correctly "fanned out" across these multiple receivers. Other ingestion mechanisms like Kafka actually allows this natively - it allows the user to define "partitions" of a data stream and the Kafka system takes care of partitioning the relavant data stream and sending it to multiple receivers without duplicating the records.

TD


On Sun, Feb 16, 2014 at 8:48 AM, amirtuval <[hidden email]> wrote:
Hi

I am a newbie to spark and spark streaming - I just recently became aware of
it, but it seems really relevant to what I am trying to achieve.

I am looking into using spark streaming, with an input stream from zeromq.
I am trying to figure out what machine is actually listening on the zeromq
socket.

If I have a 10-machine cluster, and one machine acting as the "application
machine", meaning the machine that runs the code I write, and submit jobs to
the cluster - which of these machines will subscribe to the zeromq socket?
Looking at the code, I see a "Sub" socket is created, meaning all the data
will pass through the socket, which leads me to believe that the application
machine is the one listening on the socket, and passes all received data to
the cluster. This means that this single machine might become a bottle neck
in a high throughput use case.
A better approach would be to have each node in the cluster listen on a
"fanout" socket, meaning each node in the cluster receive a part of the
data.

I am not sure about any of this, as I am not an expert in ZeroMQ, and
definitely not in spark. If someone can clarify how this works, I'd really
appreciate it.
In addition, if there other input streams, that operate in a different
manner that does not require the entire data to pass through the
"application machine", I'd really appreciate knowing that too.

Thank you very much in advance,
Amir



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-on-a-cluster-tp1576.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming on a cluster

Sourav Chandra
Hi TD,

in case of multiple streams will the streaming code be like:

val ssc = ...
(1 to n).foreach {
  val nwStream = kafkaUtils.createStream(...)
  nwStream.flatMap(...).map(...).reduceByKeyAndWindow(...).foreachRDD(saveToCassandra())
}
ssc.start()

Will it create any problem in execution (like reading/writing broadcast variable etc)

Thanks,
Sourav


On Wed, Feb 19, 2014 at 11:31 AM, Tathagata Das <[hidden email]> wrote:
The default zeromq receiver that comes with the Spark repository does guarantee which machine the zeromq receiver will be launched, it can be on any of the worker machines in the cluster, NOT the application machine (called the "driver" in our terms). And your understanding of the code and the situation is generally correct (except the "passing through application machine" part). 

So if you want to do a distributed receiving, you have to create multiple zeromq streams, which will create multiple zeromq receivers. The system tries to spread the receivers across multiple machines. But you have to make sure that the data is correctly "fanned out" across these multiple receivers. Other ingestion mechanisms like Kafka actually allows this natively - it allows the user to define "partitions" of a data stream and the Kafka system takes care of partitioning the relavant data stream and sending it to multiple receivers without duplicating the records.

TD


On Sun, Feb 16, 2014 at 8:48 AM, amirtuval <[hidden email]> wrote:
Hi

I am a newbie to spark and spark streaming - I just recently became aware of
it, but it seems really relevant to what I am trying to achieve.

I am looking into using spark streaming, with an input stream from zeromq.
I am trying to figure out what machine is actually listening on the zeromq
socket.

If I have a 10-machine cluster, and one machine acting as the "application
machine", meaning the machine that runs the code I write, and submit jobs to
the cluster - which of these machines will subscribe to the zeromq socket?
Looking at the code, I see a "Sub" socket is created, meaning all the data
will pass through the socket, which leads me to believe that the application
machine is the one listening on the socket, and passes all received data to
the cluster. This means that this single machine might become a bottle neck
in a high throughput use case.
A better approach would be to have each node in the cluster listen on a
"fanout" socket, meaning each node in the cluster receive a part of the
data.

I am not sure about any of this, as I am not an expert in ZeroMQ, and
definitely not in spark. If someone can clarify how this works, I'd really
appreciate it.
In addition, if there other input streams, that operate in a different
manner that does not require the entire data to pass through the
"application machine", I'd really appreciate knowing that too.

Thank you very much in advance,
Amir



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-on-a-cluster-tp1576.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com

Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming on a cluster

amirtuval
In reply to this post by Tathagata Das
Hi, Guys

Thank you for your answers.

ZeroMQ has the concept of a fan-out socket, were messages are distributed between multiple subscribers.

Is it possible to implement something like that in spark streaming?
If you provide a little bit of instructions, I'd be happy to implement something like that and share it.

Thanks
Amir
Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming on a cluster

Sourav Chandra
In reply to this post by Sourav Chandra
Hi,

The mentioned approach did not work for me. It did create multiple NetworkReceivers in each workers but DAG scheduler is failing with NotSerializableException"org.apache.spark.streaming.StreamingContext

Can any one of you help me figuring this out?

Thanks,
Sourav




On Wed, Feb 19, 2014 at 11:55 AM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

in case of multiple streams will the streaming code be like:

val ssc = ...
(1 to n).foreach {
  val nwStream = kafkaUtils.createStream(...)
  nwStream.flatMap(...).map(...).reduceByKeyAndWindow(...).foreachRDD(saveToCassandra())
}
ssc.start()

Will it create any problem in execution (like reading/writing broadcast variable etc)

Thanks,
Sourav


On Wed, Feb 19, 2014 at 11:31 AM, Tathagata Das <[hidden email]> wrote:
The default zeromq receiver that comes with the Spark repository does guarantee which machine the zeromq receiver will be launched, it can be on any of the worker machines in the cluster, NOT the application machine (called the "driver" in our terms). And your understanding of the code and the situation is generally correct (except the "passing through application machine" part). 

So if you want to do a distributed receiving, you have to create multiple zeromq streams, which will create multiple zeromq receivers. The system tries to spread the receivers across multiple machines. But you have to make sure that the data is correctly "fanned out" across these multiple receivers. Other ingestion mechanisms like Kafka actually allows this natively - it allows the user to define "partitions" of a data stream and the Kafka system takes care of partitioning the relavant data stream and sending it to multiple receivers without duplicating the records.

TD


On Sun, Feb 16, 2014 at 8:48 AM, amirtuval <[hidden email]> wrote:
Hi

I am a newbie to spark and spark streaming - I just recently became aware of
it, but it seems really relevant to what I am trying to achieve.

I am looking into using spark streaming, with an input stream from zeromq.
I am trying to figure out what machine is actually listening on the zeromq
socket.

If I have a 10-machine cluster, and one machine acting as the "application
machine", meaning the machine that runs the code I write, and submit jobs to
the cluster - which of these machines will subscribe to the zeromq socket?
Looking at the code, I see a "Sub" socket is created, meaning all the data
will pass through the socket, which leads me to believe that the application
machine is the one listening on the socket, and passes all received data to
the cluster. This means that this single machine might become a bottle neck
in a high throughput use case.
A better approach would be to have each node in the cluster listen on a
"fanout" socket, meaning each node in the cluster receive a part of the
data.

I am not sure about any of this, as I am not an expert in ZeroMQ, and
definitely not in spark. If someone can clarify how this works, I'd really
appreciate it.
In addition, if there other input streams, that operate in a different
manner that does not require the entire data to pass through the
"application machine", I'd really appreciate knowing that too.

Thank you very much in advance,
Amir



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-on-a-cluster-tp1576.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com

Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming on a cluster

dachuan
I don't know the final answer, but I'd like to discuss about your problem.

How many nodes are you using, and how many NetworkReceivers have you started?

Which specific class is not serializable?

thanks,
dachuan.


On Wed, Feb 19, 2014 at 9:42 AM, Sourav Chandra <[hidden email]> wrote:
Hi,

The mentioned approach did not work for me. It did create multiple NetworkReceivers in each workers but DAG scheduler is failing with NotSerializableException"org.apache.spark.streaming.StreamingContext

Can any one of you help me figuring this out?

Thanks,
Sourav




On Wed, Feb 19, 2014 at 11:55 AM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

in case of multiple streams will the streaming code be like:

val ssc = ...
(1 to n).foreach {
  val nwStream = kafkaUtils.createStream(...)
  nwStream.flatMap(...).map(...).reduceByKeyAndWindow(...).foreachRDD(saveToCassandra())
}
ssc.start()

Will it create any problem in execution (like reading/writing broadcast variable etc)

Thanks,
Sourav


On Wed, Feb 19, 2014 at 11:31 AM, Tathagata Das <[hidden email]> wrote:
The default zeromq receiver that comes with the Spark repository does guarantee which machine the zeromq receiver will be launched, it can be on any of the worker machines in the cluster, NOT the application machine (called the "driver" in our terms). And your understanding of the code and the situation is generally correct (except the "passing through application machine" part). 

So if you want to do a distributed receiving, you have to create multiple zeromq streams, which will create multiple zeromq receivers. The system tries to spread the receivers across multiple machines. But you have to make sure that the data is correctly "fanned out" across these multiple receivers. Other ingestion mechanisms like Kafka actually allows this natively - it allows the user to define "partitions" of a data stream and the Kafka system takes care of partitioning the relavant data stream and sending it to multiple receivers without duplicating the records.

TD


On Sun, Feb 16, 2014 at 8:48 AM, amirtuval <[hidden email]> wrote:
Hi

I am a newbie to spark and spark streaming - I just recently became aware of
it, but it seems really relevant to what I am trying to achieve.

I am looking into using spark streaming, with an input stream from zeromq.
I am trying to figure out what machine is actually listening on the zeromq
socket.

If I have a 10-machine cluster, and one machine acting as the "application
machine", meaning the machine that runs the code I write, and submit jobs to
the cluster - which of these machines will subscribe to the zeromq socket?
Looking at the code, I see a "Sub" socket is created, meaning all the data
will pass through the socket, which leads me to believe that the application
machine is the one listening on the socket, and passes all received data to
the cluster. This means that this single machine might become a bottle neck
in a high throughput use case.
A better approach would be to have each node in the cluster listen on a
"fanout" socket, meaning each node in the cluster receive a part of the
data.

I am not sure about any of this, as I am not an expert in ZeroMQ, and
definitely not in spark. If someone can clarify how this works, I'd really
appreciate it.
In addition, if there other input streams, that operate in a different
manner that does not require the entire data to pass through the
"application machine", I'd really appreciate knowing that too.

Thank you very much in advance,
Amir



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-on-a-cluster-tp1576.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--
Dachuan Huang
Cellphone: 614-390-7234
2015 Neil Avenue
Ohio State University
Columbus, Ohio
U.S.A.
43210
Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming on a cluster

Sourav Chandra
I am using KafkaDStream and its saying StreamingContext  class is not serializable

Code snippet:

val ssc = new StreamingContext(...)
(1 to 4).foreach(i => {
  val stream = KafkaUtils.createStream(...).flatMap(...).reduceByKeyAndWindow(...)..filter(...).foreach(saveToCassandra())
})

ssc.start()

I am using 2 nodes

Thanks,
Sourav


On Wed, Feb 19, 2014 at 8:21 PM, dachuan <[hidden email]> wrote:
I don't know the final answer, but I'd like to discuss about your problem.

How many nodes are you using, and how many NetworkReceivers have you started?

Which specific class is not serializable?

thanks,
dachuan.


On Wed, Feb 19, 2014 at 9:42 AM, Sourav Chandra <[hidden email]> wrote:
Hi,

The mentioned approach did not work for me. It did create multiple NetworkReceivers in each workers but DAG scheduler is failing with NotSerializableException"org.apache.spark.streaming.StreamingContext

Can any one of you help me figuring this out?

Thanks,
Sourav




On Wed, Feb 19, 2014 at 11:55 AM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

in case of multiple streams will the streaming code be like:

val ssc = ...
(1 to n).foreach {
  val nwStream = kafkaUtils.createStream(...)
  nwStream.flatMap(...).map(...).reduceByKeyAndWindow(...).foreachRDD(saveToCassandra())
}
ssc.start()

Will it create any problem in execution (like reading/writing broadcast variable etc)

Thanks,
Sourav


On Wed, Feb 19, 2014 at 11:31 AM, Tathagata Das <[hidden email]> wrote:
The default zeromq receiver that comes with the Spark repository does guarantee which machine the zeromq receiver will be launched, it can be on any of the worker machines in the cluster, NOT the application machine (called the "driver" in our terms). And your understanding of the code and the situation is generally correct (except the "passing through application machine" part). 

So if you want to do a distributed receiving, you have to create multiple zeromq streams, which will create multiple zeromq receivers. The system tries to spread the receivers across multiple machines. But you have to make sure that the data is correctly "fanned out" across these multiple receivers. Other ingestion mechanisms like Kafka actually allows this natively - it allows the user to define "partitions" of a data stream and the Kafka system takes care of partitioning the relavant data stream and sending it to multiple receivers without duplicating the records.

TD


On Sun, Feb 16, 2014 at 8:48 AM, amirtuval <[hidden email]> wrote:
Hi

I am a newbie to spark and spark streaming - I just recently became aware of
it, but it seems really relevant to what I am trying to achieve.

I am looking into using spark streaming, with an input stream from zeromq.
I am trying to figure out what machine is actually listening on the zeromq
socket.

If I have a 10-machine cluster, and one machine acting as the "application
machine", meaning the machine that runs the code I write, and submit jobs to
the cluster - which of these machines will subscribe to the zeromq socket?
Looking at the code, I see a "Sub" socket is created, meaning all the data
will pass through the socket, which leads me to believe that the application
machine is the one listening on the socket, and passes all received data to
the cluster. This means that this single machine might become a bottle neck
in a high throughput use case.
A better approach would be to have each node in the cluster listen on a
"fanout" socket, meaning each node in the cluster receive a part of the
data.

I am not sure about any of this, as I am not an expert in ZeroMQ, and
definitely not in spark. If someone can clarify how this works, I'd really
appreciate it.
In addition, if there other input streams, that operate in a different
manner that does not require the entire data to pass through the
"application machine", I'd really appreciate knowing that too.

Thank you very much in advance,
Amir



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-on-a-cluster-tp1576.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--
Dachuan Huang
Cellphone: 614-390-7234
2015 Neil Avenue
Ohio State University
Columbus, Ohio
U.S.A.
43210



--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com

Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming on a cluster

dachuan
I am curious about why does StreamingContext need to be serialized, it's supposed to be in the driver node alone.


On Wed, Feb 19, 2014 at 10:29 AM, Sourav Chandra <[hidden email]> wrote:
I am using KafkaDStream and its saying StreamingContext  class is not serializable

Code snippet:

val ssc = new StreamingContext(...)
(1 to 4).foreach(i => {
  val stream = KafkaUtils.createStream(...).flatMap(...).reduceByKeyAndWindow(...)..filter(...).foreach(saveToCassandra())
})

ssc.start()

I am using 2 nodes

Thanks,
Sourav


On Wed, Feb 19, 2014 at 8:21 PM, dachuan <[hidden email]> wrote:
I don't know the final answer, but I'd like to discuss about your problem.

How many nodes are you using, and how many NetworkReceivers have you started?

Which specific class is not serializable?

thanks,
dachuan.


On Wed, Feb 19, 2014 at 9:42 AM, Sourav Chandra <[hidden email]> wrote:
Hi,

The mentioned approach did not work for me. It did create multiple NetworkReceivers in each workers but DAG scheduler is failing with NotSerializableException"org.apache.spark.streaming.StreamingContext

Can any one of you help me figuring this out?

Thanks,
Sourav




On Wed, Feb 19, 2014 at 11:55 AM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

in case of multiple streams will the streaming code be like:

val ssc = ...
(1 to n).foreach {
  val nwStream = kafkaUtils.createStream(...)
  nwStream.flatMap(...).map(...).reduceByKeyAndWindow(...).foreachRDD(saveToCassandra())
}
ssc.start()

Will it create any problem in execution (like reading/writing broadcast variable etc)

Thanks,
Sourav


On Wed, Feb 19, 2014 at 11:31 AM, Tathagata Das <[hidden email]> wrote:
The default zeromq receiver that comes with the Spark repository does guarantee which machine the zeromq receiver will be launched, it can be on any of the worker machines in the cluster, NOT the application machine (called the "driver" in our terms). And your understanding of the code and the situation is generally correct (except the "passing through application machine" part). 

So if you want to do a distributed receiving, you have to create multiple zeromq streams, which will create multiple zeromq receivers. The system tries to spread the receivers across multiple machines. But you have to make sure that the data is correctly "fanned out" across these multiple receivers. Other ingestion mechanisms like Kafka actually allows this natively - it allows the user to define "partitions" of a data stream and the Kafka system takes care of partitioning the relavant data stream and sending it to multiple receivers without duplicating the records.

TD


On Sun, Feb 16, 2014 at 8:48 AM, amirtuval <[hidden email]> wrote:
Hi

I am a newbie to spark and spark streaming - I just recently became aware of
it, but it seems really relevant to what I am trying to achieve.

I am looking into using spark streaming, with an input stream from zeromq.
I am trying to figure out what machine is actually listening on the zeromq
socket.

If I have a 10-machine cluster, and one machine acting as the "application
machine", meaning the machine that runs the code I write, and submit jobs to
the cluster - which of these machines will subscribe to the zeromq socket?
Looking at the code, I see a "Sub" socket is created, meaning all the data
will pass through the socket, which leads me to believe that the application
machine is the one listening on the socket, and passes all received data to
the cluster. This means that this single machine might become a bottle neck
in a high throughput use case.
A better approach would be to have each node in the cluster listen on a
"fanout" socket, meaning each node in the cluster receive a part of the
data.

I am not sure about any of this, as I am not an expert in ZeroMQ, and
definitely not in spark. If someone can clarify how this works, I'd really
appreciate it.
In addition, if there other input streams, that operate in a different
manner that does not require the entire data to pass through the
"application machine", I'd really appreciate knowing that too.

Thank you very much in advance,
Amir



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-on-a-cluster-tp1576.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--
Dachuan Huang
Cellphone: <a href="tel:614-390-7234" value="+16143907234" target="_blank">614-390-7234
2015 Neil Avenue
Ohio State University
Columbus, Ohio
U.S.A.
43210



--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--
Dachuan Huang
Cellphone: 614-390-7234
2015 Neil Avenue
Ohio State University
Columbus, Ohio
U.S.A.
43210
Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming on a cluster

Sourav Chandra
Yes I did not understand what is wrong in the code which leads to this


On Wed, Feb 19, 2014 at 9:30 PM, dachuan <[hidden email]> wrote:
I am curious about why does StreamingContext need to be serialized, it's supposed to be in the driver node alone.


On Wed, Feb 19, 2014 at 10:29 AM, Sourav Chandra <[hidden email]> wrote:
I am using KafkaDStream and its saying StreamingContext  class is not serializable

Code snippet:

val ssc = new StreamingContext(...)
(1 to 4).foreach(i => {
  val stream = KafkaUtils.createStream(...).flatMap(...).reduceByKeyAndWindow(...)..filter(...).foreach(saveToCassandra())
})

ssc.start()

I am using 2 nodes

Thanks,
Sourav


On Wed, Feb 19, 2014 at 8:21 PM, dachuan <[hidden email]> wrote:
I don't know the final answer, but I'd like to discuss about your problem.

How many nodes are you using, and how many NetworkReceivers have you started?

Which specific class is not serializable?

thanks,
dachuan.


On Wed, Feb 19, 2014 at 9:42 AM, Sourav Chandra <[hidden email]> wrote:
Hi,

The mentioned approach did not work for me. It did create multiple NetworkReceivers in each workers but DAG scheduler is failing with NotSerializableException"org.apache.spark.streaming.StreamingContext

Can any one of you help me figuring this out?

Thanks,
Sourav




On Wed, Feb 19, 2014 at 11:55 AM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

in case of multiple streams will the streaming code be like:

val ssc = ...
(1 to n).foreach {
  val nwStream = kafkaUtils.createStream(...)
  nwStream.flatMap(...).map(...).reduceByKeyAndWindow(...).foreachRDD(saveToCassandra())
}
ssc.start()

Will it create any problem in execution (like reading/writing broadcast variable etc)

Thanks,
Sourav


On Wed, Feb 19, 2014 at 11:31 AM, Tathagata Das <[hidden email]> wrote:
The default zeromq receiver that comes with the Spark repository does guarantee which machine the zeromq receiver will be launched, it can be on any of the worker machines in the cluster, NOT the application machine (called the "driver" in our terms). And your understanding of the code and the situation is generally correct (except the "passing through application machine" part). 

So if you want to do a distributed receiving, you have to create multiple zeromq streams, which will create multiple zeromq receivers. The system tries to spread the receivers across multiple machines. But you have to make sure that the data is correctly "fanned out" across these multiple receivers. Other ingestion mechanisms like Kafka actually allows this natively - it allows the user to define "partitions" of a data stream and the Kafka system takes care of partitioning the relavant data stream and sending it to multiple receivers without duplicating the records.

TD


On Sun, Feb 16, 2014 at 8:48 AM, amirtuval <[hidden email]> wrote:
Hi

I am a newbie to spark and spark streaming - I just recently became aware of
it, but it seems really relevant to what I am trying to achieve.

I am looking into using spark streaming, with an input stream from zeromq.
I am trying to figure out what machine is actually listening on the zeromq
socket.

If I have a 10-machine cluster, and one machine acting as the "application
machine", meaning the machine that runs the code I write, and submit jobs to
the cluster - which of these machines will subscribe to the zeromq socket?
Looking at the code, I see a "Sub" socket is created, meaning all the data
will pass through the socket, which leads me to believe that the application
machine is the one listening on the socket, and passes all received data to
the cluster. This means that this single machine might become a bottle neck
in a high throughput use case.
A better approach would be to have each node in the cluster listen on a
"fanout" socket, meaning each node in the cluster receive a part of the
data.

I am not sure about any of this, as I am not an expert in ZeroMQ, and
definitely not in spark. If someone can clarify how this works, I'd really
appreciate it.
In addition, if there other input streams, that operate in a different
manner that does not require the entire data to pass through the
"application machine", I'd really appreciate knowing that too.

Thank you very much in advance,
Amir



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-on-a-cluster-tp1576.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--
Dachuan Huang
Cellphone: <a href="tel:614-390-7234" value="+16143907234" target="_blank">614-390-7234
2015 Neil Avenue
Ohio State University
Columbus, Ohio
U.S.A.
43210



--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--
Dachuan Huang
Cellphone: 614-390-7234
2015 Neil Avenue
Ohio State University
Columbus, Ohio
U.S.A.
43210



--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com

Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming on a cluster

Tathagata Das
I think there is some subtle issue with the way Scala finds function closures and tries to serialize them. It could be that in one of your map or reduce operations, the function you have provided references fields outside function that contains the code snippet you posted. That brings the whole function and the fields in that function (like the ssc variable) in the closure and Spark tries to serialize the whole thing. Here are couple of tricks you can try. 

1. Mark ssc as @transient val . This is a hack and a more cleaner solution is ...
2. Define the functions used in the map and reduce operations as distinct classes or object of their own such that its closure is clearly define and does not get polluted. For example instead of 

val someExternalVariable = ...

dstream.reduceByKey( <function literal using someExternalVariable> )    // this brings a lot of external stuff within the closure of the in-place defined function literal 

use 

class ReduceFunction(necessaryVariable) {
   def call(...) =  {
        // use necessaryVariable
   }
}

val reduceFunction = new ReduceFunction(externalVariable)

dstream.reduceByKey(reduceFunction.call)

Hope this helps!

TD


On Wed, Feb 19, 2014 at 8:02 AM, Sourav Chandra <[hidden email]> wrote:
Yes I did not understand what is wrong in the code which leads to this


On Wed, Feb 19, 2014 at 9:30 PM, dachuan <[hidden email]> wrote:
I am curious about why does StreamingContext need to be serialized, it's supposed to be in the driver node alone.


On Wed, Feb 19, 2014 at 10:29 AM, Sourav Chandra <[hidden email]> wrote:
I am using KafkaDStream and its saying StreamingContext  class is not serializable

Code snippet:

val ssc = new StreamingContext(...)
(1 to 4).foreach(i => {
  val stream = KafkaUtils.createStream(...).flatMap(...).reduceByKeyAndWindow(...)..filter(...).foreach(saveToCassandra())
})

ssc.start()

I am using 2 nodes

Thanks,
Sourav


On Wed, Feb 19, 2014 at 8:21 PM, dachuan <[hidden email]> wrote:
I don't know the final answer, but I'd like to discuss about your problem.

How many nodes are you using, and how many NetworkReceivers have you started?

Which specific class is not serializable?

thanks,
dachuan.


On Wed, Feb 19, 2014 at 9:42 AM, Sourav Chandra <[hidden email]> wrote:
Hi,

The mentioned approach did not work for me. It did create multiple NetworkReceivers in each workers but DAG scheduler is failing with NotSerializableException"org.apache.spark.streaming.StreamingContext

Can any one of you help me figuring this out?

Thanks,
Sourav




On Wed, Feb 19, 2014 at 11:55 AM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

in case of multiple streams will the streaming code be like:

val ssc = ...
(1 to n).foreach {
  val nwStream = kafkaUtils.createStream(...)
  nwStream.flatMap(...).map(...).reduceByKeyAndWindow(...).foreachRDD(saveToCassandra())
}
ssc.start()

Will it create any problem in execution (like reading/writing broadcast variable etc)

Thanks,
Sourav


On Wed, Feb 19, 2014 at 11:31 AM, Tathagata Das <[hidden email]> wrote:
The default zeromq receiver that comes with the Spark repository does guarantee which machine the zeromq receiver will be launched, it can be on any of the worker machines in the cluster, NOT the application machine (called the "driver" in our terms). And your understanding of the code and the situation is generally correct (except the "passing through application machine" part). 

So if you want to do a distributed receiving, you have to create multiple zeromq streams, which will create multiple zeromq receivers. The system tries to spread the receivers across multiple machines. But you have to make sure that the data is correctly "fanned out" across these multiple receivers. Other ingestion mechanisms like Kafka actually allows this natively - it allows the user to define "partitions" of a data stream and the Kafka system takes care of partitioning the relavant data stream and sending it to multiple receivers without duplicating the records.

TD


On Sun, Feb 16, 2014 at 8:48 AM, amirtuval <[hidden email]> wrote:
Hi

I am a newbie to spark and spark streaming - I just recently became aware of
it, but it seems really relevant to what I am trying to achieve.

I am looking into using spark streaming, with an input stream from zeromq.
I am trying to figure out what machine is actually listening on the zeromq
socket.

If I have a 10-machine cluster, and one machine acting as the "application
machine", meaning the machine that runs the code I write, and submit jobs to
the cluster - which of these machines will subscribe to the zeromq socket?
Looking at the code, I see a "Sub" socket is created, meaning all the data
will pass through the socket, which leads me to believe that the application
machine is the one listening on the socket, and passes all received data to
the cluster. This means that this single machine might become a bottle neck
in a high throughput use case.
A better approach would be to have each node in the cluster listen on a
"fanout" socket, meaning each node in the cluster receive a part of the
data.

I am not sure about any of this, as I am not an expert in ZeroMQ, and
definitely not in spark. If someone can clarify how this works, I'd really
appreciate it.
In addition, if there other input streams, that operate in a different
manner that does not require the entire data to pass through the
"application machine", I'd really appreciate knowing that too.

Thank you very much in advance,
Amir



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/Spark-Streaming-on-a-cluster-tp1576.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--
Dachuan Huang
Cellphone: <a href="tel:614-390-7234" value="+16143907234" target="_blank">614-390-7234
2015 Neil Avenue
Ohio State University
Columbus, Ohio
U.S.A.
43210



--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--
Dachuan Huang
Cellphone: <a href="tel:614-390-7234" value="+16143907234" target="_blank">614-390-7234
2015 Neil Avenue
Ohio State University
Columbus, Ohio
U.S.A.
43210



--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com