[Spark2.1] SparkStreaming to Cassandra performance problem

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
9 messages Options
Reply | Threaded
Open this post in threaded view
|

[Spark2.1] SparkStreaming to Cassandra performance problem

Saulo Sobreiro

Hi all,


I am implementing a use case where I read some sensor data from Kafka with SparkStreaming interface (KafkaUtils.createDirectStream) and, after some transformations, write the output (RDD) to Cassandra.

Everything is working properly but I am having some trouble with the performance. My kafka topic receives around 2000 messages per second. For a 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to Cassandra, which is not acceptable for longer runs.

I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB SSD space. 
Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).

I would like to know you have some suggestion to improve performance (other than getting more resources :) ).

My code (pyspark) is posted in the end of this email so you can take a look. I tried some different cassandra configurations following this link: http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark (recommended in stackoverflow for similar questions).


Thank you in advance,

Best Regards,
Saulo



=============== # CODE # =================================
####
# run command:
# spark2-submit --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2  --conf spark.cassandra.connection.host='localhost' --num-executors 2 --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2
##

# Run Spark imports
from pyspark import SparkConf # SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Run Cassandra imports
import pyspark_cassandra
from pyspark_cassandra import CassandraSparkContext, saveToCassandra

def recordHandler(record):
    (mid, tt, in_tt, sid, mv) = parseData( record )
    return processMetrics(mid, tt, in_tt, sid, mv)

def process(time, rdd):
    rdd2 = rdd.map( lambda w: recordHandler(w[1]) )
    if rdd2.count() > 0:
        return rdd2

def casssave(time, rdd):
    rdd.saveToCassandra( "test_hdpkns", "measurement" )

# ...
brokers, topic = sys.argv[1:]

# ...

sconf = SparkConf() \
        .setAppName("SensorDataStreamHandler") \
        .setMaster("local[*]") \
        .set("spark.default.parallelism", "2")

sc = CassandraSparkContext(conf = sconf)
batchIntervalSeconds = 2
ssc = StreamingContext(sc, batchIntervalSeconds)

kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

kafkaStream \
    .transform(process) \
    .foreachRDD(casssave)

ssc.start()
ssc.awaitTermination()

================================================




Reply | Threaded
Open this post in threaded view
|

Re: [Spark2.1] SparkStreaming to Cassandra performance problem

Javier Pareja
Hi Saulo,

I'm no expert but I will give it a try.
I would remove the rdd2.count(), I can't see the point and you will gain performance right away. Because of this, I would not use a transform, just directly the map.
I have not used python but in Scala the cassandra-spark connector can save directly to Cassandra without a foreachRDD.

Finally I would use the spark UI to find which stage is the bottleneck here.

On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, <[hidden email]> wrote:

Hi all,


I am implementing a use case where I read some sensor data from Kafka with SparkStreaming interface (KafkaUtils.createDirectStream) and, after some transformations, write the output (RDD) to Cassandra.

Everything is working properly but I am having some trouble with the performance. My kafka topic receives around 2000 messages per second. For a 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to Cassandra, which is not acceptable for longer runs.

I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB SSD space. 
Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).

I would like to know you have some suggestion to improve performance (other than getting more resources :) ).

My code (pyspark) is posted in the end of this email so you can take a look. I tried some different cassandra configurations following this link: http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark (recommended in stackoverflow for similar questions).


Thank you in advance,

Best Regards,
Saulo



=============== # CODE # =================================
####
# run command:
# spark2-submit --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2  --conf spark.cassandra.connection.host='localhost' --num-executors 2 --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2
##

# Run Spark imports
from pyspark import SparkConf # SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Run Cassandra imports
import pyspark_cassandra
from pyspark_cassandra import CassandraSparkContext, saveToCassandra

def recordHandler(record):
    (mid, tt, in_tt, sid, mv) = parseData( record )
    return processMetrics(mid, tt, in_tt, sid, mv)

def process(time, rdd):
    rdd2 = rdd.map( lambda w: recordHandler(w[1]) )
    if rdd2.count() > 0:
        return rdd2

def casssave(time, rdd):
    rdd.saveToCassandra( "test_hdpkns", "measurement" )

# ...
brokers, topic = sys.argv[1:]

# ...

sconf = SparkConf() \
        .setAppName("SensorDataStreamHandler") \
        .setMaster("local[*]") \
        .set("spark.default.parallelism", "2")

sc = CassandraSparkContext(conf = sconf)
batchIntervalSeconds = 2
ssc = StreamingContext(sc, batchIntervalSeconds)

kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

kafkaStream \
    .transform(process) \
    .foreachRDD(casssave)

ssc.start()
ssc.awaitTermination()

================================================




Reply | Threaded
Open this post in threaded view
|

Re: [Spark2.1] SparkStreaming to Cassandra performance problem

Saulo Sobreiro
Hi Javier, 


I removed the map and used "map" directly instead of using transform, but the kafkaStream is created with KafkaUtils which does not have a method to save to cassandra directly. 

Do you know any workarround for this?


Thank you for the suggestion. 

Best Regards,

On 29/04/2018 17:03:24, Javier Pareja <[hidden email]> wrote:

Hi Saulo,

