Ordering pushdown for Spark Datasources

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Ordering pushdown for Spark Datasources

Kohki Nishio
Hello,

I'm trying to use Spark SQL as a log analytics solution. As you might guess, for most use-cases, data is ordered by timestamp and the amount of data is large.

If I want to show the first 100 entries (ordered by timestamp) for a given condition, Spark Executor has to scan the whole entries to select the top 100 by timestamp. 

I understand this behavior, however, some of the data sources such as JDBC or Lucene can support ordering and in this case, the target data is large (a couple of millions). I believe it is possible to pushdown orderings to the data sources and make the executors return early.

Here's my ask, I know Spark doesn't do such a thing... but I'm looking for any pointers, references which might be relevant to this, or .. any random idea would be appreciated. So far I found, some folks are working on aggregation pushdown (SPARK-22390), but I don't see any current activity for ordering pushdown.

Thanks


--
Kohki Nishio
Reply | Threaded
Open this post in threaded view
|

Re: Ordering pushdown for Spark Datasources

Mich Talebzadeh
Hi,

A couple of clarifications:

  1. How is the log data stored on say HDFS?
  2. You stated show the first 100 entries for a given condition. That condition is a predicate itself?
There are articles for predicate pushdown in Spark. For example check


Although large is a relative term. So that a couple of millions is not that large. You can also try most of the following in spark-sql

spark-sql> set adaptive.enabled = true;
adaptive.enabled        true
Time taken: 0.011 seconds, Fetched 1 row(s)
spark-sql> set optimize.ppd=true;
optimize.ppd    true
Time taken: 0.011 seconds, Fetched 1 row(s)
spark-sql> set cbo.enables= true;
cbo.enables     true
Time taken: 0.01 seconds, Fetched 1 row(s)
spark-sql> set adaptive.enabled = true;
adaptive.enabled        true
Time taken: 0.01 seconds, Fetched 1 row(s)

Spark SQL is influenced by Hive SQL so you can leverage the pushdown in Hive SQL.

Check this link as well


HTH





   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sun, 4 Apr 2021 at 23:55, Kohki Nishio <[hidden email]> wrote:
Hello,

I'm trying to use Spark SQL as a log analytics solution. As you might guess, for most use-cases, data is ordered by timestamp and the amount of data is large.

If I want to show the first 100 entries (ordered by timestamp) for a given condition, Spark Executor has to scan the whole entries to select the top 100 by timestamp. 

I understand this behavior, however, some of the data sources such as JDBC or Lucene can support ordering and in this case, the target data is large (a couple of millions). I believe it is possible to pushdown orderings to the data sources and make the executors return early.

Here's my ask, I know Spark doesn't do such a thing... but I'm looking for any pointers, references which might be relevant to this, or .. any random idea would be appreciated. So far I found, some folks are working on aggregation pushdown (SPARK-22390), but I don't see any current activity for ordering pushdown.

Thanks


--
Kohki Nishio
Reply | Threaded
Open this post in threaded view
|

Re: Ordering pushdown for Spark Datasources

Kohki Nishio
The log data is stored in Lucene and I have a custom data source to access it. For example, the condition is log-level = INFO, this brings in a couple of million records per partition. Then there are hundreds of partitions involved in a query. Spark has to go through all the entries to show the first 100 entries, that is the problem. But if Spark is aware of datasource's ordering  support, it only needs to fetch 100 per partition... 

I'm wondering if Spark could do a merge-sort to make this type of query faster..

Thanks
-Kohki

On Mon, Apr 5, 2021 at 1:02 AM Mich Talebzadeh <[hidden email]> wrote:
Hi,

A couple of clarifications:

  1. How is the log data stored on say HDFS?
  2. You stated show the first 100 entries for a given condition. That condition is a predicate itself?
There are articles for predicate pushdown in Spark. For example check


Although large is a relative term. So that a couple of millions is not that large. You can also try most of the following in spark-sql

spark-sql> set adaptive.enabled = true;
adaptive.enabled        true
Time taken: 0.011 seconds, Fetched 1 row(s)
spark-sql> set optimize.ppd=true;
optimize.ppd    true
Time taken: 0.011 seconds, Fetched 1 row(s)
spark-sql> set cbo.enables= true;
cbo.enables     true
Time taken: 0.01 seconds, Fetched 1 row(s)
spark-sql> set adaptive.enabled = true;
adaptive.enabled        true
Time taken: 0.01 seconds, Fetched 1 row(s)

