Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

classic Classic list List threaded Threaded
13 messages Options
Reply | Threaded
Open this post in threaded view
|

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

Raju Bairishetti
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, sparkdriver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde fromspark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)


On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti <[hidden email]> wrote:
Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true (Default value for this is false) 
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}
...
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions = table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?

On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti <[hidden email]> wrote:
Waiting for suggestions/help on this... 

On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti <[hidden email]> wrote:
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, spark driver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde from spark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)

------
Thanks,
Raju Bairishetti,



--

------
Thanks,
Raju Bairishetti,



--

------
Thanks,
Raju Bairishetti,



--

------
Thanks,
Raju Bairishetti,
Reply | Threaded
Open this post in threaded view
|

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

Raju Bairishetti
Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true (Default value for this is false) 
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}
...
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions = table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?

On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti <[hidden email]> wrote:
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, sparkdriver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde fromspark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)


On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti <[hidden email]> wrote:
Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true (Default value for this is false) 
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}
...
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions = table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?

On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti <[hidden email]> wrote:
Waiting for suggestions/help on this... 

On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti <[hidden email]> wrote:
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, spark driver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde from spark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)

------
Thanks,
Raju Bairishetti,



--

------
Thanks,
Raju Bairishetti,



--

------
Thanks,
Raju Bairishetti,



--

------
Thanks,
Raju Bairishetti,



--

------
Thanks,
Raju Bairishetti,
Reply | Threaded
Open this post in threaded view
|

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

Michael Allman-2
Hi Raju,

I'm sorry this isn't working for you. I helped author this functionality and will try my best to help.

First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to false? Can you link specifically to the jira issue or spark pr you referred to? The first thing I would try is setting spark.sql.hive.convertMetastoreParquet to true. Setting that to false might also explain why you're getting parquet decode errors. If you're writing your table data with Spark's parquet file writer and reading with Hive's parquet file reader, there may be an incompatibility accounting for the decode errors you're seeing. 

Can you reply with your table's Hive metastore schema, including partition schema? Where are the table's files located? If you do a "show partitions <dbname>.<tablename>" in the spark-sql shell, does it show the partitions you expect to see? If not, run "msck repair table <dbname>.<tablename>".

Cheers,

Michael


On Jan 17, 2017, at 12:02 AM, Raju Bairishetti <[hidden email]> wrote:

Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true (Default value for this is false) 
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}
...
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions = table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?

On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti <[hidden email]> wrote:
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, sparkdriver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde fromspark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)


On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti <[hidden email]> wrote:
Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true (Default value for this is false) 
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}
...
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions = table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?

On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti <[hidden email]> wrote:
Waiting for suggestions/help on this... 

On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti <[hidden email]> wrote:
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, spark driver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde from spark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)

------
Thanks,
Raju Bairishetti,



--

------
Thanks,
Raju Bairishetti,



--

------
Thanks,
Raju Bairishetti,



--

------
Thanks,
Raju Bairishetti,



--

------
Thanks,
Raju Bairishetti,

Reply | Threaded
Open this post in threaded view
|

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

Raju Bairishetti
Thanks Michael for the respopnse.


On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman <[hidden email]> wrote:
Hi Raju,

I'm sorry this isn't working for you. I helped author this functionality and will try my best to help.

First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to false?
I had set as suggested in SPARK-6910 and corresponsing pull reqs. It did not work for me without  setting spark.sql.hive.convertMetastoreParquet property. 

Can you link specifically to the jira issue or spark pr you referred to? The first thing I would try is setting spark.sql.hive.convertMetastoreParquet to true. Setting that to false might also explain why you're getting parquet decode errors. If you're writing your table data with Spark's parquet file writer and reading with Hive's parquet file reader, there may be an incompatibility accounting for the decode errors you're seeing. 

 https://issues.apache.org/jira/browse/SPARK-6910 . My main motivation is to avoid fetching all the partitions. We reverted spark.sql.hive.convertMetastoreParquet  setting to true to decoding errors. After reverting this it is fetching all partiitons from the table.

Can you reply with your table's Hive metastore schema, including partition schema?
     col1 string
     col2 string
     year int
     month int
     day int
     hour int   

# Partition Information  

# col_name            data_type           comment    

year  int

month int

day int

hour int

venture string

 
Where are the table's files located?
In hadoop. Under some user directory. 
If you do a "show partitions <dbname>.<tablename>" in the spark-sql shell, does it show the partitions you expect to see? If not, run "msck repair table <dbname>.<tablename>".
Yes. It is listing the partitions
Cheers,

Michael


On Jan 17, 2017, at 12:02 AM, Raju Bairishetti <[hidden email]> wrote:

Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true (Default value for this is false) 
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}
...
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions = table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?

On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti <[hidden email]> wrote:
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, sparkdriver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde fromspark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)


On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti <[hidden email]> wrote:
Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true (Default value for this is false) 
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}
...
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions = table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?

On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti <[hidden email]> wrote:
Waiting for suggestions/help on this... 

On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti <[hidden email]> wrote:
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, spark driver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde from spark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)

------
Thanks,
Raju Bairishetti,



--

------
Thanks,
Raju Bairishetti,



--

------
Thanks,
Raju Bairishetti,



--

------
Thanks,
Raju Bairishetti,



--

------
Thanks,
Raju Bairishetti,




--

------
Thanks,
Raju Bairishetti,
Reply | Threaded
Open this post in threaded view
|

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

