getting error: value toDF is not a member of Seq[columns]

classic Classic list List threaded Threaded
16 messages Options
Reply | Threaded
Open this post in threaded view
|

getting error: value toDF is not a member of Seq[columns]

Mich Talebzadeh
Hi,

I have spark streaming that send data and I need to put that data into MongoDB for test purposes. The easiest way is to create a DF from the individual list of columns as below

I loop over individual rows in RDD and perform the following

    case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

         for(line <- pricesRDD.collect.toArray)
         {
            var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
           if (price > 90.0)
           {
             println ("price > 90.0, saving to MongoDB collection!")
            // Save prices to mongoDB collection
            var df = Seq(columns(key, ticker, timeissued, price)).toDF
 

but it fails with message

 value toDF is not a member of Seq[columns]. 

What would be the easiest way of resolving this please?

thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: getting error: value toDF is not a member of Seq[columns]

Jungtaek Lim
You may need to import implicits from your spark session like below:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <[hidden email]>님이 작성:
Hi,

I have spark streaming that send data and I need to put that data into MongoDB for test purposes. The easiest way is to create a DF from the individual list of columns as below

I loop over individual rows in RDD and perform the following

    case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

         for(line <- pricesRDD.collect.toArray)
         {
            var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
           if (price > 90.0)
           {
             println ("price > 90.0, saving to MongoDB collection!")
            // Save prices to mongoDB collection
            var df = Seq(columns(key, ticker, timeissued, price)).toDF
 

but it fails with message

 value toDF is not a member of Seq[columns]. 

What would be the easiest way of resolving this please?

thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: getting error: value toDF is not a member of Seq[columns]

Mich Talebzadeh
Thanks 

I already do that as below

    val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
  import sqlContext.implicits._

but still getting the error!

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <[hidden email]> wrote:
You may need to import implicits from your spark session like below:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <[hidden email]>님이 작성:
Hi,

I have spark streaming that send data and I need to put that data into MongoDB for test purposes. The easiest way is to create a DF from the individual list of columns as below

I loop over individual rows in RDD and perform the following

    case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

         for(line <- pricesRDD.collect.toArray)
         {
            var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
           if (price > 90.0)
           {
             println ("price > 90.0, saving to MongoDB collection!")
            // Save prices to mongoDB collection
            var df = Seq(columns(key, ticker, timeissued, price)).toDF
 

but it fails with message

 value toDF is not a member of Seq[columns]. 

What would be the easiest way of resolving this please?

thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: getting error: value toDF is not a member of Seq[columns]

Jungtaek Lim
I guess you need to have encoder for the type of result for columns().


implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(rdd))
}
You can see lots of Encoder implementations in the scala code. If your type doesn't match anything it may not work and you need to provide custom Encoder.

-Jungtaek Lim (HeartSaVioR)

2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <[hidden email]>님이 작성:
Thanks 

I already do that as below

    val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
  import sqlContext.implicits._

but still getting the error!

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <[hidden email]> wrote:
You may need to import implicits from your spark session like below:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <[hidden email]>님이 작성:
Hi,

I have spark streaming that send data and I need to put that data into MongoDB for test purposes. The easiest way is to create a DF from the individual list of columns as below

I loop over individual rows in RDD and perform the following

    case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

         for(line <- pricesRDD.collect.toArray)
         {
            var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
           if (price > 90.0)
           {
             println ("price > 90.0, saving to MongoDB collection!")
            // Save prices to mongoDB collection
            var df = Seq(columns(key, ticker, timeissued, price)).toDF
 

but it fails with message

 value toDF is not a member of Seq[columns]. 

What would be the easiest way of resolving this please?

thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: getting error: value toDF is not a member of Seq[columns]

Jungtaek Lim
Sorry I guess I pasted another method. the code is...
implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(s))
}

2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <[hidden email]>님이 작성:
I guess you need to have encoder for the type of result for columns().


implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(rdd))
}
You can see lots of Encoder implementations in the scala code. If your type doesn't match anything it may not work and you need to provide custom Encoder.

-Jungtaek Lim (HeartSaVioR)

2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <[hidden email]>님이 작성:
Thanks 

I already do that as below

    val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
  import sqlContext.implicits._

but still getting the error!

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <[hidden email]> wrote:
You may need to import implicits from your spark session like below:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <[hidden email]>님이 작성:
Hi,

I have spark streaming that send data and I need to put that data into MongoDB for test purposes. The easiest way is to create a DF from the individual list of columns as below

I loop over individual rows in RDD and perform the following

    case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

         for(line <- pricesRDD.collect.toArray)
         {
            var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
           if (price > 90.0)
           {
             println ("price > 90.0, saving to MongoDB collection!")
            // Save prices to mongoDB collection
            var df = Seq(columns(key, ticker, timeissued, price)).toDF
 

but it fails with message

 value toDF is not a member of Seq[columns]. 

What would be the easiest way of resolving this please?

thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: getting error: value toDF is not a member of Seq[columns]

Jungtaek Lim
You may also find below link useful (though it looks far old), since case class is the thing which Encoder is available, so there may be another reason which prevent implicit conversion.

And which Spark version do you use?


2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <[hidden email]>님이 작성:
Sorry I guess I pasted another method. the code is...
implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(s))
}

2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <[hidden email]>님이 작성:
I guess you need to have encoder for the type of result for columns().


implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(rdd))
}
You can see lots of Encoder implementations in the scala code. If your type doesn't match anything it may not work and you need to provide custom Encoder.

-Jungtaek Lim (HeartSaVioR)

2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <[hidden email]>님이 작성:
Thanks 

I already do that as below

    val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
  import sqlContext.implicits._

but still getting the error!

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <[hidden email]> wrote:
You may need to import implicits from your spark session like below:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <[hidden email]>님이 작성:
Hi,

I have spark streaming that send data and I need to put that data into MongoDB for test purposes. The easiest way is to create a DF from the individual list of columns as below

