Loading objects only once

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Loading objects only once

Naveen Swamy
Hello all,

I am a new user to Spark, please bear with me if this has been discussed earlier. 

I am trying to run batch inference using DL frameworks pre-trained models and Spark. Basically, I want to download a model(which is usually ~500 MB) onto the workers and load the model and run inference on images fetched from the source like S3 something like this
rdd = sc.parallelize(load_from_s3)
rdd.map(fetch_from_s3).map(read_file).map(predict)

I was able to get it running in local mode on Jupyter, However, I would like to load the model only once and not every map operation. A setup hook would have nice which loads the model once into the JVM, I came across this JIRA https://issues.apache.org/jira/browse/SPARK-650  which suggests that I can use Singleton and static initialization. I tried to do this using a Singleton metaclass following the thread here https://stackoverflow.com/questions/6760685/creating-a-singleton-in-python. Following this failed miserably complaining that Spark cannot serialize ctype objects with pointer references.

After a lot of trial and error, I moved the code to a separate file by creating a static method for predict that checks if a class variable is set or not and loads the model if not set. This approach does not sound thread safe to me, So I wanted to reach out and see if there are established patterns on how to achieve something like this.


Also, I would like to understand the executor->tasks->python process mapping, Does each task gets mapped to a separate python process?  The reason I ask is I want to be to use mapPartition method to load a batch of files and run inference on them separately for which I need to load the object once per task. Any 


Thanks for your time in answering my question.

Cheers, Naveen


Reply | Threaded
Open this post in threaded view
|

Re: Loading objects only once

Eike von Seggern
Hello,

maybe broadcast can help you here. [1]

You can load the model once on the driver and then broadcast it to the workers with `bc_model = sc.broadcast(model)`? You can access the model in the map function with `bc_model.value()`.

Best

Eike


Naveen Swamy <[hidden email]> schrieb am Do., 28. Sep. 2017 um 04:09 Uhr:
Hello all,

I am a new user to Spark, please bear with me if this has been discussed earlier. 

I am trying to run batch inference using DL frameworks pre-trained models and Spark. Basically, I want to download a model(which is usually ~500 MB) onto the workers and load the model and run inference on images fetched from the source like S3 something like this
rdd = sc.parallelize(load_from_s3)
rdd.map(fetch_from_s3).map(read_file).map(predict)

I was able to get it running in local mode on Jupyter, However, I would like to load the model only once and not every map operation. A setup hook would have nice which loads the model once into the JVM, I came across this JIRA https://issues.apache.org/jira/browse/SPARK-650  which suggests that I can use Singleton and static initialization. I tried to do this using a Singleton metaclass following the thread here https://stackoverflow.com/questions/6760685/creating-a-singleton-in-python. Following this failed miserably complaining that Spark cannot serialize ctype objects with pointer references.

After a lot of trial and error, I moved the code to a separate file by creating a static method for predict that checks if a class variable is set or not and loads the model if not set. This approach does not sound thread safe to me, So I wanted to reach out and see if there are established patterns on how to achieve something like this.


Also, I would like to understand the executor->tasks->python process mapping, Does each task gets mapped to a separate python process?  The reason I ask is I want to be to use mapPartition method to load a batch of files and run inference on them separately for which I need to load the object once per task. Any 


Thanks for your time in answering my question.

Cheers, Naveen


Reply | Threaded
Open this post in threaded view
|

RE: Loading objects only once

Jean Georges Perrin
In reply to this post by Naveen Swamy

Maybe load the model on each executor’s disk and load it from there? Depending on how you use the data/model, using something like Livy and sharing the same connection may help?

 

From: Naveen Swamy [mailto:[hidden email]]
Sent: Wednesday, September 27, 2017 9:08 PM
To: [hidden email]
Subject: Loading objects only once

 

Hello all,

 

I am a new user to Spark, please bear with me if this has been discussed earlier. 

 

I am trying to run batch inference using DL frameworks pre-trained models and Spark. Basically, I want to download a model(which is usually ~500 MB) onto the workers and load the model and run inference on images fetched from the source like S3 something like this

rdd = sc.parallelize(load_from_s3)

rdd.map(fetch_from_s3).map(read_file).map(predict)

 

I was able to get it running in local mode on Jupyter, However, I would like to load the model only once and not every map operation. A setup hook would have nice which loads the model once into the JVM, I came across this JIRA https://issues.apache.org/jira/browse/SPARK-650  which suggests that I can use Singleton and static initialization. I tried to do this using a Singleton metaclass following the thread here https://stackoverflow.com/questions/6760685/creating-a-singleton-in-python. Following this failed miserably complaining that Spark cannot serialize ctype objects with pointer references.

 

