Orc predicate pushdown with Spark Sql

Previous Topic Next Topic
 
classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Orc predicate pushdown with Spark Sql

Siva Gudavalli-2

Hello,

 

I am working with Spark SQL to query Hive Managed Table (in Orc Format)

 

I have my data organized by partitions and asked to set indexes for each 50,000 Rows by setting ('orc.row.index.stride'='50000') 

 

lets say -> after evaluating partition there are around 50 files in which data is organized.

 

Each file contains data specific to one given "cat" and I have set up a bloom filter on cat.

 

my spark SQL query looks like this ->

 

select * from logs where cdt= 20171002 and catpartkey= others and usrpartkey= logUsers and cat = 24;

 

I have set following property in my spark Sql context and assuming this will push down the filters 

sqlContext.setConf("spark.sql.orc.filterPushdown", "true")

 

Never my filters are being pushed down. and it seems like partition pruning is happening on all files. I dont understand no matter what my query is, it is triggering 50 tasks and reading all files. 

 

Here is my debug logs -> 

 

17/10/23 17:26:43 DEBUG Inode: >Inode Open file: /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0, size: 401517212, chunkSize: 268435456, fid: 2052.225472.4786362
17/10/23 17:26:43 DEBUG OrcInputFormat: No ORC pushdown predicate
17/10/23 17:26:43 INFO OrcRawRecordMerger: min key = null, max key = null
17/10/23 17:26:43 INFO ReaderImpl: Reading ORC rows from maprfs:///apps/spark/logs/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0 with {include: [true, true, false, false, false, false, true, false, false, false, false, false, false, false, false, false, false, false], offset: 0, length: 9223372036854775807}
17/10/23 17:26:43 DEBUG MapRClient: Open: path = /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0
17/10/23 17:26:43 DEBUG Inode: >Inode Open file: /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0, size: 401517212, chunkSize: 268435456, fid: 2052.225472.4786362
17/10/23 17:26:43 DEBUG RecordReaderImpl: chunks = [range start: 67684 end: 15790993, range start: 21131541 end: 21146035]
17/10/23 17:26:43 DEBUG RecordReaderImpl: merge = [data range [67684, 15790993), size: 15723309 type: array-backed, data range [21131541, 21146035), size: 14494 type: array-backed]
17/10/23 17:26:43 DEBUG Utilities: Hive Conf not found or Session not initiated, use thread based class loader instead
17/10/23 17:26:43 DEBUG HadoopTableReader: org.apache.hadoop.hive.ql.io.orc.OrcStruct$OrcStructInspector<org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableBinaryObjectInspector@e8220d5>
17/10/23 17:26:43 DEBUG GeneratePredicate: Generated predicate '(input[1, IntegerType] = 27)':

 

and here is my execution plan 