I'm no expert but I will give it a try.
I would remove the rdd2.count(), I can't see the point and you will gain performance right away. Because of this, I would not use a transform, just directly the map.
I have not used python but in Scala the cassandra-spark connector can save directly to Cassandra without a foreachRDD.

Finally I would use the spark UI to find which stage is the bottleneck here.

On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, <[hidden email]> wrote:

Hi all,


I am implementing a use case where I read some sensor data from Kafka with SparkStreaming interface (KafkaUtils.createDirectStream) and, after some transformations, write the output (RDD) to Cassandra.

Everything is working properly but I am having some trouble with the performance. My kafka topic receives around 2000 messages per second. For a 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to Cassandra, which is not acceptable for longer runs.

I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB SSD space. 
Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).

I would like to know you have some suggestion to improve performance (other than getting more resources :) ).

My code (pyspark) is posted in the end of this email so you can take a look. I tried some different cassandra configurations following this link: http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark (recommended in stackoverflow for similar questions).


Thank you in advance,

Best Regards,
Saulo



=============== # CODE # =================================
####
# run command:
# spark2-submit --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2  --conf spark.cassandra.connection.host='localhost' --num-executors 2 --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2
##

# Run Spark imports
from pyspark import SparkConf # SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Run Cassandra imports
import pyspark_cassandra
from pyspark_cassandra import CassandraSparkContext, saveToCassandra

def recordHandler(record):
    (mid, tt, in_tt, sid, mv) = parseData( record )
    return processMetrics(mid, tt, in_tt, sid, mv)

def process(time, rdd):
    rdd2 = rdd.map( lambda w: recordHandler(w[1]) )
    if rdd2.count() > 0:
        return rdd2

def casssave(time, rdd):
    rdd.saveToCassandra( "test_hdpkns", "measurement" )

# ...
brokers, topic = sys.argv[1:]

# ...

sconf = SparkConf() \
        .setAppName("SensorDataStreamHandler") \
        .setMaster("local[*]") \
        .set("spark.default.parallelism", "2")

sc = CassandraSparkContext(conf = sconf)
batchIntervalSeconds = 2
ssc = StreamingContext(sc, batchIntervalSeconds)

kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

kafkaStream \
    .transform(process) \
    .foreachRDD(casssave)

ssc.start()
ssc.awaitTermination()

================================================




Reply | Threaded
Open this post in threaded view
|

Re: [Spark2.1] SparkStreaming to Cassandra performance problem

Javier Pareja
Hi Saulo,

I meant using this to save:

But it might be slow on a different area.
Another point is that Cassandra and spark running on the same machine might compete for resources which will slow down the insert. You can check the CPU usage of the machine at the time. Also the design of the table schema can make a big difference.


On Sun, 29 Apr 2018, 19:02 Saulo Sobreiro, <[hidden email]> wrote:
Hi Javier, 


I removed the map and used "map" directly instead of using transform, but the kafkaStream is created with KafkaUtils which does not have a method to save to cassandra directly. 

Do you know any workarround for this?


Thank you for the suggestion. 

Best Regards,

On 29/04/2018 17:03:24, Javier Pareja <[hidden email]> wrote:

Hi Saulo,

I'm no expert but I will give it a try.
I would remove the rdd2.count(), I can't see the point and you will gain performance right away. Because of this, I would not use a transform, just directly the map.
I have not used python but in Scala the cassandra-spark connector can save directly to Cassandra without a foreachRDD.

Finally I would use the spark UI to find which stage is the bottleneck here.

On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, <[hidden email]> wrote:

Hi all,


I am implementing a use case where I read some sensor data from Kafka with SparkStreaming interface (KafkaUtils.createDirectStream) and, after some transformations, write the output (RDD) to Cassandra.

Everything is working properly but I am having some trouble with the performance. My kafka topic receives around 2000 messages per second. For a 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to Cassandra, which is not acceptable for longer runs.

I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB SSD space. 
Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).

I would like to know you have some suggestion to improve performance (other than getting more resources :) ).

My code (pyspark) is posted in the end of this email so you can take a look. I tried some different cassandra configurations following this link: http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark (recommended in stackoverflow for similar questions).


Thank you in advance,

Best Regards,
Saulo



=============== # CODE # =================================
####
# run command:
# spark2-submit --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2  --conf spark.cassandra.connection.host='localhost' --num-executors 2 --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2
##

# Run Spark imports
from pyspark import SparkConf # SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Run Cassandra imports
import pyspark_cassandra
from pyspark_cassandra import CassandraSparkContext, saveToCassandra

def recordHandler(record):
    (mid, tt, in_tt, sid, mv) = parseData( record )
    return processMetrics(mid, tt, in_tt, sid, mv)

def process(time, rdd):
    rdd2 = rdd.map( lambda w: recordHandler(w[1]) )
    if rdd2.count() > 0:
        return rdd2

def casssave(time, rdd):
    rdd.saveToCassandra( "test_hdpkns", "measurement" )

# ...
brokers, topic = sys.argv[1:]

# ...

sconf = SparkConf() \
        .setAppName("SensorDataStreamHandler") \
        .setMaster("local[*]") \
        .set("spark.default.parallelism", "2")

sc = CassandraSparkContext(conf = sconf)
batchIntervalSeconds = 2
ssc = StreamingContext(sc, batchIntervalSeconds)

kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

