What are the alternatives to nested DataFrames?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

What are the alternatives to nested DataFrames?

Yeikel

Hi community , 

 

As shown in other answers online , Spark does not support the nesting of DataFrames , but what are the options?

 

I have the following scenario :

 

dataFrame1 = List of Cities

 

dataFrame2 = Created after searching in ElasticSearch for each city in dataFrame1

 

I've tried :

 

 val cities    = sc.parallelize(Seq("New York")).toDF()

   cities.foreach(r => {

    val companyName = r.getString(0)

    println(companyName)

    val dfs = sqlContext.esDF("cities/docs", "?q=" + companyName)  //returns a DataFrame consisting of all the cities matching the entry in cities

    })

 

Which triggers the expected null pointer exception

 

java.lang.NullPointerException

    at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:53)

    at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:51)

    at org.elasticsearch.spark.sql.package$SQLContextFunctions.esDF(package.scala:37)

    at Main$$anonfun$main$1.apply(Main.scala:43)

    at Main$$anonfun$main$1.apply(Main.scala:39)

    at scala.collection.Iterator$class.foreach(Iterator.scala:742)

    at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)

    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)

    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)

    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)

    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)

    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

    at org.apache.spark.scheduler.Task.run(Task.scala:109)

    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

    at java.lang.Thread.run(Thread.java:748)

2018-12-28 02:01:00 ERROR TaskSetManager:70 - Task 7 in stage 0.0 failed 1 times; aborting job

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 1 times, most recent failure: Lost task 7.0 in stage 0.0 (TID 7, localhost, executor driver): java.lang.NullPointerException

 

What options do I have?

 

Thank you.

Reply | Threaded
Open this post in threaded view
|

Re: What are the alternatives to nested DataFrames?

Shahab Yunus
Can you have a dataframe with a column which stores json (type string)? Or you can also have a column of array type in which you store all cities matching your query.



On Fri, Dec 28, 2018 at 2:48 AM <[hidden email]> wrote:

Hi community , 

 

As shown in other answers online , Spark does not support the nesting of DataFrames , but what are the options?

 

I have the following scenario :

 

dataFrame1 = List of Cities

 

dataFrame2 = Created after searching in ElasticSearch for each city in dataFrame1

 

I've tried :

 

 val cities    = sc.parallelize(Seq("New York")).toDF()

   cities.foreach(r => {

    val companyName = r.getString(0)

    println(companyName)

    val dfs = sqlContext.esDF("cities/docs", "?q=" + companyName)  //returns a DataFrame consisting of all the cities matching the entry in cities

    })

 

Which triggers the expected null pointer exception

 

java.lang.NullPointerException

    at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:53)

    at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:51)

    at org.elasticsearch.spark.sql.package$SQLContextFunctions.esDF(package.scala:37)

    at Main$$anonfun$main$1.apply(Main.scala:43)

    at Main$$anonfun$main$1.apply(Main.scala:39)

    at scala.collection.Iterator$class.foreach(Iterator.scala:742)

    at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)

    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)

    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)

    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)

    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)

    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

    at org.apache.spark.scheduler.Task.run(Task.scala:109)

    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

    at java.lang.Thread.run(Thread.java:748)

2018-12-28 02:01:00 ERROR TaskSetManager:70 - Task 7 in stage 0.0 failed 1 times; aborting job

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 1 times, most recent failure: Lost task 7.0 in stage 0.0 (TID 7, localhost, executor driver): java.lang.NullPointerException

 

What options do I have?

 

Thank you.

Reply | Threaded
Open this post in threaded view
|

RE: What are the alternatives to nested DataFrames?

Yeikel

Shabad , I am not sure what you are trying to say. Could you please give me an example? The result of the Query is a Dataframe that is created after iterating, so I am not sure how could I map that to a column without iterating and getting the values.

 

I have a Dataframe that contains a list of cities for which I would like to iterate over and search in Elasticsearch.  This list is stored in Dataframe because it contains hundreds of thousands of elements with multiple properties that would not fit in a single machine.

 

The issue is that the elastic-spark connector returns a Dataframe as well which leads to a dataframe creation within a Dataframe

 

