Data source V2 in spark 2.4.0

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

Data source V2 in spark 2.4.0

assaf.mendelson
Hi all,
I understood from previous threads that the Data source V2 API will see some
changes in spark 2.4.0, however, I can't seem to find what these changes
are.

Is there some documentation which summarizes the changes?

The only mention I seem to find is this pull request:
https://github.com/apache/spark/pull/22009. Is this all of it?

Thanks,
    Assaf.



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Data source V2 in spark 2.4.0

Ryan Blue

Hi Assaf,
The major changes to the V2 API that you linked to aren’t going into 2.4. Those will be in the next release because they weren’t finished in time for 2.4.

Here are the major updates that will be in 2.4:

  • SPARK-23323: The output commit coordinator is used by default to ensure only one attempt of each task commits.
  • SPARK-23325 and SPARK-24971: Readers should always produce InternalRow instead of Row or UnsafeRow; see SPARK-23325 for detail.
  • SPARK-24990: ReadSupportWithSchema was removed, the user-supplied schema option was added to ReadSupport.
  • SPARK-24073: Read splits are now called InputPartition and a few methods were also renamed for clarity.
  • SPARK-25127: SupportsPushDownCatalystFilters was removed because it leaked Expression in the public API. V2 always uses the Filter API now.
  • SPARK-24478: Push down is now done when converting the a physical plan.

I think there are also quite a few updates for the streaming side, but I’m not as familiar with those so I’ll let someone else jump in with a summary.

rb


On Mon, Oct 1, 2018 at 9:51 AM assaf.mendelson <[hidden email]> wrote:
Hi all,
I understood from previous threads that the Data source V2 API will see some
changes in spark 2.4.0, however, I can't seem to find what these changes
are.

Is there some documentation which summarizes the changes?

The only mention I seem to find is this pull request:
https://github.com/apache/spark/pull/22009. Is this all of it?

Thanks,
    Assaf.



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: Data source V2 in spark 2.4.0

cloud0fan
Ryan thanks for putting up a list!

Generally there are a few tunning to the data source v2 API in 2.4, and it shouldn't be too hard if you already have a data source v2 implementation and you want to upgrade to Spark 2.4.

However, we do want to do some big API changes for data source v2 in the next release, e.g.
SPARK-25390: data source V2 API refactoring
SPARK-25531: new write APIs for data source v2
SPARK-24252: Add catalog support
 

On Tue, Oct 2, 2018 at 1:11 AM Ryan Blue <[hidden email]> wrote:

Hi Assaf,
The major changes to the V2 API that you linked to aren’t going into 2.4. Those will be in the next release because they weren’t finished in time for 2.4.

Here are the major updates that will be in 2.4:

  • SPARK-23323: The output commit coordinator is used by default to ensure only one attempt of each task commits.
  • SPARK-23325 and SPARK-24971: Readers should always produce InternalRow instead of Row or UnsafeRow; see SPARK-23325 for detail.
  • SPARK-24990: ReadSupportWithSchema was removed, the user-supplied schema option was added to ReadSupport.
  • SPARK-24073: Read splits are now called InputPartition and a few methods were also renamed for clarity.
  • SPARK-25127: SupportsPushDownCatalystFilters was removed because it leaked Expression in the public API. V2 always uses the Filter API now.
  • SPARK-24478: Push down is now done when converting the a physical plan.

I think there are also quite a few updates for the streaming side, but I’m not as familiar with those so I’ll let someone else jump in with a summary.

rb


On Mon, Oct 1, 2018 at 9:51 AM assaf.mendelson <[hidden email]> wrote:
Hi all,
I understood from previous threads that the Data source V2 API will see some
changes in spark 2.4.0, however, I can't seem to find what these changes
are.

Is there some documentation which summarizes the changes?

The only mention I seem to find is this pull request:
https://github.com/apache/spark/pull/22009. Is this all of it?

Thanks,
    Assaf.



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



--
Ryan Blue
Software Engineer
Netflix
Reply | Threaded
Open this post in threaded view
|

Re: Data source V2 in spark 2.4.0

assaf.mendelson
Thanks for the info.

I have been converting an internal data source to V2 and am now preparing it
for 2.4.0.

I have a couple of suggestions from my experience so far.