kafkaStream \
    .transform(process) \
    .foreachRDD(casssave)

ssc.start()
ssc.awaitTermination()

================================================




Reply | Threaded
Open this post in threaded view
|

Re: [Spark2.1] SparkStreaming to Cassandra performance problem

Saulo Sobreiro
Hi Javier,

I will try to implement this in scala then. As far as I can see in the documentation there is no SaveToCassandra in the python interface unless you are working with dataframes and the kafkaStream instance does not provide methods to convert an RDD into DF.

Regarding my table, it is very simple (see below). Can I change something to make it write faster?
CREATE TABLE test_hdpkns.measurement (
  mid bigint,
  tt timestamp,
  in_tt timestamp,
  out_tt timestamp,
  sensor_id int,
  measure double,
  PRIMARY KEY (mid, tt, sensor_id, in_tt, out_tt)
) with compact storage; 

The system CPU while the demo is running is almost always at 100% for both cores.


Thank you.

Best Regards,

On 29/04/2018 20:46:30, Javier Pareja <[hidden email]> wrote:

Hi Saulo,

I meant using this to save:

But it might be slow on a different area.
Another point is that Cassandra and spark running on the same machine might compete for resources which will slow down the insert. You can check the CPU usage of the machine at the time. Also the design of the table schema can make a big difference.


On Sun, 29 Apr 2018, 19:02 Saulo Sobreiro, <[hidden email]> wrote:
Hi Javier, 


I removed the map and used "map" directly instead of using transform, but the kafkaStream is created with KafkaUtils which does not have a method to save to cassandra directly. 

Do you know any workarround for this?


Thank you for the suggestion. 

Best Regards,

On 29/04/2018 17:03:24, Javier Pareja <[hidden email]> wrote:

Hi Saulo,

I'm no expert but I will give it a try.
I would remove the rdd2.count(), I can't see the point and you will gain performance right away. Because of this, I would not use a transform, just directly the map.
I have not used python but in Scala the cassandra-spark connector can save directly to Cassandra without a foreachRDD.

Finally I would use the spark UI to find which stage is the bottleneck here.

On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, <[hidden email]> wrote:

Hi all,


I am implementing a use case where I read some sensor data from Kafka with SparkStreaming interface (KafkaUtils.createDirectStream) and, after some transformations, write the output (RDD) to Cassandra.

Everything is working properly but I am having some trouble with the performance. My kafka topic receives around 2000 messages per second. For a 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to Cassandra, which is not acceptable for longer runs.

I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB SSD space. 
Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).

I would like to know you have some suggestion to improve performance (other than getting more resources :) ).

My code (pyspark) is posted in the end of this email so you can take a look. I tried some different cassandra configurations following this link: http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark (recommended in stackoverflow for similar questions).


Thank you in advance,

Best Regards,
Saulo



=============== # CODE # =================================
####
# run command:
# spark2-submit --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2  --conf spark.cassandra.connection.host='localhost' --num-executors 2 --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2
##

# Run Spark imports
from pyspark import SparkConf # SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Run Cassandra imports
import pyspark_cassandra
from pyspark_cassandra import CassandraSparkContext, saveToCassandra

def recordHandler(record):
    (mid, tt, in_tt, sid, mv) = parseData( record )
    return processMetrics(mid, tt, in_tt, sid, mv)

def process(time, rdd):
    rdd2 = rdd.map( lambda w: recordHandler(w[1]) )
    if rdd2.count() > 0:
        return rdd2

def casssave(time, rdd):
    rdd.saveToCassandra( "test_hdpkns", "measurement" )

# ...
brokers, topic = sys.argv[1:]

# ...

sconf = SparkConf() \
        .setAppName("SensorDataStreamHandler") \
        .setMaster("local[*]") \
        .set("spark.default.parallelism", "2")

sc = CassandraSparkContext(conf = sconf)
batchIntervalSeconds = 2
ssc = StreamingContext(sc, batchIntervalSeconds)

kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

kafkaStream \
    .transform(process) \
    .foreachRDD(casssave)

ssc.start()
ssc.awaitTermination()

================================================




Reply | Threaded
Open this post in threaded view
|

Re: [Spark2.1] SparkStreaming to Cassandra performance problem

Javier Pareja
Hi Saulo,

If the CPU is close to 100% then you are hitting the limit. I don't think that moving to Scala will make a difference. Both Spark and Cassandra are CPU hungry, your setup is small in terms of CPUs. Try running Spark on another (physical) machine so that the 2 cores are dedicated to Cassandra.

Kind Regards
Javier



On Mon, 30 Apr 2018, 14:24 Saulo Sobreiro, <[hidden email]> wrote:
Hi Javier,

I will try to implement this in scala then. As far as I can see in the documentation there is no SaveToCassandra in the python interface unless you are working with dataframes and the kafkaStream instance does not provide methods to convert an RDD into DF.

Regarding my table, it is very simple (see below). Can I change something to make it write faster?
CREATE TABLE test_hdpkns.measurement (
  mid bigint,
  tt timestamp,
  in_tt timestamp,
  out_tt timestamp,
  sensor_id int,
  measure double,
  PRIMARY KEY (mid, tt, sensor_id, in_tt, out_tt)
) with compact storage; 

The system CPU while the demo is running is almost always at 100% for both cores.