The only solution I found is to store the list of cities in a a regular scala Seq and iterate over that, but as far as I know this would make Seq centralized instead of distributed (run at the executor only?)

 

Full example :

 

val cities    = Seq("New York","Michigan")

cities.foreach(r => {

  val qb = QueryBuilders.matchQuery("name", r).operator(Operator.AND)
  print(qb.toString)

  val dfs = sqlContext.esDF("cities/docs", qb.toString) // Returns a dataframe for each city

  dfs.show() // Works as expected. It prints the individual dataframe with the result of the query

})

 

 

val cities = Seq("New York","Michigan").toDF()

 

    cities.foreach(r => {

 

      val city  = r.getString(0)

 

      val qb = QueryBuilders.matchQuery("name", city).operator(Operator.AND)

      print(qb.toString)

 

      val dfs = sqlContext.esDF("cities/docs", qb.toString) // null pointer

 

      dfs.show()

 

    })

 

 

From: Shahab Yunus <[hidden email]>
Sent: Friday, December 28, 2018 12:34 PM
To: [hidden email]
Cc: user <[hidden email]>
Subject: Re: What are the alternatives to nested DataFrames?

 

Can you have a dataframe with a column which stores json (type string)? Or you can also have a column of array type in which you store all cities matching your query.

 

 

 

On Fri, Dec 28, 2018 at 2:48 AM <[hidden email]> wrote:

Hi community , 

 

As shown in other answers online , Spark does not support the nesting of DataFrames , but what are the options?

 

I have the following scenario :

 

dataFrame1 = List of Cities

 

dataFrame2 = Created after searching in ElasticSearch for each city in dataFrame1

 

I've tried :

 

 val cities    = sc.parallelize(Seq("New York")).toDF()

   cities.foreach(r => {

    val companyName = r.getString(0)

    println(companyName)

    val dfs = sqlContext.esDF("cities/docs", "?q=" + companyName)  //returns a DataFrame consisting of all the cities matching the entry in cities

    })

 

Which triggers the expected null pointer exception

 

java.lang.NullPointerException

    at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:53)

    at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:51)

    at org.elasticsearch.spark.sql.package$SQLContextFunctions.esDF(package.scala:37)

    at Main$$anonfun$main$1.apply(Main.scala:43)

    at Main$$anonfun$main$1.apply(Main.scala:39)

    at scala.collection.Iterator$class.foreach(Iterator.scala:742)

    at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)

    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)

    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)

    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)

    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)

    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

    at org.apache.spark.scheduler.Task.run(Task.scala:109)

    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

    at java.lang.Thread.run(Thread.java:748)

2018-12-28 02:01:00 ERROR TaskSetManager:70 - Task 7 in stage 0.0 failed 1 times; aborting job

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 1 times, most recent failure: Lost task 7.0 in stage 0.0 (TID 7, localhost, executor driver): java.lang.NullPointerException

 

What options do I have?

 

Thank you.

Reply | Threaded
Open this post in threaded view
|

Re: What are the alternatives to nested DataFrames?

Andrew Melo
Could you join() the DFs on a common key?

On Fri, Dec 28, 2018 at 18:35 <[hidden email]> wrote:

Shabad , I am not sure what you are trying to say. Could you please give me an example? The result of the Query is a Dataframe that is created after iterating, so I am not sure how could I map that to a column without iterating and getting the values.

 

I have a Dataframe that contains a list of cities for which I would like to iterate over and search in Elasticsearch.  This list is stored in Dataframe because it contains hundreds of thousands of elements with multiple properties that would not fit in a single machine.

 

The issue is that the elastic-spark connector returns a Dataframe as well which leads to a dataframe creation within a Dataframe

 

The only solution I found is to store the list of cities in a a regular scala Seq and iterate over that, but as far as I know this would make Seq centralized instead of distributed (run at the executor only?)

 

Full example :

 

val cities    = Seq("New York","Michigan")

cities.foreach(r => {

  val qb = QueryBuilders.matchQuery("name", r).operator(Operator.AND)
  print(qb.toString)

  val dfs = sqlContext.esDF("cities/docs", qb.toString) // Returns a dataframe for each city

  dfs.show() // Works as expected. It prints the individual dataframe with the result of the query

})

 

 

