Spark Streaming join taking long to process

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark Streaming join taking long to process

Abhijeet Kumar
Hi All,

I'm just practicing Spark Streaming with joining two different stream. I noticed that it's taking around 15 seconds for each record. Let me share the details and the code:

If you can see for stage id 2, it's taking 15 s. Isn't this strange.

Code:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.TimestampType
import org.apache.log4j.{Level, Logger}

object StreamJoin{

val kafkaTopic1 = "demo2"
val kafkaTopic2 = "demo3"
val bootstrapServer = "localhost:9092"

def main(args: Array[String]): Unit = {
val checkPointDir = "<a href="hdfs://localhost:8020/checkpo" class="">hdfs://localhost:8020/checkpo"

val spark = SparkSession.builder
.appName("Argoid_Realtime_Pipeline")
.master("local")
.getOrCreate()

val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)

import spark.implicits._

val df1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServer)
.option("subscribe", kafkaTopic1)
.option("failOnDataLoss", "false")
.load()

val df2 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServer)
.option("subscribe", kafkaTopic2)
.option("failOnDataLoss", "false")
.load()

val order_details = df1
.withColumn("s_order_id", get_json_object($"value".cast("String"), "$.order_id"))
.withColumn("s_customer_id", get_json_object($"value".cast("String"), "$.customer_id"))
.withColumn("s_promotion_id", get_json_object($"value".cast("String"), "$.promotion_id"))
.withColumn("s_store_id", get_json_object($"value".cast("String"), "$.store_id"))
.withColumn("s_product_id", get_json_object($"value".cast("String"), "$.product_id"))
.withColumn("s_warehouse_id", get_json_object($"value".cast("String"), "$.warehouse_id"))
.withColumn("unit_cost", get_json_object($"value".cast("String"), "$.unit_cost"))
.withColumn("total_cost", get_json_object($"value".cast("String"), "$.total_cost"))
.withColumn("units_sold", get_json_object($"value".cast("String"), "$.units_sold"))
.withColumn("promotion_cost", get_json_object($"value".cast("String"), "$.promotion_cost"))
.withColumn("date_of_order", get_json_object($"value".cast("String"), "$.date_of_order"))
.withColumn("tstamp_trans", current_timestamp())
.withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))
.select($"s_customer_id", $"s_order_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
$"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
$"total_cost".cast("integer") as "total_cost", $"promotion_cost".cast("integer") as "promotion_cost",
$"date_of_order", $"tstamp_trans", $"TIMESTAMP", $"units_sold".cast("integer") as "units_sold")

val invoice_details = df2
.withColumn("order_id", get_json_object($"value".cast("String"), "$.order_id"))
.withColumn("invoice_status", get_json_object($"value".cast("String"), "$.invoice_status"))
.where($"invoice_status" === "Success")

.withColumn("tstamp_trans", current_timestamp())
.withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))


val join_df = order_details
.join(invoice_details, order_details.col("s_order_id") === invoice_details.col("order_id"))
.select($"s_customer_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
$"s_warehouse_id", $"unit_cost", $"total_cost",
$"promotion_cost",
$"date_of_order",
$"units_sold" as "units_sold", $"order_id")

join_df.writeStream
.format("console")
.option("truncate", false)
.start()
.awaitTermination()

}
}

Thanks,
Abhijeet Kumar

Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming join taking long to process

Srikanth Sriram
Hello Abhijeet,
I am not sure my answer will solve your problem but just think of the master configuration set for spark application.
val spark = SparkSession.builder
.appName("Argoid_Realtime_Pipeline")
.master("local")
.getOrCreate()
I see you have set it has "local" not as "local[*]".

From other blog, i got this information, sharing you in full sentence:
"We are going to work locally, running the application straight from our IDE, so we are setting the master to local[*], meaning we are creating as many threads as there are cores on the machine."

Just check if this is reducing the time taken for processing, since by this local[*] we are going to use all cores available, not just one core?

Regards,
Sriram Srikanth

On Tue, Nov 27, 2018 at 1:46 PM Abhijeet Kumar <[hidden email]> wrote:
Hi All,

I'm just practicing Spark Streaming with joining two different stream. I noticed that it's taking around 15 seconds for each record. Let me share the details and the code:

If you can see for stage id 2, it's taking 15 s. Isn't this strange.

Code:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.TimestampType
import org.apache.log4j.{Level, Logger}

object StreamJoin{

val kafkaTopic1 = "demo2"
val kafkaTopic2 = "demo3"
val bootstrapServer = "localhost:9092"

def main(args: Array[String]): Unit = {
val checkPointDir = "hdfs://localhost:8020/checkpo"

val spark = SparkSession.builder
.appName("Argoid_Realtime_Pipeline")
.master("local")
.getOrCreate()

val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)

import spark.implicits._

val df1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServer)
.option("subscribe", kafkaTopic1)
.option("failOnDataLoss", "false")
.load()

val df2 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServer)
.option("subscribe", kafkaTopic2)
.option("failOnDataLoss", "false")
.load()

val order_details = df1
.withColumn("s_order_id", get_json_object($"value".cast("String"), "$.order_id"))
.withColumn("s_customer_id", get_json_object($"value".cast("String"), "$.customer_id"))
.withColumn("s_promotion_id", get_json_object($"value".cast("String"), "$.promotion_id"))
.withColumn("s_store_id", get_json_object($"value".cast("String"), "$.store_id"))
.withColumn("s_product_id", get_json_object($"value".cast("String"), "$.product_id"))
.withColumn("s_warehouse_id", get_json_object($"value".cast("String"), "$.warehouse_id"))
.withColumn("unit_cost", get_json_object($"value".cast("String"), "$.unit_cost"))
.withColumn("total_cost", get_json_object($"value".cast("String"), "$.total_cost"))
.withColumn("units_sold", get_json_object($"value".cast("String"), "$.units_sold"))
.withColumn("promotion_cost", get_json_object($"value".cast("String"), "$.promotion_cost"))
.withColumn("date_of_order", get_json_object($"value".cast("String"), "$.date_of_order"))
.withColumn("tstamp_trans", current_timestamp())
.withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))
.select($"s_customer_id", $"s_order_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
$"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
$"total_cost".cast("integer") as "total_cost", $"promotion_cost".cast("integer") as "promotion_cost",
$"date_of_order", $"tstamp_trans", $"TIMESTAMP", $"units_sold".cast("integer") as "units_sold")

val invoice_details = df2
.withColumn("order_id", get_json_object($"value".cast("String"), "$.order_id"))
.withColumn("invoice_status", get_json_object($"value".cast("String"), "$.invoice_status"))
.where($"invoice_status" === "Success")

.withColumn("tstamp_trans", current_timestamp())
.withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))


val join_df = order_details
.join(invoice_details, order_details.col("s_order_id") === invoice_details.col("order_id"))
.select($"s_customer_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
$"s_warehouse_id", $"unit_cost", $"total_cost",
$"promotion_cost",
$"date_of_order",
$"units_sold" as "units_sold", $"order_id")

join_df.writeStream
.format("console")
.option("truncate", false)
.start()
.awaitTermination()

}
}

Thanks,
Abhijeet Kumar



--
Regards,
Srikanth Sriram


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Screenshot 2018-11-27 at 1.24.39 PM.png (242K) Download Attachment
Screenshot 2018-11-27 at 1.24.47 PM.png (258K) Download Attachment
Screenshot 2018-11-27 at 1.24.47 PM.png (258K) Download Attachment
Screenshot 2018-11-27 at 1.24.39 PM.png (242K) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming join taking long to process

Abhijeet Kumar
Yes, it did 
Thanks for the solution. I solved it locally, but I’m worried how I can do this when I’m using yarn because that same 15 Sec is taking on the yarn too :)

On 27-Nov-2018, at 4:42 PM, Srikanth Sriram <[hidden email]> wrote:

Hello Abhijeet,
I am not sure my answer will solve your problem but just think of the master configuration set for spark application.
val spark = SparkSession.builder
.appName("Argoid_Realtime_Pipeline")
.master("local")
.getOrCreate()
I see you have set it has "local" not as "local[*]".

From other blog, i got this information, sharing you in full sentence:
"We are going to work locally, running the application straight from our IDE, so we are setting the master to local[*], meaning we are creating as many threads as there are cores on the machine."

Just check if this is reducing the time taken for processing, since by this local[*] we are going to use all cores available, not just one core?