Thank you.

Best Regards,

On 29/04/2018 20:46:30, Javier Pareja <[hidden email]> wrote:

Hi Saulo,

I meant using this to save:

But it might be slow on a different area.
Another point is that Cassandra and spark running on the same machine might compete for resources which will slow down the insert. You can check the CPU usage of the machine at the time. Also the design of the table schema can make a big difference.


On Sun, 29 Apr 2018, 19:02 Saulo Sobreiro, <[hidden email]> wrote:
Hi Javier, 


I removed the map and used "map" directly instead of using transform, but the kafkaStream is created with KafkaUtils which does not have a method to save to cassandra directly. 

Do you know any workarround for this?


Thank you for the suggestion. 

Best Regards,

On 29/04/2018 17:03:24, Javier Pareja <[hidden email]> wrote:

Hi Saulo,

I'm no expert but I will give it a try.
I would remove the rdd2.count(), I can't see the point and you will gain performance right away. Because of this, I would not use a transform, just directly the map.
I have not used python but in Scala the cassandra-spark connector can save directly to Cassandra without a foreachRDD.

Finally I would use the spark UI to find which stage is the bottleneck here.

On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, <[hidden email]> wrote:

Hi all,


I am implementing a use case where I read some sensor data from Kafka with SparkStreaming interface (KafkaUtils.createDirectStream) and, after some transformations, write the output (RDD) to Cassandra.

Everything is working properly but I am having some trouble with the performance. My kafka topic receives around 2000 messages per second. For a 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to Cassandra, which is not acceptable for longer runs.

I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB SSD space. 
Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).

I would like to know you have some suggestion to improve performance (other than getting more resources :) ).

My code (pyspark) is posted in the end of this email so you can take a look. I tried some different cassandra configurations following this link: http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark (recommended in stackoverflow for similar questions).


Thank you in advance,

Best Regards,
Saulo



=============== # CODE # =================================
####
# run command:
# spark2-submit --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2  --conf spark.cassandra.connection.host='localhost' --num-executors 2 --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2
##

# Run Spark imports
from pyspark import SparkConf # SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Run Cassandra imports
import pyspark_cassandra
from pyspark_cassandra import CassandraSparkContext, saveToCassandra

def recordHandler(record):
    (mid, tt, in_tt, sid, mv) = parseData( record )
    return processMetrics(mid, tt, in_tt, sid, mv)

def process(time, rdd):
    rdd2 = rdd.map( lambda w: recordHandler(w[1]) )
    if rdd2.count() > 0:
        return rdd2

def casssave(time, rdd):
    rdd.saveToCassandra( "test_hdpkns", "measurement" )

# ...
brokers, topic = sys.argv[1:]

# ...

sconf = SparkConf() \
        .setAppName("SensorDataStreamHandler") \
        .setMaster("local[*]") \
        .set("spark.default.parallelism", "2")

sc = CassandraSparkContext(conf = sconf)
batchIntervalSeconds = 2
ssc = StreamingContext(sc, batchIntervalSeconds)

kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

kafkaStream \
    .transform(process) \
    .foreachRDD(casssave)

ssc.start()
ssc.awaitTermination()

================================================




Reply | Threaded
Open this post in threaded view
|

Re: [Spark2.1] SparkStreaming to Cassandra performance problem

Saulo Sobreiro
Hi Javier,

Thank you a lot for the feedback.
Indeed the CPU is a huge limitation. I got a lot of trouble trying to run this use case in yarn-client mode. I managed to run this in standalone (local master) mode only.

I do not have the hardware available to run this setup in a cluster yet, so I decided to dig a little bit more in the implementation to see what could I improve. I just finished evaluating some results. 
If you find something wrong or odd please let me know.

Following your suggestion to use "saveToCassandra" directly I decided to try Scala. Everything was implemented in the most similar way possible and I got surprised by the results. The scala implementation is much faster.

My current implementation is slightly different from the Python code shared some emails ago but to compare the languages influence in the most comparable way I used the following snippets:

# Scala implementation ------------------

val kstream = KafkaUtils.createDirectStream[String, String](
                 ssc,
                 LocationStrategies.PreferConsistent,
                 ConsumerStrategies.Subscribe[String, String](topic, kafkaParams))
kstream
           .map( x => parse(x.value) )
           .saveToCassandra("hdpkns", "batch_measurement")

# Python implementation ----------------
# Adapted from the previously shared code. However instead of calculating the metrics, it is just parsing the messages.
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

kafkaStream \
    .transform(parse) \
    .foreachRDD(casssave)


For the same streaming input the scala app took an average of ~1.5 seconds to handle each event. For the python implementation, the app took an average of ~80 seconds to handle each event (and after a lot of pickle concurrency access issues). 

Note that I considered the time as the difference between the event generation (before being published to Kafka) and the moment just before the saveToCassandra.

The problem in the python implementation seems to be due to the delay introduced by the foreachRDD(casssave) call, which only runs rdd.saveToCassandra( "test_hdpkns", "measurement" ).


Honestly I was not expecting such a difference between these 2 codes... Can you understand why is this happening ? 



Again, Thank you very much for your help,

Best Regards


