Very slow complex type column reads from parquet

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Very slow complex type column reads from parquet

jwozniak
Hello,

We have stumbled upon a quite degraded performance when reading a complex (struct, array) type columns stored in Parquet.
A Parquet file is of around 600MB (snappy) with ~400k rows with a field of a complex type { f1: array of ints, f2: array of ints } where f1 array length is 50k elements.
There are also other fields like entity_id: long, timestamp: long.

A simple query that selects rows using predicates entity_id = X and timestamp >= T1 and timestamp <= T2 plus ds.show() takes 17 minutes to execute.
If we remove the complex type columns from the query it is executed in a sub-second time.
 
Now when looking at the implementation of the Parquet datasource the Vectorized* classes are used only if the read types are primitives. In other case the code falls back to the parquet-mr default implementation.
In the VectorizedParquetRecordReader there is a TODO to handle complex types that "should be efficient & easy with codegen".

For our CERN Spark usage the current execution times are pretty much prohibitive as there is a lot of data stored as arrays / complex types…
The file of 600 MB represents 1 day of measurements and our data scientists would like to process sometimes months or even years of those.  

Could you please let me know if there is anybody currently working on it or maybe you have it in a roadmap for the future?
Or maybe you could give me some suggestions how to avoid / resolve this problem? I’m using Spark 2.2.1.

Best regards,
Jakub Wozniak




---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Very slow complex type column reads from parquet

Ryan Blue
Jakub,

You're right that Spark currently doesn't use the vectorized read path for nested data, but I'm not sure that's the problem here. With 50k elements in the f1 array, it could easily be that you're getting the significant speed-up from not reading or materializing that column. The non-vectorized path is slower, but it is more likely that the problem is the data if it is that much slower.

I'd be happy to see vectorization for nested Parquet data move forward, but I think you might want to get an idea of how much it will help before you move forward with it. Can you use Impala to test whether vectorization would help here?

rb



On Mon, Jun 11, 2018 at 6:16 AM, Jakub Wozniak <[hidden email]> wrote:
Hello,

We have stumbled upon a quite degraded performance when reading a complex (struct, array) type columns stored in Parquet.
A Parquet file is of around 600MB (snappy) with ~400k rows with a field of a complex type { f1: array of ints, f2: array of ints } where f1 array length is 50k elements.
There are also other fields like entity_id: long, timestamp: long.

A simple query that selects rows using predicates entity_id = X and timestamp >= T1 and timestamp <= T2 plus ds.show() takes 17 minutes to execute.
If we remove the complex type columns from the query it is executed in a sub-second time.

Now when looking at the implementation of the Parquet datasource the Vectorized* classes are used only if the read types are primitives. In other case the code falls back to the parquet-mr default implementation.
In the VectorizedParquetRecordReader there is a TODO to handle complex types that "should be efficient & easy with codegen".

For our CERN Spark usage the current execution times are pretty much prohibitive as there is a lot of data stored as arrays / complex types…
The file of 600 MB represents 1 day of measurements and our data scientists would like to process sometimes months or even years of those. 

Could you please let me know if there is anybody currently working on it or maybe you have it in a roadmap for the future?
Or maybe you could give me some suggestions how to avoid / resolve this problem? I’m using Spark 2.2.1.

Best regards,
Jakub Wozniak




---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]




--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: Very slow complex type column reads from parquet

jwozniak
Dear Ryan,

Thanks a lot for your answer.
After having sent the e-mail we have investigated a bit more the data itself. 
It happened that for certain days it was very skewed and one of the row groups had much more records that all others. 
This was somehow related to the fact that we have sorted it using our object ids and by chance those that went first were smaller (or compressed better). 
So the Parquet file had a 6 rows groups where the first one had 300k rows and others only 30k rows. 
The search for a given object fell into the first row group and lasted very long time. 
The data itself was very much compressed as it contained a lot of zeros. To give some numbers the 600MB parquet file expanded to 56GB in JSON. 

What we did is to sort the data not by object id but by the record timestamp which resulted in much more even data distribution among the row groups. 
This in fact helped a lot for the query time (using the timestamp & object id)

I have to say that I haven't fully understood this phenomenon yet as I’m not a Parquet format & reader expert (at least not yet). 
Maybe it is just a simple function of how many records Spark has to scan and the level of parallelism (searching for a given object id when sorted by time needs to scan all/more the groups for larger times). 
One question here - is Parquet reader reading & decoding the projection columns even if the predicate columns should filter the record out? 

