Can spark handle this scenario?

classic Classic list List threaded Threaded
17 messages Options
Reply | Threaded
Open this post in threaded view
|

Can spark handle this scenario?

Lian Jiang
Hi,

I have a user case:

I want to download S&P500 stock data from Yahoo API in parallel using Spark. I have got all stock symbols as a Dataset. Then I used below code to call Yahoo API for each symbol:

       

case class Symbol(symbol: String, sector: String)

case class Tick(symbol: String, sector: String, open: Double, close: Double)


// symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]


    symbolDs.map { k =>

      pullSymbolFromYahoo(k.symbol, k.sector)

    }


This statement cannot compile:


Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.



My questions are:


1. As you can see, this scenario is not traditional dataset handling such as count, sql query... Instead, it is more like a UDF which apply random operation on each record. Is Spark good at handling such scenario?


2. Regarding the compilation error, any fix? I did not find a satisfactory solution online.


Thanks for help!




Reply | Threaded
Open this post in threaded view
|

Re: Can spark handle this scenario?

Irving Duran

Do you only want to use Scala? Because otherwise, I think with pyspark and pandas read table you should be able to accomplish what you want to accomplish.

Thank you,

Irving Duran
On 02/16/2018 06:10 PM, Lian Jiang wrote:
Hi,

I have a user case:

I want to download S&P500 stock data from Yahoo API in parallel using Spark. I have got all stock symbols as a Dataset. Then I used below code to call Yahoo API for each symbol:

       

case class Symbol(symbol: String, sector: String)

case class Tick(symbol: String, sector: String, open: Double, close: Double)


// symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]


    symbolDs.map { k =>

      pullSymbolFromYahoo(k.symbol, k.sector)

    }


This statement cannot compile:


Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.



My questions are:


1. As you can see, this scenario is not traditional dataset handling such as count, sql query... Instead, it is more like a UDF which apply random operation on each record. Is Spark good at handling such scenario?


2. Regarding the compilation error, any fix? I did not find a satisfactory solution online.


Thanks for help!






signature.asc (484 bytes) Download Attachment
Reply | Threaded
Open this post in threaded view
|

Re: Can spark handle this scenario?

ayan guha
Hi

Couple of suggestions:

1. Do not use Dataset, use Dataframe in this scenario. There is no benefit of dataset features here. Using Dataframe, you can write an arbitrary UDF which can do what you want to do.
2. In fact you do need dataframes here. You would be better off with RDD here. just create a RDD of symbols and use map to do the processing. 

On Sat, Feb 17, 2018 at 12:40 PM, Irving Duran <[hidden email]> wrote:

Do you only want to use Scala? Because otherwise, I think with pyspark and pandas read table you should be able to accomplish what you want to accomplish.

Thank you,

Irving Duran
On 02/16/2018 06:10 PM, Lian Jiang wrote:
Hi,

I have a user case:

I want to download S&P500 stock data from Yahoo API in parallel using Spark. I have got all stock symbols as a Dataset. Then I used below code to call Yahoo API for each symbol:

       

case class Symbol(symbol: String, sector: String)

case class Tick(symbol: String, sector: String, open: Double, close: Double)


// symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]


    symbolDs.map { k =>

      pullSymbolFromYahoo(k.symbol, k.sector)

    }


This statement cannot compile:


Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.



My questions are:


1. As you can see, this scenario is not traditional dataset handling such as count, sql query... Instead, it is more like a UDF which apply random operation on each record. Is Spark good at handling such scenario?


2. Regarding the compilation error, any fix? I did not find a satisfactory solution online.


Thanks for help!








--
Best Regards,
Ayan Guha
Reply | Threaded
Open this post in threaded view
|

Re: Can spark handle this scenario?

ayan guha
** You do NOT need dataframes, I mean.....

On Sat, Feb 17, 2018 at 3:58 PM, ayan guha <[hidden email]> wrote:
Hi

Couple of suggestions:

1. Do not use Dataset, use Dataframe in this scenario. There is no benefit of dataset features here. Using Dataframe, you can write an arbitrary UDF which can do what you want to do.
2. In fact you do need dataframes here. You would be better off with RDD here. just create a RDD of symbols and use map to do the processing. 

On Sat, Feb 17, 2018 at 12:40 PM, Irving Duran <[hidden email]> wrote:

Do you only want to use Scala? Because otherwise, I think with pyspark and pandas read table you should be able to accomplish what you want to accomplish.