Michael Allman-2
What is the physical query plan after you set spark.sql.hive.convertMetastoreParquet to true?

Michael

On Jan 17, 2017, at 6:51 PM, Raju Bairishetti <[hidden email]> wrote:

Thanks Michael for the respopnse.


On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman <[hidden email]> wrote:
Hi Raju,

I'm sorry this isn't working for you. I helped author this functionality and will try my best to help.

First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to false? 
I had set as suggested in SPARK-6910 and corresponsing pull reqs. It did not work for me without  setting spark.sql.hive.convertMetastoreParquet property. 

Can you link specifically to the jira issue or spark pr you referred to? The first thing I would try is setting spark.sql.hive.convertMetastoreParquet to true. Setting that to false might also explain why you're getting parquet decode errors. If you're writing your table data with Spark's parquet file writer and reading with Hive's parquet file reader, there may be an incompatibility accounting for the decode errors you're seeing. 

 https://issues.apache.org/jira/browse/SPARK-6910 . My main motivation is to avoid fetching all the partitions. We reverted spark.sql.hive.convertMetastoreParquet  setting to true to decoding errors. After reverting this it is fetching all partiitons from the table.

Can you reply with your table's Hive metastore schema, including partition schema?
     col1 string
     col2 string
     year int
     month int
     day int
     hour int   

# Partition Information  

# col_name            data_type           comment    

year  int

month int

day int

hour int

venture string

 
Where are the table's files located?
In hadoop. Under some user directory. 
If you do a "show partitions <dbname>.<tablename>" in the spark-sql shell, does it show the partitions you expect to see? If not, run "msck repair table <dbname>.<tablename>".
Yes. It is listing the partitions
Cheers,

Michael


On Jan 17, 2017, at 12:02 AM, Raju Bairishetti <[hidden email]> wrote:

Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true (Default value for this is false) 
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}
...
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions = table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?

On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti <[hidden email]> wrote:
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, sparkdriver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde fromspark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)


On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti <[hidden email]> wrote:
Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true (Default value for this is false) 
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}
...
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions = table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?

On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti <[hidden email]> wrote:
Waiting for suggestions/help on this... 

On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti <[hidden email]> wrote:
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, spark driver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde from spark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,




-- 

------
Thanks,
Raju Bairishetti,

Reply | Threaded
Open this post in threaded view
|

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

Raju Bairishetti

On Wed, Jan 18, 2017 at 11:13 AM, Michael Allman <[hidden email]> wrote:
What is the physical query plan after you set spark.sql.hive.convertMetastoreParquet to true?
Physical plan continas all the partition locations 

Michael

On Jan 17, 2017, at 6:51 PM, Raju Bairishetti <[hidden email]> wrote:

Thanks Michael for the respopnse.


On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman <[hidden email]> wrote:
Hi Raju,

I'm sorry this isn't working for you. I helped author this functionality and will try my best to help.

First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to false? 
I had set as suggested in SPARK-6910 and corresponsing pull reqs. It did not work for me without  setting spark.sql.hive.convertMetastoreParquet property. 

Can you link specifically to the jira issue or spark pr you referred to? The first thing I would try is setting spark.sql.hive.convertMetastoreParquet to true. Setting that to false might also explain why you're getting parquet decode errors. If you're writing your table data with Spark's parquet file writer and reading with Hive's parquet file reader, there may be an incompatibility accounting for the decode errors you're seeing. 

 https://issues.apache.org/jira/browse/SPARK-6910 . My main motivation is to avoid fetching all the partitions. We reverted spark.sql.hive.convertMetastoreParquet  setting to true to decoding errors. After reverting this it is fetching all partiitons from the table.

Can you reply with your table's Hive metastore schema, including partition schema?
     col1 string
     col2 string
     year int
     month int
     day int
     hour int   

# Partition Information  

# col_name            data_type           comment    

year  int

month int

day int

hour int

venture string

 
Where are the table's files located?
In hadoop. Under some user directory. 
If you do a "show partitions <dbname>.<tablename>" in the spark-sql shell, does it show the partitions you expect to see? If not, run "msck repair table <dbname>.<tablename>".
Yes. It is listing the partitions
Cheers,

Michael


On Jan 17, 2017, at 12:02 AM, Raju Bairishetti <[hidden email]> wrote:

Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true (Default value for this is false) 
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}
...
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions = table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?

On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti <[hidden email]> wrote:
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, sparkdriver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde fromspark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)


On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti <[hidden email]> wrote:
Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true (Default value for this is false) 
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}
...
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions = table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?

On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti <[hidden email]> wrote:
Waiting for suggestions/help on this... 

On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti <[hidden email]> wrote:
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, spark driver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde from spark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,




-- 

------
Thanks,
Raju Bairishetti,




--

------
Thanks,
Raju Bairishetti,
Reply | Threaded
Open this post in threaded view
|

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

Michael Allman-2
Can you paste the actual query plan here, please?

On Jan 17, 2017, at 7:38 PM, Raju Bairishetti <[hidden email]> wrote:


On Wed, Jan 18, 2017 at 11:13 AM, Michael Allman <[hidden email]> wrote:
What is the physical query plan after you set spark.sql.hive.convertMetastoreParquet to true?
Physical plan continas all the partition locations 

Michael

On Jan 17, 2017, at 6:51 PM, Raju Bairishetti <[hidden email]> wrote:

Thanks Michael for the respopnse.


On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman <[hidden email]> wrote:
Hi Raju,

I'm sorry this isn't working for you. I helped author this functionality and will try my best to help.

First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to false? 
I had set as suggested in SPARK-6910 and corresponsing pull reqs. It did not work for me without  setting spark.sql.hive.convertMetastoreParquet property. 

Can you link specifically to the jira issue or spark pr you referred to? The first thing I would try is setting spark.sql.hive.convertMetastoreParquet to true. Setting that to false might also explain why you're getting parquet decode errors. If you're writing your table data with Spark's parquet file writer and reading with Hive's parquet file reader, there may be an incompatibility accounting for the decode errors you're seeing. 

 https://issues.apache.org/jira/browse/SPARK-6910 . My main motivation is to avoid fetching all the partitions. We reverted spark.sql.hive.convertMetastoreParquet  setting to true to decoding errors. After reverting this it is fetching all partiitons from the table.

Can you reply with your table's Hive metastore schema, including partition schema?
     col1 string
     col2 string
     year int
     month int
     day int
     hour int   

# Partition Information  

# col_name            data_type           comment    

year  int

month int

day int

hour int

venture string

 
Where are the table's files located?
In hadoop. Under some user directory. 
If you do a "show partitions <dbname>.<tablename>" in the spark-sql shell, does it show the partitions you expect to see? If not, run "msck repair table <dbname>.<tablename>".
Yes. It is listing the partitions
Cheers,

Michael


On Jan 17, 2017, at 12:02 AM, Raju Bairishetti <[hidden email]> wrote:

Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true (Default value for this is false) 
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}
...
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions = table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?

On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti <[hidden email]> wrote:
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, sparkdriver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde fromspark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)


On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti <[hidden email]> wrote:
Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true (Default value for this is false) 
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}
...
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions = table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?

On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti <[hidden email]> wrote:
Waiting for suggestions/help on this... 

On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti <[hidden email]> wrote:
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, spark driver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde from spark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,




-- 

------
Thanks,
Raju Bairishetti,




--

------
Thanks,
Raju Bairishetti,

Reply | Threaded
Open this post in threaded view
|

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

Raju Bairishetti

 describe dummy;

OK

sample              string                                  

year                string                                  

month               string                                   

# Partition Information  

# col_name            data_type           comment    

year                string                                  

month               string 


val df = sqlContext.sql("select count(1) from rajub.dummy where year='2017'")

df: org.apache.spark.sql.DataFrame = [_c0: bigint]

scala> df.explain

== Physical Plan ==

TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#3070L])

+- TungstenExchange SinglePartition, None

   +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#3076L])

      +- Scan ParquetRelation: rajub.dummy[] InputPaths: maprfs:/user/rajub/dummy/sample/year=2016/month=10, maprfs:/user/rajub/dummy/sample/year=2016/month=11, maprfs:/user/rajub/dummy/sample/year=2016/month=9, maprfs:/user/rajub/dummy/sample/year=2017/month=10, maprfs:/user/rajub/dummy/sample/year=2017/month=11, maprfs:/user/rajub/dummy/sample/year=2017/month=9


On Wed, Jan 18, 2017 at 12:25 PM, Michael Allman <[hidden email]> wrote:
Can you paste the actual query plan here, please?

On Jan 17, 2017, at 7:38 PM, Raju Bairishetti <[hidden email]> wrote:


On Wed, Jan 18, 2017 at 11:13 AM, Michael Allman <[hidden email]> wrote:
What is the physical query plan after you set spark.sql.hive.convertMetastoreParquet to true?
Physical plan continas all the partition locations 

Michael

On Jan 17, 2017, at 6:51 PM, Raju Bairishetti <[hidden email]> wrote:

Thanks Michael for the respopnse.


On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman <[hidden email]> wrote:
Hi Raju,

I'm sorry this isn't working for you. I helped author this functionality and will try my best to help.

First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to false? 
I had set as suggested in SPARK-6910 and corresponsing pull reqs. It did not work for me without  setting spark.sql.hive.convertMetastoreParquet property. 

Can you link specifically to the jira issue or spark pr you referred to? The first thing I would try is setting spark.sql.hive.convertMetastoreParquet to true. Setting that to false might also explain why you're getting parquet decode errors. If you're writing your table data with Spark's parquet file writer and reading with Hive's parquet file reader, there may be an incompatibility accounting for the decode errors you're seeing. 

 https://issues.apache.org/jira/browse/SPARK-6910 . My main motivation is to avoid fetching all the partitions. We reverted spark.sql.hive.convertMetastoreParquet  setting to true to decoding errors. After reverting this it is fetching all partiitons from the table.

Can you reply with your table's Hive metastore schema, including partition schema?
     col1 string
     col2 string
     year int
     month int
     day int
     hour int   

# Partition Information  

# col_name            data_type           comment    

year  int

month int

day int

hour int

venture string

 
Where are the table's files located?
In hadoop. Under some user directory. 
If you do a "show partitions <dbname>.<tablename>" in the spark-sql shell, does it show the partitions you expect to see? If not, run "msck repair table <dbname>.<tablename>".
Yes. It is listing the partitions
Cheers,

Michael


On Jan 17, 2017, at 12:02 AM, Raju Bairishetti <[hidden email]> wrote:

Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true (Default value for this is false) 
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}
...
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions = table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?