After a lot of trial and error, I moved the code to a separate file by creating a static method for predict that checks if a class variable is set or not and loads the model if not set. This approach does not sound thread safe to me, So I wanted to reach out and see if there are established patterns on how to achieve something like this.

 

 

Also, I would like to understand the executor->tasks->python process mapping, Does each task gets mapped to a separate python process?  The reason I ask is I want to be to use mapPartition method to load a batch of files and run inference on them separately for which I need to load the object once per task. Any 

 

 

Thanks for your time in answering my question.

 

Cheers, Naveen

 

 

Reply | Threaded
Open this post in threaded view
|

Re: Loading objects only once

Vadim Semenov
In reply to this post by Naveen Swamy
as an alternative
```
spark-submit --files <file you want>
```

the files will be put on each executor in the working directory, so you can then load it alongside your `map` function

Behind the scene it uses `SparkContext.addFile` method that you can use too 

On Wed, Sep 27, 2017 at 10:08 PM, Naveen Swamy <[hidden email]> wrote:
Hello all,

I am a new user to Spark, please bear with me if this has been discussed earlier. 

I am trying to run batch inference using DL frameworks pre-trained models and Spark. Basically, I want to download a model(which is usually ~500 MB) onto the workers and load the model and run inference on images fetched from the source like S3 something like this
rdd = sc.parallelize(load_from_s3)
rdd.map(fetch_from_s3).map(read_file).map(predict)

I was able to get it running in local mode on Jupyter, However, I would like to load the model only once and not every map operation. A setup hook would have nice which loads the model once into the JVM, I came across this JIRA https://issues.apache.org/jira/browse/SPARK-650  which suggests that I can use Singleton and static initialization. I tried to do this using a Singleton metaclass following the thread here https://stackoverflow.com/questions/6760685/creating-a-singleton-in-python. Following this failed miserably complaining that Spark cannot serialize ctype objects with pointer references.

After a lot of trial and error, I moved the code to a separate file by creating a static method for predict that checks if a class variable is set or not and loads the model if not set. This approach does not sound thread safe to me, So I wanted to reach out and see if there are established patterns on how to achieve something like this.


Also, I would like to understand the executor->tasks->python process mapping, Does each task gets mapped to a separate python process?  The reason I ask is I want to be to use mapPartition method to load a batch of files and run inference on them separately for which I need to load the object once per task. Any 


Thanks for your time in answering my question.

Cheers, Naveen



Reply | Threaded
Open this post in threaded view
|

Re: Loading objects only once

Vadim Semenov
Something like this:

```
object Model {
   @transient lazy val modelObject = new ModelLoader("model-filename")

   def get() = modelObject
}

object SparkJob {
  def main(args: Array[String]) = {
     sc.addFile("s3://bucket/path/model-filename")

     sc.parallelize(…).map(test => {
         Model.get().use(…)
     })
  }
}
```

On Thu, Sep 28, 2017 at 3:49 PM, Vadim Semenov <[hidden email]> wrote:
as an alternative
```
spark-submit --files <file you want>
```

the files will be put on each executor in the working directory, so you can then load it alongside your `map` function

Behind the scene it uses `SparkContext.addFile` method that you can use too 

On Wed, Sep 27, 2017 at 10:08 PM, Naveen Swamy <[hidden email]> wrote:
Hello all,

I am a new user to Spark, please bear with me if this has been discussed earlier. 

I am trying to run batch inference using DL frameworks pre-trained models and Spark. Basically, I want to download a model(which is usually ~500 MB) onto the workers and load the model and run inference on images fetched from the source like S3 something like this
rdd = sc.parallelize(load_from_s3)
rdd.map(fetch_from_s3).map(read_file).map(predict)

I was able to get it running in local mode on Jupyter, However, I would like to load the model only once and not every map operation. A setup hook would have nice which loads the model once into the JVM, I came across this JIRA https://issues.apache.org/jira/browse/SPARK-650  which suggests that I can use Singleton and static initialization. I tried to do this using a Singleton metaclass following the thread here https://stackoverflow.com/questions/6760685/creating-a-singleton-in-python. Following this failed miserably complaining that Spark cannot serialize ctype objects with pointer references.

After a lot of trial and error, I moved the code to a separate file by creating a static method for predict that checks if a class variable is set or not and loads the model if not set. This approach does not sound thread safe to me, So I wanted to reach out and see if there are established patterns on how to achieve something like this.


Also, I would like to understand the executor->tasks->python process mapping, Does each task gets mapped to a separate python process?  The reason I ask is I want to be to use mapPartition method to load a batch of files and run inference on them separately for which I need to load the object once per task. Any 


Thanks for your time in answering my question.

Cheers, Naveen