I loop over individual rows in RDD and perform the following

    case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

         for(line <- pricesRDD.collect.toArray)
         {
            var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
           if (price > 90.0)
           {
             println ("price > 90.0, saving to MongoDB collection!")
            // Save prices to mongoDB collection
            var df = Seq(columns(key, ticker, timeissued, price)).toDF
 

but it fails with message

 value toDF is not a member of Seq[columns]. 

What would be the easiest way of resolving this please?

thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: getting error: value toDF is not a member of Seq[columns]

Mich Talebzadeh
Thanks! 

The spark  is version 2.3.0

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:41, Jungtaek Lim <[hidden email]> wrote:
You may also find below link useful (though it looks far old), since case class is the thing which Encoder is available, so there may be another reason which prevent implicit conversion.

And which Spark version do you use?


2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <[hidden email]>님이 작성:
Sorry I guess I pasted another method. the code is...
implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(s))
}

2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <[hidden email]>님이 작성:
I guess you need to have encoder for the type of result for columns().


implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(rdd))
}
You can see lots of Encoder implementations in the scala code. If your type doesn't match anything it may not work and you need to provide custom Encoder.

-Jungtaek Lim (HeartSaVioR)

2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <[hidden email]>님이 작성:
Thanks 

I already do that as below

    val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
  import sqlContext.implicits._

but still getting the error!

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <[hidden email]> wrote:
You may need to import implicits from your spark session like below:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <[hidden email]>님이 작성:
Hi,

I have spark streaming that send data and I need to put that data into MongoDB for test purposes. The easiest way is to create a DF from the individual list of columns as below

I loop over individual rows in RDD and perform the following

    case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

         for(line <- pricesRDD.collect.toArray)
         {
            var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
           if (price > 90.0)
           {
             println ("price > 90.0, saving to MongoDB collection!")
            // Save prices to mongoDB collection
            var df = Seq(columns(key, ticker, timeissued, price)).toDF
 

but it fails with message

 value toDF is not a member of Seq[columns]. 

What would be the easiest way of resolving this please?

thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: getting error: value toDF is not a member of Seq[columns]

Mich Talebzadeh
With the following

case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

 var key = line._2.split(',').view(0).toString
 var ticker =  line._2.split(',').view(1).toString
 var timeissued = line._2.split(',').view(2).toString
 var price = line._2.split(',').view(3).toFloat

  var df = Seq(columns(key, ticker, timeissued, price))
 println(df)

I get

List(columns(ac11a78d-82df-4b37-bf58-7e3388aa64cd,MKS,2018-09-05T10:10:15,676.5))

So just need to convert that list to DF

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:49, Mich Talebzadeh <[hidden email]> wrote:
Thanks! 

The spark  is version 2.3.0

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:41, Jungtaek Lim <[hidden email]> wrote:
You may also find below link useful (though it looks far old), since case class is the thing which Encoder is available, so there may be another reason which prevent implicit conversion.

And which Spark version do you use?


2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <[hidden email]>님이 작성:
Sorry I guess I pasted another method. the code is...
implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(s))
}

2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <[hidden email]>님이 작성:
I guess you need to have encoder for the type of result for columns().


implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(rdd))
}
You can see lots of Encoder implementations in the scala code. If your type doesn't match anything it may not work and you need to provide custom Encoder.

-Jungtaek Lim (HeartSaVioR)

2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <[hidden email]>님이 작성:
Thanks 

I already do that as below

    val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
  import sqlContext.implicits._

but still getting the error!

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <[hidden email]> wrote:
You may need to import implicits from your spark session like below:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <[hidden email]>님이 작성:
Hi,

I have spark streaming that send data and I need to put that data into MongoDB for test purposes. The easiest way is to create a DF from the individual list of columns as below

I loop over individual rows in RDD and perform the following

    case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

         for(line <- pricesRDD.collect.toArray)
         {
            var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
           if (price > 90.0)
           {
             println ("price > 90.0, saving to MongoDB collection!")
            // Save prices to mongoDB collection
            var df = Seq(columns(key, ticker, timeissued, price)).toDF
 

but it fails with message

 value toDF is not a member of Seq[columns]. 

What would be the easiest way of resolving this please?

thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

Reply | Threaded
Open this post in threaded view
|

Re: getting error: value toDF is not a member of Seq[columns]

Deepak Sharma
Try this:

import spark.implicits._
df.toDF()

On Wed, Sep 5, 2018 at 2:31 PM Mich Talebzadeh <[hidden email]> wrote:
With the following

case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

 var key = line._2.split(',').view(0).toString
 var ticker =  line._2.split(',').view(1).toString
 var timeissued = line._2.split(',').view(2).toString
 var price = line._2.split(',').view(3).toFloat

  var df = Seq(columns(key, ticker, timeissued, price))
 println(df)

I get

List(columns(ac11a78d-82df-4b37-bf58-7e3388aa64cd,MKS,2018-09-05T10:10:15,676.5))

So just need to convert that list to DF

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:49, Mich Talebzadeh <[hidden email]> wrote:
Thanks! 

The spark  is version 2.3.0

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:41, Jungtaek Lim <[hidden email]> wrote:
You may also find below link useful (though it looks far old), since case class is the thing which Encoder is available, so there may be another reason which prevent implicit conversion.

And which Spark version do you use?


2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <[hidden email]>님이 작성:
Sorry I guess I pasted another method. the code is...
implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(s))
}

2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <[hidden email]>님이 작성:
I guess you need to have encoder for the type of result for columns().


implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(rdd))
}
You can see lots of Encoder implementations in the scala code. If your type doesn't match anything it may not work and you need to provide custom Encoder.

-Jungtaek Lim (HeartSaVioR)

2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <[hidden email]>님이 작성:
Thanks 

I already do that as below

    val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
  import sqlContext.implicits._

but still getting the error!

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <[hidden email]> wrote:
You may need to import implicits from your spark session like below:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <[hidden email]>님이 작성:
Hi,

I have spark streaming that send data and I need to put that data into MongoDB for test purposes. The easiest way is to create a DF from the individual list of columns as below