Thank you,

Irving Duran
On 02/16/2018 06:10 PM, Lian Jiang wrote:
Hi,

I have a user case:

I want to download S&P500 stock data from Yahoo API in parallel using Spark. I have got all stock symbols as a Dataset. Then I used below code to call Yahoo API for each symbol:

       

case class Symbol(symbol: String, sector: String)

case class Tick(symbol: String, sector: String, open: Double, close: Double)


// symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]


    symbolDs.map { k =>

      pullSymbolFromYahoo(k.symbol, k.sector)

    }


This statement cannot compile:


Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.



My questions are:


1. As you can see, this scenario is not traditional dataset handling such as count, sql query... Instead, it is more like a UDF which apply random operation on each record. Is Spark good at handling such scenario?


2. Regarding the compilation error, any fix? I did not find a satisfactory solution online.


Thanks for help!








--
Best Regards,
Ayan Guha



--
Best Regards,
Ayan Guha
Reply | Threaded
Open this post in threaded view
|

Re: Can spark handle this scenario?

Lian Jiang
Thanks Ayan. RDD may support map better than Dataset/DataFrame. However, it could be hard to serialize complex operation for Spark to execute in parallel. IMHO, spark does not fit this scenario. Hope this makes sense.

On Fri, Feb 16, 2018 at 8:58 PM, ayan guha <[hidden email]> wrote:
** You do NOT need dataframes, I mean.....

On Sat, Feb 17, 2018 at 3:58 PM, ayan guha <[hidden email]> wrote:
Hi

Couple of suggestions:

1. Do not use Dataset, use Dataframe in this scenario. There is no benefit of dataset features here. Using Dataframe, you can write an arbitrary UDF which can do what you want to do.
2. In fact you do need dataframes here. You would be better off with RDD here. just create a RDD of symbols and use map to do the processing. 

On Sat, Feb 17, 2018 at 12:40 PM, Irving Duran <[hidden email]> wrote:

Do you only want to use Scala? Because otherwise, I think with pyspark and pandas read table you should be able to accomplish what you want to accomplish.

Thank you,

Irving Duran
On 02/16/2018 06:10 PM, Lian Jiang wrote:
Hi,

I have a user case:

I want to download S&P500 stock data from Yahoo API in parallel using Spark. I have got all stock symbols as a Dataset. Then I used below code to call Yahoo API for each symbol:

       

case class Symbol(symbol: String, sector: String)

case class Tick(symbol: String, sector: String, open: Double, close: Double)


// symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]


    symbolDs.map { k =>

      pullSymbolFromYahoo(k.symbol, k.sector)

    }


This statement cannot compile:


Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.



My questions are:


1. As you can see, this scenario is not traditional dataset handling such as count, sql query... Instead, it is more like a UDF which apply random operation on each record. Is Spark good at handling such scenario?


2. Regarding the compilation error, any fix? I did not find a satisfactory solution online.


Thanks for help!








--
Best Regards,
Ayan Guha



--
Best Regards,
Ayan Guha

Reply | Threaded
Open this post in threaded view
|

Re: Can spark handle this scenario?

Holden Karau-2
I'm not sure what you mean by it could be hard to serialize complex operations?

Regardless I think the question is do you want to parallelize this on multiple machines or just one?

On Feb 17, 2018 4:20 PM, "Lian Jiang" <[hidden email]> wrote:
Thanks Ayan. RDD may support map better than Dataset/DataFrame. However, it could be hard to serialize complex operation for Spark to execute in parallel. IMHO, spark does not fit this scenario. Hope this makes sense.

On Fri, Feb 16, 2018 at 8:58 PM, ayan guha <[hidden email]> wrote:
** You do NOT need dataframes, I mean.....

On Sat, Feb 17, 2018 at 3:58 PM, ayan guha <[hidden email]> wrote:
Hi

Couple of suggestions:

1. Do not use Dataset, use Dataframe in this scenario. There is no benefit of dataset features here. Using Dataframe, you can write an arbitrary UDF which can do what you want to do.
2. In fact you do need dataframes here. You would be better off with RDD here. just create a RDD of symbols and use map to do the processing. 

On Sat, Feb 17, 2018 at 12:40 PM, Irving Duran <[hidden email]> wrote:

Do you only want to use Scala? Because otherwise, I think with pyspark and pandas read table you should be able to accomplish what you want to accomplish.

