Spark streaming questions

classic Classic list List threaded Threaded
25 messages Options
12
Reply | Threaded
Open this post in threaded view
|

Spark streaming questions

Sourav Chandra
HI,

I have couple of questions:

1. While going through the spark-streaming code, I found out there is one configuration in JobScheduler/Generator (spark.streaming.concurrentJobs) which is set to 1. There is no documentation for this parameter. After setting this to 1000 in driver program, our streaming application's performance is improved.

What is this variable used for? Is it safe to use/tweak this parameter?

2. Can someone explain the usage of MapOutputTracker, BlockManager component. I have gone through the youtube video of Matei about spark internals but this was not covered in detail.

3. Can someone explain the usage of cache w.r.t spark streaming? For example if we do stream.cache(), will the cache remain constatnt with all the partitions of rdd present across the nodes for that stream, OR will it be regularly updated as in while new batch is coming?

Thanks,
--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com

Reply | Threaded
Open this post in threaded view
|

Re: Spark streaming questions

Tathagata Das
Answers inline. Hope these answer your questions.

TD


On Thu, Feb 13, 2014 at 5:49 PM, Sourav Chandra <[hidden email]> wrote:
HI,

I have couple of questions:

1. While going through the spark-streaming code, I found out there is one configuration in JobScheduler/Generator (spark.streaming.concurrentJobs) which is set to 1. There is no documentation for this parameter. After setting this to 1000 in driver program, our streaming application's performance is improved.

That is a parameter that allows Spark Stremaing to launch multiple Spark jobs simultaneously. While it can improve the performance in many scenarios (as it has in your case), it can actually increase the processing time of each batch and increase end-to-end latency in certain scenarios. So it is something that needs to be used with caution. That said, we should have definitely exposed it in the documentation.  
 
What is this variable used for? Is it safe to use/tweak this parameter?

2. Can someone explain the usage of MapOutputTracker, BlockManager component. I have gone through the youtube video of Matei about spark internals but this was not covered in detail.

I am not sure if there is a detailed document anywhere that explains but I can give you a high level overview of the both.

BlockManager is like a distributed key-value store for large blobs (called blocks) of data. It has a master-worker architecture (loosely it is like the HDFS file system) where the BlockManager at the workers store the data blocks and BlockManagerMaster stores the metadata for what blocks are stored where. All the cached RDD's partitions and shuffle data are stored and managed by the BlockManager. It also transfers the blocks between the workers as needed (shuffles etc all happen through the block manager). Specifically for spark streaming, the data received from outside is stored in the BlockManager of the worker nodes, and the IDs of the blocks are reported to the BlockManagerMaster.

MapOutputTrackers is a simpler component that keeps track of the location of the output of the map stage, so that workers running the reduce stage knows which machines to pull the data from. That also has the master-worker component - master has the full knowledge of the mapoutput and the worker component on-demand pulls that knowledge from the master component when the reduce tasks are executed on the worker.

 

3. Can someone explain the usage of cache w.r.t spark streaming? For example if we do stream.cache(), will the cache remain constant with all the partitions of RDDs present across the nodes for that stream, OR will it be regularly updated as in while new batch is coming?

If you call DStream.persist (persist == cache = true), then all RDDs generated by the DStream will be persisted in the cache (in the BlockManager). As new RDDs are generated and persisted, old RDDs from the same DStream will fall out of memory. either by LRU or explicitly if spark.streaming.unpersist is set to true. 
 
Thanks,
--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Reply | Threaded
Open this post in threaded view
|

Re: Spark streaming questions

Sourav Chandra
Thanks TD.

One more question:

We are building real time analytics using spark streaming - We read from kafka, process it (flatMap -> Map -> reduceByKeyAndWindow -> filter) and then save to Cassandra (using DStream.foreachRDD).
Initially I used a machine with 32 cores, 32 GB and performed load testing. with 1 master and 1 worker. in the same box. Later I added one more box and launched worker on that box (32 core 16GB). I set spark.executor.memory=10G in driver program

I expected the performance should increase linearly as mentioned in spark streaming video but it did not help.

Can you please explain why it is so? Also how can we increase?

Thanks,
Sourav





On Fri, Feb 14, 2014 at 7:50 AM, Tathagata Das <[hidden email]> wrote:
Answers inline. Hope these answer your questions.

TD


On Thu, Feb 13, 2014 at 5:49 PM, Sourav Chandra <[hidden email]> wrote:
HI,

I have couple of questions:

1. While going through the spark-streaming code, I found out there is one configuration in JobScheduler/Generator (spark.streaming.concurrentJobs) which is set to 1. There is no documentation for this parameter. After setting this to 1000 in driver program, our streaming application's performance is improved.

That is a parameter that allows Spark Stremaing to launch multiple Spark jobs simultaneously. While it can improve the performance in many scenarios (as it has in your case), it can actually increase the processing time of each batch and increase end-to-end latency in certain scenarios. So it is something that needs to be used with caution. That said, we should have definitely exposed it in the documentation.  
 
What is this variable used for? Is it safe to use/tweak this parameter?

2. Can someone explain the usage of MapOutputTracker, BlockManager component. I have gone through the youtube video of Matei about spark internals but this was not covered in detail.

I am not sure if there is a detailed document anywhere that explains but I can give you a high level overview of the both.

BlockManager is like a distributed key-value store for large blobs (called blocks) of data. It has a master-worker architecture (loosely it is like the HDFS file system) where the BlockManager at the workers store the data blocks and BlockManagerMaster stores the metadata for what blocks are stored where. All the cached RDD's partitions and shuffle data are stored and managed by the BlockManager. It also transfers the blocks between the workers as needed (shuffles etc all happen through the block manager). Specifically for spark streaming, the data received from outside is stored in the BlockManager of the worker nodes, and the IDs of the blocks are reported to the BlockManagerMaster.

MapOutputTrackers is a simpler component that keeps track of the location of the output of the map stage, so that workers running the reduce stage knows which machines to pull the data from. That also has the master-worker component - master has the full knowledge of the mapoutput and the worker component on-demand pulls that knowledge from the master component when the reduce tasks are executed on the worker.

 

3. Can someone explain the usage of cache w.r.t spark streaming? For example if we do stream.cache(), will the cache remain constant with all the partitions of RDDs present across the nodes for that stream, OR will it be regularly updated as in while new batch is coming?

If you call DStream.persist (persist == cache = true), then all RDDs generated by the DStream will be persisted in the cache (in the BlockManager). As new RDDs are generated and persisted, old RDDs from the same DStream will fall out of memory. either by LRU or explicitly if spark.streaming.unpersist is set to true. 
 
Thanks,
--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com

Reply | Threaded
Open this post in threaded view
|

Re: Spark streaming questions

Tathagata Das
Can you tell me more about the structure of your program? As in 
1) What storage levels are you using in the input stream?
2) How many reducers are using for the reduceByKeyAndWindow?
3) Batch interval and processing times seen with one machine vs two machines. 

A good place to start debugging is the Spark web ui for the Spark streaming application. It should running on the master at port 4040. There if you look at the stage you should see patterns of stages repeatedly. You can figure out the number of tasks in each stage, which stage is taking the most amount of time (and is therefore the bottleneck) etc. You can drill down and see where the tasks are running, is it using the 32 slots in the new machine or not. 

TD


On Thu, Feb 13, 2014 at 6:29 PM, Sourav Chandra <[hidden email]> wrote:
Thanks TD.

One more question:

We are building real time analytics using spark streaming - We read from kafka, process it (flatMap -> Map -> reduceByKeyAndWindow -> filter) and then save to Cassandra (using DStream.foreachRDD).
Initially I used a machine with 32 cores, 32 GB and performed load testing. with 1 master and 1 worker. in the same box. Later I added one more box and launched worker on that box (32 core 16GB). I set spark.executor.memory=10G in driver program

I expected the performance should increase linearly as mentioned in spark streaming video but it did not help.

Can you please explain why it is so? Also how can we increase?

Thanks,
Sourav





On Fri, Feb 14, 2014 at 7:50 AM, Tathagata Das <[hidden email]> wrote:
Answers inline. Hope these answer your questions.

TD


On Thu, Feb 13, 2014 at 5:49 PM, Sourav Chandra <[hidden email]> wrote:
HI,

I have couple of questions:

1. While going through the spark-streaming code, I found out there is one configuration in JobScheduler/Generator (spark.streaming.concurrentJobs) which is set to 1. There is no documentation for this parameter. After setting this to 1000 in driver program, our streaming application's performance is improved.

That is a parameter that allows Spark Stremaing to launch multiple Spark jobs simultaneously. While it can improve the performance in many scenarios (as it has in your case), it can actually increase the processing time of each batch and increase end-to-end latency in certain scenarios. So it is something that needs to be used with caution. That said, we should have definitely exposed it in the documentation.  
 
What is this variable used for? Is it safe to use/tweak this parameter?

2. Can someone explain the usage of MapOutputTracker, BlockManager component. I have gone through the youtube video of Matei about spark internals but this was not covered in detail.

I am not sure if there is a detailed document anywhere that explains but I can give you a high level overview of the both.

BlockManager is like a distributed key-value store for large blobs (called blocks) of data. It has a master-worker architecture (loosely it is like the HDFS file system) where the BlockManager at the workers store the data blocks and BlockManagerMaster stores the metadata for what blocks are stored where. All the cached RDD's partitions and shuffle data are stored and managed by the BlockManager. It also transfers the blocks between the workers as needed (shuffles etc all happen through the block manager). Specifically for spark streaming, the data received from outside is stored in the BlockManager of the worker nodes, and the IDs of the blocks are reported to the BlockManagerMaster.

MapOutputTrackers is a simpler component that keeps track of the location of the output of the map stage, so that workers running the reduce stage knows which machines to pull the data from. That also has the master-worker component - master has the full knowledge of the mapoutput and the worker component on-demand pulls that knowledge from the master component when the reduce tasks are executed on the worker.

 

3. Can someone explain the usage of cache w.r.t spark streaming? For example if we do stream.cache(), will the cache remain constant with all the partitions of RDDs present across the nodes for that stream, OR will it be regularly updated as in while new batch is coming?

If you call DStream.persist (persist == cache = true), then all RDDs generated by the DStream will be persisted in the cache (in the BlockManager). As new RDDs are generated and persisted, old RDDs from the same DStream will fall out of memory. either by LRU or explicitly if spark.streaming.unpersist is set to true. 
 
Thanks,
--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Reply | Threaded
Open this post in threaded view
|

Re: Spark streaming questions

Sourav Chandra
1. We are not setting any storage explicitly, hence I assume its using defualt policy for streaming i.e. MEMORY_ONLY_SER and as its is a NetworkStream it should be replicated. Correct me if I am wrong
2. reduceByKeyAndWindow is called with 8 (we have 8 core machine per worker)
3. Batchduration for streaming context is set to 1 sec. I tried setting to 500 milli but did not help

In the ui, only 2 types of stages are present - combineByKey and foreach. And combineByKey is taking much time compared to foreach

By looking at stage ui as you suggested, i can see though foreach stage has 8 tasks combineByKey is having only 3/4 tasks. I assume the tasks are per core which implies combineByKey is not utilizinfg all cores.
What could be the reason for this?

 I have attached the stage ui with sorted duration column





On Fri, Feb 14, 2014 at 10:56 AM, Tathagata Das <[hidden email]> wrote:
Can you tell me more about the structure of your program? As in 
1) What storage levels are you using in the input stream?
2) How many reducers are using for the reduceByKeyAndWindow?
3) Batch interval and processing times seen with one machine vs two machines. 

A good place to start debugging is the Spark web ui for the Spark streaming application. It should running on the master at port 4040. There if you look at the stage you should see patterns of stages repeatedly. You can figure out the number of tasks in each stage, which stage is taking the most amount of time (and is therefore the bottleneck) etc. You can drill down and see where the tasks are running, is it using the 32 slots in the new machine or not. 

TD


On Thu, Feb 13, 2014 at 6:29 PM, Sourav Chandra <[hidden email]> wrote:
Thanks TD.

One more question:

We are building real time analytics using spark streaming - We read from kafka, process it (flatMap -> Map -> reduceByKeyAndWindow -> filter) and then save to Cassandra (using DStream.foreachRDD).
Initially I used a machine with 32 cores, 32 GB and performed load testing. with 1 master and 1 worker. in the same box. Later I added one more box and launched worker on that box (32 core 16GB). I set spark.executor.memory=10G in driver program

I expected the performance should increase linearly as mentioned in spark streaming video but it did not help.

Can you please explain why it is so? Also how can we increase?

Thanks,
Sourav





On Fri, Feb 14, 2014 at 7:50 AM, Tathagata Das <[hidden email]> wrote:
Answers inline. Hope these answer your questions.

TD


On Thu, Feb 13, 2014 at 5:49 PM, Sourav Chandra <[hidden email]> wrote:
HI,

I have couple of questions:

1. While going through the spark-streaming code, I found out there is one configuration in JobScheduler/Generator (spark.streaming.concurrentJobs) which is set to 1. There is no documentation for this parameter. After setting this to 1000 in driver program, our streaming application's performance is improved.

That is a parameter that allows Spark Stremaing to launch multiple Spark jobs simultaneously. While it can improve the performance in many scenarios (as it has in your case), it can actually increase the processing time of each batch and increase end-to-end latency in certain scenarios. So it is something that needs to be used with caution. That said, we should have definitely exposed it in the documentation.  
 
What is this variable used for? Is it safe to use/tweak this parameter?

2. Can someone explain the usage of MapOutputTracker, BlockManager component. I have gone through the youtube video of Matei about spark internals but this was not covered in detail.

I am not sure if there is a detailed document anywhere that explains but I can give you a high level overview of the both.

BlockManager is like a distributed key-value store for large blobs (called blocks) of data. It has a master-worker architecture (loosely it is like the HDFS file system) where the BlockManager at the workers store the data blocks and BlockManagerMaster stores the metadata for what blocks are stored where. All the cached RDD's partitions and shuffle data are stored and managed by the BlockManager. It also transfers the blocks between the workers as needed (shuffles etc all happen through the block manager). Specifically for spark streaming, the data received from outside is stored in the BlockManager of the worker nodes, and the IDs of the blocks are reported to the BlockManagerMaster.

MapOutputTrackers is a simpler component that keeps track of the location of the output of the map stage, so that workers running the reduce stage knows which machines to pull the data from. That also has the master-worker component - master has the full knowledge of the mapoutput and the worker component on-demand pulls that knowledge from the master component when the reduce tasks are executed on the worker.

 

3. Can someone explain the usage of cache w.r.t spark streaming? For example if we do stream.cache(), will the cache remain constant with all the partitions of RDDs present across the nodes for that stream, OR will it be regularly updated as in while new batch is coming?

If you call DStream.persist (persist == cache = true), then all RDDs generated by the DStream will be persisted in the cache (in the BlockManager). As new RDDs are generated and persisted, old RDDs from the same DStream will fall out of memory. either by LRU or explicitly if spark.streaming.unpersist is set to true. 
 
Thanks,
--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


2.jpg (209K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Spark streaming questions

Tathagata Das



On Thu, Feb 13, 2014 at 10:27 PM, Sourav Chandra <[hidden email]> wrote:
1. We are not setting any storage explicitly, hence I assume its using defualt policy for streaming i.e. MEMORY_ONLY_SER and as its is a NetworkStream it should be replicated. Correct me if I am wrong
No it wont. Use MEMORY_ONLY_2.
 
2. reduceByKeyAndWindow is called with 8 (we have 8 core machine per worker)
You have to play around with this. But it should be comparable to the number of cores in the cluster. But if this number if too big, then performance may go down. So there is a sweet spot, you have to figure it out by testing. 
 

3. Batchduration for streaming context is set to 1 sec. I tried setting to 500 milli but did not help

In the ui, only 2 types of stages are present - combineByKey and foreach. And combineByKey is taking much time compared to foreach

By looking at stage ui as you suggested, i can see though foreach stage has 8 tasks combineByKey is having only 3/4 tasks. I assume the tasks are per core which implies combineByKey is not utilizinfg all cores.
What could be the reason for this?

 I have attached the stage ui with sorted duration column

Well it is clear that the combineByKey is taking the most amount of time and 7 seconds. So you need to increase the number of reducers in the reduceByKeyAndWindow operation. That should distribute the computation more to use all the cores, and therefore speed up the processing of each batch. However you have to set the batch interval such that batch interval > processing time of each batch. Otherwise, the system is not able to process as fast as batches of data are accumulating, so it is constantly getting backlogged. So try increasing the number of reducers as well as increasing the batch interval. 

Also you can monitor the batch processing times and end-to-end delay using the StreamingListener interface (see StreamingContext.addStreamingListener in Spark 0.9). if the batch interval is not large enough you will find that the the latency found with a streaming listener will keep growing. 

Hope this helps.

TD
 




On Fri, Feb 14, 2014 at 10:56 AM, Tathagata Das <[hidden email]> wrote:
Can you tell me more about the structure of your program? As in 
1) What storage levels are you using in the input stream?
2) How many reducers are using for the reduceByKeyAndWindow?
3) Batch interval and processing times seen with one machine vs two machines. 