On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti <[hidden email]> wrote:
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, sparkdriver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde fromspark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)


On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti <[hidden email]> wrote:
Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true (Default value for this is false) 
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}
...
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions = table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?

On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti <[hidden email]> wrote:
Waiting for suggestions/help on this... 

On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti <[hidden email]> wrote:
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, spark driver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde from spark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,




-- 

------
Thanks,
Raju Bairishetti,




--

------
Thanks,
Raju Bairishetti,




--

------
Thanks,
Raju Bairishetti,
Reply | Threaded
Open this post in threaded view
|

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

Michael Allman-2
What version of Spark are you running?

On Jan 17, 2017, at 8:42 PM, Raju Bairishetti <[hidden email]> wrote:

 describe dummy;

OK

sample              string                                  

year                string                                  

month               string                                   

# Partition Information  

# col_name            data_type           comment    

year                string                                  


month               string 


val df = sqlContext.sql("select count(1) from rajub.dummy where year='2017'")

df: org.apache.spark.sql.DataFrame = [_c0: bigint]

scala> df.explain

== Physical Plan ==

TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#3070L])

+- TungstenExchange SinglePartition, None

   +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#3076L])

      +- Scan ParquetRelation: rajub.dummy[] InputPaths: maprfs:/user/rajub/dummy/sample/year=2016/month=10, maprfs:/user/rajub/dummy/sample/year=2016/month=11, maprfs:/user/rajub/dummy/sample/year=2016/month=9, maprfs:/user/rajub/dummy/sample/year=2017/month=10, maprfs:/user/rajub/dummy/sample/year=2017/month=11, maprfs:/user/rajub/dummy/sample/year=2017/month=9


On Wed, Jan 18, 2017 at 12:25 PM, Michael Allman <[hidden email]> wrote:
Can you paste the actual query plan here, please?

On Jan 17, 2017, at 7:38 PM, Raju Bairishetti <[hidden email]> wrote:


On Wed, Jan 18, 2017 at 11:13 AM, Michael Allman <[hidden email]> wrote:
What is the physical query plan after you set spark.sql.hive.convertMetastoreParquet to true?
Physical plan continas all the partition locations 

Michael

On Jan 17, 2017, at 6:51 PM, Raju Bairishetti <[hidden email]> wrote:

Thanks Michael for the respopnse.


On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman <[hidden email]> wrote:
Hi Raju,

I'm sorry this isn't working for you. I helped author this functionality and will try my best to help.

First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to false? 
I had set as suggested in SPARK-6910 and corresponsing pull reqs. It did not work for me without  setting spark.sql.hive.convertMetastoreParquet property. 

Can you link specifically to the jira issue or spark pr you referred to? The first thing I would try is setting spark.sql.hive.convertMetastoreParquet to true. Setting that to false might also explain why you're getting parquet decode errors. If you're writing your table data with Spark's parquet file writer and reading with Hive's parquet file reader, there may be an incompatibility accounting for the decode errors you're seeing. 

 https://issues.apache.org/jira/browse/SPARK-6910 . My main motivation is to avoid fetching all the partitions. We reverted spark.sql.hive.convertMetastoreParquet  setting to true to decoding errors. After reverting this it is fetching all partiitons from the table.

Can you reply with your table's Hive metastore schema, including partition schema?
     col1 string
     col2 string
     year int
     month int
     day int
     hour int   

# Partition Information  

# col_name            data_type           comment    

year  int

month int

day int

hour int

venture string

 
Where are the table's files located?
In hadoop. Under some user directory. 
If you do a "show partitions <dbname>.<tablename>" in the spark-sql shell, does it show the partitions you expect to see? If not, run "msck repair table <dbname>.<tablename>".
Yes. It is listing the partitions
Cheers,

Michael


On Jan 17, 2017, at 12:02 AM, Raju Bairishetti <[hidden email]> wrote:

Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true (Default value for this is false) 
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}
...
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions = table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?

On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti <[hidden email]> wrote:
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, sparkdriver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde fromspark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)


On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti <[hidden email]> wrote:
Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true (Default value for this is false) 
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}
...
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions = table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?

On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti <[hidden email]> wrote:
Waiting for suggestions/help on this... 

On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti <[hidden email]> wrote:
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, spark driver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde from spark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,




-- 

------
Thanks,
Raju Bairishetti,




--

------
Thanks,
Raju Bairishetti,




--

------
Thanks,
Raju Bairishetti,

Reply | Threaded
Open this post in threaded view
|

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

Raju Bairishetti
Tested on both 1.5.2 and 1.61.

On Wed, Jan 18, 2017 at 12:52 PM, Michael Allman <[hidden email]> wrote:
What version of Spark are you running?

On Jan 17, 2017, at 8:42 PM, Raju Bairishetti <[hidden email]> wrote:

 describe dummy;

OK

sample              string                                  

year                string                                  

month               string                                   

# Partition Information  

# col_name            data_type           comment    

year                string                                  


month               string 


val df = sqlContext.sql("select count(1) from rajub.dummy where year='2017'")

df: org.apache.spark.sql.DataFrame = [_c0: bigint]

scala> df.explain

== Physical Plan ==

TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#3070L])