Thank you,

Irving Duran
On 02/16/2018 06:10 PM, Lian Jiang wrote:
Hi,

I have a user case:

I want to download S&P500 stock data from Yahoo API in parallel using Spark. I have got all stock symbols as a Dataset. Then I used below code to call Yahoo API for each symbol:

       

case class Symbol(symbol: String, sector: String)

case class Tick(symbol: String, sector: String, open: Double, close: Double)


// symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]


    symbolDs.map { k =>

      pullSymbolFromYahoo(k.symbol, k.sector)

    }


This statement cannot compile:


Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.



My questions are:


1. As you can see, this scenario is not traditional dataset handling such as count, sql query... Instead, it is more like a UDF which apply random operation on each record. Is Spark good at handling such scenario?


2. Regarding the compilation error, any fix? I did not find a satisfactory solution online.


Thanks for help!








--
Best Regards,
Ayan Guha



--
Best Regards,
Ayan Guha

Reply | Threaded
Open this post in threaded view
|

Re: Can spark handle this scenario?

SNEHASISH DUTTA

Hi  Lian,

This could be the solution


case class Symbol(symbol: String, sector: String)

case class Tick(symbol: String, sector: String, open: Double, close: Double)


// symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]


    symbolDs.map { k =>

      pullSymbolFromYahoo(k.symbol, k.sector)

    }(org.apache.spark.sql.Encoders.kryo[Tick.getClass])


Thanks,

Snehasish



Regards,
Snehasish

On Sat, Feb 17, 2018 at 1:05 PM, Holden Karau <[hidden email]> wrote:
I'm not sure what you mean by it could be hard to serialize complex operations?

Regardless I think the question is do you want to parallelize this on multiple machines or just one?

On Feb 17, 2018 4:20 PM, "Lian Jiang" <[hidden email]> wrote:
Thanks Ayan. RDD may support map better than Dataset/DataFrame. However, it could be hard to serialize complex operation for Spark to execute in parallel. IMHO, spark does not fit this scenario. Hope this makes sense.

On Fri, Feb 16, 2018 at 8:58 PM, ayan guha <[hidden email]> wrote:
** You do NOT need dataframes, I mean.....

On Sat, Feb 17, 2018 at 3:58 PM, ayan guha <[hidden email]> wrote:
Hi

Couple of suggestions:

1. Do not use Dataset, use Dataframe in this scenario. There is no benefit of dataset features here. Using Dataframe, you can write an arbitrary UDF which can do what you want to do.
2. In fact you do need dataframes here. You would be better off with RDD here. just create a RDD of symbols and use map to do the processing. 

On Sat, Feb 17, 2018 at 12:40 PM, Irving Duran <[hidden email]> wrote:

Do you only want to use Scala? Because otherwise, I think with pyspark and pandas read table you should be able to accomplish what you want to accomplish.

Thank you,

Irving Duran
On 02/16/2018 06:10 PM, Lian Jiang wrote:
Hi,

I have a user case:

I want to download S&P500 stock data from Yahoo API in parallel using Spark. I have got all stock symbols as a Dataset. Then I used below code to call Yahoo API for each symbol:

       

case class Symbol(symbol: String, sector: String)

case class Tick(symbol: String, sector: String, open: Double, close: Double)


// symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]


    symbolDs.map { k =>

      pullSymbolFromYahoo(k.symbol, k.sector)

    }


This statement cannot compile:


Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.



My questions are:


1. As you can see, this scenario is not traditional dataset handling such as count, sql query... Instead, it is more like a UDF which apply random operation on each record. Is Spark good at handling such scenario?


2. Regarding the compilation error, any fix? I did not find a satisfactory solution online.


Thanks for help!








--
Best Regards,
Ayan Guha



--
Best Regards,
Ayan Guha


Reply | Threaded
Open this post in threaded view
|

Re: Can spark handle this scenario?

Lian Jiang
Snehasish,

I got this in spark-shell 2.11.8:

case class My(name:String, age:Int)

import spark.implicits._

val t = List(new My("lian", 20), new My("sh", 3)).toDS

t.map{ k=> print(My) }(org.apache.spark.sql.Encoders.kryo[My.getClass])


<console>:31: error: type getClass is not a member of object My

       t.map{ k=> print(My) }(org.apache.spark.sql.Encoders.kryo[My.getClass])



Using RDD can workaround this issue as mentioned in previous emails:


 t.rdd.map{ k=> print(k) }