Spark SQL is influenced by Hive SQL so you can leverage the pushdown in Hive SQL.

Check this link as well


HTH





   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sun, 4 Apr 2021 at 23:55, Kohki Nishio <[hidden email]> wrote:
Hello,

I'm trying to use Spark SQL as a log analytics solution. As you might guess, for most use-cases, data is ordered by timestamp and the amount of data is large.

If I want to show the first 100 entries (ordered by timestamp) for a given condition, Spark Executor has to scan the whole entries to select the top 100 by timestamp. 

I understand this behavior, however, some of the data sources such as JDBC or Lucene can support ordering and in this case, the target data is large (a couple of millions). I believe it is possible to pushdown orderings to the data sources and make the executors return early.

Here's my ask, I know Spark doesn't do such a thing... but I'm looking for any pointers, references which might be relevant to this, or .. any random idea would be appreciated. So far I found, some folks are working on aggregation pushdown (SPARK-22390), but I don't see any current activity for ordering pushdown.

Thanks


--
Kohki Nishio


--
Kohki Nishio
Reply | Threaded
Open this post in threaded view
|

Re: Ordering pushdown for Spark Datasources

Mich Talebzadeh
Lucene. I came across it years ago.

Does Lucene support JDBC connection at all? How about Solr?

HTH




   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Tue, 6 Apr 2021 at 07:17, Kohki Nishio <[hidden email]> wrote:
The log data is stored in Lucene and I have a custom data source to access it. For example, the condition is log-level = INFO, this brings in a couple of million records per partition. Then there are hundreds of partitions involved in a query. Spark has to go through all the entries to show the first 100 entries, that is the problem. But if Spark is aware of datasource's ordering  support, it only needs to fetch 100 per partition... 

I'm wondering if Spark could do a merge-sort to make this type of query faster..

Thanks
-Kohki

On Mon, Apr 5, 2021 at 1:02 AM Mich Talebzadeh <[hidden email]> wrote:
Hi,

A couple of clarifications:

  1. How is the log data stored on say HDFS?
  2. You stated show the first 100 entries for a given condition. That condition is a predicate itself?
There are articles for predicate pushdown in Spark. For example check


Although large is a relative term. So that a couple of millions is not that large. You can also try most of the following in spark-sql

spark-sql> set adaptive.enabled = true;
adaptive.enabled        true
Time taken: 0.011 seconds, Fetched 1 row(s)
spark-sql> set optimize.ppd=true;
optimize.ppd    true
Time taken: 0.011 seconds, Fetched 1 row(s)
spark-sql> set cbo.enables= true;
cbo.enables     true
Time taken: 0.01 seconds, Fetched 1 row(s)
spark-sql> set adaptive.enabled = true;
adaptive.enabled        true
Time taken: 0.01 seconds, Fetched 1 row(s)

Spark SQL is influenced by Hive SQL so you can leverage the pushdown in Hive SQL.

Check this link as well


HTH





   view my Linkedin profile

 

Disclaimer: Use it at your own risk. Any and all responsibility for any loss, damage or destruction of data or any other property which may arise from relying on this email's technical content is explicitly disclaimed. The author will in no case be liable for any monetary damages arising from such loss, damage or destruction.

 



On Sun, 4 Apr 2021 at 23:55, Kohki Nishio <[hidden email]> wrote:
Hello,

I'm trying to use Spark SQL as a log analytics solution. As you might guess, for most use-cases, data is ordered by timestamp and the amount of data is large.

If I want to show the first 100 entries (ordered by timestamp) for a given condition, Spark Executor has to scan the whole entries to select the top 100 by timestamp. 

I understand this behavior, however, some of the data sources such as JDBC or Lucene can support ordering and in this case, the target data is large (a couple of millions). I believe it is possible to pushdown orderings to the data sources and make the executors return early.

Here's my ask, I know Spark doesn't do such a thing... but I'm looking for any pointers, references which might be relevant to this, or .. any random idea would be appreciated. So far I found, some folks are working on aggregation pushdown (SPARK-22390), but I don't see any current activity for ordering pushdown.

Thanks


--
Kohki Nishio


--
Kohki Nishio