Sharing my current Scala code below
# Scala Snippet =========================
val sparkConf = new SparkConf(). // ...
val ssc = new StreamingContext(sparkConf, Seconds(1))
val sc = ssc.sparkContext
//...
val kstream = KafkaUtils.createDirectStream[String, String](
                 ssc,
                 LocationStrategies.PreferConsistent,
                 ConsumerStrategies.Subscribe[String, String](topic, kafkaParams))
//...
// handle Kafka messages in a parallel fashion
val ckstream = kstream.map( x => parse(x.value) ).cache()
ckstream
              .foreachRDD( rdd => {
                    rdd.foreach(metrics)
              } )
ckstream
              .saveToCassandra("hdpkns", "microbatch_raw_measurement")
#=========================

On 30/04/2018 14:57:50, Javier Pareja <[hidden email]> wrote:

Hi Saulo,

If the CPU is close to 100% then you are hitting the limit. I don't think that moving to Scala will make a difference. Both Spark and Cassandra are CPU hungry, your setup is small in terms of CPUs. Try running Spark on another (physical) machine so that the 2 cores are dedicated to Cassandra.

Kind Regards
Javier



On Mon, 30 Apr 2018, 14:24 Saulo Sobreiro, <[hidden email]> wrote:
Hi Javier,

I will try to implement this in scala then. As far as I can see in the documentation there is no SaveToCassandra in the python interface unless you are working with dataframes and the kafkaStream instance does not provide methods to convert an RDD into DF.

Regarding my table, it is very simple (see below). Can I change something to make it write faster?
CREATE TABLE test_hdpkns.measurement (
  mid bigint,
  tt timestamp,
  in_tt timestamp,
  out_tt timestamp,
  sensor_id int,
  measure double,
  PRIMARY KEY (mid, tt, sensor_id, in_tt, out_tt)
) with compact storage; 

The system CPU while the demo is running is almost always at 100% for both cores.


Thank you.

Best Regards,

On 29/04/2018 20:46:30, Javier Pareja <[hidden email]> wrote:

Hi Saulo,

I meant using this to save:

But it might be slow on a different area.
Another point is that Cassandra and spark running on the same machine might compete for resources which will slow down the insert. You can check the CPU usage of the machine at the time. Also the design of the table schema can make a big difference.


On Sun, 29 Apr 2018, 19:02 Saulo Sobreiro, <[hidden email]> wrote:
Hi Javier, 


I removed the map and used "map" directly instead of using transform, but the kafkaStream is created with KafkaUtils which does not have a method to save to cassandra directly. 

Do you know any workarround for this?


Thank you for the suggestion. 

Best Regards,

On 29/04/2018 17:03:24, Javier Pareja <[hidden email]> wrote:

Hi Saulo,

I'm no expert but I will give it a try.
I would remove the rdd2.count(), I can't see the point and you will gain performance right away. Because of this, I would not use a transform, just directly the map.
I have not used python but in Scala the cassandra-spark connector can save directly to Cassandra without a foreachRDD.

Finally I would use the spark UI to find which stage is the bottleneck here.

On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, <[hidden email]> wrote:

Hi all,


I am implementing a use case where I read some sensor data from Kafka with SparkStreaming interface (KafkaUtils.createDirectStream) and, after some transformations, write the output (RDD) to Cassandra.

Everything is working properly but I am having some trouble with the performance. My kafka topic receives around 2000 messages per second. For a 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to Cassandra, which is not acceptable for longer runs.

I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB SSD space. 
Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).

I would like to know you have some suggestion to improve performance (other than getting more resources :) ).

My code (pyspark) is posted in the end of this email so you can take a look. I tried some different cassandra configurations following this link: http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark (recommended in stackoverflow for similar questions).


Thank you in advance,

Best Regards,
Saulo



=============== # CODE # =================================
####
# run command:
# spark2-submit --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2  --conf spark.cassandra.connection.host='localhost' --num-executors 2 --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2
##

# Run Spark imports
from pyspark import SparkConf # SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Run Cassandra imports
import pyspark_cassandra
from pyspark_cassandra import CassandraSparkContext, saveToCassandra

def recordHandler(record):
    (mid, tt, in_tt, sid, mv) = parseData( record )
    return processMetrics(mid, tt, in_tt, sid, mv)

def process(time, rdd):
    rdd2 = rdd.map( lambda w: recordHandler(w[1]) )
    if rdd2.count() > 0:
        return rdd2

def casssave(time, rdd):
    rdd.saveToCassandra( "test_hdpkns", "measurement" )

# ...
brokers, topic = sys.argv[1:]

# ...

sconf = SparkConf() \
        .setAppName("SensorDataStreamHandler") \
        .setMaster("local[*]") \
        .set("spark.default.parallelism", "2")

sc = CassandraSparkContext(conf = sconf)
batchIntervalSeconds = 2
ssc = StreamingContext(sc, batchIntervalSeconds)

kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

kafkaStream \
    .transform(process) \
    .foreachRDD(casssave)

ssc.start()
ssc.awaitTermination()

================================================




Reply | Threaded
Open this post in threaded view
|

Re: [Spark2.1] SparkStreaming to Cassandra performance problem

Alonso
The main language they developed spark with is scala, so all the new features go first to scala, java and finally python. I'm not surprised by the results, we've seen it on Stratio since the first versions of spark. At the beginning of development, some of our engineers make the prototype with python, but when it comes down to it, if it goes into production, it has to be rewritten in scala or java, usually scala.