val cities = Seq("New York","Michigan").toDF()

 

    cities.foreach(r => {

 

      val city  = r.getString(0)

 

      val qb = QueryBuilders.matchQuery("name", city).operator(Operator.AND)

      print(qb.toString)

 

      val dfs = sqlContext.esDF("cities/docs", qb.toString) // null pointer

 

      dfs.show()

 

    })

 

 

From: Shahab Yunus <[hidden email]>
Sent: Friday, December 28, 2018 12:34 PM
To: [hidden email]
Cc: user <[hidden email]>
Subject: Re: What are the alternatives to nested DataFrames?

 

Can you have a dataframe with a column which stores json (type string)? Or you can also have a column of array type in which you store all cities matching your query.

 

 

 

On Fri, Dec 28, 2018 at 2:48 AM <[hidden email]> wrote:

Hi community , 

 

As shown in other answers online , Spark does not support the nesting of DataFrames , but what are the options?

 

I have the following scenario :

 

dataFrame1 = List of Cities

 

dataFrame2 = Created after searching in ElasticSearch for each city in dataFrame1

 

I've tried :

 

 val cities    = sc.parallelize(Seq("New York")).toDF()

   cities.foreach(r => {

    val companyName = r.getString(0)

    println(companyName)

    val dfs = sqlContext.esDF("cities/docs", "?q=" + companyName)  //returns a DataFrame consisting of all the cities matching the entry in cities

    })

 

Which triggers the expected null pointer exception

 

java.lang.NullPointerException

    at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:53)

    at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:51)

    at org.elasticsearch.spark.sql.package$SQLContextFunctions.esDF(package.scala:37)

    at Main$$anonfun$main$1.apply(Main.scala:43)

    at Main$$anonfun$main$1.apply(Main.scala:39)

    at scala.collection.Iterator$class.foreach(Iterator.scala:742)

    at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)

    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)

    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)

    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)

    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)

    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

    at org.apache.spark.scheduler.Task.run(Task.scala:109)

    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

    at java.lang.Thread.run(Thread.java:748)

2018-12-28 02:01:00 ERROR TaskSetManager:70 - Task 7 in stage 0.0 failed 1 times; aborting job

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 1 times, most recent failure: Lost task 7.0 in stage 0.0 (TID 7, localhost, executor driver): java.lang.NullPointerException

 

What options do I have?

 

Thank you.

--
It's dark in this basement.
Reply | Threaded
Open this post in threaded view
|

RE: What are the alternatives to nested DataFrames?

Yeikel

I could , but only if I had it beforehand.  I do not know what the dataframe is until I pass the query parameter and receive the resultant dataframe inside the iteration.  

 

The steps are :

 

Original DF -> Iterate -> Pass every element to a function that takes the element of the original DF and returns a new dataframe including all the matching terms

 

 

From: Andrew Melo <[hidden email]>
Sent: Friday, December 28, 2018 8:48 PM
To: [hidden email]
Cc: Shahab Yunus <[hidden email]>; user <[hidden email]>
Subject: Re: What are the alternatives to nested DataFrames?

 

Could you join() the DFs on a common key?

 

On Fri, Dec 28, 2018 at 18:35 <[hidden email]> wrote:

Shabad , I am not sure what you are trying to say. Could you please give me an example? The result of the Query is a Dataframe that is created after iterating, so I am not sure how could I map that to a column without iterating and getting the values.

 

I have a Dataframe that contains a list of cities for which I would like to iterate over and search in Elasticsearch.  This list is stored in Dataframe because it contains hundreds of thousands of elements with multiple properties that would not fit in a single machine.

 

The issue is that the elastic-spark connector returns a Dataframe as well which leads to a dataframe creation within a Dataframe

 

The only solution I found is to store the list of cities in a a regular scala Seq and iterate over that, but as far as I know this would make Seq centralized instead of distributed (run at the executor only?)

 

Full example :

 

val cities    = Seq("New York","Michigan")

cities.foreach(r => {

  val qb = QueryBuilders.matchQuery("name", r).operator(Operator.AND)
  print(qb.toString)

  val dfs = sqlContext.esDF("cities/docs", qb.toString) // Returns a dataframe for each city

  dfs.show() // Works as expected. It prints the individual dataframe with the result of the query

})

 

 