Unfortunately we have to have those big columns in the query as people want to do analysis on them. 
 
We will continue to investigate… 

Cheers,
Jakub



On 12 Jun 2018, at 22:51, Ryan Blue <[hidden email]> wrote:

Jakub,

You're right that Spark currently doesn't use the vectorized read path for nested data, but I'm not sure that's the problem here. With 50k elements in the f1 array, it could easily be that you're getting the significant speed-up from not reading or materializing that column. The non-vectorized path is slower, but it is more likely that the problem is the data if it is that much slower.

I'd be happy to see vectorization for nested Parquet data move forward, but I think you might want to get an idea of how much it will help before you move forward with it. Can you use Impala to test whether vectorization would help here?

rb



On Mon, Jun 11, 2018 at 6:16 AM, Jakub Wozniak <[hidden email]> wrote:
Hello,

We have stumbled upon a quite degraded performance when reading a complex (struct, array) type columns stored in Parquet.
A Parquet file is of around 600MB (snappy) with ~400k rows with a field of a complex type { f1: array of ints, f2: array of ints } where f1 array length is 50k elements.
There are also other fields like entity_id: long, timestamp: long.

A simple query that selects rows using predicates entity_id = X and timestamp >= T1 and timestamp <= T2 plus ds.show() takes 17 minutes to execute.
If we remove the complex type columns from the query it is executed in a sub-second time.

Now when looking at the implementation of the Parquet datasource the Vectorized* classes are used only if the read types are primitives. In other case the code falls back to the parquet-mr default implementation.
In the VectorizedParquetRecordReader there is a TODO to handle complex types that "should be efficient & easy with codegen".

For our CERN Spark usage the current execution times are pretty much prohibitive as there is a lot of data stored as arrays / complex types…
The file of 600 MB represents 1 day of measurements and our data scientists would like to process sometimes months or even years of those. 

Could you please let me know if there is anybody currently working on it or maybe you have it in a roadmap for the future?
Or maybe you could give me some suggestions how to avoid / resolve this problem? I’m using Spark 2.2.1.

Best regards,
Jakub Wozniak




---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]




--
Ryan Blue
Software Engineer
Netflix

Reply | Threaded
Open this post in threaded view
|

Re: Very slow complex type column reads from parquet

jwozniak
Hello,

I’m sorry to bother you again but it is quite important for us to understand the problem better. 

One more finding in our problem is that the performance of queries in a timestamp sorted file depend a lot on the predicate timestamp. 
If you are lucky to get some records from the start of the row group it might be fast (like seconds). If you search for something that is at the end of the row group the query takes minutes. 
I guess this is due to the fact that it has to scan all the previous records in the row group until it finds the right ones at the end of it...

Now I have a couple of questions regarding the way the Parquet file is read. 

1) Does it always decode the query column (projection from select) even if the predicate column does not match (to me it looks like it but I might be wrong)?
2) Sorting the file will result in “indexed’ row groups so it will be easier to locate the which row group to scan but isn’t it at the same time limiting parallelism? If the data is randomly placed in the row groups it will be searched with as many tasks as we have row groups, right (or at least more than 1)? Is there any common rule we can formulate or it is very data and/or query dependent?  
3) Would making a row group smaller (like by half) help? Currently I can see that the row groups are about the size of the hdfs block (256MB) but sometimes smaller or even bigger. 
We have no settings for the row group so I guess the default hdfs block size is used, right? 
Do you have any recommendation / experience with that?
  
Thanks a lot for your help,
Jakub



On 14 Jun 2018, at 12:07, Jakub Wozniak <[hidden email]> wrote:

Dear Ryan,

Thanks a lot for your answer.
After having sent the e-mail we have investigated a bit more the data itself. 
It happened that for certain days it was very skewed and one of the row groups had much more records that all others. 
This was somehow related to the fact that we have sorted it using our object ids and by chance those that went first were smaller (or compressed better). 
So the Parquet file had a 6 rows groups where the first one had 300k rows and others only 30k rows. 
The search for a given object fell into the first row group and lasted very long time. 
The data itself was very much compressed as it contained a lot of zeros. To give some numbers the 600MB parquet file expanded to 56GB in JSON. 