Holden,


The remaining problem is:


Spark need all classes used in the fn() serializable for t.rdd.map{ k=> fn(k) } to work. This could be hard since some classes in third party libraries are not serializable. This restricts the power of using spark to parallel an operation on multiple machines. Hope this is clear.



On Sat, Feb 17, 2018 at 12:04 AM, SNEHASISH DUTTA <[hidden email]> wrote:

Hi  Lian,

This could be the solution


case class Symbol(symbol: String, sector: String)

case class Tick(symbol: String, sector: String, open: Double, close: Double)


// symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]


    symbolDs.map { k =>

      pullSymbolFromYahoo(k.symbol, k.sector)

    }(org.apache.spark.sql.Encoders.kryo[Tick.getClass])


Thanks,

Snehasish



Regards,
Snehasish

On Sat, Feb 17, 2018 at 1:05 PM, Holden Karau <[hidden email]> wrote:
I'm not sure what you mean by it could be hard to serialize complex operations?

Regardless I think the question is do you want to parallelize this on multiple machines or just one?

On Feb 17, 2018 4:20 PM, "Lian Jiang" <[hidden email]> wrote:
Thanks Ayan. RDD may support map better than Dataset/DataFrame. However, it could be hard to serialize complex operation for Spark to execute in parallel. IMHO, spark does not fit this scenario. Hope this makes sense.

On Fri, Feb 16, 2018 at 8:58 PM, ayan guha <[hidden email]> wrote:
** You do NOT need dataframes, I mean.....

On Sat, Feb 17, 2018 at 3:58 PM, ayan guha <[hidden email]> wrote:
Hi

Couple of suggestions:

1. Do not use Dataset, use Dataframe in this scenario. There is no benefit of dataset features here. Using Dataframe, you can write an arbitrary UDF which can do what you want to do.
2. In fact you do need dataframes here. You would be better off with RDD here. just create a RDD of symbols and use map to do the processing. 

On Sat, Feb 17, 2018 at 12:40 PM, Irving Duran <[hidden email]> wrote:

Do you only want to use Scala? Because otherwise, I think with pyspark and pandas read table you should be able to accomplish what you want to accomplish.

Thank you,

Irving Duran
On 02/16/2018 06:10 PM, Lian Jiang wrote:
Hi,

I have a user case:

I want to download S&P500 stock data from Yahoo API in parallel using Spark. I have got all stock symbols as a Dataset. Then I used below code to call Yahoo API for each symbol:

       

case class Symbol(symbol: String, sector: String)

case class Tick(symbol: String, sector: String, open: Double, close: Double)


// symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]


    symbolDs.map { k =>

      pullSymbolFromYahoo(k.symbol, k.sector)

    }


This statement cannot compile:


Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.



My questions are:


1. As you can see, this scenario is not traditional dataset handling such as count, sql query... Instead, it is more like a UDF which apply random operation on each record. Is Spark good at handling such scenario?


2. Regarding the compilation error, any fix? I did not find a satisfactory solution online.


Thanks for help!








--
Best Regards,
Ayan Guha



--
Best Regards,
Ayan Guha



Reply | Threaded
Open this post in threaded view
|

Re: Can spark handle this scenario?

Jörn Franke
In reply to this post by Lian Jiang
You may want to think about separating the import step from the processing step. It is not very economical to download all the data again every time you want to calculate something. So download it first and store it on a distributed file system. Schedule to download newest information every day/ hour etc. you can store it using a query optimized format such as ORC or Parquet. Then you can run queries over it.

On 17. Feb 2018, at 01:10, Lian Jiang <[hidden email]> wrote:

Hi,

I have a user case:

I want to download S&P500 stock data from Yahoo API in parallel using Spark. I have got all stock symbols as a Dataset. Then I used below code to call Yahoo API for each symbol:

       

case class Symbol(symbol: String, sector: String)

case class Tick(symbol: String, sector: String, open: Double, close: Double)


// symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]


    symbolDs.map { k =>

      pullSymbolFromYahoo(k.symbol, k.sector)

    }


This statement cannot compile:


Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.



My questions are:


1. As you can see, this scenario is not traditional dataset handling such as count, sql query... Instead, it is more like a UDF which apply random operation on each record. Is Spark good at handling such scenario?


2. Regarding the compilation error, any fix? I did not find a satisfactory solution online.


Thanks for help!