+- TungstenExchange SinglePartition, None

   +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#3076L])

      +- Scan ParquetRelation: rajub.dummy[] InputPaths: maprfs:/user/rajub/dummy/sample/year=2016/month=10, maprfs:/user/rajub/dummy/sample/year=2016/month=11, maprfs:/user/rajub/dummy/sample/year=2016/month=9, maprfs:/user/rajub/dummy/sample/year=2017/month=10, maprfs:/user/rajub/dummy/sample/year=2017/month=11, maprfs:/user/rajub/dummy/sample/year=2017/month=9


On Wed, Jan 18, 2017 at 12:25 PM, Michael Allman <[hidden email]> wrote:
Can you paste the actual query plan here, please?

On Jan 17, 2017, at 7:38 PM, Raju Bairishetti <[hidden email]> wrote:


On Wed, Jan 18, 2017 at 11:13 AM, Michael Allman <[hidden email]> wrote:
What is the physical query plan after you set spark.sql.hive.convertMetastoreParquet to true?
Physical plan continas all the partition locations 

Michael

On Jan 17, 2017, at 6:51 PM, Raju Bairishetti <[hidden email]> wrote:

Thanks Michael for the respopnse.


On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman <[hidden email]> wrote:
Hi Raju,

I'm sorry this isn't working for you. I helped author this functionality and will try my best to help.

First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to false? 
I had set as suggested in SPARK-6910 and corresponsing pull reqs. It did not work for me without  setting spark.sql.hive.convertMetastoreParquet property. 

Can you link specifically to the jira issue or spark pr you referred to? The first thing I would try is setting spark.sql.hive.convertMetastoreParquet to true. Setting that to false might also explain why you're getting parquet decode errors. If you're writing your table data with Spark's parquet file writer and reading with Hive's parquet file reader, there may be an incompatibility accounting for the decode errors you're seeing. 

 https://issues.apache.org/jira/browse/SPARK-6910 . My main motivation is to avoid fetching all the partitions. We reverted spark.sql.hive.convertMetastoreParquet  setting to true to decoding errors. After reverting this it is fetching all partiitons from the table.

Can you reply with your table's Hive metastore schema, including partition schema?
     col1 string
     col2 string
     year int
     month int
     day int
     hour int   

# Partition Information  

# col_name            data_type           comment    

year  int

month int

day int

hour int

venture string

 
Where are the table's files located?
In hadoop. Under some user directory. 
If you do a "show partitions <dbname>.<tablename>" in the spark-sql shell, does it show the partitions you expect to see? If not, run "msck repair table <dbname>.<tablename>".
Yes. It is listing the partitions
Cheers,

Michael


On Jan 17, 2017, at 12:02 AM, Raju Bairishetti <[hidden email]> wrote:

Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true (Default value for this is false) 
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}
...
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions = table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?

On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti <[hidden email]> wrote:
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, sparkdriver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde fromspark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)


On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti <[hidden email]> wrote:
Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true (Default value for this is false) 
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}
...
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions = table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?

On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti <[hidden email]> wrote:
Waiting for suggestions/help on this... 

On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti <[hidden email]> wrote:
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, spark driver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde from spark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,




-- 

------
Thanks,
Raju Bairishetti,




--

------
Thanks,
Raju Bairishetti,




--

------
Thanks,
Raju Bairishetti,




--

------
Thanks,
Raju Bairishetti,
Reply | Threaded
Open this post in threaded view
|

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

Michael Allman-2
I think I understand. Partition pruning for the case where spark.sql.hive.convertMetastoreParquet is true was not added to Spark until 2.1.0. I think that in previous versions it only worked when spark.sql.hive.convertMetastoreParquet is false. Unfortunately, that configuration gives you data decoding errors. If it's possible for you to write all of your data with Hive, then you should be able to read it without decoding errors and with partition pruning turned on. Another possibility is running your Spark app with a very large maximum heap configuration, like 8g or even 16g. However, loading all of that partition metadata can be quite slow for very large tables. I'm sorry I can't think of a better solution for you.

Michael



On Jan 17, 2017, at 8:59 PM, Raju Bairishetti <[hidden email]> wrote:

Tested on both 1.5.2 and 1.61.

On Wed, Jan 18, 2017 at 12:52 PM, Michael Allman <[hidden email]> wrote:
What version of Spark are you running?

On Jan 17, 2017, at 8:42 PM, Raju Bairishetti <[hidden email]> wrote:

 describe dummy;

OK

sample              string                                  

year                string                                  

month               string                                   

# Partition Information  

# col_name            data_type           comment    

year                string                                  


month               string 


val df = sqlContext.sql("select count(1) from rajub.dummy where year='2017'")

df: org.apache.spark.sql.DataFrame = [_c0: bigint]

scala> df.explain

== Physical Plan ==

TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#3070L])

+- TungstenExchange SinglePartition, None

   +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#3076L])

      +- Scan ParquetRelation: rajub.dummy[] InputPaths: maprfs:/user/rajub/dummy/sample/year=2016/month=10, maprfs:/user/rajub/dummy/sample/year=2016/month=11, maprfs:/user/rajub/dummy/sample/year=2016/month=9, maprfs:/user/rajub/dummy/sample/year=2017/month=10, maprfs:/user/rajub/dummy/sample/year=2017/month=11, maprfs:/user/rajub/dummy/sample/year=2017/month=9


On Wed, Jan 18, 2017 at 12:25 PM, Michael Allman <[hidden email]> wrote:
Can you paste the actual query plan here, please?

On Jan 17, 2017, at 7:38 PM, Raju Bairishetti <[hidden email]> wrote:


On Wed, Jan 18, 2017 at 11:13 AM, Michael Allman <[hidden email]> wrote:
What is the physical query plan after you set spark.sql.hive.convertMetastoreParquet to true?
Physical plan continas all the partition locations 

Michael

On Jan 17, 2017, at 6:51 PM, Raju Bairishetti <[hidden email]> wrote:

Thanks Michael for the respopnse.


On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman <[hidden email]> wrote:
Hi Raju,

I'm sorry this isn't working for you. I helped author this functionality and will try my best to help.

First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to false? 
I had set as suggested in SPARK-6910 and corresponsing pull reqs. It did not work for me without  setting spark.sql.hive.convertMetastoreParquet property. 

Can you link specifically to the jira issue or spark pr you referred to? The first thing I would try is setting spark.sql.hive.convertMetastoreParquet to true. Setting that to false might also explain why you're getting parquet decode errors. If you're writing your table data with Spark's parquet file writer and reading with Hive's parquet file reader, there may be an incompatibility accounting for the decode errors you're seeing. 

 https://issues.apache.org/jira/browse/SPARK-6910 . My main motivation is to avoid fetching all the partitions. We reverted spark.sql.hive.convertMetastoreParquet  setting to true to decoding errors. After reverting this it is fetching all partiitons from the table.

Can you reply with your table's Hive metastore schema, including partition schema?
     col1 string
     col2 string
     year int
     month int
     day int
     hour int   

# Partition Information  

# col_name            data_type           comment    

year  int

month int

day int

hour int

venture string

 
Where are the table's files located?
In hadoop. Under some user directory. 
If you do a "show partitions <dbname>.<tablename>" in the spark-sql shell, does it show the partitions you expect to see? If not, run "msck repair table <dbname>.<tablename>".
Yes. It is listing the partitions
Cheers,

Michael


On Jan 17, 2017, at 12:02 AM, Raju Bairishetti <[hidden email]> wrote:

Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true (Default value for this is false) 
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}
...
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions = table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?

On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti <[hidden email]> wrote:
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, sparkdriver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde fromspark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)


On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti <[hidden email]> wrote:
Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true (Default value for this is false) 
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}
...
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions = table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?

On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti <[hidden email]> wrote:
Waiting for suggestions/help on this... 

On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti <[hidden email]> wrote:
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, spark driver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde from spark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,




-- 

------
Thanks,
Raju Bairishetti,




--

------
Thanks,
Raju Bairishetti,




--

------
Thanks,
Raju Bairishetti,




--

------
Thanks,
Raju Bairishetti,

Reply | Threaded
Open this post in threaded view
|

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

Raju Bairishetti
Thanks for the detailed explanation. Is it completely fixed in spark-2.1.0?

  We are giving very high memory to spark-driver to avoid the OOM(heap space/ GC overhead limit) errors in spark-app. But when we run two-three jobs together, these are bringing down the Hive metastore. We had to forcefully drop older partitions to avoid frequent downs of Hive Metastore.


On Wed, Jan 18, 2017 at 2:09 PM, Michael Allman <[hidden email]> wrote:
I think I understand. Partition pruning for the case where spark.sql.hive.convertMetastoreParquet is true was not added to Spark until 2.1.0. I think that in previous versions it only worked when spark.sql.hive.convertMetastoreParquet is false. Unfortunately, that configuration gives you data decoding errors. If it's possible for you to write all of your data with Hive, then you should be able to read it without decoding errors and with partition pruning turned on. Another possibility is running your Spark app with a very large maximum heap configuration, like 8g or even 16g. However, loading all of that partition metadata can be quite slow for very large tables. I'm sorry I can't think of a better solution for you.

Michael




On Jan 17, 2017, at 8:59 PM, Raju Bairishetti <[hidden email]> wrote:

Tested on both 1.5.2 and 1.61.

On Wed, Jan 18, 2017 at 12:52 PM, Michael Allman <[hidden email]> wrote:
What version of Spark are you running?

On Jan 17, 2017, at 8:42 PM, Raju Bairishetti <[hidden email]> wrote:

 describe dummy;

OK

sample              string                                  

year                string                                  

month               string                                   

# Partition Information  

# col_name            data_type           comment    

year                string                                  


month               string 


val df = sqlContext.sql("select count(1) from rajub.dummy where year='2017'")

df: org.apache.spark.sql.DataFrame = [_c0: bigint]

scala> df.explain

== Physical Plan ==

TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#3070L])

+- TungstenExchange SinglePartition, None

   +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#3076L])

      +- Scan ParquetRelation: rajub.dummy[] InputPaths: maprfs:/user/rajub/dummy/sample/year=2016/month=10, maprfs:/user/rajub/dummy/sample/year=2016/month=11, maprfs:/user/rajub/dummy/sample/year=2016/month=9, maprfs:/user/rajub/dummy/sample/year=2017/month=10, maprfs:/user/rajub/dummy/sample/year=2017/month=11, maprfs:/user/rajub/dummy/sample/year=2017/month=9


On Wed, Jan 18, 2017 at 12:25 PM, Michael Allman <[hidden email]> wrote:
Can you paste the actual query plan here, please?

On Jan 17, 2017, at 7:38 PM, Raju Bairishetti <[hidden email]> wrote:


On Wed, Jan 18, 2017 at 11:13 AM, Michael Allman <[hidden email]> wrote:
What is the physical query plan after you set spark.sql.hive.convertMetastoreParquet to true?
Physical plan continas all the partition locations 