What we did is to sort the data not by object id but by the record timestamp which resulted in much more even data distribution among the row groups. 
This in fact helped a lot for the query time (using the timestamp & object id)

I have to say that I haven't fully understood this phenomenon yet as I’m not a Parquet format & reader expert (at least not yet). 
Maybe it is just a simple function of how many records Spark has to scan and the level of parallelism (searching for a given object id when sorted by time needs to scan all/more the groups for larger times). 
One question here - is Parquet reader reading & decoding the projection columns even if the predicate columns should filter the record out? 

Unfortunately we have to have those big columns in the query as people want to do analysis on them. 
 
We will continue to investigate… 

Cheers,
Jakub



On 12 Jun 2018, at 22:51, Ryan Blue <[hidden email]> wrote:

Jakub,

You're right that Spark currently doesn't use the vectorized read path for nested data, but I'm not sure that's the problem here. With 50k elements in the f1 array, it could easily be that you're getting the significant speed-up from not reading or materializing that column. The non-vectorized path is slower, but it is more likely that the problem is the data if it is that much slower.

I'd be happy to see vectorization for nested Parquet data move forward, but I think you might want to get an idea of how much it will help before you move forward with it. Can you use Impala to test whether vectorization would help here?

rb



On Mon, Jun 11, 2018 at 6:16 AM, Jakub Wozniak <[hidden email]> wrote:
Hello,

We have stumbled upon a quite degraded performance when reading a complex (struct, array) type columns stored in Parquet.
A Parquet file is of around 600MB (snappy) with ~400k rows with a field of a complex type { f1: array of ints, f2: array of ints } where f1 array length is 50k elements.
There are also other fields like entity_id: long, timestamp: long.

A simple query that selects rows using predicates entity_id = X and timestamp >= T1 and timestamp <= T2 plus ds.show() takes 17 minutes to execute.
If we remove the complex type columns from the query it is executed in a sub-second time.

Now when looking at the implementation of the Parquet datasource the Vectorized* classes are used only if the read types are primitives. In other case the code falls back to the parquet-mr default implementation.
In the VectorizedParquetRecordReader there is a TODO to handle complex types that "should be efficient & easy with codegen".

For our CERN Spark usage the current execution times are pretty much prohibitive as there is a lot of data stored as arrays / complex types…
The file of 600 MB represents 1 day of measurements and our data scientists would like to process sometimes months or even years of those. 

Could you please let me know if there is anybody currently working on it or maybe you have it in a roadmap for the future?
Or maybe you could give me some suggestions how to avoid / resolve this problem? I’m using Spark 2.2.1.

Best regards,
Jakub Wozniak




---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]




--
Ryan Blue
Software Engineer
Netflix


Reply | Threaded
Open this post in threaded view
|

Re: Very slow complex type column reads from parquet

Ryan Blue
Jakub,

I'm moving the Spark list to bcc and adding the Parquet list, since you're probably more interested in Parquet tuning.

It makes sense that you're getting better performance when you have more matching rows distributed, especially if those rows have a huge column that you need to project. You're just able to use more processing power at once. Finding a good layout or sort to distribute the data is a good start, and you're right that decreasing the row group size would help: more (matching) row groups, more parallelism. Just be careful because decreasing the row group size can inflate the overall size of the data.

Parquet will also eliminate row groups in each task when possible. At the start of the task, it uses the row group's column stats (min, max, num nulls), and before the row group is read it will also attempt to read any column dictionaries and use those to see if any values match.

If a row group can't be eliminated, all of the rows in that group will be materialized. One thing that's probably really hurting you here is that each row from Parquet is copied in memory twice for this read path. I pointed this out recently and that's why I'm suggesting we clean up the expectations of Spark SQL operators (see this comment and the dev list thread).

There are patches out in the Parquet community to do page-level filtering as well, which will help your use case quite a bit.

rb

On Fri, Jun 15, 2018 at 1:44 AM, Jakub Wozniak <[hidden email]> wrote:
Hello,

I’m sorry to bother you again but it is quite important for us to understand the problem better. 

One more finding in our problem is that the performance of queries in a timestamp sorted file depend a lot on the predicate timestamp. 
If you are lucky to get some records from the start of the row group it might be fast (like seconds). If you search for something that is at the end of the row group the query takes minutes. 
I guess this is due to the fact that it has to scan all the previous records in the row group until it finds the right ones at the end of it...