== Parsed Logical Plan ==
'Limit 1000
+- 'Sort ['id DESC], true
+- 'Project [unresolvedalias('id)]
+- 'Filter (((('cdt = 20171002) && ('catpartkey = others)) && ('usrpartkey = logUsers)) && ('cat = 27))
+- 'UnresolvedRelation `auditlogsv5`, None

== Analyzed Logical Plan ==
id: string
Limit 1000
+- Sort [id#165 DESC], true
+- Project [id#165]
+- Filter ((((cdt#162 = 20171002) && (catpartkey#163 = others)) && (usrpartkey#164 = logUsers)) && (cat#170 = 27))
+- MetastoreRelation default, auditlogsv5, None

== Optimized Logical Plan ==
Limit 1000
+- Sort [id#165 DESC], true
+- Project [id#165]
+- Filter ((((cdt#162 = 20171002) && (catpartkey#163 = others)) && (usrpartkey#164 = logUsers)) && (cat#170 = 27))
+- MetastoreRelation default, auditlogsv5, None

== Physical Plan ==
TakeOrderedAndProject(limit=1000, orderBy=[id#165 DESC], output=[id#165])
+- ConvertToSafe
+- Project [id#165]
+- Filter (cat#170 = 27)
+- HiveTableScan [id#165,cat#170], MetastoreRelation default, logs, None, [(cdt#162 = 20171002),(catpartkey#163 = others),(usrpartkey#164 = logUsers)]

 

 

Am I missing something here. I am on spark 1.6.1 and hive 1.2.0

 

please correct me. Thank you

Reply | Threaded
Open this post in threaded view
|

Re: Orc predicate pushdown with Spark Sql

Siva Gudavalli

Hello,
 
I have an update here. 
 
spark SQL is pushing predicates down, if I load the orc files in spark Context and Is not the same when I try to read hive Table directly.
please let me know if i am missing something here.
 
Is this supported in spark  ? 
 
when I load the files in spark Context 
scala> val hhhhhlogsv5 = sqlContext.read.format("orc").load("/user/hive/warehouse/hhhhhlogsv5")
17/10/24 16:11:15 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5 on driver
17/10/24 16:11:15 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003 on driver
17/10/24 16:11:15 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003/catpartkey=others on driver
17/10/24 16:11:15 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003/catpartkey=others/usrpartkey=hhhUsers on driver
hhhhhlogsv5: org.apache.spark.sql.DataFrame = [id: string, chn: string, ht: string, br: string, rg: string, cat: int, scat: int, usr: string, org: string, act: int, ctm: int, c1: string, c2: string, c3: string, d1: int, d2: int, doc: binary, cdt: int, catpartkey: string, usrpartkey: string]
scala> hhhhhlogsv5.registerTempTable("tempo")
scala> sqlContext.sql ( "selecT id from tempo where cdt=20171003 and usrpartkey = 'hhhUsers' and usr='AA0YP' order by id desc limit 10" ).explain
17/10/24 16:11:22 INFO ParseDriver: Parsing command: selecT id from tempo where cdt=20171003 and usrpartkey = 'hhhUsers' and usr='AA0YP' order by id desc limit 10
17/10/24 16:11:22 INFO ParseDriver: Parse Completed
17/10/24 16:11:22 INFO DataSourceStrategy: Selected 1 partitions out of 1, pruned 0.0% partitions.
17/10/24 16:11:22 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 164.5 KB, free 468.0 KB)
17/10/24 16:11:22 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 18.3 KB, free 486.4 KB)
17/10/24 16:11:22 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 172.21.158.61:43493 (size: 18.3 KB, free: 511.4 MB)
17/10/24 16:11:22 INFO SparkContext: Created broadcast 6 from explain at <console>:33
17/10/24 16:11:22 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 170.2 KB, free 656.6 KB)
17/10/24 16:11:22 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 18.8 KB, free 675.4 KB)
17/10/24 16:11:22 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on 172.21.158.61:43493 (size: 18.8 KB, free: 511.4 MB)
17/10/24 16:11:22 INFO SparkContext: Created broadcast 7 from explain at <console>:33
== Physical Plan ==
TakeOrderedAndProject(limit=10, orderBy=[id#145 DESC], output=[id#145])
+- ConvertToSafe
+- Project [id#145]
+- Filter (usr#152 = AA0YP)
+- Scan OrcRelation[id#145,usr#152] InputPaths: maprfs:///user/hive/warehouse/hhhhhlogsv5, PushedFilters: [EqualTo(usr,AA0YP)]
 
when i read this as hive Table 
 
scala> sqlContext.sql ( "selecT id from hhhhhlogsv5 where cdt=20171003 and usrpartkey = 'hhhUsers' and usr='AA0YP' order by id desc limit 10" ).explain
17/10/24 16:11:32 INFO ParseDriver: Parsing command: selecT id from hhhhhlogsv5 where cdt=20171003 and usrpartkey = 'hhhUsers' and usr='AA0YP' order by id desc limit 10
17/10/24 16:11:32 INFO ParseDriver: Parse Completed
17/10/24 16:11:32 INFO MemoryStore: Block broadcast_8 stored as values in memory (estimated size 399.1 KB, free 1074.6 KB)
17/10/24 16:11:32 INFO MemoryStore: Block broadcast_8_piece0 stored as bytes in memory (estimated size 42.7 KB, free 1117.2 KB)
17/10/24 16:11:32 INFO BlockManagerInfo: Added broadcast_8_piece0 in memory on 172.21.158.61:43493 (size: 42.7 KB, free: 511.4 MB)
17/10/24 16:11:32 INFO SparkContext: Created broadcast 8 from explain at <console>:33
== Physical Plan ==
TakeOrderedAndProject(limit=10, orderBy=[id#192 DESC], output=[id#192])
+- ConvertToSafe
+- Project [id#192]
+- Filter (usr#199 = AA0YP)
+- HiveTableScan [id#192,usr#199], MetastoreRelation default, hhhhhlogsv5, None, [(cdt#189 = 20171003),(usrpartkey#191 = hhhUsers)]
 
 
please let me know if i am missing anything here. thank you


On Monday, October 23, 2017 1:56 PM, Siva Gudavalli <[hidden email]> wrote:


Hello,
 
I am working with Spark SQL to query Hive Managed Table (in Orc Format)
 
I have my data organized by partitions and asked to set indexes for each 50,000 Rows by setting ('orc.row.index.stride'='50000') 
 
lets say -> after evaluating partition there are around 50 files in which data is organized.
 
Each file contains data specific to one given "cat" and I have set up a bloom filter on cat.
 
my spark SQL query looks like this ->
 
select * from logs where cdt= 20171002 and catpartkey= others and usrpartkey= logUsers and cat = 24;
 
I have set following property in my spark Sql context and assuming this will push down the filters 
sqlContext.setConf("spark.sql.orc.filterPushdown", "true")
 
Never my filters are being pushed down. and it seems like partition pruning is happening on all files. I dont understand no matter what my query is, it is triggering 50 tasks and reading all files. 
 
Here is my debug logs -> 
 
17/10/23 17:26:43 DEBUG Inode: >Inode Open file: /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0, size: 401517212, chunkSize: 268435456, fid: 2052.225472.4786362
17/10/23 17:26:43 DEBUG OrcInputFormat: No ORC pushdown predicate
17/10/23 17:26:43 INFO OrcRawRecordMerger: min key = null, max key = null
17/10/23 17:26:43 INFO ReaderImpl: Reading ORC rows from maprfs:///apps/spark/logs/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0 with {include: [true, true, false, false, false, false, true, false, false, false, false, false, false, false, false, false, false, false], offset: 0, length: 9223372036854775807}
17/10/23 17:26:43 DEBUG MapRClient: Open: path = /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0
17/10/23 17:26:43 DEBUG Inode: >Inode Open file: /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0, size: 401517212, chunkSize: 268435456, fid: 2052.225472.4786362
17/10/23 17:26:43 DEBUG RecordReaderImpl: chunks = [range start: 67684 end: 15790993, range start: 21131541 end: 21146035]
17/10/23 17:26:43 DEBUG RecordReaderImpl: merge = [data range [67684, 15790993), size: 15723309 type: array-backed, data range [21131541, 21146035), size: 14494 type: array-backed]
17/10/23 17:26:43 DEBUG Utilities: Hive Conf not found or Session not initiated, use thread based class loader instead
17/10/23 17:26:43 DEBUG HadoopTableReader: org.apache.hadoop.hive.ql.io.orc.OrcStruct$OrcStructInspector<org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableBinaryObjectInspector@e8220d5>
17/10/23 17:26:43 DEBUG GeneratePredicate: Generated predicate '(input[1, IntegerType] = 27)':
 
and here is my execution plan 
== Parsed Logical Plan ==
'Limit 1000
+- 'Sort ['id DESC], true
+- 'Project [unresolvedalias('id)]
+- 'Filter (((('cdt = 20171002) && ('catpartkey = others)) && ('usrpartkey = logUsers)) && ('cat = 27))
+- 'UnresolvedRelation `auditlogsv5`, None
== Analyzed Logical Plan ==
id: string
Limit 1000
+- Sort [id#165 DESC], true
+- Project [id#165]
+- Filter ((((cdt#162 = 20171002) && (catpartkey#163 = others)) && (usrpartkey#164 = logUsers)) && (cat#170 = 27))
+- MetastoreRelation default, auditlogsv5, None
== Optimized Logical Plan ==
Limit 1000
+- Sort [id#165 DESC], true
+- Project [id#165]
+- Filter ((((cdt#162 = 20171002) && (catpartkey#163 = others)) && (usrpartkey#164 = logUsers)) && (cat#170 = 27))
+- MetastoreRelation default, auditlogsv5, None
== Physical Plan ==
TakeOrderedAndProject(limit=1000, orderBy=[id#165 DESC], output=[id#165])
+- ConvertToSafe
+- Project [id#165]
+- Filter (cat#170 = 27)
+- HiveTableScan [id#165,cat#170], MetastoreRelation default, logs, None, [(cdt#162 = 20171002),(catpartkey#163 = others),(usrpartkey#164 = logUsers)]
 
 
Am I missing something here. I am on spark 1.6.1 and hive 1.2.0
 
please correct me. Thank you


Reply | Threaded
Open this post in threaded view
|

Re: Orc predicate pushdown with Spark Sql

Jörn Franke
Well the meta information is in the file so I am not surprised that it reads the file, but it should not read all the content, which is probably also not happening. 

On 24. Oct 2017, at 18:16, Siva Gudavalli <[hidden email]> wrote:


Hello,
 
I have an update here. 
 
spark SQL is pushing predicates down, if I load the orc files in spark Context and Is not the same when I try to read hive Table directly.
please let me know if i am missing something here.
 
Is this supported in spark  ? 
 
when I load the files in spark Context 
scala> val hhhhhlogsv5 = sqlContext.read.format("orc").load("/user/hive/warehouse/hhhhhlogsv5")
17/10/24 16:11:15 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5 on driver
17/10/24 16:11:15 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003 on driver
17/10/24 16:11:15 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003/catpartkey=others on driver
17/10/24 16:11:15 INFO OrcRelation: Listing maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003/catpartkey=others/usrpartkey=hhhUsers on driver
hhhhhlogsv5: org.apache.spark.sql.DataFrame = [id: string, chn: string, ht: string, br: string, rg: string, cat: int, scat: int, usr: string, org: string, act: int, ctm: int, c1: string, c2: string, c3: string, d1: int, d2: int, doc: binary, cdt: int, catpartkey: string, usrpartkey: string]
scala> hhhhhlogsv5.registerTempTable("tempo")
scala> sqlContext.sql ( "selecT id from tempo where cdt=20171003 and usrpartkey = 'hhhUsers' and usr='AA0YP' order by id desc limit 10" ).explain
17/10/24 16:11:22 INFO ParseDriver: Parsing command: selecT id from tempo where cdt=20171003 and usrpartkey = 'hhhUsers' and usr='AA0YP' order by id desc limit 10
17/10/24 16:11:22 INFO ParseDriver: Parse Completed
17/10/24 16:11:22 INFO DataSourceStrategy: Selected 1 partitions out of 1, pruned 0.0% partitions.
17/10/24 16:11:22 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 164.5 KB, free 468.0 KB)
17/10/24 16:11:22 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 18.3 KB, free 486.4 KB)
17/10/24 16:11:22 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 172.21.158.61:43493 (size: 18.3 KB, free: 511.4 MB)
17/10/24 16:11:22 INFO SparkContext: Created broadcast 6 from explain at <console>:33
17/10/24 16:11:22 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 170.2 KB, free 656.6 KB)
17/10/24 16:11:22 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 18.8 KB, free 675.4 KB)
17/10/24 16:11:22 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on 172.21.158.61:43493 (size: 18.8 KB, free: 511.4 MB)
17/10/24 16:11:22 INFO SparkContext: Created broadcast 7 from explain at <console>:33
== Physical Plan ==
TakeOrderedAndProject(limit=10, orderBy=[id#145 DESC], output=[id#145])
+- ConvertToSafe
+- Project [id#145]
+- Filter (usr#152 = AA0YP)
+- Scan OrcRelation[id#145,usr#152] InputPaths: maprfs:///user/hive/warehouse/hhhhhlogsv5, PushedFilters: [EqualTo(usr,AA0YP)]
 
when i read this as hive Table 
 
scala> sqlContext.sql ( "selecT id from hhhhhlogsv5 where cdt=20171003 and usrpartkey = 'hhhUsers' and usr='AA0YP' order by id desc limit 10" ).explain
17/10/24 16:11:32 INFO ParseDriver: Parsing command: selecT id from hhhhhlogsv5 where cdt=20171003 and usrpartkey = 'hhhUsers' and usr='AA0YP' order by id desc limit 10
17/10/24 16:11:32 INFO ParseDriver: Parse Completed
17/10/24 16:11:32 INFO MemoryStore: Block broadcast_8 stored as values in memory (estimated size 399.1 KB, free 1074.6 KB)
17/10/24 16:11:32 INFO MemoryStore: Block broadcast_8_piece0 stored as bytes in memory (estimated size 42.7 KB, free 1117.2 KB)
17/10/24 16:11:32 INFO BlockManagerInfo: Added broadcast_8_piece0 in memory on 172.21.158.61:43493 (size: 42.7 KB, free: 511.4 MB)
17/10/24 16:11:32 INFO SparkContext: Created broadcast 8 from explain at <console>:33
== Physical Plan ==
TakeOrderedAndProject(limit=10, orderBy=[id#192 DESC], output=[id#192])
+- ConvertToSafe
+- Project [id#192]
+- Filter (usr#199 = AA0YP)
+- HiveTableScan [id#192,usr#199], MetastoreRelation default, hhhhhlogsv5, None, [(cdt#189 = 20171003),(usrpartkey#191 = hhhUsers)]
 
 
please let me know if i am missing anything here. thank you


On Monday, October 23, 2017 1:56 PM, Siva Gudavalli <[hidden email]> wrote:


Hello,
 
I am working with Spark SQL to query Hive Managed Table (in Orc Format)
 
I have my data organized by partitions and asked to set indexes for each 50,000 Rows by setting ('orc.row.index.stride'='50000') 
 
lets say -> after evaluating partition there are around 50 files in which data is organized.
 
Each file contains data specific to one given "cat" and I have set up a bloom filter on cat.
 
my spark SQL query looks like this ->
 
select * from logs where cdt= 20171002 and catpartkey= others and usrpartkey= logUsers and cat = 24;
 
I have set following property in my spark Sql context and assuming this will push down the filters 
sqlContext.setConf("spark.sql.orc.filterPushdown", "true")
 
Never my filters are being pushed down. and it seems like partition pruning is happening on all files. I dont understand no matter what my query is, it is triggering 50 tasks and reading all files. 
 
Here is my debug logs -> 
 
17/10/23 17:26:43 DEBUG Inode: >Inode Open file: /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0, size: 401517212, chunkSize: 268435456, fid: 2052.225472.4786362
17/10/23 17:26:43 DEBUG OrcInputFormat: No ORC pushdown predicate
17/10/23 17:26:43 INFO OrcRawRecordMerger: min key = null, max key = null
17/10/23 17:26:43 INFO ReaderImpl: Reading ORC rows from maprfs:///apps/spark/logs/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0 with {include: [true, true, false, false, false, false, true, false, false, false, false, false, false, false, false, false, false, false], offset: 0, length: 9223372036854775807}
17/10/23 17:26:43 DEBUG MapRClient: Open: path = /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0
17/10/23 17:26:43 DEBUG Inode: >Inode Open file: /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0, size: 401517212, chunkSize: 268435456, fid: 2052.225472.4786362
17/10/23 17:26:43 DEBUG RecordReaderImpl: chunks = [range start: 67684 end: 15790993, range start: 21131541 end: 21146035]
17/10/23 17:26:43 DEBUG RecordReaderImpl: merge = [data range [67684, 15790993), size: 15723309 type: array-backed, data range [21131541, 21146035), size: 14494 type: array-backed]
17/10/23 17:26:43 DEBUG Utilities: Hive Conf not found or Session not initiated, use thread based class loader instead
17/10/23 17:26:43 DEBUG HadoopTableReader: org.apache.hadoop.hive.ql.io.orc.OrcStruct$OrcStructInspector<org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableBinaryObjectInspector@e8220d5>
17/10/23 17:26:43 DEBUG GeneratePredicate: Generated predicate '(input[1, IntegerType] = 27)':
 
and here is my execution plan 
== Parsed Logical Plan ==
'Limit 1000
+- 'Sort ['id DESC], true
+- 'Project [unresolvedalias('id)]
+- 'Filter (((('cdt = 20171002) && ('catpartkey = others)) && ('usrpartkey = logUsers)) && ('cat = 27))
+- 'UnresolvedRelation `auditlogsv5`, None
== Analyzed Logical Plan ==
id: string
Limit 1000
+- Sort [id#165 DESC], true
+- Project [id#165]
+- Filter ((((cdt#162 = 20171002) && (catpartkey#163 = others)) && (usrpartkey#164 = logUsers)) && (cat#170 = 27))
+- MetastoreRelation default, auditlogsv5, None
== Optimized Logical Plan ==
Limit 1000
+- Sort [id#165 DESC], true
+- Project [id#165]
+- Filter ((((cdt#162 = 20171002) && (catpartkey#163 = others)) && (usrpartkey#164 = logUsers)) && (cat#170 = 27))
+- MetastoreRelation default, auditlogsv5, None
== Physical Plan ==
TakeOrderedAndProject(limit=1000, orderBy=[id#165 DESC], output=[id#165])
+- ConvertToSafe
+- Project [id#165]
+- Filter (cat#170 = 27)
+- HiveTableScan [id#165,cat#170], MetastoreRelation default, logs, None, [(cdt#162 = 20171002),(catpartkey#163 = others),(usrpartkey#164 = logUsers)]
 
 
Am I missing something here. I am on spark 1.6.1 and hive 1.2.0
 
please correct me. Thank you


Reply | Threaded
Open this post in threaded view
|

Re: Orc predicate pushdown with Spark Sql

Siva Gudavalli

I found a workaround, when I create Hive Table using Spark “saveAsTable”, I see filters being pushed down.

-> other approaches I tried where filters are not pushed down Is, 

1) when I create Hive Table upfront and load orc into it using Spark SQL
2) when I create orc files using spark SQL and then create Hive External Table

If my understanding is correct, when I use saveAsTable spark is using & also registering Hive Metastore with its custom Serde and Is able to pushdown filters. 
Please correct me.

Another question, 

When i am writing Orc to hive using “saveAsTable”, is there any way I can provide details about Orc Files.
for instance: stripe.size, can i create bloom filters etc… 


Regards
Shiv



On Oct 25, 2017, at 1:37 AM, Jörn Franke <[hidden email]> wrote:

Well the meta information is in the file so I am not surprised that it reads the file, but it should not read all the content, which is probably also not happening. 

On 24. Oct 2017, at 18:16, Siva Gudavalli <[hidden email]> wrote:


Hello,
 
I have an update here. 
 
spark SQL is pushing predicates down, if I load the orc files in spark Context and Is not the same when I try to read hive Table directly.
please let me know if i am missing something here.
 
Is this supported in spark  ? 
 
when I load the files in spark Context 
scala> val hhhhhlogsv5 = sqlContext.read.format("orc").load("/user/hive/warehouse/hhhhhlogsv5")
17/10/24 16:11:15 INFO OrcRelation: Listing <a href="maprfs:///user/hive/warehouse/hhhhhlogsv5" class="">maprfs:///user/hive/warehouse/hhhhhlogsv5 on driver
17/10/24 16:11:15 INFO OrcRelation: Listing <a href="maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003" class="">maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003 on driver
17/10/24 16:11:15 INFO OrcRelation: Listing <a href="maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003/catpartkey=others" class="">maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003/catpartkey=others on driver
17/10/24 16:11:15 INFO OrcRelation: Listing <a href="maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003/catpartkey=others/usrpartkey=hhhUsers" class="">maprfs:///user/hive/warehouse/hhhhhlogsv5/cdt=20171003/catpartkey=others/usrpartkey=hhhUsers on driver
hhhhhlogsv5: org.apache.spark.sql.DataFrame = [id: string, chn: string, ht: string, br: string, rg: string, cat: int, scat: int, usr: string, org: string, act: int, ctm: int, c1: string, c2: string, c3: string, d1: int, d2: int, doc: binary, cdt: int, catpartkey: string, usrpartkey: string]
scala> hhhhhlogsv5.registerTempTable("tempo")
scala> sqlContext.sql ( "selecT id from tempo where cdt=20171003 and usrpartkey = 'hhhUsers' and usr='AA0YP' order by id desc limit 10" ).explain
17/10/24 16:11:22 INFO ParseDriver: Parsing command: selecT id from tempo where cdt=20171003 and usrpartkey = 'hhhUsers' and usr='AA0YP' order by id desc limit 10
17/10/24 16:11:22 INFO ParseDriver: Parse Completed
17/10/24 16:11:22 INFO DataSourceStrategy: Selected 1 partitions out of 1, pruned 0.0% partitions.
17/10/24 16:11:22 INFO MemoryStore: Block broadcast_6 stored as values in memory (estimated size 164.5 KB, free 468.0 KB)
17/10/24 16:11:22 INFO MemoryStore: Block broadcast_6_piece0 stored as bytes in memory (estimated size 18.3 KB, free 486.4 KB)
17/10/24 16:11:22 INFO BlockManagerInfo: Added broadcast_6_piece0 in memory on 172.21.158.61:43493 (size: 18.3 KB, free: 511.4 MB)
17/10/24 16:11:22 INFO SparkContext: Created broadcast 6 from explain at <console>:33
17/10/24 16:11:22 INFO MemoryStore: Block broadcast_7 stored as values in memory (estimated size 170.2 KB, free 656.6 KB)
17/10/24 16:11:22 INFO MemoryStore: Block broadcast_7_piece0 stored as bytes in memory (estimated size 18.8 KB, free 675.4 KB)
17/10/24 16:11:22 INFO BlockManagerInfo: Added broadcast_7_piece0 in memory on 172.21.158.61:43493 (size: 18.8 KB, free: 511.4 MB)
17/10/24 16:11:22 INFO SparkContext: Created broadcast 7 from explain at <console>:33
== Physical Plan ==
TakeOrderedAndProject(limit=10, orderBy=[id#145 DESC], output=[id#145])
+- ConvertToSafe
+- Project [id#145]
+- Filter (usr#152 = AA0YP)
+- Scan OrcRelation[id#145,usr#152] InputPaths: <a href="maprfs:///user/hive/warehouse/hhhhhlogsv5" class="">maprfs:///user/hive/warehouse/hhhhhlogsv5, PushedFilters: [EqualTo(usr,AA0YP)]
 
when i read this as hive Table 
 
scala> sqlContext.sql ( "selecT id from hhhhhlogsv5 where cdt=20171003 and usrpartkey = 'hhhUsers' and usr='AA0YP' order by id desc limit 10" ).explain
17/10/24 16:11:32 INFO ParseDriver: Parsing command: selecT id from hhhhhlogsv5 where cdt=20171003 and usrpartkey = 'hhhUsers' and usr='AA0YP' order by id desc limit 10
17/10/24 16:11:32 INFO ParseDriver: Parse Completed
17/10/24 16:11:32 INFO MemoryStore: Block broadcast_8 stored as values in memory (estimated size 399.1 KB, free 1074.6 KB)
17/10/24 16:11:32 INFO MemoryStore: Block broadcast_8_piece0 stored as bytes in memory (estimated size 42.7 KB, free 1117.2 KB)
17/10/24 16:11:32 INFO BlockManagerInfo: Added broadcast_8_piece0 in memory on 172.21.158.61:43493 (size: 42.7 KB, free: 511.4 MB)
17/10/24 16:11:32 INFO SparkContext: Created broadcast 8 from explain at <console>:33
== Physical Plan ==
TakeOrderedAndProject(limit=10, orderBy=[id#192 DESC], output=[id#192])
+- ConvertToSafe
+- Project [id#192]
+- Filter (usr#199 = AA0YP)
+- HiveTableScan [id#192,usr#199], MetastoreRelation default, hhhhhlogsv5, None, [(cdt#189 = 20171003),(usrpartkey#191 = hhhUsers)]
 
 
please let me know if i am missing anything here. thank you


On Monday, October 23, 2017 1:56 PM, Siva Gudavalli <[hidden email]> wrote:


Hello,
 
I am working with Spark SQL to query Hive Managed Table (in Orc Format)
 
I have my data organized by partitions and asked to set indexes for each 50,000 Rows by setting ('orc.row.index.stride'='50000') 
 
lets say -> after evaluating partition there are around 50 files in which data is organized.
 
Each file contains data specific to one given "cat" and I have set up a bloom filter on cat.
 
my spark SQL query looks like this ->
 
select * from logs where cdt= 20171002 and catpartkey= others and usrpartkey= logUsers and cat = 24;
 
I have set following property in my spark Sql context and assuming this will push down the filters 
sqlContext.setConf("spark.sql.orc.filterPushdown", "true")
 
Never my filters are being pushed down. and it seems like partition pruning is happening on all files. I dont understand no matter what my query is, it is triggering 50 tasks and reading all files. 
 
Here is my debug logs -> 
 
17/10/23 17:26:43 DEBUG Inode: >Inode Open file: /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0, size: 401517212, chunkSize: 268435456, fid: 2052.225472.4786362
17/10/23 17:26:43 DEBUG OrcInputFormat: No ORC pushdown predicate
17/10/23 17:26:43 INFO OrcRawRecordMerger: min key = null, max key = null
17/10/23 17:26:43 INFO ReaderImpl: Reading ORC rows from <a href="maprfs:///apps/spark/logs/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0" class="">maprfs:///apps/spark/logs/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0 with {include: [true, true, false, false, false, false, true, false, false, false, false, false, false, false, false, false, false, false], offset: 0, length: 9223372036854775807}
17/10/23 17:26:43 DEBUG MapRClient: Open: path = /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0
17/10/23 17:26:43 DEBUG Inode: >Inode Open file: /apps/spark/auditlogsv5/cdt=20171002/catpartkey=others/usrpartkey=logUsers/000026_0, size: 401517212, chunkSize: 268435456, fid: 2052.225472.4786362
17/10/23 17:26:43 DEBUG RecordReaderImpl: chunks = [range start: 67684 end: 15790993, range start: 21131541 end: 21146035]
17/10/23 17:26:43 DEBUG RecordReaderImpl: merge = [data range [67684, 15790993), size: 15723309 type: array-backed, data range [21131541, 21146035), size: 14494 type: array-backed]
17/10/23 17:26:43 DEBUG Utilities: Hive Conf not found or Session not initiated, use thread based class loader instead
17/10/23 17:26:43 DEBUG HadoopTableReader: org.apache.hadoop.hive.ql.io.orc.OrcStruct$OrcStructInspector<org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org..apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive..WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableStringObjectInspector@7c1bb45,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableIntObjectInspector@3fedb2fd,org.apache.hadoop.hive.serde2.objectinspector.primitive.WritableBinaryObjectInspector@e8220d5>
17/10/23 17:26:43 DEBUG GeneratePredicate: Generated predicate '(input[1, IntegerType] = 27)':
 
and here is my execution plan 
== Parsed Logical Plan ==
'Limit 1000
+- 'Sort ['id DESC], true
+- 'Project [unresolvedalias('id)]
+- 'Filter (((('cdt = 20171002) && ('catpartkey = others)) && ('usrpartkey = logUsers)) && ('cat = 27))
+- 'UnresolvedRelation `auditlogsv5`, None
== Analyzed Logical Plan ==
id: string
Limit 1000
+- Sort [id#165 DESC], true
+- Project [id#165]
+- Filter ((((cdt#162 = 20171002) && (catpartkey#163 = others)) && (usrpartkey#164 = logUsers)) && (cat#170 = 27))
+- MetastoreRelation default, auditlogsv5, None
== Optimized Logical Plan ==
Limit 1000
+- Sort [id#165 DESC], true
+- Project [id#165]
+- Filter ((((cdt#162 = 20171002) && (catpartkey#163 = others)) && (usrpartkey#164 = logUsers)) && (cat#170 = 27))
+- MetastoreRelation default, auditlogsv5, None
== Physical Plan ==
TakeOrderedAndProject(limit=1000, orderBy=[id#165 DESC], output=[id#165])
+- ConvertToSafe
+- Project [id#165]
+- Filter (cat#170 = 27)
+- HiveTableScan [id#165,cat#170], MetastoreRelation default, logs, None, [(cdt#162 = 20171002),(catpartkey#163 = others),(usrpartkey#164 = logUsers)]
 
 
Am I missing something here. I am on spark 1.6.1 and hive 1.2.0
 
please correct me. Thank you