Michael

On Jan 17, 2017, at 6:51 PM, Raju Bairishetti <[hidden email]> wrote:

Thanks Michael for the respopnse.


On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman <[hidden email]> wrote:
Hi Raju,

I'm sorry this isn't working for you. I helped author this functionality and will try my best to help.

First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to false? 
I had set as suggested in SPARK-6910 and corresponsing pull reqs. It did not work for me without  setting spark.sql.hive.convertMetastoreParquet property. 

Can you link specifically to the jira issue or spark pr you referred to? The first thing I would try is setting spark.sql.hive.convertMetastoreParquet to true. Setting that to false might also explain why you're getting parquet decode errors. If you're writing your table data with Spark's parquet file writer and reading with Hive's parquet file reader, there may be an incompatibility accounting for the decode errors you're seeing. 

 https://issues.apache.org/jira/browse/SPARK-6910 . My main motivation is to avoid fetching all the partitions. We reverted spark.sql.hive.convertMetastoreParquet  setting to true to decoding errors. After reverting this it is fetching all partiitons from the table.

Can you reply with your table's Hive metastore schema, including partition schema?
     col1 string
     col2 string
     year int
     month int
     day int
     hour int   

# Partition Information  

# col_name            data_type           comment    

year  int

month int

day int

hour int

venture string

 
Where are the table's files located?
In hadoop. Under some user directory. 
If you do a "show partitions <dbname>.<tablename>" in the spark-sql shell, does it show the partitions you expect to see? If not, run "msck repair table <dbname>.<tablename>".
Yes. It is listing the partitions
Cheers,

Michael


On Jan 17, 2017, at 12:02 AM, Raju Bairishetti <[hidden email]> wrote:

Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true (Default value for this is false) 
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}
...
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions = table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?

On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti <[hidden email]> wrote:
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, sparkdriver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde fromspark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)


On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti <[hidden email]> wrote:
Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true (Default value for this is false) 
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}
...
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions = table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?

On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti <[hidden email]> wrote:
Waiting for suggestions/help on this... 

On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti <[hidden email]> wrote:
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, spark driver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde from spark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,




-- 

------
Thanks,
Raju Bairishetti,




--

------
Thanks,
Raju Bairishetti,




--

------
Thanks,
Raju Bairishetti,




--

------
Thanks,
Raju Bairishetti,




--

------
Thanks,
Raju Bairishetti,
Reply | Threaded
Open this post in threaded view
|

Re: Spark sql query plan contains all the partitions from hive table even though filtering of partitions is provided

Michael Allman-2
Based on what you've described, I think you should be able to use Spark's parquet reader plus partition pruning in 2.1.

On Jan 17, 2017, at 10:44 PM, Raju Bairishetti <[hidden email]> wrote:

Thanks for the detailed explanation. Is it completely fixed in spark-2.1.0?

  We are giving very high memory to spark-driver to avoid the OOM(heap space/ GC overhead limit) errors in spark-app. But when we run two-three jobs together, these are bringing down the Hive metastore. We had to forcefully drop older partitions to avoid frequent downs of Hive Metastore.


On Wed, Jan 18, 2017 at 2:09 PM, Michael Allman <[hidden email]> wrote:
I think I understand. Partition pruning for the case where spark.sql.hive.convertMetastoreParquet is true was not added to Spark until 2.1.0. I think that in previous versions it only worked when spark.sql.hive.convertMetastoreParquet is false. Unfortunately, that configuration gives you data decoding errors. If it's possible for you to write all of your data with Hive, then you should be able to read it without decoding errors and with partition pruning turned on. Another possibility is running your Spark app with a very large maximum heap configuration, like 8g or even 16g. However, loading all of that partition metadata can be quite slow for very large tables. I'm sorry I can't think of a better solution for you.

Michael




On Jan 17, 2017, at 8:59 PM, Raju Bairishetti <[hidden email]> wrote:

Tested on both 1.5.2 and 1.61.

On Wed, Jan 18, 2017 at 12:52 PM, Michael Allman <[hidden email]> wrote:
What version of Spark are you running?

On Jan 17, 2017, at 8:42 PM, Raju Bairishetti <[hidden email]> wrote:

 describe dummy;

OK

sample              string                                  

year                string                                  

month               string                                   

# Partition Information  

# col_name            data_type           comment    

year                string                                  


month               string 


val df = sqlContext.sql("select count(1) from rajub.dummy where year='2017'")

df: org.apache.spark.sql.DataFrame = [_c0: bigint]

scala> df.explain

== Physical Plan ==