val cities = Seq("New York","Michigan").toDF()

 

    cities.foreach(r => {

 

      val city  = r.getString(0)

 

      val qb = QueryBuilders.matchQuery("name", city).operator(Operator.AND)

      print(qb.toString)

 

      val dfs = sqlContext.esDF("cities/docs", qb.toString) // null pointer

 

      dfs.show()

 

    })

 

 

From: Shahab Yunus <[hidden email]>
Sent: Friday, December 28, 2018 12:34 PM
To: [hidden email]
Cc: user <[hidden email]>
Subject: Re: What are the alternatives to nested DataFrames?

 

Can you have a dataframe with a column which stores json (type string)? Or you can also have a column of array type in which you store all cities matching your query.

 

 

 

On Fri, Dec 28, 2018 at 2:48 AM <[hidden email]> wrote:

Hi community , 

 

As shown in other answers online , Spark does not support the nesting of DataFrames , but what are the options?

 

I have the following scenario :

 

dataFrame1 = List of Cities

 

dataFrame2 = Created after searching in ElasticSearch for each city in dataFrame1

 

I've tried :

 

 val cities    = sc.parallelize(Seq("New York")).toDF()

   cities.foreach(r => {

    val companyName = r.getString(0)

    println(companyName)

    val dfs = sqlContext.esDF("cities/docs", "?q=" + companyName)  //returns a DataFrame consisting of all the cities matching the entry in cities

    })

 

Which triggers the expected null pointer exception

 

java.lang.NullPointerException

    at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:53)

    at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:51)

    at org.elasticsearch.spark.sql.package$SQLContextFunctions.esDF(package.scala:37)

    at Main$$anonfun$main$1.apply(Main.scala:43)

    at Main$$anonfun$main$1.apply(Main.scala:39)

    at scala.collection.Iterator$class.foreach(Iterator.scala:742)

    at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)

    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)

    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)

    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)

    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)

    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

    at org.apache.spark.scheduler.Task.run(Task.scala:109)

    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

    at java.lang.Thread.run(Thread.java:748)

2018-12-28 02:01:00 ERROR TaskSetManager:70 - Task 7 in stage 0.0 failed 1 times; aborting job

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 1 times, most recent failure: Lost task 7.0 in stage 0.0 (TID 7, localhost, executor driver): java.lang.NullPointerException

 

What options do I have?

 

Thank you.

--

It's dark in this basement.

Reply | Threaded
Open this post in threaded view
|

Re: What are the alternatives to nested DataFrames?

Shahab Yunus
2 options I can think of:

1- Can you perform a union of dfs returned by elastic research queries. It would still be distributed but I don't know if you will run out of how many union operations you can perform at a time.

2- Can you used some other api method of elastic search other than which returns a dataframe?

On Fri, Dec 28, 2018 at 10:30 PM <[hidden email]> wrote:

I could , but only if I had it beforehand.  I do not know what the dataframe is until I pass the query parameter and receive the resultant dataframe inside the iteration.  

 

The steps are :

 

Original DF -> Iterate -> Pass every element to a function that takes the element of the original DF and returns a new dataframe including all the matching terms

 

 

From: Andrew Melo <[hidden email]>
Sent: Friday, December 28, 2018 8:48 PM
To: [hidden email]
Cc: Shahab Yunus <[hidden email]>; user <[hidden email]>
Subject: Re: What are the alternatives to nested DataFrames?

 

Could you join() the DFs on a common key?

 

On Fri, Dec 28, 2018 at 18:35 <[hidden email]> wrote:

Shabad , I am not sure what you are trying to say. Could you please give me an example? The result of the Query is a Dataframe that is created after iterating, so I am not sure how could I map that to a column without iterating and getting the values.

 

I have a Dataframe that contains a list of cities for which I would like to iterate over and search in Elasticsearch.  This list is stored in Dataframe because it contains hundreds of thousands of elements with multiple properties that would not fit in a single machine.

 

The issue is that the elastic-spark connector returns a Dataframe as well which leads to a dataframe creation within a Dataframe

 

The only solution I found is to store the list of cities in a a regular scala Seq and iterate over that, but as far as I know this would make Seq centralized instead of distributed (run at the executor only?)

 

