Spark Streaming

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

Spark Streaming

Siva Samraj
Hello All,

I am using Spark 2.3 version and i am trying to write Spark Streaming Join. It is a basic join and it is taking more time to join the stream data. I am not sure any configuration we need to set on Spark. 

Code:
*************************
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.TimestampType

object OrderSalesJoin {
  def main(args: Array[String]): Unit = {

    setEnvironmentVariables(args(0))

    val order_topic = args(1)
    val invoice_topic = args(2)
    val dest_topic_name = args(3)

    val spark = SparkSession.builder().appName("SalesStreamingJoin").getOrCreate()

    val checkpoint_path = HDFS_HOST + "/checkpoints/" + dest_topic_name

    import spark.implicits._


    val order_df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KAFKA_BROKERS)
      .option("subscribe", order_topic)
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", "false")
      .option("kafka.replica.fetch.max.bytes", "15728640")
      .load()


    val invoice_df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KAFKA_BROKERS)
      .option("subscribe", invoice_topic)
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", "false")
      .option("kafka.replica.fetch.max.bytes", "15728640")
      .load()


    val order_details = order_df
      .withColumn("s_order_id", get_json_object($"value".cast("String"), "$.order_id"))
      .withColumn("s_customer_id", get_json_object($"value".cast("String"), "$.customer_id"))
      .withColumn("s_promotion_id", get_json_object($"value".cast("String"), "$.promotion_id"))
      .withColumn("s_store_id", get_json_object($"value".cast("String"), "$.store_id"))
      .withColumn("s_product_id", get_json_object($"value".cast("String"), "$.product_id"))
      .withColumn("s_warehouse_id", get_json_object($"value".cast("String"), "$.warehouse_id"))
      .withColumn("unit_cost", get_json_object($"value".cast("String"), "$.unit_cost"))
      .withColumn("total_cost", get_json_object($"value".cast("String"), "$.total_cost"))
      .withColumn("units_sold", get_json_object($"value".cast("String"), "$.units_sold"))
      .withColumn("promotion_cost", get_json_object($"value".cast("String"), "$.promotion_cost"))
      .withColumn("date_of_order", get_json_object($"value".cast("String"), "$.date_of_order"))
      .withColumn("tstamp_trans", current_timestamp())
      .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))
      .select($"s_customer_id", $"s_order_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
        $"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
        $"total_cost".cast("integer") as "total_cost", $"promotion_cost".cast("integer") as "promotion_cost",
        $"date_of_order", $"tstamp_trans", $"TIMESTAMP", $"units_sold".cast("integer") as "units_sold")


    val invoice_details = invoice_df
      .withColumn("order_id", get_json_object($"value".cast("String"), "$.order_id"))
      .withColumn("invoice_status", get_json_object($"value".cast("String"), "$.invoice_status"))
      .where($"invoice_status" === "Success")

      .withColumn("tstamp_trans", current_timestamp())
      .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))



    val order_wm = order_details.withWatermark("tstamp_trans", args(4))
    val invoice_wm = invoice_details.withWatermark("tstamp_trans", args(5))

    val join_df = order_wm
      .join(invoice_wm, order_wm.col("s_order_id") === invoice_wm.col("order_id"))
      .select($"s_customer_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
        $"s_warehouse_id", $"unit_cost", $"total_cost",
        $"promotion_cost",
        $"date_of_order",
        $"units_sold" as "units_sold", $"order_id")

    val final_ids = join_df
      .withColumn("value", to_json(struct($"s_customer_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
        $"s_warehouse_id", $"unit_cost".cast("Int") as "unit_cost", $"total_cost".cast("Int") as "total_cost",
        $"promotion_cost".cast("Int") as "promotion_cost",
        $"date_of_order",
        $"units_sold".cast("Int") as "units_sold", $"order_id")))
      .dropDuplicates("order_id")
      .select("value")


    val write_df = final_ids
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KAFKA_BROKERS)
      .option("topic", dest_topic_name)
      .option("checkpointLocation", checkpoint_path)
      .trigger(Trigger.ProcessingTime("1 second")) 
      .start()

    write_df.awaitTermination()

  }

}
****************************

Let me know, it is taking more than a minute for every run. The waiting time is keep on increasing as the data grows. 

Please let me know, any thing we need to configure to make it fast. I tried increase the parallesim. 

Executor: tried from <1 to 4> and memory i gave is 3GB. The data flow is very less. Even for the single data it is taking time. 


Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming

Jungtaek Lim
You may need to put efforts on triage how much time is spent on each part. Without such information you are only able to get general tips and tricks. Please check SQL tab and see DAG graph as well as details (logical plan, physical plan) to see whether you're happy about these plans.

General tip on quick look of query: avoid using withColumn repeatedly and try to put them in one select statement. If I'm not mistaken, it is known as a bit costly since each call would produce a new Dataset. Defining schema and using "from_json" will eliminate all the call of withColumn"s" and extra calls of "get_json_object".

- Jungtaek Lim (HeartSaVioR)