TungstenAggregate(key=[], functions=[(count(1),mode=Final,isDistinct=false)], output=[_c0#3070L])

+- TungstenExchange SinglePartition, None

   +- TungstenAggregate(key=[], functions=[(count(1),mode=Partial,isDistinct=false)], output=[count#3076L])

      +- Scan ParquetRelation: rajub.dummy[] InputPaths: maprfs:/user/rajub/dummy/sample/year=2016/month=10, maprfs:/user/rajub/dummy/sample/year=2016/month=11, maprfs:/user/rajub/dummy/sample/year=2016/month=9, maprfs:/user/rajub/dummy/sample/year=2017/month=10, maprfs:/user/rajub/dummy/sample/year=2017/month=11, maprfs:/user/rajub/dummy/sample/year=2017/month=9


On Wed, Jan 18, 2017 at 12:25 PM, Michael Allman <[hidden email]> wrote:
Can you paste the actual query plan here, please?

On Jan 17, 2017, at 7:38 PM, Raju Bairishetti <[hidden email]> wrote:


On Wed, Jan 18, 2017 at 11:13 AM, Michael Allman <[hidden email]> wrote:
What is the physical query plan after you set spark.sql.hive.convertMetastoreParquet to true?
Physical plan continas all the partition locations 

Michael

On Jan 17, 2017, at 6:51 PM, Raju Bairishetti <[hidden email]> wrote:

Thanks Michael for the respopnse.


On Wed, Jan 18, 2017 at 2:45 AM, Michael Allman <[hidden email]> wrote:
Hi Raju,

I'm sorry this isn't working for you. I helped author this functionality and will try my best to help.

First, I'm curious why you set spark.sql.hive.convertMetastoreParquet to false? 
I had set as suggested in SPARK-6910 and corresponsing pull reqs. It did not work for me without  setting spark.sql.hive.convertMetastoreParquet property. 

Can you link specifically to the jira issue or spark pr you referred to? The first thing I would try is setting spark.sql.hive.convertMetastoreParquet to true. Setting that to false might also explain why you're getting parquet decode errors. If you're writing your table data with Spark's parquet file writer and reading with Hive's parquet file reader, there may be an incompatibility accounting for the decode errors you're seeing. 

 https://issues.apache.org/jira/browse/SPARK-6910 . My main motivation is to avoid fetching all the partitions. We reverted spark.sql.hive.convertMetastoreParquet  setting to true to decoding errors. After reverting this it is fetching all partiitons from the table.

Can you reply with your table's Hive metastore schema, including partition schema?
     col1 string
     col2 string
     year int
     month int
     day int
     hour int   

# Partition Information  

# col_name            data_type           comment    

year  int

month int

day int

hour int

venture string

 
Where are the table's files located?
In hadoop. Under some user directory. 
If you do a "show partitions <dbname>.<tablename>" in the spark-sql shell, does it show the partitions you expect to see? If not, run "msck repair table <dbname>.<tablename>".
Yes. It is listing the partitions
Cheers,

Michael


On Jan 17, 2017, at 12:02 AM, Raju Bairishetti <[hidden email]> wrote:

Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true (Default value for this is false) 
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}
...
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions = table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?

On Tue, Jan 17, 2017 at 4:01 PM, Raju Bairishetti <[hidden email]> wrote:
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, sparkdriver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde fromspark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)


On Tue, Jan 17, 2017 at 4:00 PM, Raju Bairishetti <[hidden email]> wrote:
Had a high level look into the code. Seems getHiveQlPartitions  method from HiveMetastoreCatalog is getting called irrespective of metastorePartitionPruning conf value.

 It should not fetch all partitions if we set metastorePartitionPruning to true (Default value for this is false) 
def getHiveQlPartitions(predicates: Seq[Expression] = Nil): Seq[Partition] = {
val rawPartitions = if (sqlContext.conf.metastorePartitionPruning) {
table.getPartitions(predicates)
} else {
allPartitions
}
...
def getPartitions(predicates: Seq[Expression]): Seq[HivePartition] =
client.getPartitionsByFilter(this, predicates)
lazy val allPartitions = table.getAllPartitions
But somehow getAllPartitions is getting called eventough after setting metastorePartitionPruning to true.
Am I missing something or looking at wrong place?

On Mon, Jan 16, 2017 at 12:53 PM, Raju Bairishetti <[hidden email]> wrote:
Waiting for suggestions/help on this... 

On Wed, Jan 11, 2017 at 12:14 PM, Raju Bairishetti <[hidden email]> wrote:
Hello,
      
   Spark sql is generating query plan with all partitions information even though if we apply filters on partitions in the query.  Due to this, spark driver/hive metastore is hitting with OOM as each table is with lots of partitions.

We can confirm from hive audit logs that it tries to fetch all partitions from hive metastore.

 2016-12-28 07:18:33,749 INFO  [pool-4-thread-184]: HiveMetaStore.audit (HiveMetaStore.java:logAuditEvent(371)) - ugi=rajub    ip=/x.x.x.x   cmd=get_partitions : db=xxxx tbl=xxxxx


Configured the following parameters in the spark conf to fix the above issue(source: from spark-jira & github pullreq):
    spark.sql.hive.convertMetastoreParquet   false
    spark.sql.hive.metastorePartitionPruning   true

   plan:  rdf.explain
   == Physical Plan ==
       HiveTableScan [rejection_reason#626], MetastoreRelation dbname, tablename, None,   [(year#314 = 2016),(month#315 = 12),(day#316 = 28),(hour#317 = 2),(venture#318 = DEFAULT)]

    get_partitions_by_filter method is called and fetching only required partitions.

    But we are seeing parquetDecode errors in our applications frequently after this. Looks like these decoding errors were because of changing serde from spark-builtin to hive serde.

I feel like, fixing query plan generation in the spark-sql is the right approach instead of forcing users to use hive serde.

Is there any workaround/way to fix this issue? I would like to hear more thoughts on this :)

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,



-- 

------
Thanks,
Raju Bairishetti,




-- 

------
Thanks,
Raju Bairishetti,




--

------
Thanks,
Raju Bairishetti,




--

------
Thanks,
Raju Bairishetti,




--

------
Thanks,
Raju Bairishetti,




--

------
Thanks,
Raju Bairishetti,