Full example :

 

val cities    = Seq("New York","Michigan")

cities.foreach(r => {

  val qb = QueryBuilders.matchQuery("name", r).operator(Operator.AND)
  print(qb.toString)

  val dfs = sqlContext.esDF("cities/docs", qb.toString) // Returns a dataframe for each city

  dfs.show() // Works as expected. It prints the individual dataframe with the result of the query

})

 

 

val cities = Seq("New York","Michigan").toDF()

 

    cities.foreach(r => {

 

      val city  = r.getString(0)

 

      val qb = QueryBuilders.matchQuery("name", city).operator(Operator.AND)

      print(qb.toString)

 

      val dfs = sqlContext.esDF("cities/docs", qb.toString) // null pointer

 

      dfs.show()

 

    })

 

 

From: Shahab Yunus <[hidden email]>
Sent: Friday, December 28, 2018 12:34 PM
To: [hidden email]
Cc: user <[hidden email]>
Subject: Re: What are the alternatives to nested DataFrames?

 

Can you have a dataframe with a column which stores json (type string)? Or you can also have a column of array type in which you store all cities matching your query.

 

 

 

On Fri, Dec 28, 2018 at 2:48 AM <[hidden email]> wrote:

Hi community , 

 

As shown in other answers online , Spark does not support the nesting of DataFrames , but what are the options?

 

I have the following scenario :

 

dataFrame1 = List of Cities

 

dataFrame2 = Created after searching in ElasticSearch for each city in dataFrame1

 

I've tried :

 

 val cities    = sc.parallelize(Seq("New York")).toDF()

   cities.foreach(r => {

    val companyName = r.getString(0)

    println(companyName)

    val dfs = sqlContext.esDF("cities/docs", "?q=" + companyName)  //returns a DataFrame consisting of all the cities matching the entry in cities

    })

 

Which triggers the expected null pointer exception

 

java.lang.NullPointerException

    at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:53)

    at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:51)

    at org.elasticsearch.spark.sql.package$SQLContextFunctions.esDF(package.scala:37)

    at Main$$anonfun$main$1.apply(Main.scala:43)

    at Main$$anonfun$main$1.apply(Main.scala:39)

    at scala.collection.Iterator$class.foreach(Iterator.scala:742)

    at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)

    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)

    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)

    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)

    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)

    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

    at org.apache.spark.scheduler.Task.run(Task.scala:109)

    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

    at java.lang.Thread.run(Thread.java:748)

2018-12-28 02:01:00 ERROR TaskSetManager:70 - Task 7 in stage 0.0 failed 1 times; aborting job

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 1 times, most recent failure: Lost task 7.0 in stage 0.0 (TID 7, localhost, executor driver): java.lang.NullPointerException

 

What options do I have?

 

Thank you.

--

It's dark in this basement.

Reply | Threaded
Open this post in threaded view
|

RE: What are the alternatives to nested DataFrames?

Yeikel

1 - I am not sure how can I do what you suggest for #1 because I  use the entries in the initial df to build the query and then from it I get the second df. Could you explain more?

 

2 - I also thought about doing what you consider in #2 , but if I am not mistaken If I use regular Scala data structures it won’t be distributed and it might run out of memory?

 

 

I also tried collecting the second dataframe to a Seq , but it also produced the null pointer.  

 

From: Shahab Yunus <[hidden email]>
Sent: Friday, December 28, 2018 11:21 PM
To: [hidden email]
Cc: Andrew Melo <[hidden email]>; user <[hidden email]>
Subject: Re: What are the alternatives to nested DataFrames?

 

2 options I can think of:

 

1- Can you perform a union of dfs returned by elastic research queries. It would still be distributed but I don't know if you will run out of how many union operations you can perform at a time.

 

2- Can you used some other api method of elastic search other than which returns a dataframe?

 

On Fri, Dec 28, 2018 at 10:30 PM <[hidden email]> wrote:

I could , but only if I had it beforehand.  I do not know what the dataframe is until I pass the query parameter and receive the resultant dataframe inside the iteration.  

 

The steps are :

 

Original DF -> Iterate -> Pass every element to a function that takes the element of the original DF and returns a new dataframe including all the matching terms

 

 

