re: streaming, batch / spark 2.2.1

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

re: streaming, batch / spark 2.2.1

Peter Liu
Hello there,

I'm new to spark streaming and have trouble to understand spark batch "composition" (google search keeps give me an older spark streaming concept). Would appreciate any help and clarifications.
I'm using spark 2.2.1 for a streaming workload (see quoted code in (a) below). The general question I have is:

How is the number of records for a spark batch (as seen on Spark Job UI) determined? (the default batch interval time is supposedly zero in Spark 2.2.1  by default settings)

The Issue I'm facing is that for the same incoming streaming source (300K msg/sec to a kafka broker, 220bytes per message), I got different numbers (2x) of processed batches on two different systems for the same amount of application/consumer running time (30min). -- At batch level, the input data size per batch are the same (49.9KB), where the total input data size (under spark executor tab) is different , i.e. ~2x as the system also processed 2x of batches as expected. --- Note: on both systems, the spark consumer seems to hold well (no increased batch processing time lagging over the 30 min). see (c) for the real functional concern.

(b) and (c) below have a bit more context info and the real concern in case relevant.

Have been struggling with this. Any comments and help would be very much appreciated.

Thanks!

Regards,

Peter

=============
(a) code in use:
      .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)]
      .select(from_json($"value", mySchema).as("data"), $"timestamp")
      .select("data.*", "timestamp")
      .where($"event_type" === "view")
      .select($"ad_id", $"event_time")
      .join(campaigns.toSeq.toDS().cache(), Seq("ad_id")) 
      .groupBy(millisTime(window($"event_time", "10 seconds").getField("start")) as 'time_window, $"campaign_id")  //original code
      .agg(count("*") as 'count, max('event_time) as 'lastUpdate)
      .select(to_json(struct("*")) as 'value)
      .writeStream
      .format("kafka")
...      .
       outputMode("update")
      .start()

(b)
the number of records in one batch does not seem to be determined by the batch interval (since it is zero by default in Spark2.2), but likely (at least influenced) by the time it needs to process the previous batch. It is noted that the input data amount per batch seems to be quite consistent and kept the same on both systems from Spark UI (49.9 kb)- indicating there is a strict logic to prepare/cap the data per batch despite the fluctuation in the batch processing time - what is this logic?
 

(c) 
the major question is a functional one: if one system processes the double amount of the data than the other, should it be an indication that either the system processed duplicated data or the other system processes half of the needed data? Or it is more a reporting issue?


Reply | Threaded
Open this post in threaded view
|

Re: re: streaming, batch / spark 2.2.1

zakhavan
Hello,

I just had a question. Could you refer me to a link or tell me how you
calculated these logs such as: *300K msg/sec to a kafka broker, 220bytes per
message * I'm  load a text file with 36000 records into a kafka topic and
I'd like to calculate the data rate (#samples per sec) in kafka.

Thank you,
Zeinab




--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [External Sender] re: streaming, batch / spark 2.2.1

JayeshLalwani
In reply to this post by Peter Liu
What is differrent between the 2 systems? If one system processes records faster than the other, simply because it does less processing, then you can expect the first system to have a higher throughput than the second. It's hard to say why one system has double the throughput of another without knowing what it is doing internally. 

"The number of records in one batch does not seem to be determined by the batch interval (since it is zero by default in Spark2.2), but likely (at least influenced) by the time it needs to process the previous batch."
This is expected Spark behavior. If you set batch interval to 0, it will process a microbatch immediately after it has finished the previous microbatch. Assuming your input is coming at a constant rate of R records per second, and one microbatch takes T1 seconds, then the next microbatch will take R.T1 records. If the second microbatch takes T2 seconds, then the third microbatch will take R.T2 records. This is why it's important than your throughput is higher than your input rate. If it's not, batches will become bigger and bigger and take longer and longer until the application fails



On Thu, Aug 2, 2018 at 2:43 PM Peter Liu <[hidden email]> wrote:
Hello there,

I'm new to spark streaming and have trouble to understand spark batch "composition" (google search keeps give me an older spark streaming concept). Would appreciate any help and clarifications.
I'm using spark 2.2.1 for a streaming workload (see quoted code in (a) below). The general question I have is:

How is the number of records for a spark batch (as seen on Spark Job UI) determined? (the default batch interval time is supposedly zero in Spark 2.2.1  by default settings)

The Issue I'm facing is that for the same incoming streaming source (300K msg/sec to a kafka broker, 220bytes per message), I got different numbers (2x) of processed batches on two different systems for the same amount of application/consumer running time (30min). -- At batch level, the input data size per batch are the same (49.9KB), where the total input data size (under spark executor tab) is different , i.e. ~2x as the system also processed 2x of batches as expected. --- Note: on both systems, the spark consumer seems to hold well (no increased batch processing time lagging over the 30 min). see (c) for the real functional concern.

(b) and (c) below have a bit more context info and the real concern in case relevant.

Have been struggling with this. Any comments and help would be very much appreciated.

Thanks!

Regards,

Peter

=============
(a) code in use:
      .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)]
      .select(from_json($"value", mySchema).as("data"), $"timestamp")
      .select("data.*", "timestamp")
      .where($"event_type" === "view")
      .select($"ad_id", $"event_time")
      .join(campaigns.toSeq.toDS().cache(), Seq("ad_id")) 
      .groupBy(millisTime(window($"event_time", "10 seconds").getField("start")) as 'time_window, $"campaign_id")  //original code
      .agg(count("*") as 'count, max('event_time) as 'lastUpdate)
      .select(to_json(struct("*")) as 'value)
      .writeStream
      .format("kafka")
...      .
       outputMode("update")
      .start()

(b)
the number of records in one batch does not seem to be determined by the batch interval (since it is zero by default in Spark2.2), but likely (at least influenced) by the time it needs to process the previous batch. It is noted that the input data amount per batch seems to be quite consistent and kept the same on both systems from Spark UI (49.9 kb)- indicating there is a strict logic to prepare/cap the data per batch despite the fluctuation in the batch processing time - what is this logic?
 

(c) 
the major question is a functional one: if one system processes the double amount of the data than the other, should it be an indication that either the system processed duplicated data or the other system processes half of the needed data? Or it is more a reporting issue?




The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.

Reply | Threaded
Open this post in threaded view
|

Re: re: streaming, batch / spark 2.2.1

zakhavan
In reply to this post by Peter Liu
Yes, I am loading a text file from my local machine into a kafka topic using
the script below and I'd like to calculate the number of samples per second
which is used by kafka consumer.

if __name__ == "__main__":
    print("hello spark")

    sc = SparkContext(appName="STALTA")
    ssc = StreamingContext(sc, 5)
    broker, topic = sys.argv[1:]
    # Connect to Kafka

    kvs = KafkaUtils.createStream(ssc, broker,
"raw-event-streaming-consumer",{topic:1})
    ssc.start()
    ssc.awaitTermination()



--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: [External Sender] re: streaming, batch / spark 2.2.1

Peter Liu
In reply to this post by JayeshLalwani
thanks for the clarification.

the processing time on both systems seems to be fine :(a) based on the pattern of batch processing time chart, i.e the  batch processing time is not becoming longer and longer (see charts attached below); (b) the input data on each spark stage of every batch remains the same and constant;

the two systems are indeed different in cpu frequency and cache size. so, not performing at the same level is ok. But i'd like to know whether the slower system is processing all incoming messages since its total input data seen on spark ui (exector tab) is only half as much as the faster one due to different batch processing time. would this be a functional issue? Or, the spark stage input data should not be directly interpreted/compared as/with incoming message data? I seem to miss something here.

Thank you!

Peter


the fast system (batch time in sec; x is the timeline, ignore the number on x):


the slow system:






On Thu, Aug 2, 2018 at 4:11 PM, Jayesh Lalwani <[hidden email]> wrote:
What is differrent between the 2 systems? If one system processes records faster than the other, simply because it does less processing, then you can expect the first system to have a higher throughput than the second. It's hard to say why one system has double the throughput of another without knowing what it is doing internally. 

"The number of records in one batch does not seem to be determined by the batch interval (since it is zero by default in Spark2.2), but likely (at least influenced) by the time it needs to process the previous batch."
This is expected Spark behavior. If you set batch interval to 0, it will process a microbatch immediately after it has finished the previous microbatch. Assuming your input is coming at a constant rate of R records per second, and one microbatch takes T1 seconds, then the next microbatch will take R.T1 records. If the second microbatch takes T2 seconds, then the third microbatch will take R.T2 records. This is why it's important than your throughput is higher than your input rate. If it's not, batches will become bigger and bigger and take longer and longer until the application fails



On Thu, Aug 2, 2018 at 2:43 PM Peter Liu <[hidden email]> wrote:
Hello there,

I'm new to spark streaming and have trouble to understand spark batch "composition" (google search keeps give me an older spark streaming concept). Would appreciate any help and clarifications.
I'm using spark 2.2.1 for a streaming workload (see quoted code in (a) below). The general question I have is:

How is the number of records for a spark batch (as seen on Spark Job UI) determined? (the default batch interval time is supposedly zero in Spark 2.2.1  by default settings)

The Issue I'm facing is that for the same incoming streaming source (300K msg/sec to a kafka broker, 220bytes per message), I got different numbers (2x) of processed batches on two different systems for the same amount of application/consumer running time (30min). -- At batch level, the input data size per batch are the same (49.9KB), where the total input data size (under spark executor tab) is different , i.e. ~2x as the system also processed 2x of batches as expected. --- Note: on both systems, the spark consumer seems to hold well (no increased batch processing time lagging over the 30 min). see (c) for the real functional concern.

(b) and (c) below have a bit more context info and the real concern in case relevant.

Have been struggling with this. Any comments and help would be very much appreciated.

Thanks!

Regards,

Peter

=============
(a) code in use:
      .selectExpr("CAST(value AS STRING)", "CAST(timestamp AS TIMESTAMP)").as[(String, Timestamp)]
      .select(from_json($"value", mySchema).as("data"), $"timestamp")
      .select("data.*", "timestamp")
      .where($"event_type" === "view")
      .select($"ad_id", $"event_time")
      .join(campaigns.toSeq.toDS().cache(), Seq("ad_id")) 
      .groupBy(millisTime(window($"event_time", "10 seconds").getField("start")) as 'time_window, $"campaign_id")  //original code
      .agg(count("*") as 'count, max('event_time) as 'lastUpdate)
      .select(to_json(struct("*")) as 'value)
      .writeStream
      .format("kafka")
...      .
       outputMode("update")
      .start()

(b)
the number of records in one batch does not seem to be determined by the batch interval (since it is zero by default in Spark2.2), but likely (at least influenced) by the time it needs to process the previous batch. It is noted that the input data amount per batch seems to be quite consistent and kept the same on both systems from Spark UI (49.9 kb)- indicating there is a strict logic to prepare/cap the data per batch despite the fluctuation in the batch processing time - what is this logic?
 

(c) 
the major question is a functional one: if one system processes the double amount of the data than the other, should it be an indication that either the system processed duplicated data or the other system processes half of the needed data? Or it is more a reporting issue?




The information contained in this e-mail is confidential and/or proprietary to Capital One and/or its affiliates and may only be used solely in performance of work or services for Capital One. The information transmitted herewith is intended only for use by the individual or entity to which it is addressed. If the reader of this message is not the intended recipient, you are hereby notified that any review, retransmission, dissemination, distribution, copying or other use of, or taking of any action in reliance upon this information is strictly prohibited. If you have received this communication in error, please contact the sender and delete the material from your computer.