A good place to start debugging is the Spark web ui for the Spark streaming application. It should running on the master at port 4040. There if you look at the stage you should see patterns of stages repeatedly. You can figure out the number of tasks in each stage, which stage is taking the most amount of time (and is therefore the bottleneck) etc. You can drill down and see where the tasks are running, is it using the 32 slots in the new machine or not. 

TD


On Thu, Feb 13, 2014 at 6:29 PM, Sourav Chandra <[hidden email]> wrote:
Thanks TD.

One more question:

We are building real time analytics using spark streaming - We read from kafka, process it (flatMap -> Map -> reduceByKeyAndWindow -> filter) and then save to Cassandra (using DStream.foreachRDD).
Initially I used a machine with 32 cores, 32 GB and performed load testing. with 1 master and 1 worker. in the same box. Later I added one more box and launched worker on that box (32 core 16GB). I set spark.executor.memory=10G in driver program

I expected the performance should increase linearly as mentioned in spark streaming video but it did not help.

Can you please explain why it is so? Also how can we increase?

Thanks,
Sourav





On Fri, Feb 14, 2014 at 7:50 AM, Tathagata Das <[hidden email]> wrote:
Answers inline. Hope these answer your questions.

TD


On Thu, Feb 13, 2014 at 5:49 PM, Sourav Chandra <[hidden email]> wrote:
HI,

I have couple of questions:

1. While going through the spark-streaming code, I found out there is one configuration in JobScheduler/Generator (spark.streaming.concurrentJobs) which is set to 1. There is no documentation for this parameter. After setting this to 1000 in driver program, our streaming application's performance is improved.

That is a parameter that allows Spark Stremaing to launch multiple Spark jobs simultaneously. While it can improve the performance in many scenarios (as it has in your case), it can actually increase the processing time of each batch and increase end-to-end latency in certain scenarios. So it is something that needs to be used with caution. That said, we should have definitely exposed it in the documentation.  
 
What is this variable used for? Is it safe to use/tweak this parameter?

2. Can someone explain the usage of MapOutputTracker, BlockManager component. I have gone through the youtube video of Matei about spark internals but this was not covered in detail.

I am not sure if there is a detailed document anywhere that explains but I can give you a high level overview of the both.

BlockManager is like a distributed key-value store for large blobs (called blocks) of data. It has a master-worker architecture (loosely it is like the HDFS file system) where the BlockManager at the workers store the data blocks and BlockManagerMaster stores the metadata for what blocks are stored where. All the cached RDD's partitions and shuffle data are stored and managed by the BlockManager. It also transfers the blocks between the workers as needed (shuffles etc all happen through the block manager). Specifically for spark streaming, the data received from outside is stored in the BlockManager of the worker nodes, and the IDs of the blocks are reported to the BlockManagerMaster.

MapOutputTrackers is a simpler component that keeps track of the location of the output of the map stage, so that workers running the reduce stage knows which machines to pull the data from. That also has the master-worker component - master has the full knowledge of the mapoutput and the worker component on-demand pulls that knowledge from the master component when the reduce tasks are executed on the worker.

 

3. Can someone explain the usage of cache w.r.t spark streaming? For example if we do stream.cache(), will the cache remain constant with all the partitions of RDDs present across the nodes for that stream, OR will it be regularly updated as in while new batch is coming?

If you call DStream.persist (persist == cache = true), then all RDDs generated by the DStream will be persisted in the cache (in the BlockManager). As new RDDs are generated and persisted, old RDDs from the same DStream will fall out of memory. either by LRU or explicitly if spark.streaming.unpersist is set to true. 
 
Thanks,
--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Reply | Threaded
Open this post in threaded view
|

Re: Spark streaming questions

Sourav Chandra
I increased the reduceByKeyAndWindow numPartitions parameter to 12 but still it shows 3/4 tasks for combineByKey stage, though i can see foreach stage has 12 tasks.

I am still unable to understand why it is so.

Also I find some apply stage in the UI. I have attached both stage details and stage ui snapshots for your reference. Can you explain what this stage is?

Thanks,
Sourav


On Fri, Feb 14, 2014 at 1:20 PM, Tathagata Das <[hidden email]> wrote:



On Thu, Feb 13, 2014 at 10:27 PM, Sourav Chandra <[hidden email]> wrote:
1. We are not setting any storage explicitly, hence I assume its using defualt policy for streaming i.e. MEMORY_ONLY_SER and as its is a NetworkStream it should be replicated. Correct me if I am wrong
No it wont. Use MEMORY_ONLY_2.
 
2. reduceByKeyAndWindow is called with 8 (we have 8 core machine per worker)
You have to play around with this. But it should be comparable to the number of cores in the cluster. But if this number if too big, then performance may go down. So there is a sweet spot, you have to figure it out by testing. 
 

3. Batchduration for streaming context is set to 1 sec. I tried setting to 500 milli but did not help

In the ui, only 2 types of stages are present - combineByKey and foreach. And combineByKey is taking much time compared to foreach

By looking at stage ui as you suggested, i can see though foreach stage has 8 tasks combineByKey is having only 3/4 tasks. I assume the tasks are per core which implies combineByKey is not utilizinfg all cores.
What could be the reason for this?

 I have attached the stage ui with sorted duration column

Well it is clear that the combineByKey is taking the most amount of time and 7 seconds. So you need to increase the number of reducers in the reduceByKeyAndWindow operation. That should distribute the computation more to use all the cores, and therefore speed up the processing of each batch. However you have to set the batch interval such that batch interval > processing time of each batch. Otherwise, the system is not able to process as fast as batches of data are accumulating, so it is constantly getting backlogged. So try increasing the number of reducers as well as increasing the batch interval. 

Also you can monitor the batch processing times and end-to-end delay using the StreamingListener interface (see StreamingContext.addStreamingListener in Spark 0.9). if the batch interval is not large enough you will find that the the latency found with a streaming listener will keep growing. 

Hope this helps.

TD
 




On Fri, Feb 14, 2014 at 10:56 AM, Tathagata Das <[hidden email]> wrote:
Can you tell me more about the structure of your program? As in 
1) What storage levels are you using in the input stream?
2) How many reducers are using for the reduceByKeyAndWindow?
3) Batch interval and processing times seen with one machine vs two machines. 

A good place to start debugging is the Spark web ui for the Spark streaming application. It should running on the master at port 4040. There if you look at the stage you should see patterns of stages repeatedly. You can figure out the number of tasks in each stage, which stage is taking the most amount of time (and is therefore the bottleneck) etc. You can drill down and see where the tasks are running, is it using the 32 slots in the new machine or not. 

TD


On Thu, Feb 13, 2014 at 6:29 PM, Sourav Chandra <[hidden email]> wrote:
Thanks TD.

One more question:

We are building real time analytics using spark streaming - We read from kafka, process it (flatMap -> Map -> reduceByKeyAndWindow -> filter) and then save to Cassandra (using DStream.foreachRDD).
Initially I used a machine with 32 cores, 32 GB and performed load testing. with 1 master and 1 worker. in the same box. Later I added one more box and launched worker on that box (32 core 16GB). I set spark.executor.memory=10G in driver program

I expected the performance should increase linearly as mentioned in spark streaming video but it did not help.

Can you please explain why it is so? Also how can we increase?

Thanks,
Sourav





On Fri, Feb 14, 2014 at 7:50 AM, Tathagata Das <[hidden email]> wrote:
Answers inline. Hope these answer your questions.

TD


On Thu, Feb 13, 2014 at 5:49 PM, Sourav Chandra <[hidden email]> wrote:
HI,

I have couple of questions:

1. While going through the spark-streaming code, I found out there is one configuration in JobScheduler/Generator (spark.streaming.concurrentJobs) which is set to 1. There is no documentation for this parameter. After setting this to 1000 in driver program, our streaming application's performance is improved.

That is a parameter that allows Spark Stremaing to launch multiple Spark jobs simultaneously. While it can improve the performance in many scenarios (as it has in your case), it can actually increase the processing time of each batch and increase end-to-end latency in certain scenarios. So it is something that needs to be used with caution. That said, we should have definitely exposed it in the documentation.  
 
What is this variable used for? Is it safe to use/tweak this parameter?

2. Can someone explain the usage of MapOutputTracker, BlockManager component. I have gone through the youtube video of Matei about spark internals but this was not covered in detail.

I am not sure if there is a detailed document anywhere that explains but I can give you a high level overview of the both.

BlockManager is like a distributed key-value store for large blobs (called blocks) of data. It has a master-worker architecture (loosely it is like the HDFS file system) where the BlockManager at the workers store the data blocks and BlockManagerMaster stores the metadata for what blocks are stored where. All the cached RDD's partitions and shuffle data are stored and managed by the BlockManager. It also transfers the blocks between the workers as needed (shuffles etc all happen through the block manager). Specifically for spark streaming, the data received from outside is stored in the BlockManager of the worker nodes, and the IDs of the blocks are reported to the BlockManagerMaster.

MapOutputTrackers is a simpler component that keeps track of the location of the output of the map stage, so that workers running the reduce stage knows which machines to pull the data from. That also has the master-worker component - master has the full knowledge of the mapoutput and the worker component on-demand pulls that knowledge from the master component when the reduce tasks are executed on the worker.

 

3. Can someone explain the usage of cache w.r.t spark streaming? For example if we do stream.cache(), will the cache remain constant with all the partitions of RDDs present across the nodes for that stream, OR will it be regularly updated as in while new batch is coming?

If you call DStream.persist (persist == cache = true), then all RDDs generated by the DStream will be persisted in the cache (in the BlockManager). As new RDDs are generated and persisted, old RDDs from the same DStream will fall out of memory. either by LRU or explicitly if spark.streaming.unpersist is set to true. 
 
Thanks,
--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


apply_stage.jpg (29K) Download Attachment
apply_stage_details.jpg (151K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Spark streaming questions

Sourav Chandra
Also regarding storage policy, as storage policy is set to MEMORY_ONLY_2, if rdd cannot be fitted in memory will it be flushed to disk or not? 


On Fri, Feb 14, 2014 at 1:56 PM, Sourav Chandra <[hidden email]> wrote:
I increased the reduceByKeyAndWindow numPartitions parameter to 12 but still it shows 3/4 tasks for combineByKey stage, though i can see foreach stage has 12 tasks.

I am still unable to understand why it is so.

Also I find some apply stage in the UI. I have attached both stage details and stage ui snapshots for your reference. Can you explain what this stage is?

Thanks,
Sourav


On Fri, Feb 14, 2014 at 1:20 PM, Tathagata Das <[hidden email]> wrote:



On Thu, Feb 13, 2014 at 10:27 PM, Sourav Chandra <[hidden email]> wrote:
1. We are not setting any storage explicitly, hence I assume its using defualt policy for streaming i.e. MEMORY_ONLY_SER and as its is a NetworkStream it should be replicated. Correct me if I am wrong
No it wont. Use MEMORY_ONLY_2.
 
2. reduceByKeyAndWindow is called with 8 (we have 8 core machine per worker)
You have to play around with this. But it should be comparable to the number of cores in the cluster. But if this number if too big, then performance may go down. So there is a sweet spot, you have to figure it out by testing. 
 

3. Batchduration for streaming context is set to 1 sec. I tried setting to 500 milli but did not help

In the ui, only 2 types of stages are present - combineByKey and foreach. And combineByKey is taking much time compared to foreach

By looking at stage ui as you suggested, i can see though foreach stage has 8 tasks combineByKey is having only 3/4 tasks. I assume the tasks are per core which implies combineByKey is not utilizinfg all cores.
What could be the reason for this?

 I have attached the stage ui with sorted duration column

Well it is clear that the combineByKey is taking the most amount of time and 7 seconds. So you need to increase the number of reducers in the reduceByKeyAndWindow operation. That should distribute the computation more to use all the cores, and therefore speed up the processing of each batch. However you have to set the batch interval such that batch interval > processing time of each batch. Otherwise, the system is not able to process as fast as batches of data are accumulating, so it is constantly getting backlogged. So try increasing the number of reducers as well as increasing the batch interval. 

Also you can monitor the batch processing times and end-to-end delay using the StreamingListener interface (see StreamingContext.addStreamingListener in Spark 0.9). if the batch interval is not large enough you will find that the the latency found with a streaming listener will keep growing. 

Hope this helps.

TD
 




On Fri, Feb 14, 2014 at 10:56 AM, Tathagata Das <[hidden email]> wrote:
Can you tell me more about the structure of your program? As in 
1) What storage levels are you using in the input stream?
2) How many reducers are using for the reduceByKeyAndWindow?
3) Batch interval and processing times seen with one machine vs two machines. 

A good place to start debugging is the Spark web ui for the Spark streaming application. It should running on the master at port 4040. There if you look at the stage you should see patterns of stages repeatedly. You can figure out the number of tasks in each stage, which stage is taking the most amount of time (and is therefore the bottleneck) etc. You can drill down and see where the tasks are running, is it using the 32 slots in the new machine or not. 

TD


On Thu, Feb 13, 2014 at 6:29 PM, Sourav Chandra <[hidden email]> wrote:
Thanks TD.

One more question:

We are building real time analytics using spark streaming - We read from kafka, process it (flatMap -> Map -> reduceByKeyAndWindow -> filter) and then save to Cassandra (using DStream.foreachRDD).
Initially I used a machine with 32 cores, 32 GB and performed load testing. with 1 master and 1 worker. in the same box. Later I added one more box and launched worker on that box (32 core 16GB). I set spark.executor.memory=10G in driver program

I expected the performance should increase linearly as mentioned in spark streaming video but it did not help.

Can you please explain why it is so? Also how can we increase?

Thanks,
Sourav





On Fri, Feb 14, 2014 at 7:50 AM, Tathagata Das <[hidden email]> wrote:
Answers inline. Hope these answer your questions.

TD


On Thu, Feb 13, 2014 at 5:49 PM, Sourav Chandra <[hidden email]> wrote:
HI,

I have couple of questions:

1. While going through the spark-streaming code, I found out there is one configuration in JobScheduler/Generator (spark.streaming.concurrentJobs) which is set to 1. There is no documentation for this parameter. After setting this to 1000 in driver program, our streaming application's performance is improved.

That is a parameter that allows Spark Stremaing to launch multiple Spark jobs simultaneously. While it can improve the performance in many scenarios (as it has in your case), it can actually increase the processing time of each batch and increase end-to-end latency in certain scenarios. So it is something that needs to be used with caution. That said, we should have definitely exposed it in the documentation.  
 
What is this variable used for? Is it safe to use/tweak this parameter?

2. Can someone explain the usage of MapOutputTracker, BlockManager component. I have gone through the youtube video of Matei about spark internals but this was not covered in detail.

I am not sure if there is a detailed document anywhere that explains but I can give you a high level overview of the both.

BlockManager is like a distributed key-value store for large blobs (called blocks) of data. It has a master-worker architecture (loosely it is like the HDFS file system) where the BlockManager at the workers store the data blocks and BlockManagerMaster stores the metadata for what blocks are stored where. All the cached RDD's partitions and shuffle data are stored and managed by the BlockManager. It also transfers the blocks between the workers as needed (shuffles etc all happen through the block manager). Specifically for spark streaming, the data received from outside is stored in the BlockManager of the worker nodes, and the IDs of the blocks are reported to the BlockManagerMaster.

MapOutputTrackers is a simpler component that keeps track of the location of the output of the map stage, so that workers running the reduce stage knows which machines to pull the data from. That also has the master-worker component - master has the full knowledge of the mapoutput and the worker component on-demand pulls that knowledge from the master component when the reduce tasks are executed on the worker.

 

3. Can someone explain the usage of cache w.r.t spark streaming? For example if we do stream.cache(), will the cache remain constant with all the partitions of RDDs present across the nodes for that stream, OR will it be regularly updated as in while new batch is coming?