I loop over individual rows in RDD and perform the following

    case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

         for(line <- pricesRDD.collect.toArray)
         {
            var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
           if (price > 90.0)
           {
             println ("price > 90.0, saving to MongoDB collection!")
            // Save prices to mongoDB collection
            var df = Seq(columns(key, ticker, timeissued, price)).toDF
 

but it fails with message

 value toDF is not a member of Seq[columns]. 

What would be the easiest way of resolving this please?

thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



--
Reply | Threaded
Open this post in threaded view
|

Re: getting error: value toDF is not a member of Seq[columns]

Mich Talebzadeh
yep already tried it and it did not work.

thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 10:10, Deepak Sharma <[hidden email]> wrote:
Try this:

import spark.implicits._
df.toDF()

On Wed, Sep 5, 2018 at 2:31 PM Mich Talebzadeh <[hidden email]> wrote:
With the following

case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

 var key = line._2.split(',').view(0).toString
 var ticker =  line._2.split(',').view(1).toString
 var timeissued = line._2.split(',').view(2).toString
 var price = line._2.split(',').view(3).toFloat

  var df = Seq(columns(key, ticker, timeissued, price))
 println(df)

I get

List(columns(ac11a78d-82df-4b37-bf58-7e3388aa64cd,MKS,2018-09-05T10:10:15,676.5))

So just need to convert that list to DF

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:49, Mich Talebzadeh <[hidden email]> wrote:
Thanks! 

The spark  is version 2.3.0

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:41, Jungtaek Lim <[hidden email]> wrote:
You may also find below link useful (though it looks far old), since case class is the thing which Encoder is available, so there may be another reason which prevent implicit conversion.

And which Spark version do you use?


2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <[hidden email]>님이 작성:
Sorry I guess I pasted another method. the code is...
implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(s))
}

2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <[hidden email]>님이 작성:
I guess you need to have encoder for the type of result for columns().


implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(rdd))
}
You can see lots of Encoder implementations in the scala code. If your type doesn't match anything it may not work and you need to provide custom Encoder.

-Jungtaek Lim (HeartSaVioR)

2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <[hidden email]>님이 작성:
Thanks 

I already do that as below

    val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
  import sqlContext.implicits._

but still getting the error!

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <[hidden email]> wrote:
You may need to import implicits from your spark session like below:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <[hidden email]>님이 작성:
Hi,

I have spark streaming that send data and I need to put that data into MongoDB for test purposes. The easiest way is to create a DF from the individual list of columns as below

I loop over individual rows in RDD and perform the following

    case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

         for(line <- pricesRDD.collect.toArray)
         {
            var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
           if (price > 90.0)
           {
             println ("price > 90.0, saving to MongoDB collection!")
            // Save prices to mongoDB collection
            var df = Seq(columns(key, ticker, timeissued, price)).toDF
 

but it fails with message

 value toDF is not a member of Seq[columns]. 

What would be the easiest way of resolving this please?

thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



--
Reply | Threaded
Open this post in threaded view
|

Re: getting error: value toDF is not a member of Seq[columns]

Mich Talebzadeh

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

I can rebuild the comma separated list as follows:


   case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)
    val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
    import sqlContext.implicits._
 

         for(line <- pricesRDD.collect.toArray)
         {
           var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           var allInOne = key+","+ticker+","+timeissued+","+price
           println(allInOne)

and the print shows the columns separated by ","


34e07d9f-829a-446a-93ab-8b93aa8eda41,SAP,2018-09-05T23:22:34,56.89


So I just need to convert that line of rowinto a DataFrame

I try this conversion to DF to write to MongoDB document with MongoSpark.save(df, writeConfig)

var df = sparkContext.parallelize(Seq(columns(key, ticker, timeissued, price))).toDF

[error] /data6/hduser/scala/md_streaming_mongoDB/src/main/scala/myPackage/md_streaming_mongoDB.scala:235: value toDF is not a member of org.apache.spark.rdd.RDD[columns]
[error]             var df = sparkContext.parallelize(Seq(columns(key, ticker, timeissued, price))).toDF
[

 

frustrating!

 has anyone come across this?

thanks

On Wed, 5 Sep 2018 at 13:30, Mich Talebzadeh <[hidden email]> wrote:
yep already tried it and it did not work.

thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 10:10, Deepak Sharma <[hidden email]> wrote:
Try this:

import spark.implicits._
df.toDF()

On Wed, Sep 5, 2018 at 2:31 PM Mich Talebzadeh <[hidden email]> wrote:
With the following

case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

 var key = line._2.split(',').view(0).toString
 var ticker =  line._2.split(',').view(1).toString
 var timeissued = line._2.split(',').view(2).toString
 var price = line._2.split(',').view(3).toFloat

  var df = Seq(columns(key, ticker, timeissued, price))
 println(df)

I get

List(columns(ac11a78d-82df-4b37-bf58-7e3388aa64cd,MKS,2018-09-05T10:10:15,676.5))

So just need to convert that list to DF

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:49, Mich Talebzadeh <[hidden email]> wrote:
Thanks! 

The spark  is version 2.3.0

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:41, Jungtaek Lim <[hidden email]> wrote:
You may also find below link useful (though it looks far old), since case class is the thing which Encoder is available, so there may be another reason which prevent implicit conversion.

And which Spark version do you use?


2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <[hidden email]>님이 작성:
Sorry I guess I pasted another method. the code is...
implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(s))
}

2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <[hidden email]>님이 작성:
I guess you need to have encoder for the type of result for columns().


implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(rdd))
}
You can see lots of Encoder implementations in the scala code. If your type doesn't match anything it may not work and you need to provide custom Encoder.

-Jungtaek Lim (HeartSaVioR)

2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <[hidden email]>님이 작성:
Thanks 

I already do that as below

    val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
  import sqlContext.implicits._

but still getting the error!

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <[hidden email]> wrote:
You may need to import implicits from your spark session like below:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <[hidden email]>님이 작성:
Hi,

I have spark streaming that send data and I need to put that data into MongoDB for test purposes. The easiest way is to create a DF from the individual list of columns as below