First I believe we are missing documentation on this. I am currently writing
an internal tutorial based on what I am learning, I would be happy to share
it once it gets a little better (not sure where it should go though).



The change from using Row to using InternalRow is a little confusing.
For generic row we can do Row.fromSeq(values) where values are regular java
types (matching the schema). This even includes more complex types like
Array[String] and everything just works.

For IntrenalRow, this doesn't work for non trivial types. I figured out how
to convert strings and timestamps (hopefully I even did it correctly)  but I
couldn't figure Array[String].

Beyond the fact that I would love to learn how to do the conversion
correctly for various types (such as array), I would suggest we should add
some method to create the internal row from base types. In the 2.3.0
version, the row we got from Get would be encoded via an encoder which was
provided. I managed to get it to work by doing:

val encoder = RowEncoder.apply(schema).resolveAndBind() in the constructor
and then encoder.toRow(Row.fromSeq(values))

this simply feels a little weird to me.


Another issue that I encountered is handling bad data. In our legacy source
we have cases where a specific row is bad. What we would do in non spark
code is simply skip it.

The problem is that in spark, if we put next to be true we must have some
row for the get function. This means we always need to read records ahead to
figure out if we actually ha something or not.

Might we instead be allowed to return null from get in which case the line
would just be skipped?


Lastly I would be happy for a means to return metrics from the reading (how
many records we read, how many bad records we have). Perhaps by allowing to
use accumulators in the data source?

Sorry for the long winded message, I will probably have more as I continue
to explore this.

Thanks,
   Assaf.





--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Data source V2 in spark 2.4.0

Ryan Blue
Assaf, thanks for the feedback.

The InternalRow issue is one we know about. If it helps, I wrote up some docs for InternalRow as part of SPARK-23657. It may be a good idea to make it easier for people to produce InternalRow, but we want to be careful not to mislead people down paths that have bad performance. InternalRow is what Spark will use directly for filters and we don't want to do too much conversion. We shouldn't make it deceptively easy to work with Row instead of InternalRow because Row is going to be slower.

For bad rows, I would suggest using a filtered iterator to solve your problem. How to handle invalid rows isn't really a concern Spark should handle. Using a filtered iterator would give you the hasNext/next methods you're looking for to implement Spark's API.

Metrics are something that we still need to add. I think that Spark should handle record count and FS bytes read metrics like it does for other sources (I've been meaning to contribute an implementation for DSv2). Bad records may be a good candidate for requesting accumulators in the v2 API.

rb

On Thu, Oct 4, 2018 at 11:32 AM assaf.mendelson <[hidden email]> wrote:
Thanks for the info.

I have been converting an internal data source to V2 and am now preparing it
for 2.4.0.

I have a couple of suggestions from my experience so far.

First I believe we are missing documentation on this. I am currently writing
an internal tutorial based on what I am learning, I would be happy to share
it once it gets a little better (not sure where it should go though).



The change from using Row to using InternalRow is a little confusing.
For generic row we can do Row.fromSeq(values) where values are regular java
types (matching the schema). This even includes more complex types like
Array[String] and everything just works.

For IntrenalRow, this doesn't work for non trivial types. I figured out how
to convert strings and timestamps (hopefully I even did it correctly)  but I
couldn't figure Array[String].

Beyond the fact that I would love to learn how to do the conversion
correctly for various types (such as array), I would suggest we should add
some method to create the internal row from base types. In the 2.3.0
version, the row we got from Get would be encoded via an encoder which was
provided. I managed to get it to work by doing:

val encoder = RowEncoder.apply(schema).resolveAndBind() in the constructor
and then encoder.toRow(Row.fromSeq(values))

this simply feels a little weird to me.


Another issue that I encountered is handling bad data. In our legacy source
we have cases where a specific row is bad. What we would do in non spark
code is simply skip it.

The problem is that in spark, if we put next to be true we must have some
row for the get function. This means we always need to read records ahead to
figure out if we actually ha something or not.

Might we instead be allowed to return null from get in which case the line
would just be skipped?


Lastly I would be happy for a means to return metrics from the reading (how
many records we read, how many bad records we have). Perhaps by allowing to
use accumulators in the data source?

Sorry for the long winded message, I will probably have more as I continue
to explore this.

Thanks,
   Assaf.





--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]



--
Ryan Blue
Software Engineer
Netflix