El lun., 21 may. 2018 a las 4:34, Saulo Sobreiro (<[hidden email]>) escribió:
Hi Javier,

Thank you a lot for the feedback.
Indeed the CPU is a huge limitation. I got a lot of trouble trying to run this use case in yarn-client mode. I managed to run this in standalone (local master) mode only.

I do not have the hardware available to run this setup in a cluster yet, so I decided to dig a little bit more in the implementation to see what could I improve. I just finished evaluating some results. 
If you find something wrong or odd please let me know.

Following your suggestion to use "saveToCassandra" directly I decided to try Scala. Everything was implemented in the most similar way possible and I got surprised by the results. The scala implementation is much faster.

My current implementation is slightly different from the Python code shared some emails ago but to compare the languages influence in the most comparable way I used the following snippets:

# Scala implementation ------------------

val kstream = KafkaUtils.createDirectStream[String, String](
                 ssc,
                 LocationStrategies.PreferConsistent,
                 ConsumerStrategies.Subscribe[String, String](topic, kafkaParams))
kstream
           .map( x => parse(x.value) )
           .saveToCassandra("hdpkns", "batch_measurement")

# Python implementation ----------------
# Adapted from the previously shared code. However instead of calculating the metrics, it is just parsing the messages.
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

kafkaStream \
    .transform(parse) \
    .foreachRDD(casssave)


For the same streaming input the scala app took an average of ~1.5 seconds to handle each event. For the python implementation, the app took an average of ~80 seconds to handle each event (and after a lot of pickle concurrency access issues). 

Note that I considered the time as the difference between the event generation (before being published to Kafka) and the moment just before the saveToCassandra.

The problem in the python implementation seems to be due to the delay introduced by the foreachRDD(casssave) call, which only runs rdd.saveToCassandra( "test_hdpkns", "measurement" ).


Honestly I was not expecting such a difference between these 2 codes... Can you understand why is this happening ? 



Again, Thank you very much for your help,

Best Regards


Sharing my current Scala code below
# Scala Snippet =========================
val sparkConf = new SparkConf(). // ...
val ssc = new StreamingContext(sparkConf, Seconds(1))
val sc = ssc.sparkContext
//...
val kstream = KafkaUtils.createDirectStream[String, String](
                 ssc,
                 LocationStrategies.PreferConsistent,
                 ConsumerStrategies.Subscribe[String, String](topic, kafkaParams))
//...
// handle Kafka messages in a parallel fashion
val ckstream = kstream.map( x => parse(x.value) ).cache()
ckstream
              .foreachRDD( rdd => {
                    rdd.foreach(metrics)
              } )
ckstream
              .saveToCassandra("hdpkns", "microbatch_raw_measurement")
#=========================

On 30/04/2018 14:57:50, Javier Pareja <[hidden email]> wrote:

Hi Saulo,

If the CPU is close to 100% then you are hitting the limit. I don't think that moving to Scala will make a difference. Both Spark and Cassandra are CPU hungry, your setup is small in terms of CPUs. Try running Spark on another (physical) machine so that the 2 cores are dedicated to Cassandra.

Kind Regards
Javier



On Mon, 30 Apr 2018, 14:24 Saulo Sobreiro, <[hidden email]> wrote:
Hi Javier,

I will try to implement this in scala then. As far as I can see in the documentation there is no SaveToCassandra in the python interface unless you are working with dataframes and the kafkaStream instance does not provide methods to convert an RDD into DF.

Regarding my table, it is very simple (see below). Can I change something to make it write faster?
CREATE TABLE test_hdpkns.measurement (
  mid bigint,
  tt timestamp,
  in_tt timestamp,
  out_tt timestamp,
  sensor_id int,
  measure double,
  PRIMARY KEY (mid, tt, sensor_id, in_tt, out_tt)
) with compact storage; 

The system CPU while the demo is running is almost always at 100% for both cores.


Thank you.

Best Regards,

On 29/04/2018 20:46:30, Javier Pareja <[hidden email]> wrote:

Hi Saulo,

I meant using this to save:

But it might be slow on a different area.
Another point is that Cassandra and spark running on the same machine might compete for resources which will slow down the insert. You can check the CPU usage of the machine at the time. Also the design of the table schema can make a big difference.


On Sun, 29 Apr 2018, 19:02 Saulo Sobreiro, <[hidden email]> wrote:
Hi Javier, 


I removed the map and used "map" directly instead of using transform, but the kafkaStream is created with KafkaUtils which does not have a method to save to cassandra directly. 

Do you know any workarround for this?


Thank you for the suggestion. 

Best Regards,

On 29/04/2018 17:03:24, Javier Pareja <[hidden email]> wrote:

Hi Saulo,

I'm no expert but I will give it a try.
I would remove the rdd2.count(), I can't see the point and you will gain performance right away. Because of this, I would not use a transform, just directly the map.
I have not used python but in Scala the cassandra-spark connector can save directly to Cassandra without a foreachRDD.

Finally I would use the spark UI to find which stage is the bottleneck here.

On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, <[hidden email]> wrote:

Hi all,


I am implementing a use case where I read some sensor data from Kafka with SparkStreaming interface (KafkaUtils.createDirectStream) and, after some transformations, write the output (RDD) to Cassandra.