I loop over individual rows in RDD and perform the following

    case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

         for(line <- pricesRDD.collect.toArray)
         {
            var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
           if (price > 90.0)
           {
             println ("price > 90.0, saving to MongoDB collection!")
            // Save prices to mongoDB collection
            var df = Seq(columns(key, ticker, timeissued, price)).toDF
 

but it fails with message

 value toDF is not a member of Seq[columns]. 

What would be the easiest way of resolving this please?

thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



--
Reply | Threaded
Open this post in threaded view
|

Re: getting error: value toDF is not a member of Seq[columns]

Manu Zhang
Have you tried adding Encoder for columns as suggested by Jungtaek Lim ?

On Thu, Sep 6, 2018 at 6:24 AM Mich Talebzadeh <[hidden email]> wrote:

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

I can rebuild the comma separated list as follows:


   case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)
    val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
    import sqlContext.implicits._
 

         for(line <- pricesRDD.collect.toArray)
         {
           var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           var allInOne = key+","+ticker+","+timeissued+","+price
           println(allInOne)

and the print shows the columns separated by ","


34e07d9f-829a-446a-93ab-8b93aa8eda41,SAP,2018-09-05T23:22:34,56.89


So I just need to convert that line of rowinto a DataFrame

I try this conversion to DF to write to MongoDB document with MongoSpark.save(df, writeConfig)

var df = sparkContext.parallelize(Seq(columns(key, ticker, timeissued, price))).toDF

[error] /data6/hduser/scala/md_streaming_mongoDB/src/main/scala/myPackage/md_streaming_mongoDB.scala:235: value toDF is not a member of org.apache.spark.rdd.RDD[columns]
[error]             var df = sparkContext.parallelize(Seq(columns(key, ticker, timeissued, price))).toDF
[

 

frustrating!

 has anyone come across this?

thanks

On Wed, 5 Sep 2018 at 13:30, Mich Talebzadeh <[hidden email]> wrote:
yep already tried it and it did not work.

thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 10:10, Deepak Sharma <[hidden email]> wrote:
Try this:

import spark.implicits._
df.toDF()

On Wed, Sep 5, 2018 at 2:31 PM Mich Talebzadeh <[hidden email]> wrote:
With the following

case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

 var key = line._2.split(',').view(0).toString
 var ticker =  line._2.split(',').view(1).toString
 var timeissued = line._2.split(',').view(2).toString
 var price = line._2.split(',').view(3).toFloat

  var df = Seq(columns(key, ticker, timeissued, price))
 println(df)

I get

List(columns(ac11a78d-82df-4b37-bf58-7e3388aa64cd,MKS,2018-09-05T10:10:15,676.5))

So just need to convert that list to DF

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:49, Mich Talebzadeh <[hidden email]> wrote:
Thanks! 

The spark  is version 2.3.0

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:41, Jungtaek Lim <[hidden email]> wrote:
You may also find below link useful (though it looks far old), since case class is the thing which Encoder is available, so there may be another reason which prevent implicit conversion.

And which Spark version do you use?


2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <[hidden email]>님이 작성:
Sorry I guess I pasted another method. the code is...
implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(s))
}

2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <[hidden email]>님이 작성:
I guess you need to have encoder for the type of result for columns().


implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(rdd))
}
You can see lots of Encoder implementations in the scala code. If your type doesn't match anything it may not work and you need to provide custom Encoder.

-Jungtaek Lim (HeartSaVioR)

2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <[hidden email]>님이 작성:
Thanks 

I already do that as below

    val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
  import sqlContext.implicits._

but still getting the error!

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <[hidden email]> wrote:
You may need to import implicits from your spark session like below:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <[hidden email]>님이 작성:
Hi,

I have spark streaming that send data and I need to put that data into MongoDB for test purposes. The easiest way is to create a DF from the individual list of columns as below

I loop over individual rows in RDD and perform the following

    case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

         for(line <- pricesRDD.collect.toArray)
         {
            var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
           if (price > 90.0)
           {
             println ("price > 90.0, saving to MongoDB collection!")
            // Save prices to mongoDB collection
            var df = Seq(columns(key, ticker, timeissued, price)).toDF
 

but it fails with message

 value toDF is not a member of Seq[columns]. 

What would be the easiest way of resolving this please?

thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



--
Reply | Threaded
Open this post in threaded view
|

Re: getting error: value toDF is not a member of Seq[columns]

Mich Talebzadeh
I am trying to understand why spark cannot convert a simple comma separated columns as DF.

I did a test

I took one line of print and stored it as a one liner csv file as below

var allInOne = key+","+ticker+","+timeissued+","+price
println(allInOne)


cat crap.csv
6e84b11d-cb03-44c0-aab6-37e06e06c996,MRW,2018-09-06T09:35:53,275.45

Then after storing it in HDFS, I read that file as below

import org.apache.spark.sql.functions._
val location="hdfs://rhes75:9000/tmp/crap.csv"
val df1 = spark.read.option("header", false).csv(location)
case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Double)
val df2 = df1.map(p => columns(p(0).toString,p(1).toString, p(2).toString,p(3).toString.toDouble))
df2.printSchema

This is the result I get

df1: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 2 more fields]
defined class columns
df2: org.apache.spark.sql.Dataset[columns] = [KEY: string, TICKER: string ... 2 more fields]
root
 |-- KEY: string (nullable = true)
 |-- TICKER: string (nullable = true)
 |-- TIMEISSUED: string (nullable = true)
 |-- PRICE: double (nullable = false)

So in my case the only difference is that that comma separated line is stored in a String as opposed to csv.

How can I achieve this simple transformation?

Thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 6 Sep 2018 at 03:38, Manu Zhang <[hidden email]> wrote:
Have you tried adding Encoder for columns as suggested by Jungtaek Lim ?

On Thu, Sep 6, 2018 at 6:24 AM Mich Talebzadeh <[hidden email]> wrote:

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

