[DISCUSS][SPIP][SPARK-29031] Materialized columns

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS][SPIP][SPARK-29031] Materialized columns

Jason Guo
Hi,

I'd like to propose a feature name materialized column. This feature will boost queries on complex type columns. 


Background
In data warehouse domain, there is a common requirement to add new fields to existing tables. In practice, data engineers usually use complex type, such as Map (or they may use JSON), and put all subfields into it.
However, it may impact the query performance dramatically because
  1. It is a waste of IO. The whole column (in Map format) should be read and Spark extract the required keys from the map, even though the query requires only one or a few keys in the map
  2. Vectorized read can not be exploit. Currently, vectorized read can be enabled only when all required columns are in atomic type. When a query read subfield in a complex type column, vectorized read can not be exploit
  3. Filter pushdown can not be utilized. Only when all required fields are in atomic type can filter pushdown be enabled
  4. CPU is wasted because of duplicated computation.  When JSON is selected to store all keys, JSON happens each time we query a subfield in it. However, JSON parse is a CPU intensive operation, especially when the JSON string is very long

Goal
  • Add a new SQL grammar of Materialized column
  • Implicitly rewrite SQL queries on the complex type of columns if there is a materialized columns for it
  • If the data type of the materialized columns is atomic type, even though the origin column type is in complex type, enable vectorized read and filter pushdown to improve performance

Usage
#1 Add materialized columns to an existing table
Step 1: Create a normal table
CREATE TABLE x (
    name STRING,
    age INT,
    params STRING,
    event MAP<STRING, STRING>
) USING parquet;

Step 2: Add materialized columns to an existing table
ALTER TABLE x ADD COLUMNS (
    new_age INT MATERIALIZED age + 1,
    city STRING MATERIALIZED get_json_object(params, '$.city'),
    label STRING MATERIALIZED event['label']
);

#2 Create a new table with materialized table
CREATE TABLE x (
    name STRING,
    age INT,
    params STRING,
    event MAP<STRING, STRING>,
    new_age INT MATERIALIZED age + 1,
    city STRING MATERIALIZED get_json_object(params, '$.city'),
    label STRING MATERIALIZED event['label']
) USING parquet;
 

When issue a query on complex type column as below
SELECT name, age+1, get_json_object(params, '$.city'), event['label']
FROM x
WHERE event['label']='newuser';

It is equivalent to
SELECT name, new_age, city, label
FROM x
WHERE label = 'newuser'

The query performance improved dramatically because
  1. The new query (after rewritten) will read the new column city (in string type) instead of read the whole map of params(in map string). Much lesser data are need to read
  2. Vectorized read can be utilized in the new query and can not be used in the old one. Because vectorized read can only be enabled when all required columns are in atomic type
  3. Filter can be pushdown. Only filters on atomic column can be pushdown. The original filter  event['label'] = 'newuser' is on complex column, so it can not be pushdown.
  4. The new query do not need to parse JSON any more. JSON parse is a CPU intensive operation which will impact performance dramatically

--


Thanks & Best Regards,
Jason Guo
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS][SPIP][SPARK-29031] Materialized columns

cloud0fan
> 1. It is a waste of IO. The whole column (in Map format) should be read and Spark extract the required keys from the map, even though the query requires only one or a few keys in the map

This sounds like a similar use case to nested column pruning. We should push down the map key extracting to the data source.

> 2. Vectorized read can not be exploit. Currently, vectorized read can be enabled only when all required columns are in atomic type. When a query read subfield in a complex type column, vectorized read can not be exploit.

I think this is just a current limitation. Technically we can read complex types with the vectorized reader.

> 3, Filter pushdown can not be utilized. Only when all required fields are in atomic type can filter pushdown be enabled

This is not true anymore with the new nested column pruning feature. This only works for parquet right now but we plan to support it in general data sources.

> 4. CPU is wasted because of duplicated computation.  When JSON is selected to store all keys, JSON happens each time we query a subfield in it. However, JSON parse is a CPU intensive operation, especially when the JSON string is very long.

Similar to 1, this problem goes away if we can push down map key extracting/array element extracting to the data source.


I agree with the problems you pointed out here, but I don't think materialized columns is the right solution. I think we should improve the data source API to allow the data source to fix these problems themselves.

Thanks,
Wenchen



On Tue, Sep 10, 2019 at 5:47 PM Jason Guo <[hidden email]> wrote:
Hi,

I'd like to propose a feature name materialized column. This feature will boost queries on complex type columns. 


Background
In data warehouse domain, there is a common requirement to add new fields to existing tables. In practice, data engineers usually use complex type, such as Map (or they may use JSON), and put all subfields into it.
However, it may impact the query performance dramatically because
  1. It is a waste of IO. The whole column (in Map format) should be read and Spark extract the required keys from the map, even though the query requires only one or a few keys in the map
  2. Vectorized read can not be exploit. Currently, vectorized read can be enabled only when all required columns are in atomic type. When a query read subfield in a complex type column, vectorized read can not be exploit
  3. Filter pushdown can not be utilized. Only when all required fields are in atomic type can filter pushdown be enabled
  4. CPU is wasted because of duplicated computation.  When JSON is selected to store all keys, JSON happens each time we query a subfield in it. However, JSON parse is a CPU intensive operation, especially when the JSON string is very long

Goal
  • Add a new SQL grammar of Materialized column
  • Implicitly rewrite SQL queries on the complex type of columns if there is a materialized columns for it
  • If the data type of the materialized columns is atomic type, even though the origin column type is in complex type, enable vectorized read and filter pushdown to improve performance

Usage
#1 Add materialized columns to an existing table
Step 1: Create a normal table
CREATE TABLE x (
    name STRING,
    age INT,
    params STRING,
    event MAP<STRING, STRING>
) USING parquet;

Step 2: Add materialized columns to an existing table
ALTER TABLE x ADD COLUMNS (
    new_age INT MATERIALIZED age + 1,
    city STRING MATERIALIZED get_json_object(params, '$.city'),
    label STRING MATERIALIZED event['label']
);

#2 Create a new table with materialized table
CREATE TABLE x (
    name STRING,
    age INT,
    params STRING,
    event MAP<STRING, STRING>,
    new_age INT MATERIALIZED age + 1,
    city STRING MATERIALIZED get_json_object(params, '$.city'),
    label STRING MATERIALIZED event['label']
) USING parquet;
 

When issue a query on complex type column as below
SELECT name, age+1, get_json_object(params, '$.city'), event['label']
FROM x
WHERE event['label']='newuser';

It is equivalent to
SELECT name, new_age, city, label
FROM x
WHERE label = 'newuser'

The query performance improved dramatically because
  1. The new query (after rewritten) will read the new column city (in string type) instead of read the whole map of params(in map string). Much lesser data are need to read
  2. Vectorized read can be utilized in the new query and can not be used in the old one. Because vectorized read can only be enabled when all required columns are in atomic type
  3. Filter can be pushdown. Only filters on atomic column can be pushdown. The original filter  event['label'] = 'newuser' is on complex column, so it can not be pushdown.
  4. The new query do not need to parse JSON any more. JSON parse is a CPU intensive operation which will impact performance dramatically

--


Thanks & Best Regards,
Jason Guo