If you call DStream.persist (persist == cache = true), then all RDDs generated by the DStream will be persisted in the cache (in the BlockManager). As new RDDs are generated and persisted, old RDDs from the same DStream will fall out of memory. either by LRU or explicitly if spark.streaming.unpersist is set to true. 
 
Thanks,
--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com

Reply | Threaded
Open this post in threaded view
|

Re: Spark streaming questions

Pankaj Mittal
In reply to this post by Tathagata Das
Hi TD,
There is no persist method which accepts boolean. There is only persist(MEMORY_LEVEL) or default persist.
I have a question, RDDs remain in cache for some remember time which is initialised to slide duration, but is it possible to set this to let's say an hour without changing slide duration ?

Thanks
Pankaj



On Fri, Feb 14, 2014 at 7:50 AM, Tathagata Das <[hidden email]> wrote:
Answers inline. Hope these answer your questions.

TD


On Thu, Feb 13, 2014 at 5:49 PM, Sourav Chandra <[hidden email]> wrote:
HI,

I have couple of questions:

1. While going through the spark-streaming code, I found out there is one configuration in JobScheduler/Generator (spark.streaming.concurrentJobs) which is set to 1. There is no documentation for this parameter. After setting this to 1000 in driver program, our streaming application's performance is improved.

That is a parameter that allows Spark Stremaing to launch multiple Spark jobs simultaneously. While it can improve the performance in many scenarios (as it has in your case), it can actually increase the processing time of each batch and increase end-to-end latency in certain scenarios. So it is something that needs to be used with caution. That said, we should have definitely exposed it in the documentation.  
 
What is this variable used for? Is it safe to use/tweak this parameter?

2. Can someone explain the usage of MapOutputTracker, BlockManager component. I have gone through the youtube video of Matei about spark internals but this was not covered in detail.

I am not sure if there is a detailed document anywhere that explains but I can give you a high level overview of the both.

BlockManager is like a distributed key-value store for large blobs (called blocks) of data. It has a master-worker architecture (loosely it is like the HDFS file system) where the BlockManager at the workers store the data blocks and BlockManagerMaster stores the metadata for what blocks are stored where. All the cached RDD's partitions and shuffle data are stored and managed by the BlockManager. It also transfers the blocks between the workers as needed (shuffles etc all happen through the block manager). Specifically for spark streaming, the data received from outside is stored in the BlockManager of the worker nodes, and the IDs of the blocks are reported to the BlockManagerMaster.

MapOutputTrackers is a simpler component that keeps track of the location of the output of the map stage, so that workers running the reduce stage knows which machines to pull the data from. That also has the master-worker component - master has the full knowledge of the mapoutput and the worker component on-demand pulls that knowledge from the master component when the reduce tasks are executed on the worker.

 

3. Can someone explain the usage of cache w.r.t spark streaming? For example if we do stream.cache(), will the cache remain constant with all the partitions of RDDs present across the nodes for that stream, OR will it be regularly updated as in while new batch is coming?

If you call DStream.persist (persist == cache = true), then all RDDs generated by the DStream will be persisted in the cache (in the BlockManager). As new RDDs are generated and persisted, old RDDs from the same DStream will fall out of memory. either by LRU or explicitly if spark.streaming.unpersist is set to true. 
 
Thanks,
--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com



Reply | Threaded
Open this post in threaded view
|

Re: Spark streaming questions

Sourav Chandra
Hi TD,