I can rebuild the comma separated list as follows:


   case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)
    val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
    import sqlContext.implicits._
 

         for(line <- pricesRDD.collect.toArray)
         {
           var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           var allInOne = key+","+ticker+","+timeissued+","+price
           println(allInOne)

and the print shows the columns separated by ","


34e07d9f-829a-446a-93ab-8b93aa8eda41,SAP,2018-09-05T23:22:34,56.89


So I just need to convert that line of rowinto a DataFrame

I try this conversion to DF to write to MongoDB document with MongoSpark.save(df, writeConfig)

var df = sparkContext.parallelize(Seq(columns(key, ticker, timeissued, price))).toDF

[error] /data6/hduser/scala/md_streaming_mongoDB/src/main/scala/myPackage/md_streaming_mongoDB.scala:235: value toDF is not a member of org.apache.spark.rdd.RDD[columns]
[error]             var df = sparkContext.parallelize(Seq(columns(key, ticker, timeissued, price))).toDF
[

 

frustrating!

 has anyone come across this?

thanks

On Wed, 5 Sep 2018 at 13:30, Mich Talebzadeh <[hidden email]> wrote:
yep already tried it and it did not work.

thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 10:10, Deepak Sharma <[hidden email]> wrote:
Try this:

import spark.implicits._
df.toDF()

On Wed, Sep 5, 2018 at 2:31 PM Mich Talebzadeh <[hidden email]> wrote:
With the following

case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

 var key = line._2.split(',').view(0).toString
 var ticker =  line._2.split(',').view(1).toString
 var timeissued = line._2.split(',').view(2).toString
 var price = line._2.split(',').view(3).toFloat

  var df = Seq(columns(key, ticker, timeissued, price))
 println(df)

I get

List(columns(ac11a78d-82df-4b37-bf58-7e3388aa64cd,MKS,2018-09-05T10:10:15,676.5))

So just need to convert that list to DF

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:49, Mich Talebzadeh <[hidden email]> wrote:
Thanks! 

The spark  is version 2.3.0

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:41, Jungtaek Lim <[hidden email]> wrote:
You may also find below link useful (though it looks far old), since case class is the thing which Encoder is available, so there may be another reason which prevent implicit conversion.

And which Spark version do you use?


2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <[hidden email]>님이 작성:
Sorry I guess I pasted another method. the code is...
implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(s))
}

2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <[hidden email]>님이 작성:
I guess you need to have encoder for the type of result for columns().


implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(rdd))
}
You can see lots of Encoder implementations in the scala code. If your type doesn't match anything it may not work and you need to provide custom Encoder.

-Jungtaek Lim (HeartSaVioR)

2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <[hidden email]>님이 작성:
Thanks 

I already do that as below

    val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
  import sqlContext.implicits._

but still getting the error!

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <[hidden email]> wrote:
You may need to import implicits from your spark session like below:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <[hidden email]>님이 작성:
Hi,

I have spark streaming that send data and I need to put that data into MongoDB for test purposes. The easiest way is to create a DF from the individual list of columns as below

I loop over individual rows in RDD and perform the following

    case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

         for(line <- pricesRDD.collect.toArray)
         {
            var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
           if (price > 90.0)
           {
             println ("price > 90.0, saving to MongoDB collection!")
            // Save prices to mongoDB collection
            var df = Seq(columns(key, ticker, timeissued, price)).toDF
 

but it fails with message

 value toDF is not a member of Seq[columns]. 

What would be the easiest way of resolving this please?

thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



--
Reply | Threaded
Open this post in threaded view
|

Re: getting error: value toDF is not a member of Seq[columns]

Jungtaek Lim
This code works with Spark 2.3.0 via spark-shell.

scala> case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)
defined class columns

scala> import spark.implicits._
import spark.implicits._

scala> var df = Seq(columns("key", "ticker", "timeissued", 1.23f)).toDF
18/09/06 18:02:23 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
df: org.apache.spark.sql.DataFrame = [KEY: string, TICKER: string ... 2 more fields]

scala> df
res0: org.apache.spark.sql.DataFrame = [KEY: string, TICKER: string ... 2 more fields]

Maybe need to know about actual type of key, ticker, timeissued, price from your variables.

Jungtaek Lim (HeartSaVioR)

2018년 9월 6일 (목) 오후 5:57, Mich Talebzadeh <[hidden email]>님이 작성:
I am trying to understand why spark cannot convert a simple comma separated columns as DF.

I did a test

I took one line of print and stored it as a one liner csv file as below

var allInOne = key+","+ticker+","+timeissued+","+price
println(allInOne)


cat crap.csv
6e84b11d-cb03-44c0-aab6-37e06e06c996,MRW,2018-09-06T09:35:53,275.45

Then after storing it in HDFS, I read that file as below

import org.apache.spark.sql.functions._
val location="hdfs://rhes75:9000/tmp/crap.csv"
val df1 = spark.read.option("header", false).csv(location)
case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Double)
val df2 = df1.map(p => columns(p(0).toString,p(1).toString, p(2).toString,p(3).toString.toDouble))
df2.printSchema

This is the result I get

df1: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 2 more fields]
defined class columns
df2: org.apache.spark.sql.Dataset[columns] = [KEY: string, TICKER: string ... 2 more fields]
root
 |-- KEY: string (nullable = true)
 |-- TICKER: string (nullable = true)
 |-- TIMEISSUED: string (nullable = true)
 |-- PRICE: double (nullable = false)

So in my case the only difference is that that comma separated line is stored in a String as opposed to csv.

How can I achieve this simple transformation?

Thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 6 Sep 2018 at 03:38, Manu Zhang <[hidden email]> wrote:
Have you tried adding Encoder for columns as suggested by Jungtaek Lim ?

On Thu, Sep 6, 2018 at 6:24 AM Mich Talebzadeh <[hidden email]> wrote:

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

