How to convert Spark Streaming to Static Dataframe on the fly and pass it to a ML Model as batch

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

How to convert Spark Streaming to Static Dataframe on the fly and pass it to a ML Model as batch

Aakash Basu-2
Hi all,

The requirement is, to process file using Spark Streaming fed from Kafka Topic and once all the transformations are done, make it a batch of static dataframe and pass it into a Spark ML Model tuning.

As of now, I had been doing it in the below fashion -

1) Read the file using Kafka
2) Consume it in Spark using a streaming dataframe
3) Run spark transformation jobs on streaming data
4) Append and write on HDFS.
5) Read the transformed file as batch in Spark
6) Run Spark ML Model

But, the requirement is to avoid use of HDFS as it may not be installed in certain clusters, so, we've to avoid the disk I/O and do it on the fly from Kafka to append in a spark static DF and hence pass that DF to the ML Model.

How to go about it?

Thanks,
Aakash.
Reply | Threaded
Open this post in threaded view
|

Re: How to convert Spark Streaming to Static Dataframe on the fly and pass it to a ML Model as batch

maasg
Hi Aakash,

In Spark Streaming, forEachRDD provides you access to the data in each micro batch.
You can transform that RDD into a DataFrame and implement the flow you describe.

eg.:

var historyRDD:RDD[mytype] = sparkContext.emptyRDD

// create Kafka Dstream ...

dstream.foreachRDD{ rdd =>
  val allData = historyRDD union rdd
  val df = allData.toDF   // requires the RDD to be of some structured type. i.e. a case class
  // do something with the dataframe df
historyRDD = allData  // this needs checkpointing 
}
Depending on the volume of data you're dealing with, it might not be possible to hold all data in memory. 
Checkpoint of the historyRDD is mandatory to break up the growing lineage (union will keep a reference to the previous RDDs and at some point, things will blow up)
So, while this trick might keep data within the Spark boundaries, you still need resilient storage to write the checkpoints in order to implement a reliable streaming job.

As you are using Kafka, another alternative would be to write the transformed data to Kafka and have the training job consume that topic, replaying data from the start. 
Confluent has some good resources on how to use "kafka as a storage"

I  hope this helps.

kr, Gerard.

PS: I'm also not sure why you are initially writing the files to Kafka. It would be easier to read the files directly from Spark Streaming or Structured Streaming.





On Tue, Aug 14, 2018 at 9:31 AM Aakash Basu <[hidden email]> wrote:
Hi all,

The requirement is, to process file using Spark Streaming fed from Kafka Topic and once all the transformations are done, make it a batch of static dataframe and pass it into a Spark ML Model tuning.

As of now, I had been doing it in the below fashion -

1) Read the file using Kafka
2) Consume it in Spark using a streaming dataframe
3) Run spark transformation jobs on streaming data
4) Append and write on HDFS.
5) Read the transformed file as batch in Spark
6) Run Spark ML Model

But, the requirement is to avoid use of HDFS as it may not be installed in certain clusters, so, we've to avoid the disk I/O and do it on the fly from Kafka to append in a spark static DF and hence pass that DF to the ML Model.

How to go about it?

Thanks,
Aakash.
Reply | Threaded
Open this post in threaded view
|

Re: How to convert Spark Streaming to Static Dataframe on the fly and pass it to a ML Model as batch

Gourav Sengupta
Hi,



Regards,
Gourav Sengupta

On Tue, Aug 14, 2018 at 10:51 AM, Gerard Maas <[hidden email]> wrote:
Hi Aakash,

In Spark Streaming, forEachRDD provides you access to the data in each micro batch.
You can transform that RDD into a DataFrame and implement the flow you describe.

eg.:

var historyRDD:RDD[mytype] = sparkContext.emptyRDD

// create Kafka Dstream ...

dstream.foreachRDD{ rdd =>
  val allData = historyRDD union rdd
  val df = allData.toDF   // requires the RDD to be of some structured type. i.e. a case class
  // do something with the dataframe df
historyRDD = allData  // this needs checkpointing 
}
Depending on the volume of data you're dealing with, it might not be possible to hold all data in memory. 
Checkpoint of the historyRDD is mandatory to break up the growing lineage (union will keep a reference to the previous RDDs and at some point, things will blow up)
So, while this trick might keep data within the Spark boundaries, you still need resilient storage to write the checkpoints in order to implement a reliable streaming job.

As you are using Kafka, another alternative would be to write the transformed data to Kafka and have the training job consume that topic, replaying data from the start. 
Confluent has some good resources on how to use "kafka as a storage"

I  hope this helps.

kr, Gerard.

PS: I'm also not sure why you are initially writing the files to Kafka. It would be easier to read the files directly from Spark Streaming or Structured Streaming.





On Tue, Aug 14, 2018 at 9:31 AM Aakash Basu <[hidden email]> wrote:
Hi all,

The requirement is, to process file using Spark Streaming fed from Kafka Topic and once all the transformations are done, make it a batch of static dataframe and pass it into a Spark ML Model tuning.

As of now, I had been doing it in the below fashion -

1) Read the file using Kafka
2) Consume it in Spark using a streaming dataframe
3) Run spark transformation jobs on streaming data
4) Append and write on HDFS.
5) Read the transformed file as batch in Spark
6) Run Spark ML Model

But, the requirement is to avoid use of HDFS as it may not be installed in certain clusters, so, we've to avoid the disk I/O and do it on the fly from Kafka to append in a spark static DF and hence pass that DF to the ML Model.

How to go about it?

Thanks,
Aakash.