[DISCUSS] Spark SQL internal data: InternalRow or UnsafeRow?

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

[DISCUSS] Spark SQL internal data: InternalRow or UnsafeRow?

Ryan Blue

While moving the new data source API to InternalRow, I noticed a few odd things:

  • Spark scans always produce UnsafeRow, but that data is passed around as InternalRow with explicit casts.
  • Operators expect InternalRow and nearly all codegen works with InternalRow (I’ve tested this with quite a few queries.)
  • Spark uses unchecked casts from InternalRow to UnsafeRow in places, assuming that data will be unsafe, even though that isn’t what the type system guarantees.

To me, it looks like the idea was to code SQL operators to the abstract InternalRow so we can swap out the implementation, but ended up with a general assumption that rows will always be unsafe. This is the worst of both options: we can’t actually rely on everything working with InternalRow but code must still use it, until it is inconvenient and an unchecked cast gets inserted.

The main question I want to answer is this: what data format should SQL use internally? What was the intent when building catalyst?

The v2 data source API depends on the answer, but I also found that this introduces a significant performance penalty in Parquet (and probably other formats). A quick check on one of our tables showed a 6% performance hit caused by unnecessary copies from InternalRow to UnsafeRow. So if we can guarantee that all operators should support InternalRow, then there is an easy performance win that also simplifies the v2 data source API.

rb

--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Spark SQL internal data: InternalRow or UnsafeRow?

rxin
What the internal operators do are strictly internal. To take one step back, is the goal to design an API so the consumers of the API can directly produces what Spark expects internally, to cut down perf cost?


On Tue, May 8, 2018 at 1:22 PM Ryan Blue <[hidden email]> wrote:

While moving the new data source API to InternalRow, I noticed a few odd things:

  • Spark scans always produce UnsafeRow, but that data is passed around as InternalRow with explicit casts.
  • Operators expect InternalRow and nearly all codegen works with InternalRow (I’ve tested this with quite a few queries.)
  • Spark uses unchecked casts from InternalRow to UnsafeRow in places, assuming that data will be unsafe, even though that isn’t what the type system guarantees.

To me, it looks like the idea was to code SQL operators to the abstract InternalRow so we can swap out the implementation, but ended up with a general assumption that rows will always be unsafe. This is the worst of both options: we can’t actually rely on everything working with InternalRow but code must still use it, until it is inconvenient and an unchecked cast gets inserted.

The main question I want to answer is this: what data format should SQL use internally? What was the intent when building catalyst?

The v2 data source API depends on the answer, but I also found that this introduces a significant performance penalty in Parquet (and probably other formats). A quick check on one of our tables showed a 6% performance hit caused by unnecessary copies from InternalRow to UnsafeRow. So if we can guarantee that all operators should support InternalRow, then there is an easy performance win that also simplifies the v2 data source API.

rb

--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Spark SQL internal data: InternalRow or UnsafeRow?

Ryan Blue

Is the goal to design an API so the consumers of the API can directly produces what Spark expects internally, to cut down perf cost?

No. That has already been done. The problem on the API side is that it makes little sense to force implementers to create UnsafeRow when it almost certainly causes them to simply use UnsafeProjection and copy it. If we’re just making a copy and we can defer that copy to get better performance, why would we make implementations handle it? Instead, I think we should accept InternalRow from v2 data sources and copy to unsafe when it makes sense to do so: after filters are run and only if there isn’t another projection that will do it already.

But I don’t want to focus on the v2 API for this. What I’m asking in this thread is what the intent is for the SQL engine. Is this an accident that nearly everything works with InternalRow? If we were to make a choice here, should we mandate that rows passed into the SQL engine must be UnsafeRow?

Personally, I think it makes sense to say that everything should accept InternalRow, but produce UnsafeRow, with the understanding that UnsafeRow will usually perform better.

rb


On Tue, May 8, 2018 at 4:09 PM, Reynold Xin <[hidden email]> wrote:
What the internal operators do are strictly internal. To take one step back, is the goal to design an API so the consumers of the API can directly produces what Spark expects internally, to cut down perf cost?


On Tue, May 8, 2018 at 1:22 PM Ryan Blue <[hidden email]> wrote:

While moving the new data source API to InternalRow, I noticed a few odd things:

  • Spark scans always produce UnsafeRow, but that data is passed around as InternalRow with explicit casts.
  • Operators expect InternalRow and nearly all codegen works with InternalRow (I’ve tested this with quite a few queries.)
  • Spark uses unchecked casts from InternalRow to UnsafeRow in places, assuming that data will be unsafe, even though that isn’t what the type system guarantees.

To me, it looks like the idea was to code SQL operators to the abstract InternalRow so we can swap out the implementation, but ended up with a general assumption that rows will always be unsafe. This is the worst of both options: we can’t actually rely on everything working with InternalRow but code must still use it, until it is inconvenient and an unchecked cast gets inserted.

The main question I want to answer is this: what data format should SQL use internally? What was the intent when building catalyst?

The v2 data source API depends on the answer, but I also found that this introduces a significant performance penalty in Parquet (and probably other formats). A quick check on one of our tables showed a 6% performance hit caused by unnecessary copies from InternalRow to UnsafeRow. So if we can guarantee that all operators should support InternalRow, then there is an easy performance win that also simplifies the v2 data source API.

rb

--
Ryan Blue
Software Engineer
Netflix



--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Spark SQL internal data: InternalRow or UnsafeRow?

rxin
IIRC we switched all internals to UnsafeRow for simplicity. It is easier to serialize UnsafeRows, compute hash codes, etc. At some point we had a bug with unioning two plans producing different types of rows, so we forced the conversion at input.

Can't your "wish" be satisfied by having the public API producing the internals of UnsafeRow (without actually exposing UnsafeRow)?


On Tue, May 8, 2018 at 4:16 PM Ryan Blue <[hidden email]> wrote:

Is the goal to design an API so the consumers of the API can directly produces what Spark expects internally, to cut down perf cost?

No. That has already been done. The problem on the API side is that it makes little sense to force implementers to create UnsafeRow when it almost certainly causes them to simply use UnsafeProjection and copy it. If we’re just making a copy and we can defer that copy to get better performance, why would we make implementations handle it? Instead, I think we should accept InternalRow from v2 data sources and copy to unsafe when it makes sense to do so: after filters are run and only if there isn’t another projection that will do it already.

But I don’t want to focus on the v2 API for this. What I’m asking in this thread is what the intent is for the SQL engine. Is this an accident that nearly everything works with InternalRow? If we were to make a choice here, should we mandate that rows passed into the SQL engine must be UnsafeRow?

Personally, I think it makes sense to say that everything should accept InternalRow, but produce UnsafeRow, with the understanding that UnsafeRow will usually perform better.

rb


On Tue, May 8, 2018 at 4:09 PM, Reynold Xin <[hidden email]> wrote:
What the internal operators do are strictly internal. To take one step back, is the goal to design an API so the consumers of the API can directly produces what Spark expects internally, to cut down perf cost?


On Tue, May 8, 2018 at 1:22 PM Ryan Blue <[hidden email]> wrote:

While moving the new data source API to InternalRow, I noticed a few odd things:

  • Spark scans always produce UnsafeRow, but that data is passed around as InternalRow with explicit casts.
  • Operators expect InternalRow and nearly all codegen works with InternalRow (I’ve tested this with quite a few queries.)
  • Spark uses unchecked casts from InternalRow to UnsafeRow in places, assuming that data will be unsafe, even though that isn’t what the type system guarantees.

To me, it looks like the idea was to code SQL operators to the abstract InternalRow so we can swap out the implementation, but ended up with a general assumption that rows will always be unsafe. This is the worst of both options: we can’t actually rely on everything working with InternalRow but code must still use it, until it is inconvenient and an unchecked cast gets inserted.

The main question I want to answer is this: what data format should SQL use internally? What was the intent when building catalyst?

The v2 data source API depends on the answer, but I also found that this introduces a significant performance penalty in Parquet (and probably other formats). A quick check on one of our tables showed a 6% performance hit caused by unnecessary copies from InternalRow to UnsafeRow. So if we can guarantee that all operators should support InternalRow, then there is an easy performance win that also simplifies the v2 data source API.

rb

--
Ryan Blue
Software Engineer
Netflix