I can rebuild the comma separated list as follows:


   case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)
    val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
    import sqlContext.implicits._
 

         for(line <- pricesRDD.collect.toArray)
         {
           var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           var allInOne = key+","+ticker+","+timeissued+","+price
           println(allInOne)

and the print shows the columns separated by ","


34e07d9f-829a-446a-93ab-8b93aa8eda41,SAP,2018-09-05T23:22:34,56.89


So I just need to convert that line of rowinto a DataFrame

I try this conversion to DF to write to MongoDB document with MongoSpark.save(df, writeConfig)

var df = sparkContext.parallelize(Seq(columns(key, ticker, timeissued, price))).toDF

[error] /data6/hduser/scala/md_streaming_mongoDB/src/main/scala/myPackage/md_streaming_mongoDB.scala:235: value toDF is not a member of org.apache.spark.rdd.RDD[columns]
[error]             var df = sparkContext.parallelize(Seq(columns(key, ticker, timeissued, price))).toDF
[

 

frustrating!

 has anyone come across this?

thanks

On Wed, 5 Sep 2018 at 13:30, Mich Talebzadeh <[hidden email]> wrote:
yep already tried it and it did not work.

thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 10:10, Deepak Sharma <[hidden email]> wrote:
Try this:

import spark.implicits._
df.toDF()

On Wed, Sep 5, 2018 at 2:31 PM Mich Talebzadeh <[hidden email]> wrote:
With the following

case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

 var key = line._2.split(',').view(0).toString
 var ticker =  line._2.split(',').view(1).toString
 var timeissued = line._2.split(',').view(2).toString
 var price = line._2.split(',').view(3).toFloat

  var df = Seq(columns(key, ticker, timeissued, price))
 println(df)

I get

List(columns(ac11a78d-82df-4b37-bf58-7e3388aa64cd,MKS,2018-09-05T10:10:15,676.5))

So just need to convert that list to DF

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:49, Mich Talebzadeh <[hidden email]> wrote:
Thanks! 

The spark  is version 2.3.0

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:41, Jungtaek Lim <[hidden email]> wrote:
You may also find below link useful (though it looks far old), since case class is the thing which Encoder is available, so there may be another reason which prevent implicit conversion.

And which Spark version do you use?


2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <[hidden email]>님이 작성:
Sorry I guess I pasted another method. the code is...
implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(s))
}

2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <[hidden email]>님이 작성:
I guess you need to have encoder for the type of result for columns().


implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(rdd))
}
You can see lots of Encoder implementations in the scala code. If your type doesn't match anything it may not work and you need to provide custom Encoder.

-Jungtaek Lim (HeartSaVioR)

2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <[hidden email]>님이 작성:
Thanks 

I already do that as below

    val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
  import sqlContext.implicits._

but still getting the error!

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <[hidden email]> wrote:
You may need to import implicits from your spark session like below:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <[hidden email]>님이 작성:
Hi,

I have spark streaming that send data and I need to put that data into MongoDB for test purposes. The easiest way is to create a DF from the individual list of columns as below

I loop over individual rows in RDD and perform the following

    case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

         for(line <- pricesRDD.collect.toArray)
         {
            var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
           if (price > 90.0)
           {
             println ("price > 90.0, saving to MongoDB collection!")
            // Save prices to mongoDB collection
            var df = Seq(columns(key, ticker, timeissued, price)).toDF
 

but it fails with message

 value toDF is not a member of Seq[columns]. 

What would be the easiest way of resolving this please?

thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



--
Reply | Threaded
Open this post in threaded view
|

Re: getting error: value toDF is not a member of Seq[columns]

Mich Talebzadeh
thanks if you define columns class as below


scala> case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Double)
defined class columns
scala> var df = Seq(columns("key", "ticker", "timeissued", 1.23f)).toDF
df: org.apache.spark.sql.DataFrame = [KEY: string, TICKER: string ... 2 more fields]
scala> df.printSchema
root
 |-- KEY: string (nullable = true)
 |-- TICKER: string (nullable = true)
 |-- TIMEISSUED: string (nullable = true)
 |-- PRICE: double (nullable = false)

looks better

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 6 Sep 2018 at 10:10, Jungtaek Lim <[hidden email]> wrote:
This code works with Spark 2.3.0 via spark-shell.

scala> case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)
defined class columns

scala> import spark.implicits._
import spark.implicits._

scala> var df = Seq(columns("key", "ticker", "timeissued", 1.23f)).toDF
18/09/06 18:02:23 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
df: org.apache.spark.sql.DataFrame = [KEY: string, TICKER: string ... 2 more fields]

scala> df
res0: org.apache.spark.sql.DataFrame = [KEY: string, TICKER: string ... 2 more fields]

Maybe need to know about actual type of key, ticker, timeissued, price from your variables.

Jungtaek Lim (HeartSaVioR)

2018년 9월 6일 (목) 오후 5:57, Mich Talebzadeh <[hidden email]>님이 작성:
I am trying to understand why spark cannot convert a simple comma separated columns as DF.

I did a test

I took one line of print and stored it as a one liner csv file as below

var allInOne = key+","+ticker+","+timeissued+","+price
println(allInOne)


cat crap.csv
6e84b11d-cb03-44c0-aab6-37e06e06c996,MRW,2018-09-06T09:35:53,275.45

Then after storing it in HDFS, I read that file as below

import org.apache.spark.sql.functions._
val location="hdfs://rhes75:9000/tmp/crap.csv"
val df1 = spark.read.option("header", false).csv(location)
case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Double)
val df2 = df1.map(p => columns(p(0).toString,p(1).toString, p(2).toString,p(3).toString.toDouble))
df2.printSchema

This is the result I get

df1: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 2 more fields]
defined class columns
df2: org.apache.spark.sql.Dataset[columns] = [KEY: string, TICKER: string ... 2 more fields]
root
 |-- KEY: string (nullable = true)
 |-- TICKER: string (nullable = true)
 |-- TIMEISSUED: string (nullable = true)
 |-- PRICE: double (nullable = false)

So in my case the only difference is that that comma separated line is stored in a String as opposed to csv.

How can I achieve this simple transformation?

Thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 6 Sep 2018 at 03:38, Manu Zhang <[hidden email]> wrote:
Have you tried adding Encoder for columns as suggested by Jungtaek Lim ?

On Thu, Sep 6, 2018 at 6:24 AM Mich Talebzadeh <[hidden email]> wrote:

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