Everything is working properly but I am having some trouble with the performance. My kafka topic receives around 2000 messages per second. For a 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to Cassandra, which is not acceptable for longer runs.

I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB SSD space. 
Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).

I would like to know you have some suggestion to improve performance (other than getting more resources :) ).

My code (pyspark) is posted in the end of this email so you can take a look. I tried some different cassandra configurations following this link: http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark (recommended in stackoverflow for similar questions).


Thank you in advance,

Best Regards,
Saulo



=============== # CODE # =================================
####
# run command:
# spark2-submit --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2  --conf spark.cassandra.connection.host='localhost' --num-executors 2 --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2
##

# Run Spark imports
from pyspark import SparkConf # SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Run Cassandra imports
import pyspark_cassandra
from pyspark_cassandra import CassandraSparkContext, saveToCassandra

def recordHandler(record):
    (mid, tt, in_tt, sid, mv) = parseData( record )
    return processMetrics(mid, tt, in_tt, sid, mv)

def process(time, rdd):
    rdd2 = rdd.map( lambda w: recordHandler(w[1]) )
    if rdd2.count() > 0:
        return rdd2

def casssave(time, rdd):
    rdd.saveToCassandra( "test_hdpkns", "measurement" )

# ...
brokers, topic = sys.argv[1:]

# ...

sconf = SparkConf() \
        .setAppName("SensorDataStreamHandler") \
        .setMaster("local[*]") \
        .set("spark.default.parallelism", "2")

sc = CassandraSparkContext(conf = sconf)
batchIntervalSeconds = 2
ssc = StreamingContext(sc, batchIntervalSeconds)

kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

kafkaStream \
    .transform(process) \
    .foreachRDD(casssave)

ssc.start()
ssc.awaitTermination()

================================================






--
Reply | Threaded
Open this post in threaded view
|

Re: [Spark2.1] SparkStreaming to Cassandra performance problem

Russell Spitzer
The answer is most likely that when you use Cross Java - Python code you incur a penalty for every objects that you transform from a Java object into a Python object (and then back again to a Python object) when data is being passed in and out of your functions. A way around this would probably be to have used the Dataframe API if possible, which would have compiled the interactions in Java and skipped python-java serialization. Using Scala from the start thought is a great idea. I would also probably remove the cache from your stream since that probably is only hurting (adding an additional serialization which is only used once.)

On Mon, May 21, 2018 at 5:01 AM Alonso Isidoro Roman <[hidden email]> wrote:
The main language they developed spark with is scala, so all the new features go first to scala, java and finally python. I'm not surprised by the results, we've seen it on Stratio since the first versions of spark. At the beginning of development, some of our engineers make the prototype with python, but when it comes down to it, if it goes into production, it has to be rewritten in scala or java, usually scala.



El lun., 21 may. 2018 a las 4:34, Saulo Sobreiro (<[hidden email]>) escribió:
Hi Javier,

Thank you a lot for the feedback.
Indeed the CPU is a huge limitation. I got a lot of trouble trying to run this use case in yarn-client mode. I managed to run this in standalone (local master) mode only.

I do not have the hardware available to run this setup in a cluster yet, so I decided to dig a little bit more in the implementation to see what could I improve. I just finished evaluating some results. 
If you find something wrong or odd please let me know.

Following your suggestion to use "saveToCassandra" directly I decided to try Scala. Everything was implemented in the most similar way possible and I got surprised by the results. The scala implementation is much faster.

My current implementation is slightly different from the Python code shared some emails ago but to compare the languages influence in the most comparable way I used the following snippets:

# Scala implementation ------------------

val kstream = KafkaUtils.createDirectStream[String, String](
                 ssc,
                 LocationStrategies.PreferConsistent,
                 ConsumerStrategies.Subscribe[String, String](topic, kafkaParams))
kstream
           .map( x => parse(x.value) )
           .saveToCassandra("hdpkns", "batch_measurement")

# Python implementation ----------------
# Adapted from the previously shared code. However instead of calculating the metrics, it is just parsing the messages.
kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

kafkaStream \
    .transform(parse) \
    .foreachRDD(casssave)


For the same streaming input the scala app took an average of ~1.5 seconds to handle each event. For the python implementation, the app took an average of ~80 seconds to handle each event (and after a lot of pickle concurrency access issues). 

Note that I considered the time as the difference between the event generation (before being published to Kafka) and the moment just before the saveToCassandra.

The problem in the python implementation seems to be due to the delay introduced by the foreachRDD(casssave) call, which only runs rdd.saveToCassandra( "test_hdpkns", "measurement" ).


Honestly I was not expecting such a difference between these 2 codes... Can you understand why is this happening ? 



Again, Thank you very much for your help,

Best Regards


Sharing my current Scala code below
# Scala Snippet =========================
val sparkConf = new SparkConf(). // ...
val ssc = new StreamingContext(sparkConf, Seconds(1))
val sc = ssc.sparkContext
//...
val kstream = KafkaUtils.createDirectStream[String, String](
                 ssc,
                 LocationStrategies.PreferConsistent,
                 ConsumerStrategies.Subscribe[String, String](topic, kafkaParams))