Reply | Threaded
Open this post in threaded view
|

Re: Can spark handle this scenario?

Lian Jiang
Agreed. Thanks.

On Sat, Feb 17, 2018 at 9:53 AM, Jörn Franke <[hidden email]> wrote:
You may want to think about separating the import step from the processing step. It is not very economical to download all the data again every time you want to calculate something. So download it first and store it on a distributed file system. Schedule to download newest information every day/ hour etc. you can store it using a query optimized format such as ORC or Parquet. Then you can run queries over it.

On 17. Feb 2018, at 01:10, Lian Jiang <[hidden email]> wrote:

Hi,

I have a user case:

I want to download S&P500 stock data from Yahoo API in parallel using Spark. I have got all stock symbols as a Dataset. Then I used below code to call Yahoo API for each symbol:

       

case class Symbol(symbol: String, sector: String)

case class Tick(symbol: String, sector: String, open: Double, close: Double)


// symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]


    symbolDs.map { k =>

      pullSymbolFromYahoo(k.symbol, k.sector)

    }


This statement cannot compile:


Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.



My questions are:


1. As you can see, this scenario is not traditional dataset handling such as count, sql query... Instead, it is more like a UDF which apply random operation on each record. Is Spark good at handling such scenario?


2. Regarding the compilation error, any fix? I did not find a satisfactory solution online.


Thanks for help!





Reply | Threaded
Open this post in threaded view
|

Re: Can spark handle this scenario?

Anastasios Zouzias
In reply to this post by Lian Jiang
Hi Lian,

The remaining problem is:


Spark need all classes used in the fn() serializable for t.rdd.map{ k=> fn(k) } to work. This could be hard since some classes in third party libraries are not serializable. This restricts the power of using spark to parallel an operation on multiple machines. Hope this is clear.



This is not entirely true. You can bypass the serialisation issue in most cases, see the link below for an example.


In a nutshell, the non-serialisable code is available to all executors, so there is no need for Spark to serialise from the driver to the executors.

Best regards,
Anastasios




On Sat, Feb 17, 2018 at 6:13 PM, Lian Jiang <[hidden email]> wrote:
Snehasish,

I got this in spark-shell 2.11.8:

case class My(name:String, age:Int)

import spark.implicits._

val t = List(new My("lian", 20), new My("sh", 3)).toDS

t.map{ k=> print(My) }(org.apache.spark.sql.Encoders.kryo[My.getClass])


<console>:31: error: type getClass is not a member of object My

       t.map{ k=> print(My) }(org.apache.spark.sql.Encoders.kryo[My.getClass])



Using RDD can workaround this issue as mentioned in previous emails:


 t.rdd.map{ k=> print(k) }



Holden,


The remaining problem is:


Spark need all classes used in the fn() serializable for t.rdd.map{ k=> fn(k) } to work. This could be hard since some classes in third party libraries are not serializable. This restricts the power of using spark to parallel an operation on multiple machines. Hope this is clear.



On Sat, Feb 17, 2018 at 12:04 AM, SNEHASISH DUTTA <[hidden email]> wrote:

Hi  Lian,

This could be the solution


case class Symbol(symbol: String, sector: String)

case class Tick(symbol: String, sector: String, open: Double, close: Double)


// symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]


    symbolDs.map { k =>

      pullSymbolFromYahoo(k.symbol, k.sector)

    }(org.apache.spark.sql.Encoders.kryo[Tick.getClass])


Thanks,

Snehasish



Regards,
Snehasish

On Sat, Feb 17, 2018 at 1:05 PM, Holden Karau <[hidden email]> wrote:
I'm not sure what you mean by it could be hard to serialize complex operations?

Regardless I think the question is do you want to parallelize this on multiple machines or just one?

On Feb 17, 2018 4:20 PM, "Lian Jiang" <[hidden email]> wrote:
Thanks Ayan. RDD may support map better than Dataset/DataFrame. However, it could be hard to serialize complex operation for Spark to execute in parallel. IMHO, spark does not fit this scenario. Hope this makes sense.

On Fri, Feb 16, 2018 at 8:58 PM, ayan guha <[hidden email]> wrote:
** You do NOT need dataframes, I mean.....

On Sat, Feb 17, 2018 at 3:58 PM, ayan guha <[hidden email]> wrote:
Hi

Couple of suggestions:

1. Do not use Dataset, use Dataframe in this scenario. There is no benefit of dataset features here. Using Dataframe, you can write an arbitrary UDF which can do what you want to do.
2. In fact you do need dataframes here. You would be better off with RDD here. just create a RDD of symbols and use map to do the processing. 

On Sat, Feb 17, 2018 at 12:40 PM, Irving Duran <[hidden email]> wrote:

Do you only want to use Scala? Because otherwise, I think with pyspark and pandas read table you should be able to accomplish what you want to accomplish.

Thank you,

Irving Duran
On 02/16/2018 06:10 PM, Lian Jiang wrote:
Hi,

I have a user case:

I want to download S&P500 stock data from Yahoo API in parallel using Spark. I have got all stock symbols as a Dataset. Then I used below code to call Yahoo API for each symbol:

       

case class Symbol(symbol: String, sector: String)

case class Tick(symbol: String, sector: String, open: Double, close: Double)


// symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]


    symbolDs.map { k =>

      pullSymbolFromYahoo(k.symbol, k.sector)

    }


This statement cannot compile:


Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.



My questions are:


1. As you can see, this scenario is not traditional dataset handling such as count, sql query... Instead, it is more like a UDF which apply random operation on each record. Is Spark good at handling such scenario?


2. Regarding the compilation error, any fix? I did not find a satisfactory solution online.


Thanks for help!








--
Best Regards,
Ayan Guha



--
Best Regards,
Ayan Guha






--
-- Anastasios Zouzias
Reply | Threaded
Open this post in threaded view
|

Re: Can spark handle this scenario?

Lian Jiang
Thanks Anastasios. This link is helpful!

On Sat, Feb 17, 2018 at 11:05 AM, Anastasios Zouzias <[hidden email]> wrote:
Hi Lian,

The remaining problem is:


Spark need all classes used in the fn() serializable for t.rdd.map{ k=> fn(k) } to work. This could be hard since some classes in third party libraries are not serializable. This restricts the power of using spark to parallel an operation on multiple machines. Hope this is clear.



This is not entirely true. You can bypass the serialisation issue in most cases, see the link below for an example.


In a nutshell, the non-serialisable code is available to all executors, so there is no need for Spark to serialise from the driver to the executors.

Best regards,
Anastasios




On Sat, Feb 17, 2018 at 6:13 PM, Lian Jiang <[hidden email]> wrote:
Snehasish,

I got this in spark-shell 2.11.8:

case class My(name:String, age:Int)

import spark.implicits._

val t = List(new My("lian", 20), new My("sh", 3)).toDS

t.map{ k=> print(My) }(org.apache.spark.sql.Encoders.kryo[My.getClass])


<console>:31: error: type getClass is not a member of object My

       t.map{ k=> print(My) }(org.apache.spark.sql.Encoders.kryo[My.getClass])



Using RDD can workaround this issue as mentioned in previous emails:


 t.rdd.map{ k=> print(k) }



Holden,


The remaining problem is:


Spark need all classes used in the fn() serializable for t.rdd.map{ k=> fn(k) } to work. This could be hard since some classes in third party libraries are not serializable. This restricts the power of using spark to parallel an operation on multiple machines. Hope this is clear.



On Sat, Feb 17, 2018 at 12:04 AM, SNEHASISH DUTTA <[hidden email]> wrote:

Hi  Lian,

This could be the solution


case class Symbol(symbol: String, sector: String)

case class Tick(symbol: String, sector: String, open: Double, close: Double)


// symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]


    symbolDs.map { k =>

      pullSymbolFromYahoo(k.symbol, k.sector)

    }(org.apache.spark.sql.Encoders.kryo[Tick.getClass])


Thanks,

Snehasish



Regards,
Snehasish

On Sat, Feb 17, 2018 at 1:05 PM, Holden Karau <[hidden email]> wrote:
I'm not sure what you mean by it could be hard to serialize complex operations?

Regardless I think the question is do you want to parallelize this on multiple machines or just one?

On Feb 17, 2018 4:20 PM, "Lian Jiang" <[hidden email]> wrote:
Thanks Ayan. RDD may support map better than Dataset/DataFrame. However, it could be hard to serialize complex operation for Spark to execute in parallel. IMHO, spark does not fit this scenario. Hope this makes sense.

On Fri, Feb 16, 2018 at 8:58 PM, ayan guha <[hidden email]> wrote:
** You do NOT need dataframes, I mean.....