I have kept running the streaming application for ~1hr though there is no messages present in Kafka , just to check the memory usage and all and then found out the stages have started failing (with exception java.io.FileNotFoundException (java.io.FileNotFoundException: http://10.10.127.230:57124/broadcast_1)) and there are 1000 active stages

Questions:
 1. Why it suddenly started failing and not able to find broadcast _1 file? Is there any background cleanup causes this? How can we overcome this?
 2. Is the 1000 actve stages are because of spark.streaming.concurrentJobs parameter?
 3. Why these stages are in hanging state (the ui showing no tasks started)?
     Shouldn't these also fail? what is the logic behind this?
 4. Why taks:Succeed:Total in failed stages showing like (0/12)(30 failed)  I can understand it has total 12 tasks and none succeeded. From where its getting the 30 failed? Is it internal retry. If so why it is not same for all other failed stages/
 
I have attached the snapshots.
 



On Fri, Feb 14, 2014 at 2:35 PM, Pankaj Mittal <[hidden email]> wrote:
Hi TD,
There is no persist method which accepts boolean. There is only persist(MEMORY_LEVEL) or default persist.
I have a question, RDDs remain in cache for some remember time which is initialised to slide duration, but is it possible to set this to let's say an hour without changing slide duration ?

Thanks
Pankaj



On Fri, Feb 14, 2014 at 7:50 AM, Tathagata Das <[hidden email]> wrote:
Answers inline. Hope these answer your questions.

TD


On Thu, Feb 13, 2014 at 5:49 PM, Sourav Chandra <[hidden email]> wrote:
HI,

I have couple of questions:

1. While going through the spark-streaming code, I found out there is one configuration in JobScheduler/Generator (spark.streaming.concurrentJobs) which is set to 1. There is no documentation for this parameter. After setting this to 1000 in driver program, our streaming application's performance is improved.

That is a parameter that allows Spark Stremaing to launch multiple Spark jobs simultaneously. While it can improve the performance in many scenarios (as it has in your case), it can actually increase the processing time of each batch and increase end-to-end latency in certain scenarios. So it is something that needs to be used with caution. That said, we should have definitely exposed it in the documentation.  
 
What is this variable used for? Is it safe to use/tweak this parameter?

2. Can someone explain the usage of MapOutputTracker, BlockManager component. I have gone through the youtube video of Matei about spark internals but this was not covered in detail.

I am not sure if there is a detailed document anywhere that explains but I can give you a high level overview of the both.

BlockManager is like a distributed key-value store for large blobs (called blocks) of data. It has a master-worker architecture (loosely it is like the HDFS file system) where the BlockManager at the workers store the data blocks and BlockManagerMaster stores the metadata for what blocks are stored where. All the cached RDD's partitions and shuffle data are stored and managed by the BlockManager. It also transfers the blocks between the workers as needed (shuffles etc all happen through the block manager). Specifically for spark streaming, the data received from outside is stored in the BlockManager of the worker nodes, and the IDs of the blocks are reported to the BlockManagerMaster.

MapOutputTrackers is a simpler component that keeps track of the location of the output of the map stage, so that workers running the reduce stage knows which machines to pull the data from. That also has the master-worker component - master has the full knowledge of the mapoutput and the worker component on-demand pulls that knowledge from the master component when the reduce tasks are executed on the worker.

 

3. Can someone explain the usage of cache w.r.t spark streaming? For example if we do stream.cache(), will the cache remain constant with all the partitions of RDDs present across the nodes for that stream, OR will it be regularly updated as in while new batch is coming?

If you call DStream.persist (persist == cache = true), then all RDDs generated by the DStream will be persisted in the cache (in the BlockManager). As new RDDs are generated and persisted, old RDDs from the same DStream will fall out of memory. either by LRU or explicitly if spark.streaming.unpersist is set to true. 
 
Thanks,
--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com






--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


failure.jpg (14K) Download Attachment
failure2.jpg (156K) Download Attachment
failure3.jpg (185K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Spark streaming questions

Sourav Chandra
Hi TD,

I think the FileNotFound is due to spark.cleaner.ttl parameter which is set to 3600 sec i.e. 1 hour. Thats why the temp metadata files are deleted. 

Please correct me if I am wrong. Also If that is the case why it did not download again and create the file? Is is because our application is doing nothing i.e. no messages from kafka?

Will it be downloaded if application again start receiving data?

Thanks,
Sourav


On Fri, Feb 14, 2014 at 2:55 PM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

I have kept running the streaming application for ~1hr though there is no messages present in Kafka , just to check the memory usage and all and then found out the stages have started failing (with exception java.io.FileNotFoundException (java.io.FileNotFoundException: http://10.10.127.230:57124/broadcast_1)) and there are 1000 active stages

Questions:
 1. Why it suddenly started failing and not able to find broadcast _1 file? Is there any background cleanup causes this? How can we overcome this?
 2. Is the 1000 actve stages are because of spark.streaming.concurrentJobs parameter?
 3. Why these stages are in hanging state (the ui showing no tasks started)?
     Shouldn't these also fail? what is the logic behind this?
 4. Why taks:Succeed:Total in failed stages showing like (0/12)(30 failed)  I can understand it has total 12 tasks and none succeeded. From where its getting the 30 failed? Is it internal retry. If so why it is not same for all other failed stages/
 
I have attached the snapshots.
 



On Fri, Feb 14, 2014 at 2:35 PM, Pankaj Mittal <[hidden email]> wrote:
Hi TD,
There is no persist method which accepts boolean. There is only persist(MEMORY_LEVEL) or default persist.
I have a question, RDDs remain in cache for some remember time which is initialised to slide duration, but is it possible to set this to let's say an hour without changing slide duration ?

Thanks
Pankaj



On Fri, Feb 14, 2014 at 7:50 AM, Tathagata Das <[hidden email]> wrote:
Answers inline. Hope these answer your questions.

TD


On Thu, Feb 13, 2014 at 5:49 PM, Sourav Chandra <[hidden email]> wrote:
HI,

I have couple of questions:

1. While going through the spark-streaming code, I found out there is one configuration in JobScheduler/Generator (spark.streaming.concurrentJobs) which is set to 1. There is no documentation for this parameter. After setting this to 1000 in driver program, our streaming application's performance is improved.

That is a parameter that allows Spark Stremaing to launch multiple Spark jobs simultaneously. While it can improve the performance in many scenarios (as it has in your case), it can actually increase the processing time of each batch and increase end-to-end latency in certain scenarios. So it is something that needs to be used with caution. That said, we should have definitely exposed it in the documentation.  
 
What is this variable used for? Is it safe to use/tweak this parameter?

2. Can someone explain the usage of MapOutputTracker, BlockManager component. I have gone through the youtube video of Matei about spark internals but this was not covered in detail.

I am not sure if there is a detailed document anywhere that explains but I can give you a high level overview of the both.

BlockManager is like a distributed key-value store for large blobs (called blocks) of data. It has a master-worker architecture (loosely it is like the HDFS file system) where the BlockManager at the workers store the data blocks and BlockManagerMaster stores the metadata for what blocks are stored where. All the cached RDD's partitions and shuffle data are stored and managed by the BlockManager. It also transfers the blocks between the workers as needed (shuffles etc all happen through the block manager). Specifically for spark streaming, the data received from outside is stored in the BlockManager of the worker nodes, and the IDs of the blocks are reported to the BlockManagerMaster.

MapOutputTrackers is a simpler component that keeps track of the location of the output of the map stage, so that workers running the reduce stage knows which machines to pull the data from. That also has the master-worker component - master has the full knowledge of the mapoutput and the worker component on-demand pulls that knowledge from the master component when the reduce tasks are executed on the worker.

 

3. Can someone explain the usage of cache w.r.t spark streaming? For example if we do stream.cache(), will the cache remain constant with all the partitions of RDDs present across the nodes for that stream, OR will it be regularly updated as in while new batch is coming?

If you call DStream.persist (persist == cache = true), then all RDDs generated by the DStream will be persisted in the cache (in the BlockManager). As new RDDs are generated and persisted, old RDDs from the same DStream will fall out of memory. either by LRU or explicitly if spark.streaming.unpersist is set to true. 
 
Thanks,
--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com






--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com

Reply | Threaded
Open this post in threaded view
|

Re: Spark streaming questions

Tathagata Das
Okay, thats a lots of mails to respond to! Let me try to do it point by point. I hope I cover all of the raised concerns.

1. STAGE PARALLELISM: I was confused about the stages. Yes, increasing the number of reducers to 12 should increase the tasks for the stage marked as "foreach" (thats the reduce stage, bad naming). To increase the parallelism of the map stage, you can do two things
  (i) First repartition the data to larger number of partitions and then apply rest of the computation. For example if you were doing kafkaStream.map(....).reduceByKeyAndWindow(....), you can do kafkaStream.repartition(20).map(...).reduceByKeyAndWindow(...). 
 (ii) You can also try setting the spark.streaming.blockInterval configuration. This configuration decides how many blocks of data is created with received data every second. Default is 200ms, so it makes 4-5 blocks per second. You can either increase the batch interval or reduce the block interval. 

2. APPLY STAGE: I am not entirely sure what that stage is without looking at all Spark and Spark Streaming the operations that you are doing in your program. And a large snapshot of the stages UI.

3. PERSIST LEVEL: DStream has two functions - persist(), which has the default StorageLevel of MEMORY_ONLY_SER, and persist(StorageLevel...... ) where you can specify the storage level. When you use StorageLevel.MEMORY_ONLY_SER or MEMORY_ONLY_SER_2 (that is without disk in it), it wont fall off to disk. It will just be lost. To fall of to disk you have to use MEMORY_AND_DISK_SER or MEMORY_AND_DISK_SER_2. Note that, SER = keep data serialized, good for GC behavior (see programming guide), and _2 = replicate twice.

4. BROADCAST FAILURE: 
When the cleaner ttl is set, everything gets cleaned, including broadcast variables. Hence the file backing the broadcast variable is getting delete, and the tasks are failing. If you are using the same broadcast variable for all batches, it is probably a good idea to re-broadcast the data (thatis, create new broadcast variables with the necessary data) periodically. The period should obviously be less than the ttl. 

5. ACTIVE STAGES: Yes, 1000 means, it can run 1000 jobs in parallel. I am not sure what your usecase actually is that requires running 1000 jobs in parallel? Are you generating 1000 jobs EVERY batch? If you are generating N jobs every batch, then makes sense to have the concurrentJobs set to around N, maybe up to 2 * N. 

6: 30 failed: probably considers the multiple attempts for each failed tasks.



Hope this helps.


TD










On Fri, Feb 14, 2014 at 2:08 AM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

I think the FileNotFound is due to spark.cleaner.ttl parameter which is set to 3600 sec i.e. 1 hour. Thats why the temp metadata files are deleted. 

Please correct me if I am wrong. Also If that is the case why it did not download again and create the file? Is is because our application is doing nothing i.e. no messages from kafka?

Will it be downloaded if application again start receiving data?

Thanks,
Sourav


On Fri, Feb 14, 2014 at 2:55 PM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

I have kept running the streaming application for ~1hr though there is no messages present in Kafka , just to check the memory usage and all and then found out the stages have started failing (with exception java.io.FileNotFoundException (java.io.FileNotFoundException: http://10.10.127.230:57124/broadcast_1)) and there are 1000 active stages

Questions:
 1. Why it suddenly started failing and not able to find broadcast _1 file? Is there any background cleanup causes this? How can we overcome this?
 2. Is the 1000 actve stages are because of spark.streaming.concurrentJobs parameter?
 3. Why these stages are in hanging state (the ui showing no tasks started)?
     Shouldn't these also fail? what is the logic behind this?
 4. Why taks:Succeed:Total in failed stages showing like (0/12)(30 failed)  I can understand it has total 12 tasks and none succeeded. From where its getting the 30 failed? Is it internal retry. If so why it is not same for all other failed stages/
 
I have attached the snapshots.
 



On Fri, Feb 14, 2014 at 2:35 PM, Pankaj Mittal <[hidden email]> wrote:
Hi TD,
There is no persist method which accepts boolean. There is only persist(MEMORY_LEVEL) or default persist.
I have a question, RDDs remain in cache for some remember time which is initialised to slide duration, but is it possible to set this to let's say an hour without changing slide duration ?

Thanks
Pankaj



On Fri, Feb 14, 2014 at 7:50 AM, Tathagata Das <[hidden email]> wrote:
Answers inline. Hope these answer your questions.

TD


On Thu, Feb 13, 2014 at 5:49 PM, Sourav Chandra <[hidden email]> wrote:
HI,

I have couple of questions:

1. While going through the spark-streaming code, I found out there is one configuration in JobScheduler/Generator (spark.streaming.concurrentJobs) which is set to 1. There is no documentation for this parameter. After setting this to 1000 in driver program, our streaming application's performance is improved.

That is a parameter that allows Spark Stremaing to launch multiple Spark jobs simultaneously. While it can improve the performance in many scenarios (as it has in your case), it can actually increase the processing time of each batch and increase end-to-end latency in certain scenarios. So it is something that needs to be used with caution. That said, we should have definitely exposed it in the documentation.  
 
What is this variable used for? Is it safe to use/tweak this parameter?

2. Can someone explain the usage of MapOutputTracker, BlockManager component. I have gone through the youtube video of Matei about spark internals but this was not covered in detail.

I am not sure if there is a detailed document anywhere that explains but I can give you a high level overview of the both.

BlockManager is like a distributed key-value store for large blobs (called blocks) of data. It has a master-worker architecture (loosely it is like the HDFS file system) where the BlockManager at the workers store the data blocks and BlockManagerMaster stores the metadata for what blocks are stored where. All the cached RDD's partitions and shuffle data are stored and managed by the BlockManager. It also transfers the blocks between the workers as needed (shuffles etc all happen through the block manager). Specifically for spark streaming, the data received from outside is stored in the BlockManager of the worker nodes, and the IDs of the blocks are reported to the BlockManagerMaster.

MapOutputTrackers is a simpler component that keeps track of the location of the output of the map stage, so that workers running the reduce stage knows which machines to pull the data from. That also has the master-worker component - master has the full knowledge of the mapoutput and the worker component on-demand pulls that knowledge from the master component when the reduce tasks are executed on the worker.

 

3. Can someone explain the usage of cache w.r.t spark streaming? For example if we do stream.cache(), will the cache remain constant with all the partitions of RDDs present across the nodes for that stream, OR will it be regularly updated as in while new batch is coming?

If you call DStream.persist (persist == cache = true), then all RDDs generated by the DStream will be persisted in the cache (in the BlockManager). As new RDDs are generated and persisted, old RDDs from the same DStream will fall out of memory. either by LRU or explicitly if spark.streaming.unpersist is set to true. 
 
Thanks,
--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com






--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Reply | Threaded
Open this post in threaded view
|

Re: Spark streaming questions

Sourav Chandra
Hi TD,

Thanks a lot for going through all the questions scatted across the mails and answering each one of them. Much appreciated

I will get back with more details of code, stage ui once I am in office on Monday.

BTW, if I re-broadcast i.e. creating broadcast variables again in some timer thread will this be reflected in the closures passed inside the transformations? As i read somewhere spark will do some closure cleanup before actually sending them to other components?

Thanks,
Sourav


On Sat, Feb 15, 2014 at 5:31 AM, Tathagata Das <[hidden email]> wrote:
Okay, thats a lots of mails to respond to! Let me try to do it point by point. I hope I cover all of the raised concerns.

1. STAGE PARALLELISM: I was confused about the stages. Yes, increasing the number of reducers to 12 should increase the tasks for the stage marked as "foreach" (thats the reduce stage, bad naming). To increase the parallelism of the map stage, you can do two things
  (i) First repartition the data to larger number of partitions and then apply rest of the computation. For example if you were doing kafkaStream.map(....).reduceByKeyAndWindow(....), you can do kafkaStream.repartition(20).map(...).reduceByKeyAndWindow(...). 
 (ii) You can also try setting the spark.streaming.blockInterval configuration. This configuration decides how many blocks of data is created with received data every second. Default is 200ms, so it makes 4-5 blocks per second. You can either increase the batch interval or reduce the block interval. 

2. APPLY STAGE: I am not entirely sure what that stage is without looking at all Spark and Spark Streaming the operations that you are doing in your program. And a large snapshot of the stages UI.

3. PERSIST LEVEL: DStream has two functions - persist(), which has the default StorageLevel of MEMORY_ONLY_SER, and persist(StorageLevel...... ) where you can specify the storage level. When you use StorageLevel.MEMORY_ONLY_SER or MEMORY_ONLY_SER_2 (that is without disk in it), it wont fall off to disk. It will just be lost. To fall of to disk you have to use MEMORY_AND_DISK_SER or MEMORY_AND_DISK_SER_2. Note that, SER = keep data serialized, good for GC behavior (see programming guide), and _2 = replicate twice.

4. BROADCAST FAILURE: 
When the cleaner ttl is set, everything gets cleaned, including broadcast variables. Hence the file backing the broadcast variable is getting delete, and the tasks are failing. If you are using the same broadcast variable for all batches, it is probably a good idea to re-broadcast the data (thatis, create new broadcast variables with the necessary data) periodically. The period should obviously be less than the ttl. 

5. ACTIVE STAGES: Yes, 1000 means, it can run 1000 jobs in parallel. I am not sure what your usecase actually is that requires running 1000 jobs in parallel? Are you generating 1000 jobs EVERY batch? If you are generating N jobs every batch, then makes sense to have the concurrentJobs set to around N, maybe up to 2 * N. 

6: 30 failed: probably considers the multiple attempts for each failed tasks.



Hope this helps.


TD










On Fri, Feb 14, 2014 at 2:08 AM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

I think the FileNotFound is due to spark.cleaner.ttl parameter which is set to 3600 sec i.e. 1 hour. Thats why the temp metadata files are deleted. 

Please correct me if I am wrong. Also If that is the case why it did not download again and create the file? Is is because our application is doing nothing i.e. no messages from kafka?

Will it be downloaded if application again start receiving data?

Thanks,
Sourav


On Fri, Feb 14, 2014 at 2:55 PM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

I have kept running the streaming application for ~1hr though there is no messages present in Kafka , just to check the memory usage and all and then found out the stages have started failing (with exception java.io.FileNotFoundException (java.io.FileNotFoundException: http://10.10.127.230:57124/broadcast_1)) and there are 1000 active stages

Questions:
 1. Why it suddenly started failing and not able to find broadcast _1 file? Is there any background cleanup causes this? How can we overcome this?
 2. Is the 1000 actve stages are because of spark.streaming.concurrentJobs parameter?
 3. Why these stages are in hanging state (the ui showing no tasks started)?
     Shouldn't these also fail? what is the logic behind this?
 4. Why taks:Succeed:Total in failed stages showing like (0/12)(30 failed)  I can understand it has total 12 tasks and none succeeded. From where its getting the 30 failed? Is it internal retry. If so why it is not same for all other failed stages/
 
I have attached the snapshots.
 



On Fri, Feb 14, 2014 at 2:35 PM, Pankaj Mittal <[hidden email]> wrote:
Hi TD,
There is no persist method which accepts boolean. There is only persist(MEMORY_LEVEL) or default persist.
I have a question, RDDs remain in cache for some remember time which is initialised to slide duration, but is it possible to set this to let's say an hour without changing slide duration ?

Thanks
Pankaj



On Fri, Feb 14, 2014 at 7:50 AM, Tathagata Das <[hidden email]> wrote:
Answers inline. Hope these answer your questions.

TD


On Thu, Feb 13, 2014 at 5:49 PM, Sourav Chandra <[hidden email]> wrote:
HI,

I have couple of questions:

1. While going through the spark-streaming code, I found out there is one configuration in JobScheduler/Generator (spark.streaming.concurrentJobs) which is set to 1. There is no documentation for this parameter. After setting this to 1000 in driver program, our streaming application's performance is improved.

That is a parameter that allows Spark Stremaing to launch multiple Spark jobs simultaneously. While it can improve the performance in many scenarios (as it has in your case), it can actually increase the processing time of each batch and increase end-to-end latency in certain scenarios. So it is something that needs to be used with caution. That said, we should have definitely exposed it in the documentation.  
 
What is this variable used for? Is it safe to use/tweak this parameter?

2. Can someone explain the usage of MapOutputTracker, BlockManager component. I have gone through the youtube video of Matei about spark internals but this was not covered in detail.

I am not sure if there is a detailed document anywhere that explains but I can give you a high level overview of the both.

BlockManager is like a distributed key-value store for large blobs (called blocks) of data. It has a master-worker architecture (loosely it is like the HDFS file system) where the BlockManager at the workers store the data blocks and BlockManagerMaster stores the metadata for what blocks are stored where. All the cached RDD's partitions and shuffle data are stored and managed by the BlockManager. It also transfers the blocks between the workers as needed (shuffles etc all happen through the block manager). Specifically for spark streaming, the data received from outside is stored in the BlockManager of the worker nodes, and the IDs of the blocks are reported to the BlockManagerMaster.

MapOutputTrackers is a simpler component that keeps track of the location of the output of the map stage, so that workers running the reduce stage knows which machines to pull the data from. That also has the master-worker component - master has the full knowledge of the mapoutput and the worker component on-demand pulls that knowledge from the master component when the reduce tasks are executed on the worker.

 

3. Can someone explain the usage of cache w.r.t spark streaming? For example if we do stream.cache(), will the cache remain constant with all the partitions of RDDs present across the nodes for that stream, OR will it be regularly updated as in while new batch is coming?

If you call DStream.persist (persist == cache = true), then all RDDs generated by the DStream will be persisted in the cache (in the BlockManager). As new RDDs are generated and persisted, old RDDs from the same DStream will fall out of memory. either by LRU or explicitly if spark.streaming.unpersist is set to true. 
 
Thanks,
--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com






--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com

Reply | Threaded
Open this post in threaded view
|

Re: Spark streaming questions

Tathagata Das
Depends on how you are using the broadcast variables. Can you give a high level overview of what DStream operations you are using and where does the broadcast variable get used?

TD


On Fri, Feb 14, 2014 at 7:22 PM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

Thanks a lot for going through all the questions scatted across the mails and answering each one of them. Much appreciated

I will get back with more details of code, stage ui once I am in office on Monday.

BTW, if I re-broadcast i.e. creating broadcast variables again in some timer thread will this be reflected in the closures passed inside the transformations? As i read somewhere spark will do some closure cleanup before actually sending them to other components?

Thanks,
Sourav


On Sat, Feb 15, 2014 at 5:31 AM, Tathagata Das <[hidden email]> wrote:
Okay, thats a lots of mails to respond to! Let me try to do it point by point. I hope I cover all of the raised concerns.

1. STAGE PARALLELISM: I was confused about the stages. Yes, increasing the number of reducers to 12 should increase the tasks for the stage marked as "foreach" (thats the reduce stage, bad naming). To increase the parallelism of the map stage, you can do two things
  (i) First repartition the data to larger number of partitions and then apply rest of the computation. For example if you were doing kafkaStream.map(....).reduceByKeyAndWindow(....), you can do kafkaStream.repartition(20).map(...).reduceByKeyAndWindow(...). 
 (ii) You can also try setting the spark.streaming.blockInterval configuration. This configuration decides how many blocks of data is created with received data every second. Default is 200ms, so it makes 4-5 blocks per second. You can either increase the batch interval or reduce the block interval. 

2. APPLY STAGE: I am not entirely sure what that stage is without looking at all Spark and Spark Streaming the operations that you are doing in your program. And a large snapshot of the stages UI.

3. PERSIST LEVEL: DStream has two functions - persist(), which has the default StorageLevel of MEMORY_ONLY_SER, and persist(StorageLevel...... ) where you can specify the storage level. When you use StorageLevel.MEMORY_ONLY_SER or MEMORY_ONLY_SER_2 (that is without disk in it), it wont fall off to disk. It will just be lost. To fall of to disk you have to use MEMORY_AND_DISK_SER or MEMORY_AND_DISK_SER_2. Note that, SER = keep data serialized, good for GC behavior (see programming guide), and _2 = replicate twice.

4. BROADCAST FAILURE: 
When the cleaner ttl is set, everything gets cleaned, including broadcast variables. Hence the file backing the broadcast variable is getting delete, and the tasks are failing. If you are using the same broadcast variable for all batches, it is probably a good idea to re-broadcast the data (thatis, create new broadcast variables with the necessary data) periodically. The period should obviously be less than the ttl. 

5. ACTIVE STAGES: Yes, 1000 means, it can run 1000 jobs in parallel. I am not sure what your usecase actually is that requires running 1000 jobs in parallel? Are you generating 1000 jobs EVERY batch? If you are generating N jobs every batch, then makes sense to have the concurrentJobs set to around N, maybe up to 2 * N. 

6: 30 failed: probably considers the multiple attempts for each failed tasks.



Hope this helps.


TD










On Fri, Feb 14, 2014 at 2:08 AM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

I think the FileNotFound is due to spark.cleaner.ttl parameter which is set to 3600 sec i.e. 1 hour. Thats why the temp metadata files are deleted. 

Please correct me if I am wrong. Also If that is the case why it did not download again and create the file? Is is because our application is doing nothing i.e. no messages from kafka?

Will it be downloaded if application again start receiving data?

Thanks,
Sourav


On Fri, Feb 14, 2014 at 2:55 PM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

I have kept running the streaming application for ~1hr though there is no messages present in Kafka , just to check the memory usage and all and then found out the stages have started failing (with exception java.io.FileNotFoundException (java.io.FileNotFoundException: http://10.10.127.230:57124/broadcast_1)) and there are 1000 active stages

Questions:
 1. Why it suddenly started failing and not able to find broadcast _1 file? Is there any background cleanup causes this? How can we overcome this?
 2. Is the 1000 actve stages are because of spark.streaming.concurrentJobs parameter?
 3. Why these stages are in hanging state (the ui showing no tasks started)?
     Shouldn't these also fail? what is the logic behind this?
 4. Why taks:Succeed:Total in failed stages showing like (0/12)(30 failed)  I can understand it has total 12 tasks and none succeeded. From where its getting the 30 failed? Is it internal retry. If so why it is not same for all other failed stages/
 
I have attached the snapshots.
 



On Fri, Feb 14, 2014 at 2:35 PM, Pankaj Mittal <[hidden email]> wrote:
Hi TD,
There is no persist method which accepts boolean. There is only persist(MEMORY_LEVEL) or default persist.
I have a question, RDDs remain in cache for some remember time which is initialised to slide duration, but is it possible to set this to let's say an hour without changing slide duration ?

Thanks
Pankaj



On Fri, Feb 14, 2014 at 7:50 AM, Tathagata Das <[hidden email]> wrote:
Answers inline. Hope these answer your questions.

TD


On Thu, Feb 13, 2014 at 5:49 PM, Sourav Chandra <[hidden email]> wrote:
HI,

I have couple of questions:

1. While going through the spark-streaming code, I found out there is one configuration in JobScheduler/Generator (spark.streaming.concurrentJobs) which is set to 1. There is no documentation for this parameter. After setting this to 1000 in driver program, our streaming application's performance is improved.

That is a parameter that allows Spark Stremaing to launch multiple Spark jobs simultaneously. While it can improve the performance in many scenarios (as it has in your case), it can actually increase the processing time of each batch and increase end-to-end latency in certain scenarios. So it is something that needs to be used with caution. That said, we should have definitely exposed it in the documentation.  
 
What is this variable used for? Is it safe to use/tweak this parameter?

2. Can someone explain the usage of MapOutputTracker, BlockManager component. I have gone through the youtube video of Matei about spark internals but this was not covered in detail.

I am not sure if there is a detailed document anywhere that explains but I can give you a high level overview of the both.

BlockManager is like a distributed key-value store for large blobs (called blocks) of data. It has a master-worker architecture (loosely it is like the HDFS file system) where the BlockManager at the workers store the data blocks and BlockManagerMaster stores the metadata for what blocks are stored where. All the cached RDD's partitions and shuffle data are stored and managed by the BlockManager. It also transfers the blocks between the workers as needed (shuffles etc all happen through the block manager). Specifically for spark streaming, the data received from outside is stored in the BlockManager of the worker nodes, and the IDs of the blocks are reported to the BlockManagerMaster.

MapOutputTrackers is a simpler component that keeps track of the location of the output of the map stage, so that workers running the reduce stage knows which machines to pull the data from. That also has the master-worker component - master has the full knowledge of the mapoutput and the worker component on-demand pulls that knowledge from the master component when the reduce tasks are executed on the worker.

 

3. Can someone explain the usage of cache w.r.t spark streaming? For example if we do stream.cache(), will the cache remain constant with all the partitions of RDDs present across the nodes for that stream, OR will it be regularly updated as in while new batch is coming?

If you call DStream.persist (persist == cache = true), then all RDDs generated by the DStream will be persisted in the cache (in the BlockManager). As new RDDs are generated and persisted, old RDDs from the same DStream will fall out of memory. either by LRU or explicitly if spark.streaming.unpersist is set to true. 
 
Thanks,
--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com






--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


Reply | Threaded
Open this post in threaded view
|

Re: Spark streaming questions

Sourav Chandra
Hi TD,

Hope you have had a nice weekend. 

I am giving you a brief overview if what we are and we are trying to achieve using spark streaming

We are building a realtime analytics application using spark streaming.
We are internet video broadcasting company and reltime analytics should show no. of likes/comments/concuurent viewers per broadcast happened.

Below is the overview of what we are doing:

Spark properties :
- batch interval is set as 1 second
- spark.executor.memory = 10g
- spark.streaming.concurrentJobs = 1000
- spark.streaming.blockInterval = 100

Create couple of broadcast variable to be used inside the Step 2 below

1. We are reading the analytics trigger messages from kafka using kafkainputstream and then reparitioning as per your suggestion
   val kafkaStream = KafkaUtils.createStream(...).repartition(12)

2. Process the message read form kafka and generates a bunch of related messages for analysis. In this step we use previously created broadcast variables to get metadata about incoming message like - which device it was generated, which country etc.
   val processedStream = kafkaStream.flatMap(...).map(s => (s,1)) // include count = 1 for each of generated message

3. Reducing the stream for last 1 second
   val reducedStream = processedStream.reduceByKeyAndWindow((a:Int,b:Int) => a + b, Seconds(1), Seconds(1), 12).checkpoint(Seconds(10))

4. Filtring out the above reducedStream to get 3 streams out of it - second, muinute and hour resolutioned
   val secStream  = reducedStream.filter(_._1.resolution.label == "second")
   val minStream  = reducedStream.filter(_._1.resolution.label == "minute")
   val hourStream = reducedStream.filter(_._1.resolution.label == "hour")

5. Saving each of stream in cassandra in different tables (for example secStream goes to sec table, minStream goes to min table and so on)
   secStream.foreachRDD(rdd => rdd.foreach(saveToCassandra()))
   ...


Now couple of open points:

1. Once repartiotn is called I observed below things which I need clarification about:
   - Why foreach requires has so many shuffle read/write now? It takes 4-5 seconds more if i use repartition(20).cache() than earlier where I did not use repartition though ican see combineByKey stage has 12 tasks. If I use repartition only it takes almost 1.5 times more than no repartiton. 

2. How can we use broadcast variable? How can we re-submit/re-create the variables. Can you give some example?

3. Still I can see the apply stage on List.scala. What could the reason?

4. Regarding storage level as we are using kafka dstream it is MEMORY_AND_DISK_SER_2 instead of MEMORY_ONLY_2 as per code. Can you confirm this? I got bit cionfused as you jhad mentioned this is MEMORY_ONLY_2

5. Still there is no improvement in performance even thogugh I start more worker process.

I have attached all the relevant snapshots from stage ui for your reference.

Thanks,
Sourav


On Sat, Feb 15, 2014 at 3:55 PM, Tathagata Das <[hidden email]> wrote:
Depends on how you are using the broadcast variables. Can you give a high level overview of what DStream operations you are using and where does the broadcast variable get used?

TD


On Fri, Feb 14, 2014 at 7:22 PM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

Thanks a lot for going through all the questions scatted across the mails and answering each one of them. Much appreciated

I will get back with more details of code, stage ui once I am in office on Monday.

BTW, if I re-broadcast i.e. creating broadcast variables again in some timer thread will this be reflected in the closures passed inside the transformations? As i read somewhere spark will do some closure cleanup before actually sending them to other components?

Thanks,
Sourav


On Sat, Feb 15, 2014 at 5:31 AM, Tathagata Das <[hidden email]> wrote:
Okay, thats a lots of mails to respond to! Let me try to do it point by point. I hope I cover all of the raised concerns.

1. STAGE PARALLELISM: I was confused about the stages. Yes, increasing the number of reducers to 12 should increase the tasks for the stage marked as "foreach" (thats the reduce stage, bad naming). To increase the parallelism of the map stage, you can do two things
  (i) First repartition the data to larger number of partitions and then apply rest of the computation. For example if you were doing kafkaStream.map(....).reduceByKeyAndWindow(....), you can do kafkaStream.repartition(20).map(...).reduceByKeyAndWindow(...). 
 (ii) You can also try setting the spark.streaming.blockInterval configuration. This configuration decides how many blocks of data is created with received data every second. Default is 200ms, so it makes 4-5 blocks per second. You can either increase the batch interval or reduce the block interval. 

2. APPLY STAGE: I am not entirely sure what that stage is without looking at all Spark and Spark Streaming the operations that you are doing in your program. And a large snapshot of the stages UI.

3. PERSIST LEVEL: DStream has two functions - persist(), which has the default StorageLevel of MEMORY_ONLY_SER, and persist(StorageLevel...... ) where you can specify the storage level. When you use StorageLevel.MEMORY_ONLY_SER or MEMORY_ONLY_SER_2 (that is without disk in it), it wont fall off to disk. It will just be lost. To fall of to disk you have to use MEMORY_AND_DISK_SER or MEMORY_AND_DISK_SER_2. Note that, SER = keep data serialized, good for GC behavior (see programming guide), and _2 = replicate twice.

4. BROADCAST FAILURE: 
When the cleaner ttl is set, everything gets cleaned, including broadcast variables. Hence the file backing the broadcast variable is getting delete, and the tasks are failing. If you are using the same broadcast variable for all batches, it is probably a good idea to re-broadcast the data (thatis, create new broadcast variables with the necessary data) periodically. The period should obviously be less than the ttl. 

5. ACTIVE STAGES: Yes, 1000 means, it can run 1000 jobs in parallel. I am not sure what your usecase actually is that requires running 1000 jobs in parallel? Are you generating 1000 jobs EVERY batch? If you are generating N jobs every batch, then makes sense to have the concurrentJobs set to around N, maybe up to 2 * N. 

6: 30 failed: probably considers the multiple attempts for each failed tasks.



Hope this helps.


TD










On Fri, Feb 14, 2014 at 2:08 AM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

I think the FileNotFound is due to spark.cleaner.ttl parameter which is set to 3600 sec i.e. 1 hour. Thats why the temp metadata files are deleted. 

Please correct me if I am wrong. Also If that is the case why it did not download again and create the file? Is is because our application is doing nothing i.e. no messages from kafka?

Will it be downloaded if application again start receiving data?

Thanks,
Sourav


On Fri, Feb 14, 2014 at 2:55 PM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

I have kept running the streaming application for ~1hr though there is no messages present in Kafka , just to check the memory usage and all and then found out the stages have started failing (with exception java.io.FileNotFoundException (java.io.FileNotFoundException: http://10.10.127.230:57124/broadcast_1)) and there are 1000 active stages

Questions:
 1. Why it suddenly started failing and not able to find broadcast _1 file? Is there any background cleanup causes this? How can we overcome this?
 2. Is the 1000 actve stages are because of spark.streaming.concurrentJobs parameter?
 3. Why these stages are in hanging state (the ui showing no tasks started)?
     Shouldn't these also fail? what is the logic behind this?
 4. Why taks:Succeed:Total in failed stages showing like (0/12)(30 failed)  I can understand it has total 12 tasks and none succeeded. From where its getting the 30 failed? Is it internal retry. If so why it is not same for all other failed stages/
 
I have attached the snapshots.
 



On Fri, Feb 14, 2014 at 2:35 PM, Pankaj Mittal <[hidden email]> wrote:
Hi TD,
There is no persist method which accepts boolean. There is only persist(MEMORY_LEVEL) or default persist.
I have a question, RDDs remain in cache for some remember time which is initialised to slide duration, but is it possible to set this to let's say an hour without changing slide duration ?

Thanks
Pankaj



On Fri, Feb 14, 2014 at 7:50 AM, Tathagata Das <[hidden email]> wrote:
Answers inline. Hope these answer your questions.

TD


On Thu, Feb 13, 2014 at 5:49 PM, Sourav Chandra <[hidden email]> wrote:
HI,

I have couple of questions:

1. While going through the spark-streaming code, I found out there is one configuration in JobScheduler/Generator (spark.streaming.concurrentJobs) which is set to 1. There is no documentation for this parameter. After setting this to 1000 in driver program, our streaming application's performance is improved.

That is a parameter that allows Spark Stremaing to launch multiple Spark jobs simultaneously. While it can improve the performance in many scenarios (as it has in your case), it can actually increase the processing time of each batch and increase end-to-end latency in certain scenarios. So it is something that needs to be used with caution. That said, we should have definitely exposed it in the documentation.  
 
What is this variable used for? Is it safe to use/tweak this parameter?

2. Can someone explain the usage of MapOutputTracker, BlockManager component. I have gone through the youtube video of Matei about spark internals but this was not covered in detail.

I am not sure if there is a detailed document anywhere that explains but I can give you a high level overview of the both.

BlockManager is like a distributed key-value store for large blobs (called blocks) of data. It has a master-worker architecture (loosely it is like the HDFS file system) where the BlockManager at the workers store the data blocks and BlockManagerMaster stores the metadata for what blocks are stored where. All the cached RDD's partitions and shuffle data are stored and managed by the BlockManager. It also transfers the blocks between the workers as needed (shuffles etc all happen through the block manager). Specifically for spark streaming, the data received from outside is stored in the BlockManager of the worker nodes, and the IDs of the blocks are reported to the BlockManagerMaster.

MapOutputTrackers is a simpler component that keeps track of the location of the output of the map stage, so that workers running the reduce stage knows which machines to pull the data from. That also has the master-worker component - master has the full knowledge of the mapoutput and the worker component on-demand pulls that knowledge from the master component when the reduce tasks are executed on the worker.

 

3. Can someone explain the usage of cache w.r.t spark streaming? For example if we do stream.cache(), will the cache remain constant with all the partitions of RDDs present across the nodes for that stream, OR will it be regularly updated as in while new batch is coming?

If you call DStream.persist (persist == cache = true), then all RDDs generated by the DStream will be persisted in the cache (in the BlockManager). As new RDDs are generated and persisted, old RDDs from the same DStream will fall out of memory. either by LRU or explicitly if spark.streaming.unpersist is set to true. 
 
Thanks,
--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com






--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


after_repartition.jpg (260K) Download Attachment
foreach_shuffle_read_write.jpg (279K) Download Attachment
most_time_consuming_operations.jpg (309K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Spark streaming questions

Sourav Chandra
I did not see any improvement if we set spark.streaming.blockInterval = 100 and it degrades if I use repartition as mentioned,


On Mon, Feb 17, 2014 at 3:31 PM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

Hope you have had a nice weekend. 

I am giving you a brief overview if what we are and we are trying to achieve using spark streaming

We are building a realtime analytics application using spark streaming.
We are internet video broadcasting company and reltime analytics should show no. of likes/comments/concuurent viewers per broadcast happened.

Below is the overview of what we are doing:

Spark properties :
- batch interval is set as 1 second
- spark.executor.memory = 10g
- spark.streaming.concurrentJobs = 1000
- spark.streaming.blockInterval = 100

Create couple of broadcast variable to be used inside the Step 2 below

1. We are reading the analytics trigger messages from kafka using kafkainputstream and then reparitioning as per your suggestion
   val kafkaStream = KafkaUtils.createStream(...).repartition(12)

2. Process the message read form kafka and generates a bunch of related messages for analysis. In this step we use previously created broadcast variables to get metadata about incoming message like - which device it was generated, which country etc.
   val processedStream = kafkaStream.flatMap(...).map(s => (s,1)) // include count = 1 for each of generated message

3. Reducing the stream for last 1 second
   val reducedStream = processedStream.reduceByKeyAndWindow((a:Int,b:Int) => a + b, Seconds(1), Seconds(1), 12).checkpoint(Seconds(10))

4. Filtring out the above reducedStream to get 3 streams out of it - second, muinute and hour resolutioned
   val secStream  = reducedStream.filter(_._1.resolution.label == "second")
   val minStream  = reducedStream.filter(_._1.resolution.label == "minute")
   val hourStream = reducedStream.filter(_._1.resolution.label == "hour")

5. Saving each of stream in cassandra in different tables (for example secStream goes to sec table, minStream goes to min table and so on)
   secStream.foreachRDD(rdd => rdd.foreach(saveToCassandra()))
   ...


Now couple of open points:

1. Once repartiotn is called I observed below things which I need clarification about:
   - Why foreach requires has so many shuffle read/write now? It takes 4-5 seconds more if i use repartition(20).cache() than earlier where I did not use repartition though ican see combineByKey stage has 12 tasks. If I use repartition only it takes almost 1.5 times more than no repartiton. 

2. How can we use broadcast variable? How can we re-submit/re-create the variables. Can you give some example?

3. Still I can see the apply stage on List.scala. What could the reason?

4. Regarding storage level as we are using kafka dstream it is MEMORY_AND_DISK_SER_2 instead of MEMORY_ONLY_2 as per code. Can you confirm this? I got bit cionfused as you jhad mentioned this is MEMORY_ONLY_2

5. Still there is no improvement in performance even thogugh I start more worker process.

I have attached all the relevant snapshots from stage ui for your reference.

Thanks,
Sourav


On Sat, Feb 15, 2014 at 3:55 PM, Tathagata Das <[hidden email]> wrote:
Depends on how you are using the broadcast variables. Can you give a high level overview of what DStream operations you are using and where does the broadcast variable get used?

TD


On Fri, Feb 14, 2014 at 7:22 PM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

Thanks a lot for going through all the questions scatted across the mails and answering each one of them. Much appreciated

I will get back with more details of code, stage ui once I am in office on Monday.

BTW, if I re-broadcast i.e. creating broadcast variables again in some timer thread will this be reflected in the closures passed inside the transformations? As i read somewhere spark will do some closure cleanup before actually sending them to other components?

Thanks,
Sourav


On Sat, Feb 15, 2014 at 5:31 AM, Tathagata Das <[hidden email]> wrote:
Okay, thats a lots of mails to respond to! Let me try to do it point by point. I hope I cover all of the raised concerns.

1. STAGE PARALLELISM: I was confused about the stages. Yes, increasing the number of reducers to 12 should increase the tasks for the stage marked as "foreach" (thats the reduce stage, bad naming). To increase the parallelism of the map stage, you can do two things
  (i) First repartition the data to larger number of partitions and then apply rest of the computation. For example if you were doing kafkaStream.map(....).reduceByKeyAndWindow(....), you can do kafkaStream.repartition(20).map(...).reduceByKeyAndWindow(...). 
 (ii) You can also try setting the spark.streaming.blockInterval configuration. This configuration decides how many blocks of data is created with received data every second. Default is 200ms, so it makes 4-5 blocks per second. You can either increase the batch interval or reduce the block interval. 

2. APPLY STAGE: I am not entirely sure what that stage is without looking at all Spark and Spark Streaming the operations that you are doing in your program. And a large snapshot of the stages UI.

3. PERSIST LEVEL: DStream has two functions - persist(), which has the default StorageLevel of MEMORY_ONLY_SER, and persist(StorageLevel...... ) where you can specify the storage level. When you use StorageLevel.MEMORY_ONLY_SER or MEMORY_ONLY_SER_2 (that is without disk in it), it wont fall off to disk. It will just be lost. To fall of to disk you have to use MEMORY_AND_DISK_SER or MEMORY_AND_DISK_SER_2. Note that, SER = keep data serialized, good for GC behavior (see programming guide), and _2 = replicate twice.

4. BROADCAST FAILURE: 
When the cleaner ttl is set, everything gets cleaned, including broadcast variables. Hence the file backing the broadcast variable is getting delete, and the tasks are failing. If you are using the same broadcast variable for all batches, it is probably a good idea to re-broadcast the data (thatis, create new broadcast variables with the necessary data) periodically. The period should obviously be less than the ttl. 

5. ACTIVE STAGES: Yes, 1000 means, it can run 1000 jobs in parallel. I am not sure what your usecase actually is that requires running 1000 jobs in parallel? Are you generating 1000 jobs EVERY batch? If you are generating N jobs every batch, then makes sense to have the concurrentJobs set to around N, maybe up to 2 * N. 

6: 30 failed: probably considers the multiple attempts for each failed tasks.



Hope this helps.


TD










On Fri, Feb 14, 2014 at 2:08 AM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

I think the FileNotFound is due to spark.cleaner.ttl parameter which is set to 3600 sec i.e. 1 hour. Thats why the temp metadata files are deleted. 

Please correct me if I am wrong. Also If that is the case why it did not download again and create the file? Is is because our application is doing nothing i.e. no messages from kafka?

Will it be downloaded if application again start receiving data?

Thanks,
Sourav


On Fri, Feb 14, 2014 at 2:55 PM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

I have kept running the streaming application for ~1hr though there is no messages present in Kafka , just to check the memory usage and all and then found out the stages have started failing (with exception java.io.FileNotFoundException (java.io.FileNotFoundException: http://10.10.127.230:57124/broadcast_1)) and there are 1000 active stages

Questions:
 1. Why it suddenly started failing and not able to find broadcast _1 file? Is there any background cleanup causes this? How can we overcome this?
 2. Is the 1000 actve stages are because of spark.streaming.concurrentJobs parameter?
 3. Why these stages are in hanging state (the ui showing no tasks started)?
     Shouldn't these also fail? what is the logic behind this?
 4. Why taks:Succeed:Total in failed stages showing like (0/12)(30 failed)  I can understand it has total 12 tasks and none succeeded. From where its getting the 30 failed? Is it internal retry. If so why it is not same for all other failed stages/
 
I have attached the snapshots.
 



On Fri, Feb 14, 2014 at 2:35 PM, Pankaj Mittal <[hidden email]> wrote:
Hi TD,
There is no persist method which accepts boolean. There is only persist(MEMORY_LEVEL) or default persist.
I have a question, RDDs remain in cache for some remember time which is initialised to slide duration, but is it possible to set this to let's say an hour without changing slide duration ?

Thanks
Pankaj



On Fri, Feb 14, 2014 at 7:50 AM, Tathagata Das <[hidden email]> wrote:
Answers inline. Hope these answer your questions.

TD


On Thu, Feb 13, 2014 at 5:49 PM, Sourav Chandra <[hidden email]> wrote:
HI,

I have couple of questions:

1. While going through the spark-streaming code, I found out there is one configuration in JobScheduler/Generator (spark.streaming.concurrentJobs) which is set to 1. There is no documentation for this parameter. After setting this to 1000 in driver program, our streaming application's performance is improved.

That is a parameter that allows Spark Stremaing to launch multiple Spark jobs simultaneously. While it can improve the performance in many scenarios (as it has in your case), it can actually increase the processing time of each batch and increase end-to-end latency in certain scenarios. So it is something that needs to be used with caution. That said, we should have definitely exposed it in the documentation.  
 
What is this variable used for? Is it safe to use/tweak this parameter?

2. Can someone explain the usage of MapOutputTracker, BlockManager component. I have gone through the youtube video of Matei about spark internals but this was not covered in detail.

I am not sure if there is a detailed document anywhere that explains but I can give you a high level overview of the both.

BlockManager is like a distributed key-value store for large blobs (called blocks) of data. It has a master-worker architecture (loosely it is like the HDFS file system) where the BlockManager at the workers store the data blocks and BlockManagerMaster stores the metadata for what blocks are stored where. All the cached RDD's partitions and shuffle data are stored and managed by the BlockManager. It also transfers the blocks between the workers as needed (shuffles etc all happen through the block manager). Specifically for spark streaming, the data received from outside is stored in the BlockManager of the worker nodes, and the IDs of the blocks are reported to the BlockManagerMaster.

MapOutputTrackers is a simpler component that keeps track of the location of the output of the map stage, so that workers running the reduce stage knows which machines to pull the data from. That also has the master-worker component - master has the full knowledge of the mapoutput and the worker component on-demand pulls that knowledge from the master component when the reduce tasks are executed on the worker.

 

3. Can someone explain the usage of cache w.r.t spark streaming? For example if we do stream.cache(), will the cache remain constant with all the partitions of RDDs present across the nodes for that stream, OR will it be regularly updated as in while new batch is coming?

If you call DStream.persist (persist == cache = true), then all RDDs generated by the DStream will be persisted in the cache (in the BlockManager). As new RDDs are generated and persisted, old RDDs from the same DStream will fall out of memory. either by LRU or explicitly if spark.streaming.unpersist is set to true. 
 
Thanks,
--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com






--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com

Reply | Threaded
Open this post in threaded view
|

Re: Spark streaming questions

Sourav Chandra
One more question regarding check-pointing:

 - What is the cleanup mechanism of checkpoint directory for streaming application? Will older files be deleted automatically by spark? Do we need to set up up scheduler task ? If so what is the strategy to safely remove checkpoint files without disrupting ongoing process and disk space?

Thanks,
Sourav


On Mon, Feb 17, 2014 at 3:38 PM, Sourav Chandra <[hidden email]> wrote:
I did not see any improvement if we set spark.streaming.blockInterval = 100 and it degrades if I use repartition as mentioned,


On Mon, Feb 17, 2014 at 3:31 PM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

Hope you have had a nice weekend. 

I am giving you a brief overview if what we are and we are trying to achieve using spark streaming

We are building a realtime analytics application using spark streaming.
We are internet video broadcasting company and reltime analytics should show no. of likes/comments/concuurent viewers per broadcast happened.

Below is the overview of what we are doing:

Spark properties :
- batch interval is set as 1 second
- spark.executor.memory = 10g
- spark.streaming.concurrentJobs = 1000
- spark.streaming.blockInterval = 100

Create couple of broadcast variable to be used inside the Step 2 below

1. We are reading the analytics trigger messages from kafka using kafkainputstream and then reparitioning as per your suggestion
   val kafkaStream = KafkaUtils.createStream(...).repartition(12)

2. Process the message read form kafka and generates a bunch of related messages for analysis. In this step we use previously created broadcast variables to get metadata about incoming message like - which device it was generated, which country etc.
   val processedStream = kafkaStream.flatMap(...).map(s => (s,1)) // include count = 1 for each of generated message

3. Reducing the stream for last 1 second
   val reducedStream = processedStream.reduceByKeyAndWindow((a:Int,b:Int) => a + b, Seconds(1), Seconds(1), 12).checkpoint(Seconds(10))

4. Filtring out the above reducedStream to get 3 streams out of it - second, muinute and hour resolutioned
   val secStream  = reducedStream.filter(_._1.resolution.label == "second")
   val minStream  = reducedStream.filter(_._1.resolution.label == "minute")
   val hourStream = reducedStream.filter(_._1.resolution.label == "hour")

5. Saving each of stream in cassandra in different tables (for example secStream goes to sec table, minStream goes to min table and so on)
   secStream.foreachRDD(rdd => rdd.foreach(saveToCassandra()))
   ...


Now couple of open points:

1. Once repartiotn is called I observed below things which I need clarification about:
   - Why foreach requires has so many shuffle read/write now? It takes 4-5 seconds more if i use repartition(20).cache() than earlier where I did not use repartition though ican see combineByKey stage has 12 tasks. If I use repartition only it takes almost 1.5 times more than no repartiton. 

2. How can we use broadcast variable? How can we re-submit/re-create the variables. Can you give some example?

3. Still I can see the apply stage on List.scala. What could the reason?

4. Regarding storage level as we are using kafka dstream it is MEMORY_AND_DISK_SER_2 instead of MEMORY_ONLY_2 as per code. Can you confirm this? I got bit cionfused as you jhad mentioned this is MEMORY_ONLY_2

5. Still there is no improvement in performance even thogugh I start more worker process.

I have attached all the relevant snapshots from stage ui for your reference.

Thanks,
Sourav


On Sat, Feb 15, 2014 at 3:55 PM, Tathagata Das <[hidden email]> wrote:
Depends on how you are using the broadcast variables. Can you give a high level overview of what DStream operations you are using and where does the broadcast variable get used?

TD


On Fri, Feb 14, 2014 at 7:22 PM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

Thanks a lot for going through all the questions scatted across the mails and answering each one of them. Much appreciated

I will get back with more details of code, stage ui once I am in office on Monday.

BTW, if I re-broadcast i.e. creating broadcast variables again in some timer thread will this be reflected in the closures passed inside the transformations? As i read somewhere spark will do some closure cleanup before actually sending them to other components?

Thanks,
Sourav


On Sat, Feb 15, 2014 at 5:31 AM, Tathagata Das <[hidden email]> wrote:
Okay, thats a lots of mails to respond to! Let me try to do it point by point. I hope I cover all of the raised concerns.

1. STAGE PARALLELISM: I was confused about the stages. Yes, increasing the number of reducers to 12 should increase the tasks for the stage marked as "foreach" (thats the reduce stage, bad naming). To increase the parallelism of the map stage, you can do two things
  (i) First repartition the data to larger number of partitions and then apply rest of the computation. For example if you were doing kafkaStream.map(....).reduceByKeyAndWindow(....), you can do kafkaStream.repartition(20).map(...).reduceByKeyAndWindow(...). 
 (ii) You can also try setting the spark.streaming.blockInterval configuration. This configuration decides how many blocks of data is created with received data every second. Default is 200ms, so it makes 4-5 blocks per second. You can either increase the batch interval or reduce the block interval. 

2. APPLY STAGE: I am not entirely sure what that stage is without looking at all Spark and Spark Streaming the operations that you are doing in your program. And a large snapshot of the stages UI.

3. PERSIST LEVEL: DStream has two functions - persist(), which has the default StorageLevel of MEMORY_ONLY_SER, and persist(StorageLevel...... ) where you can specify the storage level. When you use StorageLevel.MEMORY_ONLY_SER or MEMORY_ONLY_SER_2 (that is without disk in it), it wont fall off to disk. It will just be lost. To fall of to disk you have to use MEMORY_AND_DISK_SER or MEMORY_AND_DISK_SER_2. Note that, SER = keep data serialized, good for GC behavior (see programming guide), and _2 = replicate twice.

4. BROADCAST FAILURE: 
When the cleaner ttl is set, everything gets cleaned, including broadcast variables. Hence the file backing the broadcast variable is getting delete, and the tasks are failing. If you are using the same broadcast variable for all batches, it is probably a good idea to re-broadcast the data (thatis, create new broadcast variables with the necessary data) periodically. The period should obviously be less than the ttl. 

5. ACTIVE STAGES: Yes, 1000 means, it can run 1000 jobs in parallel. I am not sure what your usecase actually is that requires running 1000 jobs in parallel? Are you generating 1000 jobs EVERY batch? If you are generating N jobs every batch, then makes sense to have the concurrentJobs set to around N, maybe up to 2 * N. 

6: 30 failed: probably considers the multiple attempts for each failed tasks.



Hope this helps.


TD










On Fri, Feb 14, 2014 at 2:08 AM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

I think the FileNotFound is due to spark.cleaner.ttl parameter which is set to 3600 sec i.e. 1 hour. Thats why the temp metadata files are deleted. 

Please correct me if I am wrong. Also If that is the case why it did not download again and create the file? Is is because our application is doing nothing i.e. no messages from kafka?

Will it be downloaded if application again start receiving data?

Thanks,
Sourav


On Fri, Feb 14, 2014 at 2:55 PM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

I have kept running the streaming application for ~1hr though there is no messages present in Kafka , just to check the memory usage and all and then found out the stages have started failing (with exception java.io.FileNotFoundException (java.io.FileNotFoundException: http://10.10.127.230:57124/broadcast_1)) and there are 1000 active stages

Questions:
 1. Why it suddenly started failing and not able to find broadcast _1 file? Is there any background cleanup causes this? How can we overcome this?
 2. Is the 1000 actve stages are because of spark.streaming.concurrentJobs parameter?
 3. Why these stages are in hanging state (the ui showing no tasks started)?
     Shouldn't these also fail? what is the logic behind this?
 4. Why taks:Succeed:Total in failed stages showing like (0/12)(30 failed)  I can understand it has total 12 tasks and none succeeded. From where its getting the 30 failed? Is it internal retry. If so why it is not same for all other failed stages/
 
I have attached the snapshots.
 



On Fri, Feb 14, 2014 at 2:35 PM, Pankaj Mittal <[hidden email]> wrote:
Hi TD,
There is no persist method which accepts boolean. There is only persist(MEMORY_LEVEL) or default persist.
I have a question, RDDs remain in cache for some remember time which is initialised to slide duration, but is it possible to set this to let's say an hour without changing slide duration ?

Thanks
Pankaj



On Fri, Feb 14, 2014 at 7:50 AM, Tathagata Das <[hidden email]> wrote:
Answers inline. Hope these answer your questions.

TD


On Thu, Feb 13, 2014 at 5:49 PM, Sourav Chandra <[hidden email]> wrote:
HI,

I have couple of questions:

1. While going through the spark-streaming code, I found out there is one configuration in JobScheduler/Generator (spark.streaming.concurrentJobs) which is set to 1. There is no documentation for this parameter. After setting this to 1000 in driver program, our streaming application's performance is improved.

That is a parameter that allows Spark Stremaing to launch multiple Spark jobs simultaneously. While it can improve the performance in many scenarios (as it has in your case), it can actually increase the processing time of each batch and increase end-to-end latency in certain scenarios. So it is something that needs to be used with caution. That said, we should have definitely exposed it in the documentation.  
 
What is this variable used for? Is it safe to use/tweak this parameter?

2. Can someone explain the usage of MapOutputTracker, BlockManager component. I have gone through the youtube video of Matei about spark internals but this was not covered in detail.

I am not sure if there is a detailed document anywhere that explains but I can give you a high level overview of the both.

BlockManager is like a distributed key-value store for large blobs (called blocks) of data. It has a master-worker architecture (loosely it is like the HDFS file system) where the BlockManager at the workers store the data blocks and BlockManagerMaster stores the metadata for what blocks are stored where. All the cached RDD's partitions and shuffle data are stored and managed by the BlockManager. It also transfers the blocks between the workers as needed (shuffles etc all happen through the block manager). Specifically for spark streaming, the data received from outside is stored in the BlockManager of the worker nodes, and the IDs of the blocks are reported to the BlockManagerMaster.

MapOutputTrackers is a simpler component that keeps track of the location of the output of the map stage, so that workers running the reduce stage knows which machines to pull the data from. That also has the master-worker component - master has the full knowledge of the mapoutput and the worker component on-demand pulls that knowledge from the master component when the reduce tasks are executed on the worker.

 

3. Can someone explain the usage of cache w.r.t spark streaming? For example if we do stream.cache(), will the cache remain constant with all the partitions of RDDs present across the nodes for that stream, OR will it be regularly updated as in while new batch is coming?

If you call DStream.persist (persist == cache = true), then all RDDs generated by the DStream will be persisted in the cache (in the BlockManager). As new RDDs are generated and persisted, old RDDs from the same DStream will fall out of memory. either by LRU or explicitly if spark.streaming.unpersist is set to true. 
 
Thanks,
--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com






--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com

Reply | Threaded
Open this post in threaded view
|

Re: Spark streaming questions

Sourav Chandra
I have couple of questions below:

1. What is the memory,CPU requirement for Master and Worker and Driver process? As per my understanding it should not be any higher than what default settings is at least for Master and Worker. As Driver does the actual DAG scheduling and all it should be fast process? 
Please correct me if I am wrong.  Also let me know the system requirements for all the 3 process.

2. If we run worker and master on same node, Is over spilling of RDD to disk or memory usage harmful for master? As per my understanding it should not impact as master and worker does very little thing (at kleast what is seen from logs), It is the executor whose performance will be degraded? Please correct me if I am wrong

3.  I was going through KafkaInputDStream and found out its only writing kafka message and partitioning key into block generator not other info like partition,offset. Is there any way to incorporate these or do we have to create our own DSTream for this.

Thanks,
Sourav


On Mon, Feb 17, 2014 at 5:16 PM, Sourav Chandra <[hidden email]> wrote:
One more question regarding check-pointing:

 - What is the cleanup mechanism of checkpoint directory for streaming application? Will older files be deleted automatically by spark? Do we need to set up up scheduler task ? If so what is the strategy to safely remove checkpoint files without disrupting ongoing process and disk space?

Thanks,
Sourav


On Mon, Feb 17, 2014 at 3:38 PM, Sourav Chandra <[hidden email]> wrote:
I did not see any improvement if we set spark.streaming.blockInterval = 100 and it degrades if I use repartition as mentioned,


On Mon, Feb 17, 2014 at 3:31 PM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

Hope you have had a nice weekend. 

I am giving you a brief overview if what we are and we are trying to achieve using spark streaming

We are building a realtime analytics application using spark streaming.
We are internet video broadcasting company and reltime analytics should show no. of likes/comments/concuurent viewers per broadcast happened.

Below is the overview of what we are doing:

Spark properties :
- batch interval is set as 1 second
- spark.executor.memory = 10g
- spark.streaming.concurrentJobs = 1000
- spark.streaming.blockInterval = 100

Create couple of broadcast variable to be used inside the Step 2 below

1. We are reading the analytics trigger messages from kafka using kafkainputstream and then reparitioning as per your suggestion
   val kafkaStream = KafkaUtils.createStream(...).repartition(12)

2. Process the message read form kafka and generates a bunch of related messages for analysis. In this step we use previously created broadcast variables to get metadata about incoming message like - which device it was generated, which country etc.
   val processedStream = kafkaStream.flatMap(...).map(s => (s,1)) // include count = 1 for each of generated message

3. Reducing the stream for last 1 second
   val reducedStream = processedStream.reduceByKeyAndWindow((a:Int,b:Int) => a + b, Seconds(1), Seconds(1), 12).checkpoint(Seconds(10))

4. Filtring out the above reducedStream to get 3 streams out of it - second, muinute and hour resolutioned
   val secStream  = reducedStream.filter(_._1.resolution.label == "second")
   val minStream  = reducedStream.filter(_._1.resolution.label == "minute")
   val hourStream = reducedStream.filter(_._1.resolution.label == "hour")

5. Saving each of stream in cassandra in different tables (for example secStream goes to sec table, minStream goes to min table and so on)
   secStream.foreachRDD(rdd => rdd.foreach(saveToCassandra()))
   ...


Now couple of open points:

1. Once repartiotn is called I observed below things which I need clarification about:
   - Why foreach requires has so many shuffle read/write now? It takes 4-5 seconds more if i use repartition(20).cache() than earlier where I did not use repartition though ican see combineByKey stage has 12 tasks. If I use repartition only it takes almost 1.5 times more than no repartiton. 

2. How can we use broadcast variable? How can we re-submit/re-create the variables. Can you give some example?

3. Still I can see the apply stage on List.scala. What could the reason?

4. Regarding storage level as we are using kafka dstream it is MEMORY_AND_DISK_SER_2 instead of MEMORY_ONLY_2 as per code. Can you confirm this? I got bit cionfused as you jhad mentioned this is MEMORY_ONLY_2

5. Still there is no improvement in performance even thogugh I start more worker process.

I have attached all the relevant snapshots from stage ui for your reference.

Thanks,
Sourav


On Sat, Feb 15, 2014 at 3:55 PM, Tathagata Das <[hidden email]> wrote:
Depends on how you are using the broadcast variables. Can you give a high level overview of what DStream operations you are using and where does the broadcast variable get used?

TD


On Fri, Feb 14, 2014 at 7:22 PM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

Thanks a lot for going through all the questions scatted across the mails and answering each one of them. Much appreciated

I will get back with more details of code, stage ui once I am in office on Monday.

BTW, if I re-broadcast i.e. creating broadcast variables again in some timer thread will this be reflected in the closures passed inside the transformations? As i read somewhere spark will do some closure cleanup before actually sending them to other components?

Thanks,
Sourav


On Sat, Feb 15, 2014 at 5:31 AM, Tathagata Das <[hidden email]> wrote:
Okay, thats a lots of mails to respond to! Let me try to do it point by point. I hope I cover all of the raised concerns.

1. STAGE PARALLELISM: I was confused about the stages. Yes, increasing the number of reducers to 12 should increase the tasks for the stage marked as "foreach" (thats the reduce stage, bad naming). To increase the parallelism of the map stage, you can do two things
  (i) First repartition the data to larger number of partitions and then apply rest of the computation. For example if you were doing kafkaStream.map(....).reduceByKeyAndWindow(....), you can do kafkaStream.repartition(20).map(...).reduceByKeyAndWindow(...). 
 (ii) You can also try setting the spark.streaming.blockInterval configuration. This configuration decides how many blocks of data is created with received data every second. Default is 200ms, so it makes 4-5 blocks per second. You can either increase the batch interval or reduce the block interval. 

2. APPLY STAGE: I am not entirely sure what that stage is without looking at all Spark and Spark Streaming the operations that you are doing in your program. And a large snapshot of the stages UI.

3. PERSIST LEVEL: DStream has two functions - persist(), which has the default StorageLevel of MEMORY_ONLY_SER, and persist(StorageLevel...... ) where you can specify the storage level. When you use StorageLevel.MEMORY_ONLY_SER or MEMORY_ONLY_SER_2 (that is without disk in it), it wont fall off to disk. It will just be lost. To fall of to disk you have to use MEMORY_AND_DISK_SER or MEMORY_AND_DISK_SER_2. Note that, SER = keep data serialized, good for GC behavior (see programming guide), and _2 = replicate twice.

4. BROADCAST FAILURE: 
When the cleaner ttl is set, everything gets cleaned, including broadcast variables. Hence the file backing the broadcast variable is getting delete, and the tasks are failing. If you are using the same broadcast variable for all batches, it is probably a good idea to re-broadcast the data (thatis, create new broadcast variables with the necessary data) periodically. The period should obviously be less than the ttl. 

5. ACTIVE STAGES: Yes, 1000 means, it can run 1000 jobs in parallel. I am not sure what your usecase actually is that requires running 1000 jobs in parallel? Are you generating 1000 jobs EVERY batch? If you are generating N jobs every batch, then makes sense to have the concurrentJobs set to around N, maybe up to 2 * N. 

6: 30 failed: probably considers the multiple attempts for each failed tasks.



Hope this helps.


TD










On Fri, Feb 14, 2014 at 2:08 AM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

I think the FileNotFound is due to spark.cleaner.ttl parameter which is set to 3600 sec i.e. 1 hour. Thats why the temp metadata files are deleted. 

Please correct me if I am wrong. Also If that is the case why it did not download again and create the file? Is is because our application is doing nothing i.e. no messages from kafka?

Will it be downloaded if application again start receiving data?

Thanks,
Sourav


On Fri, Feb 14, 2014 at 2:55 PM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

I have kept running the streaming application for ~1hr though there is no messages present in Kafka , just to check the memory usage and all and then found out the stages have started failing (with exception java.io.FileNotFoundException (java.io.FileNotFoundException: http://10.10.127.230:57124/broadcast_1)) and there are 1000 active stages

Questions:
 1. Why it suddenly started failing and not able to find broadcast _1 file? Is there any background cleanup causes this? How can we overcome this?
 2. Is the 1000 actve stages are because of spark.streaming.concurrentJobs parameter?
 3. Why these stages are in hanging state (the ui showing no tasks started)?
     Shouldn't these also fail? what is the logic behind this?
 4. Why taks:Succeed:Total in failed stages showing like (0/12)(30 failed)  I can understand it has total 12 tasks and none succeeded. From where its getting the 30 failed? Is it internal retry. If so why it is not same for all other failed stages/
 
I have attached the snapshots.
 



On Fri, Feb 14, 2014 at 2:35 PM, Pankaj Mittal <[hidden email]> wrote:
Hi TD,
There is no persist method which accepts boolean. There is only persist(MEMORY_LEVEL) or default persist.
I have a question, RDDs remain in cache for some remember time which is initialised to slide duration, but is it possible to set this to let's say an hour without changing slide duration ?

Thanks
Pankaj



On Fri, Feb 14, 2014 at 7:50 AM, Tathagata Das <[hidden email]> wrote:
Answers inline. Hope these answer your questions.

TD


On Thu, Feb 13, 2014 at 5:49 PM, Sourav Chandra <[hidden email]> wrote:
HI,

I have couple of questions:

1. While going through the spark-streaming code, I found out there is one configuration in JobScheduler/Generator (spark.streaming.concurrentJobs) which is set to 1. There is no documentation for this parameter. After setting this to 1000 in driver program, our streaming application's performance is improved.

That is a parameter that allows Spark Stremaing to launch multiple Spark jobs simultaneously. While it can improve the performance in many scenarios (as it has in your case), it can actually increase the processing time of each batch and increase end-to-end latency in certain scenarios. So it is something that needs to be used with caution. That said, we should have definitely exposed it in the documentation.  
 
What is this variable used for? Is it safe to use/tweak this parameter?

2. Can someone explain the usage of MapOutputTracker, BlockManager component. I have gone through the youtube video of Matei about spark internals but this was not covered in detail.

I am not sure if there is a detailed document anywhere that explains but I can give you a high level overview of the both.

BlockManager is like a distributed key-value store for large blobs (called blocks) of data. It has a master-worker architecture (loosely it is like the HDFS file system) where the BlockManager at the workers store the data blocks and BlockManagerMaster stores the metadata for what blocks are stored where. All the cached RDD's partitions and shuffle data are stored and managed by the BlockManager. It also transfers the blocks between the workers as needed (shuffles etc all happen through the block manager). Specifically for spark streaming, the data received from outside is stored in the BlockManager of the worker nodes, and the IDs of the blocks are reported to the BlockManagerMaster.

MapOutputTrackers is a simpler component that keeps track of the location of the output of the map stage, so that workers running the reduce stage knows which machines to pull the data from. That also has the master-worker component - master has the full knowledge of the mapoutput and the worker component on-demand pulls that knowledge from the master component when the reduce tasks are executed on the worker.

 

3. Can someone explain the usage of cache w.r.t spark streaming? For example if we do stream.cache(), will the cache remain constant with all the partitions of RDDs present across the nodes for that stream, OR will it be regularly updated as in while new batch is coming?

If you call DStream.persist (persist == cache = true), then all RDDs generated by the DStream will be persisted in the cache (in the BlockManager). As new RDDs are generated and persisted, old RDDs from the same DStream will fall out of memory. either by LRU or explicitly if spark.streaming.unpersist is set to true. 
 
Thanks,
--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com






--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com

Reply | Threaded
Open this post in threaded view
|

Re: Spark streaming questions

Sourav Chandra
Waiting for response :)


On Tue, Feb 18, 2014 at 1:09 PM, Sourav Chandra <[hidden email]> wrote:
I have couple of questions below:

1. What is the memory,CPU requirement for Master and Worker and Driver process? As per my understanding it should not be any higher than what default settings is at least for Master and Worker. As Driver does the actual DAG scheduling and all it should be fast process? 
Please correct me if I am wrong.  Also let me know the system requirements for all the 3 process.

2. If we run worker and master on same node, Is over spilling of RDD to disk or memory usage harmful for master? As per my understanding it should not impact as master and worker does very little thing (at kleast what is seen from logs), It is the executor whose performance will be degraded? Please correct me if I am wrong

3.  I was going through KafkaInputDStream and found out its only writing kafka message and partitioning key into block generator not other info like partition,offset. Is there any way to incorporate these or do we have to create our own DSTream for this.

Thanks,
Sourav


On Mon, Feb 17, 2014 at 5:16 PM, Sourav Chandra <[hidden email]> wrote:
One more question regarding check-pointing:

 - What is the cleanup mechanism of checkpoint directory for streaming application? Will older files be deleted automatically by spark? Do we need to set up up scheduler task ? If so what is the strategy to safely remove checkpoint files without disrupting ongoing process and disk space?

Thanks,
Sourav


On Mon, Feb 17, 2014 at 3:38 PM, Sourav Chandra <[hidden email]> wrote:
I did not see any improvement if we set spark.streaming.blockInterval = 100 and it degrades if I use repartition as mentioned,


On Mon, Feb 17, 2014 at 3:31 PM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

Hope you have had a nice weekend. 

I am giving you a brief overview if what we are and we are trying to achieve using spark streaming

We are building a realtime analytics application using spark streaming.
We are internet video broadcasting company and reltime analytics should show no. of likes/comments/concuurent viewers per broadcast happened.

Below is the overview of what we are doing:

Spark properties :
- batch interval is set as 1 second
- spark.executor.memory = 10g
- spark.streaming.concurrentJobs = 1000
- spark.streaming.blockInterval = 100

Create couple of broadcast variable to be used inside the Step 2 below

1. We are reading the analytics trigger messages from kafka using kafkainputstream and then reparitioning as per your suggestion
   val kafkaStream = KafkaUtils.createStream(...).repartition(12)

2. Process the message read form kafka and generates a bunch of related messages for analysis. In this step we use previously created broadcast variables to get metadata about incoming message like - which device it was generated, which country etc.
   val processedStream = kafkaStream.flatMap(...).map(s => (s,1)) // include count = 1 for each of generated message

3. Reducing the stream for last 1 second
   val reducedStream = processedStream.reduceByKeyAndWindow((a:Int,b:Int) => a + b, Seconds(1), Seconds(1), 12).checkpoint(Seconds(10))

4. Filtring out the above reducedStream to get 3 streams out of it - second, muinute and hour resolutioned
   val secStream  = reducedStream.filter(_._1.resolution.label == "second")
   val minStream  = reducedStream.filter(_._1.resolution.label == "minute")
   val hourStream = reducedStream.filter(_._1.resolution.label == "hour")

5. Saving each of stream in cassandra in different tables (for example secStream goes to sec table, minStream goes to min table and so on)
   secStream.foreachRDD(rdd => rdd.foreach(saveToCassandra()))
   ...


Now couple of open points:

1. Once repartiotn is called I observed below things which I need clarification about:
   - Why foreach requires has so many shuffle read/write now? It takes 4-5 seconds more if i use repartition(20).cache() than earlier where I did not use repartition though ican see combineByKey stage has 12 tasks. If I use repartition only it takes almost 1.5 times more than no repartiton. 

2. How can we use broadcast variable? How can we re-submit/re-create the variables. Can you give some example?

3. Still I can see the apply stage on List.scala. What could the reason?

4. Regarding storage level as we are using kafka dstream it is MEMORY_AND_DISK_SER_2 instead of MEMORY_ONLY_2 as per code. Can you confirm this? I got bit cionfused as you jhad mentioned this is MEMORY_ONLY_2

5. Still there is no improvement in performance even thogugh I start more worker process.

I have attached all the relevant snapshots from stage ui for your reference.

Thanks,
Sourav


On Sat, Feb 15, 2014 at 3:55 PM, Tathagata Das <[hidden email]> wrote:
Depends on how you are using the broadcast variables. Can you give a high level overview of what DStream operations you are using and where does the broadcast variable get used?

TD


On Fri, Feb 14, 2014 at 7:22 PM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

Thanks a lot for going through all the questions scatted across the mails and answering each one of them. Much appreciated

I will get back with more details of code, stage ui once I am in office on Monday.

BTW, if I re-broadcast i.e. creating broadcast variables again in some timer thread will this be reflected in the closures passed inside the transformations? As i read somewhere spark will do some closure cleanup before actually sending them to other components?

Thanks,
Sourav


On Sat, Feb 15, 2014 at 5:31 AM, Tathagata Das <[hidden email]> wrote:
Okay, thats a lots of mails to respond to! Let me try to do it point by point. I hope I cover all of the raised concerns.

1. STAGE PARALLELISM: I was confused about the stages. Yes, increasing the number of reducers to 12 should increase the tasks for the stage marked as "foreach" (thats the reduce stage, bad naming). To increase the parallelism of the map stage, you can do two things
  (i) First repartition the data to larger number of partitions and then apply rest of the computation. For example if you were doing kafkaStream.map(....).reduceByKeyAndWindow(....), you can do kafkaStream.repartition(20).map(...).reduceByKeyAndWindow(...). 
 (ii) You can also try setting the spark.streaming.blockInterval configuration. This configuration decides how many blocks of data is created with received data every second. Default is 200ms, so it makes 4-5 blocks per second. You can either increase the batch interval or reduce the block interval. 

2. APPLY STAGE: I am not entirely sure what that stage is without looking at all Spark and Spark Streaming the operations that you are doing in your program. And a large snapshot of the stages UI.

3. PERSIST LEVEL: DStream has two functions - persist(), which has the default StorageLevel of MEMORY_ONLY_SER, and persist(StorageLevel...... ) where you can specify the storage level. When you use StorageLevel.MEMORY_ONLY_SER or MEMORY_ONLY_SER_2 (that is without disk in it), it wont fall off to disk. It will just be lost. To fall of to disk you have to use MEMORY_AND_DISK_SER or MEMORY_AND_DISK_SER_2. Note that, SER = keep data serialized, good for GC behavior (see programming guide), and _2 = replicate twice.

4. BROADCAST FAILURE: 
When the cleaner ttl is set, everything gets cleaned, including broadcast variables. Hence the file backing the broadcast variable is getting delete, and the tasks are failing. If you are using the same broadcast variable for all batches, it is probably a good idea to re-broadcast the data (thatis, create new broadcast variables with the necessary data) periodically. The period should obviously be less than the ttl. 

5. ACTIVE STAGES: Yes, 1000 means, it can run 1000 jobs in parallel. I am not sure what your usecase actually is that requires running 1000 jobs in parallel? Are you generating 1000 jobs EVERY batch? If you are generating N jobs every batch, then makes sense to have the concurrentJobs set to around N, maybe up to 2 * N. 

6: 30 failed: probably considers the multiple attempts for each failed tasks.



Hope this helps.


TD










On Fri, Feb 14, 2014 at 2:08 AM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

I think the FileNotFound is due to spark.cleaner.ttl parameter which is set to 3600 sec i.e. 1 hour. Thats why the temp metadata files are deleted. 

Please correct me if I am wrong. Also If that is the case why it did not download again and create the file? Is is because our application is doing nothing i.e. no messages from kafka?

Will it be downloaded if application again start receiving data?

Thanks,
Sourav


On Fri, Feb 14, 2014 at 2:55 PM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

I have kept running the streaming application for ~1hr though there is no messages present in Kafka , just to check the memory usage and all and then found out the stages have started failing (with exception java.io.FileNotFoundException (java.io.FileNotFoundException: http://10.10.127.230:57124/broadcast_1)) and there are 1000 active stages

Questions:
 1. Why it suddenly started failing and not able to find broadcast _1 file? Is there any background cleanup causes this? How can we overcome this?
 2. Is the 1000 actve stages are because of spark.streaming.concurrentJobs parameter?
 3. Why these stages are in hanging state (the ui showing no tasks started)?
     Shouldn't these also fail? what is the logic behind this?
 4. Why taks:Succeed:Total in failed stages showing like (0/12)(30 failed)  I can understand it has total 12 tasks and none succeeded. From where its getting the 30 failed? Is it internal retry. If so why it is not same for all other failed stages/
 
I have attached the snapshots.
 



On Fri, Feb 14, 2014 at 2:35 PM, Pankaj Mittal <[hidden email]> wrote:
Hi TD,
There is no persist method which accepts boolean. There is only persist(MEMORY_LEVEL) or default persist.
I have a question, RDDs remain in cache for some remember time which is initialised to slide duration, but is it possible to set this to let's say an hour without changing slide duration ?

Thanks
Pankaj



On Fri, Feb 14, 2014 at 7:50 AM, Tathagata Das <[hidden email]> wrote:
Answers inline. Hope these answer your questions.

TD


On Thu, Feb 13, 2014 at 5:49 PM, Sourav Chandra <[hidden email]> wrote:
HI,

I have couple of questions:

1. While going through the spark-streaming code, I found out there is one configuration in JobScheduler/Generator (spark.streaming.concurrentJobs) which is set to 1. There is no documentation for this parameter. After setting this to 1000 in driver program, our streaming application's performance is improved.

That is a parameter that allows Spark Stremaing to launch multiple Spark jobs simultaneously. While it can improve the performance in many scenarios (as it has in your case), it can actually increase the processing time of each batch and increase end-to-end latency in certain scenarios. So it is something that needs to be used with caution. That said, we should have definitely exposed it in the documentation.  
 
What is this variable used for? Is it safe to use/tweak this parameter?

2. Can someone explain the usage of MapOutputTracker, BlockManager component. I have gone through the youtube video of Matei about spark internals but this was not covered in detail.

I am not sure if there is a detailed document anywhere that explains but I can give you a high level overview of the both.

BlockManager is like a distributed key-value store for large blobs (called blocks) of data. It has a master-worker architecture (loosely it is like the HDFS file system) where the BlockManager at the workers store the data blocks and BlockManagerMaster stores the metadata for what blocks are stored where. All the cached RDD's partitions and shuffle data are stored and managed by the BlockManager. It also transfers the blocks between the workers as needed (shuffles etc all happen through the block manager). Specifically for spark streaming, the data received from outside is stored in the BlockManager of the worker nodes, and the IDs of the blocks are reported to the BlockManagerMaster.

MapOutputTrackers is a simpler component that keeps track of the location of the output of the map stage, so that workers running the reduce stage knows which machines to pull the data from. That also has the master-worker component - master has the full knowledge of the mapoutput and the worker component on-demand pulls that knowledge from the master component when the reduce tasks are executed on the worker.

 

3. Can someone explain the usage of cache w.r.t spark streaming? For example if we do stream.cache(), will the cache remain constant with all the partitions of RDDs present across the nodes for that stream, OR will it be regularly updated as in while new batch is coming?

If you call DStream.persist (persist == cache = true), then all RDDs generated by the DStream will be persisted in the cache (in the BlockManager). As new RDDs are generated and persisted, old RDDs from the same DStream will fall out of memory. either by LRU or explicitly if spark.streaming.unpersist is set to true. 
 
Thanks,
--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com






--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: +91 80 4121 8723

m: +91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com

Reply | Threaded
Open this post in threaded view
|

Re: Spark streaming questions

Andrew Ash
In my experience, you don't need much horsepower on the master or worker nodes.  If you're bringing large data back to the driver (e.g. with .take or .collect) you can cause OOMs on the driver, so bump the heap if that's the case.  But the majority of your memory requirements will be in the executors, which are JVMs that the Worker spins up for each application (in the standalone mode cluster).

Ajdrew


On Tue, Feb 18, 2014 at 8:07 PM, Sourav Chandra <[hidden email]> wrote:
Waiting for response :)


On Tue, Feb 18, 2014 at 1:09 PM, Sourav Chandra <[hidden email]> wrote:
I have couple of questions below:

1. What is the memory,CPU requirement for Master and Worker and Driver process? As per my understanding it should not be any higher than what default settings is at least for Master and Worker. As Driver does the actual DAG scheduling and all it should be fast process? 
Please correct me if I am wrong.  Also let me know the system requirements for all the 3 process.

2. If we run worker and master on same node, Is over spilling of RDD to disk or memory usage harmful for master? As per my understanding it should not impact as master and worker does very little thing (at kleast what is seen from logs), It is the executor whose performance will be degraded? Please correct me if I am wrong

3.  I was going through KafkaInputDStream and found out its only writing kafka message and partitioning key into block generator not other info like partition,offset. Is there any way to incorporate these or do we have to create our own DSTream for this.

Thanks,
Sourav


On Mon, Feb 17, 2014 at 5:16 PM, Sourav Chandra <[hidden email]> wrote:
One more question regarding check-pointing:

 - What is the cleanup mechanism of checkpoint directory for streaming application? Will older files be deleted automatically by spark? Do we need to set up up scheduler task ? If so what is the strategy to safely remove checkpoint files without disrupting ongoing process and disk space?

Thanks,
Sourav


On Mon, Feb 17, 2014 at 3:38 PM, Sourav Chandra <[hidden email]> wrote:
I did not see any improvement if we set spark.streaming.blockInterval = 100 and it degrades if I use repartition as mentioned,


On Mon, Feb 17, 2014 at 3:31 PM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

Hope you have had a nice weekend. 

I am giving you a brief overview if what we are and we are trying to achieve using spark streaming

We are building a realtime analytics application using spark streaming.
We are internet video broadcasting company and reltime analytics should show no. of likes/comments/concuurent viewers per broadcast happened.

Below is the overview of what we are doing:

Spark properties :
- batch interval is set as 1 second
- spark.executor.memory = 10g
- spark.streaming.concurrentJobs = 1000
- spark.streaming.blockInterval = 100

Create couple of broadcast variable to be used inside the Step 2 below

1. We are reading the analytics trigger messages from kafka using kafkainputstream and then reparitioning as per your suggestion
   val kafkaStream = KafkaUtils.createStream(...).repartition(12)

2. Process the message read form kafka and generates a bunch of related messages for analysis. In this step we use previously created broadcast variables to get metadata about incoming message like - which device it was generated, which country etc.
   val processedStream = kafkaStream.flatMap(...).map(s => (s,1)) // include count = 1 for each of generated message

3. Reducing the stream for last 1 second
   val reducedStream = processedStream.reduceByKeyAndWindow((a:Int,b:Int) => a + b, Seconds(1), Seconds(1), 12).checkpoint(Seconds(10))

4. Filtring out the above reducedStream to get 3 streams out of it - second, muinute and hour resolutioned
   val secStream  = reducedStream.filter(_._1.resolution.label == "second")
   val minStream  = reducedStream.filter(_._1.resolution.label == "minute")
   val hourStream = reducedStream.filter(_._1.resolution.label == "hour")

5. Saving each of stream in cassandra in different tables (for example secStream goes to sec table, minStream goes to min table and so on)
   secStream.foreachRDD(rdd => rdd.foreach(saveToCassandra()))
   ...


Now couple of open points:

1. Once repartiotn is called I observed below things which I need clarification about:
   - Why foreach requires has so many shuffle read/write now? It takes 4-5 seconds more if i use repartition(20).cache() than earlier where I did not use repartition though ican see combineByKey stage has 12 tasks. If I use repartition only it takes almost 1.5 times more than no repartiton. 

2. How can we use broadcast variable? How can we re-submit/re-create the variables. Can you give some example?

3. Still I can see the apply stage on List.scala. What could the reason?

4. Regarding storage level as we are using kafka dstream it is MEMORY_AND_DISK_SER_2 instead of MEMORY_ONLY_2 as per code. Can you confirm this? I got bit cionfused as you jhad mentioned this is MEMORY_ONLY_2

5. Still there is no improvement in performance even thogugh I start more worker process.

I have attached all the relevant snapshots from stage ui for your reference.

Thanks,
Sourav


On Sat, Feb 15, 2014 at 3:55 PM, Tathagata Das <[hidden email]> wrote:
Depends on how you are using the broadcast variables. Can you give a high level overview of what DStream operations you are using and where does the broadcast variable get used?

TD


On Fri, Feb 14, 2014 at 7:22 PM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

Thanks a lot for going through all the questions scatted across the mails and answering each one of them. Much appreciated

I will get back with more details of code, stage ui once I am in office on Monday.

BTW, if I re-broadcast i.e. creating broadcast variables again in some timer thread will this be reflected in the closures passed inside the transformations? As i read somewhere spark will do some closure cleanup before actually sending them to other components?

Thanks,
Sourav


On Sat, Feb 15, 2014 at 5:31 AM, Tathagata Das <[hidden email]> wrote:
Okay, thats a lots of mails to respond to! Let me try to do it point by point. I hope I cover all of the raised concerns.

1. STAGE PARALLELISM: I was confused about the stages. Yes, increasing the number of reducers to 12 should increase the tasks for the stage marked as "foreach" (thats the reduce stage, bad naming). To increase the parallelism of the map stage, you can do two things
  (i) First repartition the data to larger number of partitions and then apply rest of the computation. For example if you were doing kafkaStream.map(....).reduceByKeyAndWindow(....), you can do kafkaStream.repartition(20).map(...).reduceByKeyAndWindow(...). 
 (ii) You can also try setting the spark.streaming.blockInterval configuration. This configuration decides how many blocks of data is created with received data every second. Default is 200ms, so it makes 4-5 blocks per second. You can either increase the batch interval or reduce the block interval. 

2. APPLY STAGE: I am not entirely sure what that stage is without looking at all Spark and Spark Streaming the operations that you are doing in your program. And a large snapshot of the stages UI.

3. PERSIST LEVEL: DStream has two functions - persist(), which has the default StorageLevel of MEMORY_ONLY_SER, and persist(StorageLevel...... ) where you can specify the storage level. When you use StorageLevel.MEMORY_ONLY_SER or MEMORY_ONLY_SER_2 (that is without disk in it), it wont fall off to disk. It will just be lost. To fall of to disk you have to use MEMORY_AND_DISK_SER or MEMORY_AND_DISK_SER_2. Note that, SER = keep data serialized, good for GC behavior (see programming guide), and _2 = replicate twice.

4. BROADCAST FAILURE: 
When the cleaner ttl is set, everything gets cleaned, including broadcast variables. Hence the file backing the broadcast variable is getting delete, and the tasks are failing. If you are using the same broadcast variable for all batches, it is probably a good idea to re-broadcast the data (thatis, create new broadcast variables with the necessary data) periodically. The period should obviously be less than the ttl. 

5. ACTIVE STAGES: Yes, 1000 means, it can run 1000 jobs in parallel. I am not sure what your usecase actually is that requires running 1000 jobs in parallel? Are you generating 1000 jobs EVERY batch? If you are generating N jobs every batch, then makes sense to have the concurrentJobs set to around N, maybe up to 2 * N. 

6: 30 failed: probably considers the multiple attempts for each failed tasks.



Hope this helps.


TD










On Fri, Feb 14, 2014 at 2:08 AM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

I think the FileNotFound is due to spark.cleaner.ttl parameter which is set to 3600 sec i.e. 1 hour. Thats why the temp metadata files are deleted. 

Please correct me if I am wrong. Also If that is the case why it did not download again and create the file? Is is because our application is doing nothing i.e. no messages from kafka?

Will it be downloaded if application again start receiving data?

Thanks,
Sourav


On Fri, Feb 14, 2014 at 2:55 PM, Sourav Chandra <[hidden email]> wrote:
Hi TD,

I have kept running the streaming application for ~1hr though there is no messages present in Kafka , just to check the memory usage and all and then found out the stages have started failing (with exception java.io.FileNotFoundException (java.io.FileNotFoundException: http://10.10.127.230:57124/broadcast_1)) and there are 1000 active stages

Questions:
 1. Why it suddenly started failing and not able to find broadcast _1 file? Is there any background cleanup causes this? How can we overcome this?
 2. Is the 1000 actve stages are because of spark.streaming.concurrentJobs parameter?
 3. Why these stages are in hanging state (the ui showing no tasks started)?
     Shouldn't these also fail? what is the logic behind this?
 4. Why taks:Succeed:Total in failed stages showing like (0/12)(30 failed)  I can understand it has total 12 tasks and none succeeded. From where its getting the 30 failed? Is it internal retry. If so why it is not same for all other failed stages/
 
I have attached the snapshots.
 



On Fri, Feb 14, 2014 at 2:35 PM, Pankaj Mittal <[hidden email]> wrote:
Hi TD,
There is no persist method which accepts boolean. There is only persist(MEMORY_LEVEL) or default persist.
I have a question, RDDs remain in cache for some remember time which is initialised to slide duration, but is it possible to set this to let's say an hour without changing slide duration ?

Thanks
Pankaj



On Fri, Feb 14, 2014 at 7:50 AM, Tathagata Das <[hidden email]> wrote:
Answers inline. Hope these answer your questions.

TD


On Thu, Feb 13, 2014 at 5:49 PM, Sourav Chandra <[hidden email]> wrote:
HI,

I have couple of questions:

1. While going through the spark-streaming code, I found out there is one configuration in JobScheduler/Generator (spark.streaming.concurrentJobs) which is set to 1. There is no documentation for this parameter. After setting this to 1000 in driver program, our streaming application's performance is improved.

That is a parameter that allows Spark Stremaing to launch multiple Spark jobs simultaneously. While it can improve the performance in many scenarios (as it has in your case), it can actually increase the processing time of each batch and increase end-to-end latency in certain scenarios. So it is something that needs to be used with caution. That said, we should have definitely exposed it in the documentation.  
 
What is this variable used for? Is it safe to use/tweak this parameter?

2. Can someone explain the usage of MapOutputTracker, BlockManager component. I have gone through the youtube video of Matei about spark internals but this was not covered in detail.

I am not sure if there is a detailed document anywhere that explains but I can give you a high level overview of the both.

BlockManager is like a distributed key-value store for large blobs (called blocks) of data. It has a master-worker architecture (loosely it is like the HDFS file system) where the BlockManager at the workers store the data blocks and BlockManagerMaster stores the metadata for what blocks are stored where. All the cached RDD's partitions and shuffle data are stored and managed by the BlockManager. It also transfers the blocks between the workers as needed (shuffles etc all happen through the block manager). Specifically for spark streaming, the data received from outside is stored in the BlockManager of the worker nodes, and the IDs of the blocks are reported to the BlockManagerMaster.

MapOutputTrackers is a simpler component that keeps track of the location of the output of the map stage, so that workers running the reduce stage knows which machines to pull the data from. That also has the master-worker component - master has the full knowledge of the mapoutput and the worker component on-demand pulls that knowledge from the master component when the reduce tasks are executed on the worker.

 

3. Can someone explain the usage of cache w.r.t spark streaming? For example if we do stream.cache(), will the cache remain constant with all the partitions of RDDs present across the nodes for that stream, OR will it be regularly updated as in while new batch is coming?

If you call DStream.persist (persist == cache = true), then all RDDs generated by the DStream will be persisted in the cache (in the BlockManager). As new RDDs are generated and persisted, old RDDs from the same DStream will fall out of memory. either by LRU or explicitly if spark.streaming.unpersist is set to true. 
 
Thanks,
--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com






--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com





--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com




--

Sourav Chandra

Senior Software Engineer

· · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · · ·

[hidden email]

o: <a href="tel:%2B91%2080%204121%208723" value="+918041218723" target="_blank">+91 80 4121 8723

m: <a href="tel:%2B91%20988%20699%203746" value="+919886993746" target="_blank">+91 988 699 3746

skype: sourav.chandra

Livestream

"Ajmera Summit", First Floor, #3/D, 68 Ward, 3rd Cross, 7th C Main, 3rd Block, Koramangala Industrial Area,

Bangalore 560034

www.livestream.com


12