//...
// handle Kafka messages in a parallel fashion
val ckstream = kstream.map( x => parse(x.value) ).cache()
ckstream
              .foreachRDD( rdd => {
                    rdd.foreach(metrics)
              } )
ckstream
              .saveToCassandra("hdpkns", "microbatch_raw_measurement")
#=========================

On 30/04/2018 14:57:50, Javier Pareja <[hidden email]> wrote:

Hi Saulo,

If the CPU is close to 100% then you are hitting the limit. I don't think that moving to Scala will make a difference. Both Spark and Cassandra are CPU hungry, your setup is small in terms of CPUs. Try running Spark on another (physical) machine so that the 2 cores are dedicated to Cassandra.

Kind Regards
Javier



On Mon, 30 Apr 2018, 14:24 Saulo Sobreiro, <[hidden email]> wrote:
Hi Javier,

I will try to implement this in scala then. As far as I can see in the documentation there is no SaveToCassandra in the python interface unless you are working with dataframes and the kafkaStream instance does not provide methods to convert an RDD into DF.

Regarding my table, it is very simple (see below). Can I change something to make it write faster?
CREATE TABLE test_hdpkns.measurement (
  mid bigint,
  tt timestamp,
  in_tt timestamp,
  out_tt timestamp,
  sensor_id int,
  measure double,
  PRIMARY KEY (mid, tt, sensor_id, in_tt, out_tt)
) with compact storage; 

The system CPU while the demo is running is almost always at 100% for both cores.


Thank you.

Best Regards,

On 29/04/2018 20:46:30, Javier Pareja <[hidden email]> wrote:

Hi Saulo,

I meant using this to save:

But it might be slow on a different area.
Another point is that Cassandra and spark running on the same machine might compete for resources which will slow down the insert. You can check the CPU usage of the machine at the time. Also the design of the table schema can make a big difference.


On Sun, 29 Apr 2018, 19:02 Saulo Sobreiro, <[hidden email]> wrote:
Hi Javier, 


I removed the map and used "map" directly instead of using transform, but the kafkaStream is created with KafkaUtils which does not have a method to save to cassandra directly. 

Do you know any workarround for this?


Thank you for the suggestion. 

Best Regards,

On 29/04/2018 17:03:24, Javier Pareja <[hidden email]> wrote:

Hi Saulo,

I'm no expert but I will give it a try.
I would remove the rdd2.count(), I can't see the point and you will gain performance right away. Because of this, I would not use a transform, just directly the map.
I have not used python but in Scala the cassandra-spark connector can save directly to Cassandra without a foreachRDD.

Finally I would use the spark UI to find which stage is the bottleneck here.

On Sun, 29 Apr 2018, 01:17 Saulo Sobreiro, <[hidden email]> wrote:

Hi all,


I am implementing a use case where I read some sensor data from Kafka with SparkStreaming interface (KafkaUtils.createDirectStream) and, after some transformations, write the output (RDD) to Cassandra.

Everything is working properly but I am having some trouble with the performance. My kafka topic receives around 2000 messages per second. For a 4 min. test, the SparkStreaming app takes 6~7 min. to process and write to Cassandra, which is not acceptable for longer runs.

I am running this application in a "sandbox" with 12GB of RAM, 2 cores and 30GB SSD space. 
Versions: Spark 2.1, Cassandra 3.0.9 (cqlsh 5.0.1).

I would like to know you have some suggestion to improve performance (other than getting more resources :) ).

My code (pyspark) is posted in the end of this email so you can take a look. I tried some different cassandra configurations following this link: http://www.snappydata.io/blog/snappydata-memsql-cassandra-a-performance-benchmark (recommended in stackoverflow for similar questions).


Thank you in advance,

Best Regards,
Saulo



=============== # CODE # =================================
####
# run command:
# spark2-submit --packages org.apache.spark:spark-streaming-kafka_2.11:1.6.3,anguenot:pyspark-cassandra:0.7.0,org.apache.spark:spark-core_2.11:1.5.2  --conf spark.cassandra.connection.host='localhost' --num-executors 2 --executor-cores 2 SensorDataStreamHandler.py localhost:6667 test_topic2
##

# Run Spark imports
from pyspark import SparkConf # SparkContext, SparkConf
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

# Run Cassandra imports
import pyspark_cassandra
from pyspark_cassandra import CassandraSparkContext, saveToCassandra

def recordHandler(record):
    (mid, tt, in_tt, sid, mv) = parseData( record )
    return processMetrics(mid, tt, in_tt, sid, mv)

def process(time, rdd):
    rdd2 = rdd.map( lambda w: recordHandler(w[1]) )
    if rdd2.count() > 0:
        return rdd2

def casssave(time, rdd):
    rdd.saveToCassandra( "test_hdpkns", "measurement" )

# ...
brokers, topic = sys.argv[1:]

# ...

sconf = SparkConf() \
        .setAppName("SensorDataStreamHandler") \
        .setMaster("local[*]") \
        .set("spark.default.parallelism", "2")

sc = CassandraSparkContext(conf = sconf)
batchIntervalSeconds = 2
ssc = StreamingContext(sc, batchIntervalSeconds)

kafkaStream = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})

kafkaStream \
    .transform(process) \
    .foreachRDD(casssave)

ssc.start()
ssc.awaitTermination()

================================================






--