On Sat, Feb 17, 2018 at 3:58 PM, ayan guha <[hidden email]> wrote:
Hi

Couple of suggestions:

1. Do not use Dataset, use Dataframe in this scenario. There is no benefit of dataset features here. Using Dataframe, you can write an arbitrary UDF which can do what you want to do.
2. In fact you do need dataframes here. You would be better off with RDD here. just create a RDD of symbols and use map to do the processing. 

On Sat, Feb 17, 2018 at 12:40 PM, Irving Duran <[hidden email]> wrote:

Do you only want to use Scala? Because otherwise, I think with pyspark and pandas read table you should be able to accomplish what you want to accomplish.

Thank you,

Irving Duran
On 02/16/2018 06:10 PM, Lian Jiang wrote:
Hi,

I have a user case:

I want to download S&P500 stock data from Yahoo API in parallel using Spark. I have got all stock symbols as a Dataset. Then I used below code to call Yahoo API for each symbol:

       

case class Symbol(symbol: String, sector: String)

case class Tick(symbol: String, sector: String, open: Double, close: Double)


// symbolDS is Dataset[Symbol], pullSymbolFromYahoo returns Dataset[Tick]


    symbolDs.map { k =>

      pullSymbolFromYahoo(k.symbol, k.sector)

    }


This statement cannot compile:


Unable to find encoder for type stored in a Dataset.  Primitive types (Int, String, etc) and Product types (case classes) are supported by importing spark.implicits._  Support for serializing other types will be added in future releases.



My questions are:


1. As you can see, this scenario is not traditional dataset handling such as count, sql query... Instead, it is more like a UDF which apply random operation on each record. Is Spark good at handling such scenario?


2. Regarding the compilation error, any fix? I did not find a satisfactory solution online.


Thanks for help!








--
Best Regards,
Ayan Guha



--
Best Regards,
Ayan Guha






--
-- Anastasios Zouzias

Reply | Threaded
Open this post in threaded view
|

Re: Can spark handle this scenario?

vijay.bvp
I am assuming pullSymbolFromYahoo functions opens a connection to yahoo API
with some token passed, in the code provided so far if you have 2000
symbols, it will make 2000 new connections!! and 2000 API calls
connection objects can't/shouldn't be serialized and send to executors, they
should rather be created at executors.

the philosophy given below is nicely documented on Spark Streaming, look at
Design Patterns for using foreachRDD
https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams


case class Symbol(symbol: String, sector: String)
case class Tick(symbol: String, sector: String, open: Double, close: Double)
//assume symbolDs is rdd of symbol and tick dataset/dataframe can be
converted to RDD
symbolRdd.foreachPartition(partition => {
       //this code runs at executor    
      //open a connection here -
      val connectionToYahoo = new HTTPConnection()

      partition.foreach(k => {
          pullSymbolFromYahoo(k.symbol, k.sector,connectionToYahoo)
      }
}

with the above code if the dataset has 10 partitions (2000 symbols), only 10
connections will be opened though it makes 2000 API calls.
you should also be looking at sending and receiving results for large number
of symbols, because of the amount of parallelism that spark provides you
might run into rate limit on the APIs. if you are bulk sending symbols above
pattern also very much useful

thanks
Vijay







--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Can spark handle this scenario?

Lian Jiang
Thanks Vijay! This is very clear.

On Tue, Feb 20, 2018 at 12:47 AM, vijay.bvp <[hidden email]> wrote:
I am assuming pullSymbolFromYahoo functions opens a connection to yahoo API
with some token passed, in the code provided so far if you have 2000
symbols, it will make 2000 new connections!! and 2000 API calls
connection objects can't/shouldn't be serialized and send to executors, they
should rather be created at executors.

the philosophy given below is nicely documented on Spark Streaming, look at
Design Patterns for using foreachRDD
https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams


case class Symbol(symbol: String, sector: String)
case class Tick(symbol: String, sector: String, open: Double, close: Double)
//assume symbolDs is rdd of symbol and tick dataset/dataframe can be
converted to RDD
symbolRdd.foreachPartition(partition => {
       //this code runs at executor
      //open a connection here -
      val connectionToYahoo = new HTTPConnection()

      partition.foreach(k => {
          pullSymbolFromYahoo(k.symbol, k.sector,connectionToYahoo)
      }
}

with the above code if the dataset has 10 partitions (2000 symbols), only 10
connections will be opened though it makes 2000 API calls.
you should also be looking at sending and receiving results for large number
of symbols, because of the amount of parallelism that spark provides you
might run into rate limit on the APIs. if you are bulk sending symbols above
pattern also very much useful

thanks
Vijay







--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Can spark handle this scenario?

Lian Jiang
In reply to this post by vijay.bvp
Hi Vijay,

Should HTTPConnection() (or any other object created per partition) be serializable so that your code work? If so, the usage seems to be limited. 

Sometimes, the error caused by a non-serializable object can be very misleading (e.g. "Return statements aren't allowed in Spark closures") instead of "Task not serializable".

The post shared by Anastasios helps but does not completely resolve the "need serialization" problem. For example, if I need to create per partition class object that 
relies on other objects which may not be serializable, then wrapping the object creation in an object making it a static function does not help, not mentioning 
the programming model becomes unintuitive.

I have been played this scenario for some time and still frustrated. Thanks






On Tue, Feb 20, 2018 at 12:47 AM, vijay.bvp <[hidden email]> wrote:
I am assuming pullSymbolFromYahoo functions opens a connection to yahoo API
with some token passed, in the code provided so far if you have 2000
symbols, it will make 2000 new connections!! and 2000 API calls
connection objects can't/shouldn't be serialized and send to executors, they
should rather be created at executors.

the philosophy given below is nicely documented on Spark Streaming, look at
Design Patterns for using foreachRDD
https://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations-on-dstreams


case class Symbol(symbol: String, sector: String)
case class Tick(symbol: String, sector: String, open: Double, close: Double)
//assume symbolDs is rdd of symbol and tick dataset/dataframe can be
converted to RDD
symbolRdd.foreachPartition(partition => {
       //this code runs at executor
      //open a connection here -
      val connectionToYahoo = new HTTPConnection()

      partition.foreach(k => {
          pullSymbolFromYahoo(k.symbol, k.sector,connectionToYahoo)
      }
}

with the above code if the dataset has 10 partitions (2000 symbols), only 10
connections will be opened though it makes 2000 API calls.
you should also be looking at sending and receiving results for large number
of symbols, because of the amount of parallelism that spark provides you
might run into rate limit on the APIs. if you are bulk sending symbols above
pattern also very much useful

thanks
Vijay







--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]


Reply | Threaded
Open this post in threaded view
|

Re: Can spark handle this scenario?

vijay.bvp
when HTTP connection is opened you are opening a connection between specific
machine (with IP and NIC card) to another specific machine, so this can't be
serialized and used on other machine right!!

This isn't spark limitation.

I made a simple diagram if it helps. The Objects created at driver and
passed to worker need to be serialized. The objects created at workers need
not.

In the diagram you have to create HTTPConnection on the executors
independently of the driver.
The HTTPConnection created at Executor-1 can be used for partitions P1-P3 of
RDD available on that executor.

Spark is tolerant and does allow passing objects from driver to workers, but
in case if it reports "Task not serializable"  it does indicate some object
is having issue. mark the class as Serializable if you think if the object
of it can be serialized. As I said in the beginning not everything could
serializable particularly http connections, JDBC connections etc..

<http://apache-spark-user-list.1001560.n3.nabble.com/file/t8878/Picture1.png>














--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Can spark handle this scenario?

Lian Jiang
Thanks Vijay. After changing the programming model (create a context class for the workers), it finally worked for me. Cheers.

On Fri, Feb 23, 2018 at 5:42 PM, vijay.bvp <[hidden email]> wrote:
when HTTP connection is opened you are opening a connection between specific
machine (with IP and NIC card) to another specific machine, so this can't be
serialized and used on other machine right!!

This isn't spark limitation.

I made a simple diagram if it helps. The Objects created at driver and
passed to worker need to be serialized. The objects created at workers need
not.

In the diagram you have to create HTTPConnection on the executors
independently of the driver.
The HTTPConnection created at Executor-1 can be used for partitions P1-P3 of
RDD available on that executor.

Spark is tolerant and does allow passing objects from driver to workers, but
in case if it reports "Task not serializable"  it does indicate some object
is having issue. mark the class as Serializable if you think if the object
of it can be serialized. As I said in the beginning not everything could
serializable particularly http connections, JDBC connections etc..

<http://apache-spark-user-list.1001560.n3.nabble.com/file/t8878/Picture1.png>














--
Sent from: http://apache-spark-user-list.1001560.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]