I can rebuild the comma separated list as follows:


   case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)
    val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
    import sqlContext.implicits._
 

         for(line <- pricesRDD.collect.toArray)
         {
           var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           var allInOne = key+","+ticker+","+timeissued+","+price
           println(allInOne)

and the print shows the columns separated by ","


34e07d9f-829a-446a-93ab-8b93aa8eda41,SAP,2018-09-05T23:22:34,56.89


So I just need to convert that line of rowinto a DataFrame

I try this conversion to DF to write to MongoDB document with MongoSpark.save(df, writeConfig)

var df = sparkContext.parallelize(Seq(columns(key, ticker, timeissued, price))).toDF

[error] /data6/hduser/scala/md_streaming_mongoDB/src/main/scala/myPackage/md_streaming_mongoDB.scala:235: value toDF is not a member of org.apache.spark.rdd.RDD[columns]
[error]             var df = sparkContext.parallelize(Seq(columns(key, ticker, timeissued, price))).toDF
[

 

frustrating!

 has anyone come across this?

thanks

On Wed, 5 Sep 2018 at 13:30, Mich Talebzadeh <[hidden email]> wrote:
yep already tried it and it did not work.

thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 10:10, Deepak Sharma <[hidden email]> wrote:
Try this:

import spark.implicits._
df.toDF()

On Wed, Sep 5, 2018 at 2:31 PM Mich Talebzadeh <[hidden email]> wrote:
With the following

case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

 var key = line._2.split(',').view(0).toString
 var ticker =  line._2.split(',').view(1).toString
 var timeissued = line._2.split(',').view(2).toString
 var price = line._2.split(',').view(3).toFloat

  var df = Seq(columns(key, ticker, timeissued, price))
 println(df)

I get

List(columns(ac11a78d-82df-4b37-bf58-7e3388aa64cd,MKS,2018-09-05T10:10:15,676.5))

So just need to convert that list to DF

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:49, Mich Talebzadeh <[hidden email]> wrote:
Thanks! 

The spark  is version 2.3.0

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:41, Jungtaek Lim <[hidden email]> wrote:
You may also find below link useful (though it looks far old), since case class is the thing which Encoder is available, so there may be another reason which prevent implicit conversion.

And which Spark version do you use?


2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <[hidden email]>님이 작성:
Sorry I guess I pasted another method. the code is...
implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(s))
}

2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <[hidden email]>님이 작성:
I guess you need to have encoder for the type of result for columns().


implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(rdd))
}
You can see lots of Encoder implementations in the scala code. If your type doesn't match anything it may not work and you need to provide custom Encoder.

-Jungtaek Lim (HeartSaVioR)

2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <[hidden email]>님이 작성:
Thanks 

I already do that as below

    val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
  import sqlContext.implicits._

but still getting the error!

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <[hidden email]> wrote:
You may need to import implicits from your spark session like below:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <[hidden email]>님이 작성:
Hi,

I have spark streaming that send data and I need to put that data into MongoDB for test purposes. The easiest way is to create a DF from the individual list of columns as below

I loop over individual rows in RDD and perform the following

    case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

         for(line <- pricesRDD.collect.toArray)
         {
            var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
           if (price > 90.0)
           {
             println ("price > 90.0, saving to MongoDB collection!")
            // Save prices to mongoDB collection
            var df = Seq(columns(key, ticker, timeissued, price)).toDF
 

but it fails with message

 value toDF is not a member of Seq[columns]. 

What would be the easiest way of resolving this please?

thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



--
Reply | Threaded
Open this post in threaded view
|

Re: getting error: value toDF is not a member of Seq[columns]

Mich Talebzadeh
Ok somehow this worked!

             // Save prices to mongoDB collection
             val document = sparkContext.parallelize((1 to 1).
                            map(i => Document.parse(s"{key:'$key',ticker:'$ticker',timeissued:'$timeissued',price:$price,CURRENCY:'$CURRENCY',op_type:$op_type,op_time:'$op_time'}")))
             //
             // Writing document to Mongo collection
             //
             MongoSpark.save(document, writeConfig)

Note that all non numeric columns are enclosed with '$column'

I just created a dummy map with one single mapping (1 to 1)

These are the results in MongoDB document

{
    "_id" : ObjectId("5b915796d3c6071e82fdca2b"),
    "key" : "23c39917-08a9-4845-ba74-51997707d374",
    "ticker" : "IBM",
    "timeissued" : "2018-09-06T17:51:21",
    "price" : 207.23,
    "CURRENCY" : "GBP",
    "op_type" : NumberInt(1),
    "op_time" : "1536251798114"
}
{
    "_id" : ObjectId("5b915796d3c6071e82fdca2c"),
    "key" : "22f353f9-9b28-463c-9f1c-64213ded7cd5",
    "ticker" : "TSCO",
    "timeissued" : "2018-09-06T17:51:21",
    "price" : 179.52,
    "CURRENCY" : "GBP",
    "op_type" : NumberInt(1),
    "op_time" : "1536251798162"
}



Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 6 Sep 2018 at 10:24, Mich Talebzadeh <[hidden email]> wrote:
thanks if you define columns class as below


scala> case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Double)
defined class columns
scala> var df = Seq(columns("key", "ticker", "timeissued", 1.23f)).toDF
df: org.apache.spark.sql.DataFrame = [KEY: string, TICKER: string ... 2 more fields]
scala> df.printSchema
root
 |-- KEY: string (nullable = true)
 |-- TICKER: string (nullable = true)
 |-- TIMEISSUED: string (nullable = true)
 |-- PRICE: double (nullable = false)

looks better

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 6 Sep 2018 at 10:10, Jungtaek Lim <[hidden email]> wrote:
This code works with Spark 2.3.0 via spark-shell.

scala> case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)
defined class columns

scala> import spark.implicits._
import spark.implicits._

scala> var df = Seq(columns("key", "ticker", "timeissued", 1.23f)).toDF
18/09/06 18:02:23 WARN ObjectStore: Failed to get database global_temp, returning NoSuchObjectException
df: org.apache.spark.sql.DataFrame = [KEY: string, TICKER: string ... 2 more fields]

scala> df
res0: org.apache.spark.sql.DataFrame = [KEY: string, TICKER: string ... 2 more fields]

Maybe need to know about actual type of key, ticker, timeissued, price from your variables.

Jungtaek Lim (HeartSaVioR)