2018년 11월 27일 (화) 오후 2:44, Siva Samraj <[hidden email]>님이 작성:
Hello All,

I am using Spark 2.3 version and i am trying to write Spark Streaming Join. It is a basic join and it is taking more time to join the stream data. I am not sure any configuration we need to set on Spark. 

Code:
*************************
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.TimestampType

object OrderSalesJoin {
  def main(args: Array[String]): Unit = {

    setEnvironmentVariables(args(0))

    val order_topic = args(1)
    val invoice_topic = args(2)
    val dest_topic_name = args(3)

    val spark = SparkSession.builder().appName("SalesStreamingJoin").getOrCreate()

    val checkpoint_path = HDFS_HOST + "/checkpoints/" + dest_topic_name

    import spark.implicits._


    val order_df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KAFKA_BROKERS)
      .option("subscribe", order_topic)
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", "false")
      .option("kafka.replica.fetch.max.bytes", "15728640")
      .load()


    val invoice_df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KAFKA_BROKERS)
      .option("subscribe", invoice_topic)
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", "false")
      .option("kafka.replica.fetch.max.bytes", "15728640")
      .load()


    val order_details = order_df
      .withColumn("s_order_id", get_json_object($"value".cast("String"), "$.order_id"))
      .withColumn("s_customer_id", get_json_object($"value".cast("String"), "$.customer_id"))
      .withColumn("s_promotion_id", get_json_object($"value".cast("String"), "$.promotion_id"))
      .withColumn("s_store_id", get_json_object($"value".cast("String"), "$.store_id"))
      .withColumn("s_product_id", get_json_object($"value".cast("String"), "$.product_id"))
      .withColumn("s_warehouse_id", get_json_object($"value".cast("String"), "$.warehouse_id"))
      .withColumn("unit_cost", get_json_object($"value".cast("String"), "$.unit_cost"))
      .withColumn("total_cost", get_json_object($"value".cast("String"), "$.total_cost"))
      .withColumn("units_sold", get_json_object($"value".cast("String"), "$.units_sold"))
      .withColumn("promotion_cost", get_json_object($"value".cast("String"), "$.promotion_cost"))
      .withColumn("date_of_order", get_json_object($"value".cast("String"), "$.date_of_order"))
      .withColumn("tstamp_trans", current_timestamp())
      .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))
      .select($"s_customer_id", $"s_order_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
        $"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
        $"total_cost".cast("integer") as "total_cost", $"promotion_cost".cast("integer") as "promotion_cost",
        $"date_of_order", $"tstamp_trans", $"TIMESTAMP", $"units_sold".cast("integer") as "units_sold")


    val invoice_details = invoice_df
      .withColumn("order_id", get_json_object($"value".cast("String"), "$.order_id"))
      .withColumn("invoice_status", get_json_object($"value".cast("String"), "$.invoice_status"))
      .where($"invoice_status" === "Success")

      .withColumn("tstamp_trans", current_timestamp())
      .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))



    val order_wm = order_details.withWatermark("tstamp_trans", args(4))
    val invoice_wm = invoice_details.withWatermark("tstamp_trans", args(5))

    val join_df = order_wm
      .join(invoice_wm, order_wm.col("s_order_id") === invoice_wm.col("order_id"))
      .select($"s_customer_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
        $"s_warehouse_id", $"unit_cost", $"total_cost",
        $"promotion_cost",
        $"date_of_order",
        $"units_sold" as "units_sold", $"order_id")

    val final_ids = join_df
      .withColumn("value", to_json(struct($"s_customer_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
        $"s_warehouse_id", $"unit_cost".cast("Int") as "unit_cost", $"total_cost".cast("Int") as "total_cost",
        $"promotion_cost".cast("Int") as "promotion_cost",
        $"date_of_order",
        $"units_sold".cast("Int") as "units_sold", $"order_id")))
      .dropDuplicates("order_id")
      .select("value")


    val write_df = final_ids
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KAFKA_BROKERS)
      .option("topic", dest_topic_name)
      .option("checkpointLocation", checkpoint_path)
      .trigger(Trigger.ProcessingTime("1 second")) 
      .start()

    write_df.awaitTermination()

  }

}
****************************

Let me know, it is taking more than a minute for every run. The waiting time is keep on increasing as the data grows. 

Please let me know, any thing we need to configure to make it fast. I tried increase the parallesim. 

Executor: tried from <1 to 4> and memory i gave is 3GB. The data flow is very less. Even for the single data it is taking time. 


Reply | Threaded
Open this post in threaded view
|

Re: Spark Streaming

Siva Samraj
My joindf is taking 14 sec in the first run and i have commented out the withcolumn still it is taking more time.



On Tue, Nov 27, 2018 at 12:08 PM Jungtaek Lim <[hidden email]> wrote:
You may need to put efforts on triage how much time is spent on each part. Without such information you are only able to get general tips and tricks. Please check SQL tab and see DAG graph as well as details (logical plan, physical plan) to see whether you're happy about these plans.