Now I have a couple of questions regarding the way the Parquet file is read. 

1) Does it always decode the query column (projection from select) even if the predicate column does not match (to me it looks like it but I might be wrong)?
2) Sorting the file will result in “indexed’ row groups so it will be easier to locate the which row group to scan but isn’t it at the same time limiting parallelism? If the data is randomly placed in the row groups it will be searched with as many tasks as we have row groups, right (or at least more than 1)? Is there any common rule we can formulate or it is very data and/or query dependent?  
3) Would making a row group smaller (like by half) help? Currently I can see that the row groups are about the size of the hdfs block (256MB) but sometimes smaller or even bigger. 
We have no settings for the row group so I guess the default hdfs block size is used, right? 
Do you have any recommendation / experience with that?
  
Thanks a lot for your help,
Jakub



On 14 Jun 2018, at 12:07, Jakub Wozniak <[hidden email]> wrote:

Dear Ryan,

Thanks a lot for your answer.
After having sent the e-mail we have investigated a bit more the data itself. 
It happened that for certain days it was very skewed and one of the row groups had much more records that all others. 
This was somehow related to the fact that we have sorted it using our object ids and by chance those that went first were smaller (or compressed better). 
So the Parquet file had a 6 rows groups where the first one had 300k rows and others only 30k rows. 
The search for a given object fell into the first row group and lasted very long time. 
The data itself was very much compressed as it contained a lot of zeros. To give some numbers the 600MB parquet file expanded to 56GB in JSON. 

What we did is to sort the data not by object id but by the record timestamp which resulted in much more even data distribution among the row groups. 
This in fact helped a lot for the query time (using the timestamp & object id)

I have to say that I haven't fully understood this phenomenon yet as I’m not a Parquet format & reader expert (at least not yet). 
Maybe it is just a simple function of how many records Spark has to scan and the level of parallelism (searching for a given object id when sorted by time needs to scan all/more the groups for larger times). 
One question here - is Parquet reader reading & decoding the projection columns even if the predicate columns should filter the record out? 

Unfortunately we have to have those big columns in the query as people want to do analysis on them. 
 
We will continue to investigate… 

Cheers,
Jakub



On 12 Jun 2018, at 22:51, Ryan Blue <[hidden email]> wrote:

Jakub,

You're right that Spark currently doesn't use the vectorized read path for nested data, but I'm not sure that's the problem here. With 50k elements in the f1 array, it could easily be that you're getting the significant speed-up from not reading or materializing that column. The non-vectorized path is slower, but it is more likely that the problem is the data if it is that much slower.

I'd be happy to see vectorization for nested Parquet data move forward, but I think you might want to get an idea of how much it will help before you move forward with it. Can you use Impala to test whether vectorization would help here?

rb



On Mon, Jun 11, 2018 at 6:16 AM, Jakub Wozniak <[hidden email]> wrote:
Hello,

We have stumbled upon a quite degraded performance when reading a complex (struct, array) type columns stored in Parquet.
A Parquet file is of around 600MB (snappy) with ~400k rows with a field of a complex type { f1: array of ints, f2: array of ints } where f1 array length is 50k elements.
There are also other fields like entity_id: long, timestamp: long.

A simple query that selects rows using predicates entity_id = X and timestamp >= T1 and timestamp <= T2 plus ds.show() takes 17 minutes to execute.
If we remove the complex type columns from the query it is executed in a sub-second time.

Now when looking at the implementation of the Parquet datasource the Vectorized* classes are used only if the read types are primitives. In other case the code falls back to the parquet-mr default implementation.
In the VectorizedParquetRecordReader there is a TODO to handle complex types that "should be efficient & easy with codegen".

For our CERN Spark usage the current execution times are pretty much prohibitive as there is a lot of data stored as arrays / complex types…
The file of 600 MB represents 1 day of measurements and our data scientists would like to process sometimes months or even years of those. 

Could you please let me know if there is anybody currently working on it or maybe you have it in a roadmap for the future?
Or maybe you could give me some suggestions how to avoid / resolve this problem? I’m using Spark 2.2.1.

Best regards,
Jakub Wozniak




---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]




--
Ryan Blue
Software Engineer
Netflix





--
Ryan Blue
Software Engineer
Netflix