From: Andrew Melo <[hidden email]>
Sent: Friday, December 28, 2018 8:48 PM
To: [hidden email]
Cc: Shahab Yunus <[hidden email]>; user <[hidden email]>
Subject: Re: What are the alternatives to nested DataFrames?

 

Could you join() the DFs on a common key?

 

On Fri, Dec 28, 2018 at 18:35 <[hidden email]> wrote:

Shabad , I am not sure what you are trying to say. Could you please give me an example? The result of the Query is a Dataframe that is created after iterating, so I am not sure how could I map that to a column without iterating and getting the values.

 

I have a Dataframe that contains a list of cities for which I would like to iterate over and search in Elasticsearch.  This list is stored in Dataframe because it contains hundreds of thousands of elements with multiple properties that would not fit in a single machine.

 

The issue is that the elastic-spark connector returns a Dataframe as well which leads to a dataframe creation within a Dataframe

 

The only solution I found is to store the list of cities in a a regular scala Seq and iterate over that, but as far as I know this would make Seq centralized instead of distributed (run at the executor only?)

 

Full example :

 

val cities    = Seq("New York","Michigan")

cities.foreach(r => {

  val qb = QueryBuilders.matchQuery("name", r).operator(Operator.AND)
  print(qb.toString)

  val dfs = sqlContext.esDF("cities/docs", qb.toString) // Returns a dataframe for each city

  dfs.show() // Works as expected. It prints the individual dataframe with the result of the query

})

 

 

val cities = Seq("New York","Michigan").toDF()

 

    cities.foreach(r => {

 

      val city  = r.getString(0)

 

      val qb = QueryBuilders.matchQuery("name", city).operator(Operator.AND)

      print(qb.toString)

 

      val dfs = sqlContext.esDF("cities/docs", qb.toString) // null pointer

 

      dfs.show()

 

    })

 

 

From: Shahab Yunus <[hidden email]>
Sent: Friday, December 28, 2018 12:34 PM
To: [hidden email]
Cc: user <[hidden email]>
Subject: Re: What are the alternatives to nested DataFrames?

 

Can you have a dataframe with a column which stores json (type string)? Or you can also have a column of array type in which you store all cities matching your query.

 

 

 

On Fri, Dec 28, 2018 at 2:48 AM <[hidden email]> wrote:

Hi community , 

 

As shown in other answers online , Spark does not support the nesting of DataFrames , but what are the options?

 

I have the following scenario :

 

dataFrame1 = List of Cities

 

dataFrame2 = Created after searching in ElasticSearch for each city in dataFrame1

 

I've tried :

 

 val cities    = sc.parallelize(Seq("New York")).toDF()

   cities.foreach(r => {

    val companyName = r.getString(0)

    println(companyName)

    val dfs = sqlContext.esDF("cities/docs", "?q=" + companyName)  //returns a DataFrame consisting of all the cities matching the entry in cities

    })

 

Which triggers the expected null pointer exception

 

java.lang.NullPointerException

    at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:53)

    at org.elasticsearch.spark.sql.EsSparkSQL$.esDF(EsSparkSQL.scala:51)

    at org.elasticsearch.spark.sql.package$SQLContextFunctions.esDF(package.scala:37)

    at Main$$anonfun$main$1.apply(Main.scala:43)

    at Main$$anonfun$main$1.apply(Main.scala:39)

    at scala.collection.Iterator$class.foreach(Iterator.scala:742)

    at scala.collection.AbstractIterator.foreach(Iterator.scala:1194)

    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)

    at org.apache.spark.rdd.RDD$$anonfun$foreach$1$$anonfun$apply$28.apply(RDD.scala:921)

    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)

    at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2067)

    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

    at org.apache.spark.scheduler.Task.run(Task.scala:109)

    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)

    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)

    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)

    at java.lang.Thread.run(Thread.java:748)

2018-12-28 02:01:00 ERROR TaskSetManager:70 - Task 7 in stage 0.0 failed 1 times; aborting job

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 0.0 failed 1 times, most recent failure: Lost task 7.0 in stage 0.0 (TID 7, localhost, executor driver): java.lang.NullPointerException

 

What options do I have?

 

Thank you.

--

It's dark in this basement.