How to retreive data from nested json use dataframe

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

How to retreive data from nested json use dataframe

阎志涛

Hi, All,

I am using Spark 2.1 and want to do data transfer for a nested json. I tried to read it use dataframe but failed. Following is the schema of the dataframe:

root

|-- deviceid: string (nullable = true)

|-- app: struct (nullable = true)

|    |-- appList: array (nullable = true)

|    |    |-- element: struct (containsNull = true)

|    |    |    |-- appName: string (nullable = true)

|    |    |    |-- appVersion: string (nullable = true)

|    |    |    |-- pkgName: string (nullable = true)

|    |-- appName: string (nullable = true)

|    |-- appVersion: string (nullable = true)

|    |-- firstUseTime: string (nullable = true)

|    |-- installTime: string (nullable = true)

|    |-- pkgName: string (nullable = true)

 

I want to retrieve the data under appList and want to merge it. What I did is define a case class:

case class AppInfo(appName:String,appVersion:String,pkgName:String)

And I read it use getList(AppInfo) .

It can compile successfully but I got class cast exception while run it and the exception is as following:

java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to com.zhishu.data.etl.ParquetTest$AppInfo

       at com.zhishu.data.etl.ParquetTest$$anonfun$2.apply(ParquetTest.scala:75)

       at com.zhishu.data.etl.ParquetTest$$anonfun$2.apply(ParquetTest.scala:56)

       at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

       at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

       at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)

       at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)

       at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

       at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)

       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)

       at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)

       at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)

       at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)

       at org.apache.spark.scheduler.Task.run(Task.scala:99)

      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)

       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

       at java.lang.Thread.run(Thread.java:748)

 

So is there any easy way I can implement what I want to do?

 

Thanks and Regards,

Tony

 

 

 

 

 

Reply | Threaded
Open this post in threaded view
|

Re: How to retreive data from nested json use dataframe

Mich Talebzadeh
Hi Tony,

Try something like below with two class definitions

case class Address(building: String, coord: Array[Double], street: String, zipcode: String)
case class Restaurant(address: Address, borough: String, cuisine: String, name: String)
val dfRestaurants = Seq(Restaurant(Address("1480", Array(-73.9557413, 40.7720266), "2 Avenue", "10075"), "Manhattan", "Italian", "Vella"),
                         Restaurant(Address("1007", Array(-73.856077, 40.848447), "Morris Park Ave", "10462"), "Bronx", "Bakery", "Morris Park Bake Shop")).
                         toDF().
                         coalesce(1)


HTH

Dr Mich Talebzadeh

 

LinkedIn  https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw

 

http://talebzadehmich.wordpress.com


Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sat, 8 Sep 2018 at 15:38, 阎志涛 <[hidden email]> wrote:

Hi, All,

I am using Spark 2.1 and want to do data transfer for a nested json. I tried to read it use dataframe but failed. Following is the schema of the dataframe:

root

|-- deviceid: string (nullable = true)

|-- app: struct (nullable = true)

|    |-- appList: array (nullable = true)

|    |    |-- element: struct (containsNull = true)

|    |    |    |-- appName: string (nullable = true)

|    |    |    |-- appVersion: string (nullable = true)

|    |    |    |-- pkgName: string (nullable = true)

|    |-- appName: string (nullable = true)

|    |-- appVersion: string (nullable = true)

|    |-- firstUseTime: string (nullable = true)

|    |-- installTime: string (nullable = true)

|    |-- pkgName: string (nullable = true)

 

I want to retrieve the data under appList and want to merge it. What I did is define a case class:

case class AppInfo(appName:String,appVersion:String,pkgName:String)

And I read it use getList(AppInfo) .

It can compile successfully but I got class cast exception while run it and the exception is as following:

java.lang.ClassCastException: org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema cannot be cast to com.zhishu.data.etl.ParquetTest$AppInfo

       at com.zhishu.data.etl.ParquetTest$$anonfun$2.apply(ParquetTest.scala:75)

       at com.zhishu.data.etl.ParquetTest$$anonfun$2.apply(ParquetTest.scala:56)

       at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

       at scala.collection.Iterator$$anon$11.next(Iterator.scala:409)

       at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.agg_doAggregateWithoutKey$(Unknown Source)

       at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)

       at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)

       at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)

       at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:408)

       at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:126)

       at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)

       at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)

       at org.apache.spark.scheduler.Task.run(Task.scala:99)

      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)

       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

       at java.lang.Thread.run(Thread.java:748)

 

So is there any easy way I can implement what I want to do?

 

Thanks and Regards,

Tony