General tip on quick look of query: avoid using withColumn repeatedly and try to put them in one select statement. If I'm not mistaken, it is known as a bit costly since each call would produce a new Dataset. Defining schema and using "from_json" will eliminate all the call of withColumn"s" and extra calls of "get_json_object".

- Jungtaek Lim (HeartSaVioR)

2018년 11월 27일 (화) 오후 2:44, Siva Samraj <[hidden email]>님이 작성:
Hello All,

I am using Spark 2.3 version and i am trying to write Spark Streaming Join. It is a basic join and it is taking more time to join the stream data. I am not sure any configuration we need to set on Spark. 

Code:
*************************
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.types.TimestampType

object OrderSalesJoin {
  def main(args: Array[String]): Unit = {

    setEnvironmentVariables(args(0))

    val order_topic = args(1)
    val invoice_topic = args(2)
    val dest_topic_name = args(3)

    val spark = SparkSession.builder().appName("SalesStreamingJoin").getOrCreate()

    val checkpoint_path = HDFS_HOST + "/checkpoints/" + dest_topic_name

    import spark.implicits._


    val order_df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KAFKA_BROKERS)
      .option("subscribe", order_topic)
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", "false")
      .option("kafka.replica.fetch.max.bytes", "15728640")
      .load()


    val invoice_df = spark
      .readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KAFKA_BROKERS)
      .option("subscribe", invoice_topic)
      .option("startingOffsets", "latest")
      .option("failOnDataLoss", "false")
      .option("kafka.replica.fetch.max.bytes", "15728640")
      .load()


    val order_details = order_df
      .withColumn("s_order_id", get_json_object($"value".cast("String"), "$.order_id"))
      .withColumn("s_customer_id", get_json_object($"value".cast("String"), "$.customer_id"))
      .withColumn("s_promotion_id", get_json_object($"value".cast("String"), "$.promotion_id"))
      .withColumn("s_store_id", get_json_object($"value".cast("String"), "$.store_id"))
      .withColumn("s_product_id", get_json_object($"value".cast("String"), "$.product_id"))
      .withColumn("s_warehouse_id", get_json_object($"value".cast("String"), "$.warehouse_id"))
      .withColumn("unit_cost", get_json_object($"value".cast("String"), "$.unit_cost"))
      .withColumn("total_cost", get_json_object($"value".cast("String"), "$.total_cost"))
      .withColumn("units_sold", get_json_object($"value".cast("String"), "$.units_sold"))
      .withColumn("promotion_cost", get_json_object($"value".cast("String"), "$.promotion_cost"))
      .withColumn("date_of_order", get_json_object($"value".cast("String"), "$.date_of_order"))
      .withColumn("tstamp_trans", current_timestamp())
      .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))
      .select($"s_customer_id", $"s_order_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
        $"s_warehouse_id", $"unit_cost".cast("integer") as "unit_cost",
        $"total_cost".cast("integer") as "total_cost", $"promotion_cost".cast("integer") as "promotion_cost",
        $"date_of_order", $"tstamp_trans", $"TIMESTAMP", $"units_sold".cast("integer") as "units_sold")


    val invoice_details = invoice_df
      .withColumn("order_id", get_json_object($"value".cast("String"), "$.order_id"))
      .withColumn("invoice_status", get_json_object($"value".cast("String"), "$.invoice_status"))
      .where($"invoice_status" === "Success")

      .withColumn("tstamp_trans", current_timestamp())
      .withColumn("TIMESTAMP", unix_timestamp($"tstamp_trans", "yyyyMMddHHmmss").cast(TimestampType))



    val order_wm = order_details.withWatermark("tstamp_trans", args(4))
    val invoice_wm = invoice_details.withWatermark("tstamp_trans", args(5))

    val join_df = order_wm
      .join(invoice_wm, order_wm.col("s_order_id") === invoice_wm.col("order_id"))
      .select($"s_customer_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
        $"s_warehouse_id", $"unit_cost", $"total_cost",
        $"promotion_cost",
        $"date_of_order",
        $"units_sold" as "units_sold", $"order_id")

    val final_ids = join_df
      .withColumn("value", to_json(struct($"s_customer_id", $"s_promotion_id", $"s_store_id", $"s_product_id",
        $"s_warehouse_id", $"unit_cost".cast("Int") as "unit_cost", $"total_cost".cast("Int") as "total_cost",
        $"promotion_cost".cast("Int") as "promotion_cost",
        $"date_of_order",
        $"units_sold".cast("Int") as "units_sold", $"order_id")))
      .dropDuplicates("order_id")
      .select("value")


    val write_df = final_ids
      .writeStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KAFKA_BROKERS)
      .option("topic", dest_topic_name)
      .option("checkpointLocation", checkpoint_path)
      .trigger(Trigger.ProcessingTime("1 second")) 
      .start()

    write_df.awaitTermination()

  }

}
****************************

Let me know, it is taking more than a minute for every run. The waiting time is keep on increasing as the data grows. 

Please let me know, any thing we need to configure to make it fast. I tried increase the parallesim. 

Executor: tried from <1 to 4> and memory i gave is 3GB. The data flow is very less. Even for the single data it is taking time.