Regards,
Sriram Srikanth

On Tue, Nov 27, 2018 at 1:46 PM Abhijeet Kumar <[hidden email]> wrote:
Hi All,

I'm just practicing Spark Streaming with joining two different stream. I noticed that it's taking around 15 seconds for each record. Let me share the details and the code:

If you can see for stage id 2, it's taking 15 s. Isn't this strange.

Code:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.TimestampType
import org.apache.log4j.{Level, Logger}

object StreamJoin{

val kafkaTopic1 = "demo2"
val kafkaTopic2 = "demo3"
val bootstrapServer = "localhost:9092"

def main(args: Array[String]): Unit = {
val checkPointDir = "hdfs://localhost:8020/checkpo"

val spark = SparkSession.builder
.appName("Argoid_Realtime_Pipeline")
.master("local")
.getOrCreate()

val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)

import spark.implicits._

val df1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServer)
.option("subscribe", kafkaTopic1)
.option("failOnDataLoss", "false")
.load()

val df2 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServer)
.option("subscribe", kafkaTopic2)
.option("failOnDataLoss", "false")
.load()

val order_details = df1
.withColumn("s_order_id", get_json_object($"value".cast("String"), "$.order_id"))
.withColumn("s_customer_id", get_json_object($"value".cast("String"), "$.customer_id"))
.withColumn("s_promotion_id", get_json_object($"value".cast("String"), "$.promotion_id"))
.withColumn("s_store_id", get_json_object($"value".cast("String"), "$.store_id"))
.withColumn("s_product_id", get_json_object($"value".cast("String"), "$.product_id"))
.withColumn("s_warehouse_id", get_json_object($"value".cast("String"), "$.warehouse_id"))
.withColumn("unit_cost", get_json_object($"value".cast("String"), "$.unit_cost"))
.withColumn("total_cost", get_json_object($"value".cast("String"), "$.total_cost"))
.withColumn("units_sold", get_json_object($"value".cast("String"), "$.units_sold"))
.withColumn("promotion_cost", get_json_object($"value".cast("String"), "$.promotion_cost"))
.withColumn("date_of_order", get_json_object($"value".cast("String"), "$.date_of_order"))
.withColumn("tstamp_trans", current_timestamp())
.withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))
.select($"s_customer_id", $"s_order_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
$"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
$"total_cost".cast("integer") as "total_cost", $"promotion_cost".cast("integer") as "promotion_cost",
$"date_of_order", $"tstamp_trans", $"TIMESTAMP", $"units_sold".cast("integer") as "units_sold")

val invoice_details = df2
.withColumn("order_id", get_json_object($"value".cast("String"), "$.order_id"))
.withColumn("invoice_status", get_json_object($"value".cast("String"), "$.invoice_status"))
.where($"invoice_status" === "Success")

.withColumn("tstamp_trans", current_timestamp())
.withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))


val join_df = order_details
.join(invoice_details, order_details.col("s_order_id") === invoice_details.col("order_id"))
.select($"s_customer_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
$"s_warehouse_id", $"unit_cost", $"total_cost",
$"promotion_cost",
$"date_of_order",
$"units_sold" as "units_sold", $"order_id")

join_df.writeStream
.format("console")
.option("truncate", false)
.start()
.awaitTermination()

}
}

Thanks,
Abhijeet Kumar



--
Regards,
Srikanth Sriram
<Screenshot 2018-11-27 at 1.24.47 PM.png><Screenshot 2018-11-27 at 1.24.39 PM.png><Screenshot 2018-11-27 at 1.24.39 PM.png><Screenshot 2018-11-27 at 1.24.39 PM.png>

Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming join taking long to process

Shixiong(Ryan) Zhu
If you are using the same code to run on Yarn, I believe it’s still using the local mode as it overwrites the master url set by CLI. You can check the “executors” tab in the Spark UI to set how many executors are running, and verify if it matches your config.
On Tue, Nov 27, 2018 at 6:17 AM Abhijeet Kumar <[hidden email]> wrote:
Yes, it did 
Thanks for the solution. I solved it locally, but I’m worried how I can do this when I’m using yarn because that same 15 Sec is taking on the yarn too :)

On 27-Nov-2018, at 4:42 PM, Srikanth Sriram <[hidden email]> wrote:

Hello Abhijeet,
I am not sure my answer will solve your problem but just think of the master configuration set for spark application.
val spark = SparkSession.builder
.appName("Argoid_Realtime_Pipeline")
.master("local")
.getOrCreate()
I see you have set it has "local" not as "local[*]".

From other blog, i got this information, sharing you in full sentence:
"We are going to work locally, running the application straight from our IDE, so we are setting the master to local[*], meaning we are creating as many threads as there are cores on the machine."

Just check if this is reducing the time taken for processing, since by this local[*] we are going to use all cores available, not just one core?

Regards,
Sriram Srikanth

On Tue, Nov 27, 2018 at 1:46 PM Abhijeet Kumar <[hidden email]> wrote:
Hi All,

I'm just practicing Spark Streaming with joining two different stream. I noticed that it's taking around 15 seconds for each record. Let me share the details and the code:

If you can see for stage id 2, it's taking 15 s. Isn't this strange.

Code:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.TimestampType
import org.apache.log4j.{Level, Logger}

object StreamJoin{

val kafkaTopic1 = "demo2"
val kafkaTopic2 = "demo3"
val bootstrapServer = "localhost:9092"

def main(args: Array[String]): Unit = {
val checkPointDir = "hdfs://localhost:8020/checkpo"

val spark = SparkSession.builder
.appName("Argoid_Realtime_Pipeline")
.master("local")
.getOrCreate()

val rootLogger = Logger.getRootLogger()
rootLogger.setLevel(Level.ERROR)

import spark.implicits._

val df1 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServer)
.option("subscribe", kafkaTopic1)
.option("failOnDataLoss", "false")
.load()

val df2 = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServer)
.option("subscribe", kafkaTopic2)
.option("failOnDataLoss", "false")
.load()

val order_details = df1
.withColumn("s_order_id", get_json_object($"value".cast("String"), "$.order_id"))
.withColumn("s_customer_id", get_json_object($"value".cast("String"), "$.customer_id"))
.withColumn("s_promotion_id", get_json_object($"value".cast("String"), "$.promotion_id"))
.withColumn("s_store_id", get_json_object($"value".cast("String"), "$.store_id"))
.withColumn("s_product_id", get_json_object($"value".cast("String"), "$.product_id"))
.withColumn("s_warehouse_id", get_json_object($"value".cast("String"), "$.warehouse_id"))
.withColumn("unit_cost", get_json_object($"value".cast("String"), "$.unit_cost"))
.withColumn("total_cost", get_json_object($"value".cast("String"), "$.total_cost"))
.withColumn("units_sold", get_json_object($"value".cast("String"), "$.units_sold"))
.withColumn("promotion_cost", get_json_object($"value".cast("String"), "$.promotion_cost"))
.withColumn("date_of_order", get_json_object($"value".cast("String"), "$.date_of_order"))
.withColumn("tstamp_trans", current_timestamp())
.withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))
.select($"s_customer_id", $"s_order_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
$"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
$"total_cost".cast("integer") as "total_cost", $"promotion_cost".cast("integer") as "promotion_cost",
$"date_of_order", $"tstamp_trans", $"TIMESTAMP", $"units_sold".cast("integer") as "units_sold")

val invoice_details = df2
.withColumn("order_id", get_json_object($"value".cast("String"), "$.order_id"))
.withColumn("invoice_status", get_json_object($"value".cast("String"), "$.invoice_status"))
.where($"invoice_status" === "Success")

.withColumn("tstamp_trans", current_timestamp())
.withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))


val join_df = order_details
.join(invoice_details, order_details.col("s_order_id") === invoice_details.col("order_id"))
.select($"s_customer_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
$"s_warehouse_id", $"unit_cost", $"total_cost",
$"promotion_cost",
$"date_of_order",
$"units_sold" as "units_sold", $"order_id")

join_df.writeStream
.format("console")
.option("truncate", false)
.start()
.awaitTermination()

}
}

Thanks,
Abhijeet Kumar



--
Regards,
Srikanth Sriram
<Screenshot 2018-11-27 at 1.24.47 PM.png><Screenshot 2018-11-27 at 1.24.39 PM.png><Screenshot 2018-11-27 at 1.24.39 PM.png><Screenshot 2018-11-27 at 1.24.39 PM.png>