2018년 9월 6일 (목) 오후 5:57, Mich Talebzadeh <[hidden email]>님이 작성:
I am trying to understand why spark cannot convert a simple comma separated columns as DF.

I did a test

I took one line of print and stored it as a one liner csv file as below

var allInOne = key+","+ticker+","+timeissued+","+price
println(allInOne)


cat crap.csv
6e84b11d-cb03-44c0-aab6-37e06e06c996,MRW,2018-09-06T09:35:53,275.45

Then after storing it in HDFS, I read that file as below

import org.apache.spark.sql.functions._
val location="hdfs://rhes75:9000/tmp/crap.csv"
val df1 = spark.read.option("header", false).csv(location)
case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Double)
val df2 = df1.map(p => columns(p(0).toString,p(1).toString, p(2).toString,p(3).toString.toDouble))
df2.printSchema

This is the result I get

df1: org.apache.spark.sql.DataFrame = [_c0: string, _c1: string ... 2 more fields]
defined class columns
df2: org.apache.spark.sql.Dataset[columns] = [KEY: string, TICKER: string ... 2 more fields]
root
 |-- KEY: string (nullable = true)
 |-- TICKER: string (nullable = true)
 |-- TIMEISSUED: string (nullable = true)
 |-- PRICE: double (nullable = false)

So in my case the only difference is that that comma separated line is stored in a String as opposed to csv.

How can I achieve this simple transformation?

Thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Thu, 6 Sep 2018 at 03:38, Manu Zhang <[hidden email]> wrote:
Have you tried adding Encoder for columns as suggested by Jungtaek Lim ?

On Thu, Sep 6, 2018 at 6:24 AM Mich Talebzadeh <[hidden email]> wrote:

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 

I can rebuild the comma separated list as follows:


   case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)
    val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
    import sqlContext.implicits._
 

         for(line <- pricesRDD.collect.toArray)
         {
           var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           var allInOne = key+","+ticker+","+timeissued+","+price
           println(allInOne)

and the print shows the columns separated by ","


34e07d9f-829a-446a-93ab-8b93aa8eda41,SAP,2018-09-05T23:22:34,56.89


So I just need to convert that line of rowinto a DataFrame

I try this conversion to DF to write to MongoDB document with MongoSpark.save(df, writeConfig)

var df = sparkContext.parallelize(Seq(columns(key, ticker, timeissued, price))).toDF

[error] /data6/hduser/scala/md_streaming_mongoDB/src/main/scala/myPackage/md_streaming_mongoDB.scala:235: value toDF is not a member of org.apache.spark.rdd.RDD[columns]
[error]             var df = sparkContext.parallelize(Seq(columns(key, ticker, timeissued, price))).toDF
[

 

frustrating!

 has anyone come across this?

thanks

On Wed, 5 Sep 2018 at 13:30, Mich Talebzadeh <[hidden email]> wrote:
yep already tried it and it did not work.

thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 10:10, Deepak Sharma <[hidden email]> wrote:
Try this:

import spark.implicits._
df.toDF()

On Wed, Sep 5, 2018 at 2:31 PM Mich Talebzadeh <[hidden email]> wrote:
With the following

case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

 var key = line._2.split(',').view(0).toString
 var ticker =  line._2.split(',').view(1).toString
 var timeissued = line._2.split(',').view(2).toString
 var price = line._2.split(',').view(3).toFloat

  var df = Seq(columns(key, ticker, timeissued, price))
 println(df)

I get

List(columns(ac11a78d-82df-4b37-bf58-7e3388aa64cd,MKS,2018-09-05T10:10:15,676.5))

So just need to convert that list to DF

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:49, Mich Talebzadeh <[hidden email]> wrote:
Thanks! 

The spark  is version 2.3.0

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:41, Jungtaek Lim <[hidden email]> wrote:
You may also find below link useful (though it looks far old), since case class is the thing which Encoder is available, so there may be another reason which prevent implicit conversion.

And which Spark version do you use?


2018년 9월 5일 (수) 오후 5:32, Jungtaek Lim <[hidden email]>님이 작성:
Sorry I guess I pasted another method. the code is...
implicit def localSeqToDatasetHolder[T : Encoder](s: Seq[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(s))
}

2018년 9월 5일 (수) 오후 5:30, Jungtaek Lim <[hidden email]>님이 작성:
I guess you need to have encoder for the type of result for columns().


implicit def rddToDatasetHolder[T : Encoder](rdd: RDD[T]): DatasetHolder[T] = {
DatasetHolder(_sqlContext.createDataset(rdd))
}
You can see lots of Encoder implementations in the scala code. If your type doesn't match anything it may not work and you need to provide custom Encoder.

-Jungtaek Lim (HeartSaVioR)

2018년 9월 5일 (수) 오후 5:24, Mich Talebzadeh <[hidden email]>님이 작성:
Thanks 

I already do that as below

    val sqlContext= new org.apache.spark.sql.SQLContext(sparkContext)
  import sqlContext.implicits._

but still getting the error!

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Wed, 5 Sep 2018 at 09:17, Jungtaek Lim <[hidden email]> wrote:
You may need to import implicits from your spark session like below:

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

2018년 9월 5일 (수) 오후 5:11, Mich Talebzadeh <[hidden email]>님이 작성:
Hi,

I have spark streaming that send data and I need to put that data into MongoDB for test purposes. The easiest way is to create a DF from the individual list of columns as below

I loop over individual rows in RDD and perform the following

    case class columns(KEY: String, TICKER: String, TIMEISSUED: String, PRICE: Float)

         for(line <- pricesRDD.collect.toArray)
         {
            var key = line._2.split(',').view(0).toString
           var ticker =  line._2.split(',').view(1).toString
           var timeissued = line._2.split(',').view(2).toString
           var price = line._2.split(',').view(3).toFloat
           val priceToString = line._2.split(',').view(3)
           if (price > 90.0)
           {
             println ("price > 90.0, saving to MongoDB collection!")
            // Save prices to mongoDB collection
            var df = Seq(columns(key, ticker, timeissued, price)).toDF
 

but it fails with message

 value toDF is not a member of Seq[columns]. 

What would be the easiest way of resolving this please?

thanks

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



--