--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: [DISCUSS] Spark SQL internal data: InternalRow or UnsafeRow?

Ryan Blue

Because the InternalRow representation is already in the v2 public API, we’re already working on cleaning up classes and documenting the internal representation. The JIRA issue is SPARK-23657 and my PR is #21242.

The confusion I want to clear up is what “correct” is. Should we immediately translate to UnsafeRow before passing data to other operators, or should we (eventually) fix the operators that don’t support InternalRow? If the answer is that it is okay to pass InternalRow, then we can get a few performance gains by avoiding copies in data sources (including v1 sources). Parquet copies every partitioned row twice to ensure it is UnsafeRow, even though this is the most expensive place to do it if there are filters or another projection on top of the scan operator.

For the v2 API, I think the right thing to do is accept InternalRow and handle conversion to unsafe in Spark. That way, we can defer conversion until after filters have run and only add a conversion if there isn’t already a projection that will do it.

Here’s the thread on github about this for background: https://github.com/apache/spark/pull/21118#issuecomment-386647848

rb


On Tue, May 8, 2018 at 4:27 PM, Reynold Xin <[hidden email]> wrote:
IIRC we switched all internals to UnsafeRow for simplicity. It is easier to serialize UnsafeRows, compute hash codes, etc. At some point we had a bug with unioning two plans producing different types of rows, so we forced the conversion at input.

Can't your "wish" be satisfied by having the public API producing the internals of UnsafeRow (without actually exposing UnsafeRow)?


On Tue, May 8, 2018 at 4:16 PM Ryan Blue <[hidden email]> wrote:

Is the goal to design an API so the consumers of the API can directly produces what Spark expects internally, to cut down perf cost?

No. That has already been done. The problem on the API side is that it makes little sense to force implementers to create UnsafeRow when it almost certainly causes them to simply use UnsafeProjection and copy it. If we’re just making a copy and we can defer that copy to get better performance, why would we make implementations handle it? Instead, I think we should accept InternalRow from v2 data sources and copy to unsafe when it makes sense to do so: after filters are run and only if there isn’t another projection that will do it already.

But I don’t want to focus on the v2 API for this. What I’m asking in this thread is what the intent is for the SQL engine. Is this an accident that nearly everything works with InternalRow? If we were to make a choice here, should we mandate that rows passed into the SQL engine must be UnsafeRow?

Personally, I think it makes sense to say that everything should accept InternalRow, but produce UnsafeRow, with the understanding that UnsafeRow will usually perform better.

rb


On Tue, May 8, 2018 at 4:09 PM, Reynold Xin <[hidden email]> wrote:
What the internal operators do are strictly internal. To take one step back, is the goal to design an API so the consumers of the API can directly produces what Spark expects internally, to cut down perf cost?


On Tue, May 8, 2018 at 1:22 PM Ryan Blue <[hidden email]> wrote:

While moving the new data source API to InternalRow, I noticed a few odd things:

  • Spark scans always produce UnsafeRow, but that data is passed around as InternalRow with explicit casts.
  • Operators expect InternalRow and nearly all codegen works with InternalRow (I’ve tested this with quite a few queries.)
  • Spark uses unchecked casts from InternalRow to UnsafeRow in places, assuming that data will be unsafe, even though that isn’t what the type system guarantees.

To me, it looks like the idea was to code SQL operators to the abstract InternalRow so we can swap out the implementation, but ended up with a general assumption that rows will always be unsafe. This is the worst of both options: we can’t actually rely on everything working with InternalRow but code must still use it, until it is inconvenient and an unchecked cast gets inserted.

The main question I want to answer is this: what data format should SQL use internally? What was the intent when building catalyst?

The v2 data source API depends on the answer, but I also found that this introduces a significant performance penalty in Parquet (and probably other formats). A quick check on one of our tables showed a 6% performance hit caused by unnecessary copies from InternalRow to UnsafeRow. So if we can guarantee that all operators should support InternalRow, then there is an easy performance win that also simplifies the v2 data source API.

rb

--
Ryan Blue
Software Engineer
Netflix



--
Ryan Blue
Software Engineer
Netflix



--
Ryan Blue
Software Engineer
Netflix