Custom datasource as a wrapper for existing ones?

classic Classic list List threaded Threaded
7 messages Options
Reply | Threaded
Open this post in threaded view
|

Custom datasource as a wrapper for existing ones?

jwozniak
Hello,

At CERN we are developing a Big Data system called NXCALS that uses Spark as
Extraction API.
We have implemented a custom datasource that was wrapping 2 existing ones
(parquet and Hbase) in order to hide the implementation details (location of
the parquet files, hbase tables, etc) and to provide an abstraction layer to
our users.
We have entered a stage where we execute some performance tests on our data
and we have noticed that this approach did not provide the expected
performance observed using pure Spark. In other words reading a parquet file
with some simple predicates behaves 15 times slower if the same code is
executed from within a custom datasource (that just uses Spark to read
parquet).
After some investigation we've learnt that Spark did not apply the same
optimisations for both.
We could see that in Spark 2.3.0 there was a new V2 version that abstracts
from SparkSession and focuses on low level Row API.
Could you give us some suggestions of how to correctly implement our
datasource using the V2 API?
Is this a correct way of doing it at all?

What we want to achieve is to join existing datasources with some level of
additional abstraction on top.
At the same time we want to profit from all catalyst & parquet optimisations
that exist for the original ones.
We also don't want to reimplement access to parquet files or Hbase at the
low level (like Row) but just profit from the Dataset API.
We could have achieved the same by providing an external library on top of
Spark but the datasource approach looked like a more elegant solution. Only
the performance is still far from the desired one.

Any help or direction in that matter would be greatly appreciated as we have
only started to build our Spark expertise yet.  

Best regards,
Jakub Wozniak
Software Engineer
CERN



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Custom datasource as a wrapper for existing ones?

Jörn Franke
Spark at some point in time used for the formats shipped with Spark (eg parquet) an internal API that is not the data source API. You can look on how this is implemented for Parquet and co in the Spark source code.

Maybe this is the issue you are facing?

Have you tried to put your encapsulation in a normal application instead of the data source to see if it shows the same performance issues?

> On 2. May 2018, at 18:49, jwozniak <[hidden email]> wrote:
>
> Hello,
>
> At CERN we are developing a Big Data system called NXCALS that uses Spark as
> Extraction API.
> We have implemented a custom datasource that was wrapping 2 existing ones
> (parquet and Hbase) in order to hide the implementation details (location of
> the parquet files, hbase tables, etc) and to provide an abstraction layer to
> our users.
> We have entered a stage where we execute some performance tests on our data
> and we have noticed that this approach did not provide the expected
> performance observed using pure Spark. In other words reading a parquet file
> with some simple predicates behaves 15 times slower if the same code is
> executed from within a custom datasource (that just uses Spark to read
> parquet).
> After some investigation we've learnt that Spark did not apply the same
> optimisations for both.
> We could see that in Spark 2.3.0 there was a new V2 version that abstracts
> from SparkSession and focuses on low level Row API.
> Could you give us some suggestions of how to correctly implement our
> datasource using the V2 API?
> Is this a correct way of doing it at all?
>
> What we want to achieve is to join existing datasources with some level of
> additional abstraction on top.
> At the same time we want to profit from all catalyst & parquet optimisations
> that exist for the original ones.
> We also don't want to reimplement access to parquet files or Hbase at the
> low level (like Row) but just profit from the Dataset API.
> We could have achieved the same by providing an external library on top of
> Spark but the datasource approach looked like a more elegant solution. Only
> the performance is still far from the desired one.
>
> Any help or direction in that matter would be greatly appreciated as we have
> only started to build our Spark expertise yet.  
>
> Best regards,
> Jakub Wozniak
> Software Engineer
> CERN
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Custom datasource as a wrapper for existing ones?

Jörn Franke
In reply to this post by jwozniak
Some note on the internal API - it used to change with each release which was quiet annoying because  other data sources (Avro, HadoopOffice etc) had to follow up in this. In the end it is an internal API and thus does not guarantee to be stable. If you want to have something stable you have to use the official data source APIs with some disadvantages.

> On 2. May 2018, at 18:49, jwozniak <[hidden email]> wrote:
>
> Hello,
>
> At CERN we are developing a Big Data system called NXCALS that uses Spark as
> Extraction API.
> We have implemented a custom datasource that was wrapping 2 existing ones
> (parquet and Hbase) in order to hide the implementation details (location of
> the parquet files, hbase tables, etc) and to provide an abstraction layer to
> our users.
> We have entered a stage where we execute some performance tests on our data
> and we have noticed that this approach did not provide the expected
> performance observed using pure Spark. In other words reading a parquet file
> with some simple predicates behaves 15 times slower if the same code is
> executed from within a custom datasource (that just uses Spark to read
> parquet).
> After some investigation we've learnt that Spark did not apply the same
> optimisations for both.
> We could see that in Spark 2.3.0 there was a new V2 version that abstracts
> from SparkSession and focuses on low level Row API.
> Could you give us some suggestions of how to correctly implement our
> datasource using the V2 API?
> Is this a correct way of doing it at all?
>
> What we want to achieve is to join existing datasources with some level of
> additional abstraction on top.
> At the same time we want to profit from all catalyst & parquet optimisations
> that exist for the original ones.
> We also don't want to reimplement access to parquet files or Hbase at the
> low level (like Row) but just profit from the Dataset API.
> We could have achieved the same by providing an external library on top of
> Spark but the datasource approach looked like a more elegant solution. Only
> the performance is still far from the desired one.
>
> Any help or direction in that matter would be greatly appreciated as we have
> only started to build our Spark expertise yet.  
>
> Best regards,
> Jakub Wozniak
> Software Engineer
> CERN
>
>
>
> --
> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: [hidden email]
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Custom datasource as a wrapper for existing ones?

jwozniak
Hello,

Thanks a lot for your answers.

We normally look for some stability so the use of internal APIs that are a subject to change with no warning are somewhat questionable.
As to the approach of putting this functionality on top of Spark instead of a datasource - this works but poses a problem for Python.
In Python we would like to reuse the code written in Java. An external lib in Java has to proxy to Python and Spark proxies as well.
This means passing over objects (like SparkSession) back and forth from one jvm to the other. Not surprisingly this did not work for us in the past (although we did not push much hoping for the datasource).
All in all if we don’t find another solution we might go for an external library that most likely have to be reimplemented twice in Python…
Or there might be a way to force our lib execution in the same JVM as Spark uses. To be seen… Again the most elegant way would be the datasource.

Cheers,
Jakub


> On 2 May 2018, at 21:07, Jörn Franke <[hidden email]> wrote:
>
> Some note on the internal API - it used to change with each release which was quiet annoying because  other data sources (Avro, HadoopOffice etc) had to follow up in this. In the end it is an internal API and thus does not guarantee to be stable. If you want to have something stable you have to use the official data source APIs with some disadvantages.
>
>> On 2. May 2018, at 18:49, jwozniak <[hidden email]> wrote:
>>
>> Hello,
>>
>> At CERN we are developing a Big Data system called NXCALS that uses Spark as
>> Extraction API.
>> We have implemented a custom datasource that was wrapping 2 existing ones
>> (parquet and Hbase) in order to hide the implementation details (location of
>> the parquet files, hbase tables, etc) and to provide an abstraction layer to
>> our users.
>> We have entered a stage where we execute some performance tests on our data
>> and we have noticed that this approach did not provide the expected
>> performance observed using pure Spark. In other words reading a parquet file
>> with some simple predicates behaves 15 times slower if the same code is
>> executed from within a custom datasource (that just uses Spark to read
>> parquet).
>> After some investigation we've learnt that Spark did not apply the same
>> optimisations for both.
>> We could see that in Spark 2.3.0 there was a new V2 version that abstracts
>> from SparkSession and focuses on low level Row API.
>> Could you give us some suggestions of how to correctly implement our
>> datasource using the V2 API?
>> Is this a correct way of doing it at all?
>>
>> What we want to achieve is to join existing datasources with some level of
>> additional abstraction on top.
>> At the same time we want to profit from all catalyst & parquet optimisations
>> that exist for the original ones.
>> We also don't want to reimplement access to parquet files or Hbase at the
>> low level (like Row) but just profit from the Dataset API.
>> We could have achieved the same by providing an external library on top of
>> Spark but the datasource approach looked like a more elegant solution. Only
>> the performance is still far from the desired one.
>>
>> Any help or direction in that matter would be greatly appreciated as we have
>> only started to build our Spark expertise yet.  
>>
>> Best regards,
>> Jakub Wozniak
>> Software Engineer
>> CERN
>>
>>
>>
>> --
>> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: [hidden email]
>>


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Custom datasource as a wrapper for existing ones?

cloud0fan
Hi Jakub,

Yea I think data source would be the most elegant way to solve your problem. Unfortunately in Spark 2.3 the only stable data source API is data source v1, which can't be used to implement high-performance data source. Data source v2 is still a preview version in Spark 2.3 and may change in the next release.

For now I'd suggest you take a look at `FileFormat`, which is the API for the Spark builtin file-based data source like parquet. It's an internal API but has not been changed for a long time. In the future, data source v2 would be the best solution.

Thanks,
Wenchen

On Thu, May 3, 2018 at 4:17 AM, Jakub Wozniak <[hidden email]> wrote:
Hello,

Thanks a lot for your answers.

We normally look for some stability so the use of internal APIs that are a subject to change with no warning are somewhat questionable.
As to the approach of putting this functionality on top of Spark instead of a datasource - this works but poses a problem for Python.
In Python we would like to reuse the code written in Java. An external lib in Java has to proxy to Python and Spark proxies as well.
This means passing over objects (like SparkSession) back and forth from one jvm to the other. Not surprisingly this did not work for us in the past (although we did not push much hoping for the datasource).
All in all if we don’t find another solution we might go for an external library that most likely have to be reimplemented twice in Python…
Or there might be a way to force our lib execution in the same JVM as Spark uses. To be seen… Again the most elegant way would be the datasource.

Cheers,
Jakub


> On 2 May 2018, at 21:07, Jörn Franke <[hidden email]> wrote:
>
> Some note on the internal API - it used to change with each release which was quiet annoying because  other data sources (Avro, HadoopOffice etc) had to follow up in this. In the end it is an internal API and thus does not guarantee to be stable. If you want to have something stable you have to use the official data source APIs with some disadvantages.
>
>> On 2. May 2018, at 18:49, jwozniak <[hidden email]> wrote:
>>
>> Hello,
>>
>> At CERN we are developing a Big Data system called NXCALS that uses Spark as
>> Extraction API.
>> We have implemented a custom datasource that was wrapping 2 existing ones
>> (parquet and Hbase) in order to hide the implementation details (location of
>> the parquet files, hbase tables, etc) and to provide an abstraction layer to
>> our users.
>> We have entered a stage where we execute some performance tests on our data
>> and we have noticed that this approach did not provide the expected
>> performance observed using pure Spark. In other words reading a parquet file
>> with some simple predicates behaves 15 times slower if the same code is
>> executed from within a custom datasource (that just uses Spark to read
>> parquet).
>> After some investigation we've learnt that Spark did not apply the same
>> optimisations for both.
>> We could see that in Spark 2.3.0 there was a new V2 version that abstracts
>> from SparkSession and focuses on low level Row API.
>> Could you give us some suggestions of how to correctly implement our
>> datasource using the V2 API?
>> Is this a correct way of doing it at all?
>>
>> What we want to achieve is to join existing datasources with some level of
>> additional abstraction on top.
>> At the same time we want to profit from all catalyst & parquet optimisations
>> that exist for the original ones.
>> We also don't want to reimplement access to parquet files or Hbase at the
>> low level (like Row) but just profit from the Dataset API.
>> We could have achieved the same by providing an external library on top of
>> Spark but the datasource approach looked like a more elegant solution. Only
>> the performance is still far from the desired one.
>>
>> Any help or direction in that matter would be greatly appreciated as we have
>> only started to build our Spark expertise yet. 
>>
>> Best regards,
>> Jakub Wozniak
>> Software Engineer
>> CERN
>>
>>
>>
>> --
>> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: [hidden email]
>>


Reply | Threaded
Open this post in threaded view
|

Re: Custom datasource as a wrapper for existing ones?

jwozniak
Hi Wenchen,

Thanks for your reply! We will have a look at the FileFormat. 

Actually looking at the V2 APIs I still don’t see how you can use the existing datasource (like Parquet + Hbase) and wrap it up in another one. 
Imagine you would like to load some files from parquet and load some tables from Hbase and present it together (like we do). 
First of all you don’t have access to any high level Spark APIs in the datasource anymore… So it is not easy now to use any existing ds inside another ds. 
Secondly even if you had still you have to translate a Dataset back into partitions/filters/expressions/Rows/etc to meet the V2 API requirements. 

Please correct me if I’m wrong. I might not see the full picture… 

Cheers,
Jakub

 



On 3 May 2018, at 16:46, Wenchen Fan <[hidden email]> wrote:

Hi Jakub,

Yea I think data source would be the most elegant way to solve your problem. Unfortunately in Spark 2.3 the only stable data source API is data source v1, which can't be used to implement high-performance data source. Data source v2 is still a preview version in Spark 2.3 and may change in the next release.

For now I'd suggest you take a look at `FileFormat`, which is the API for the Spark builtin file-based data source like parquet. It's an internal API but has not been changed for a long time. In the future, data source v2 would be the best solution.

Thanks,
Wenchen

On Thu, May 3, 2018 at 4:17 AM, Jakub Wozniak <[hidden email]> wrote:
Hello,

Thanks a lot for your answers.

We normally look for some stability so the use of internal APIs that are a subject to change with no warning are somewhat questionable.
As to the approach of putting this functionality on top of Spark instead of a datasource - this works but poses a problem for Python.
In Python we would like to reuse the code written in Java. An external lib in Java has to proxy to Python and Spark proxies as well.
This means passing over objects (like SparkSession) back and forth from one jvm to the other. Not surprisingly this did not work for us in the past (although we did not push much hoping for the datasource).
All in all if we don’t find another solution we might go for an external library that most likely have to be reimplemented twice in Python…
Or there might be a way to force our lib execution in the same JVM as Spark uses. To be seen… Again the most elegant way would be the datasource.

Cheers,
Jakub


> On 2 May 2018, at 21:07, Jörn Franke <[hidden email]> wrote:
>
> Some note on the internal API - it used to change with each release which was quiet annoying because  other data sources (Avro, HadoopOffice etc) had to follow up in this. In the end it is an internal API and thus does not guarantee to be stable. If you want to have something stable you have to use the official data source APIs with some disadvantages.
>
>> On 2. May 2018, at 18:49, jwozniak <[hidden email]> wrote:
>>
>> Hello,
>>
>> At CERN we are developing a Big Data system called NXCALS that uses Spark as
>> Extraction API.
>> We have implemented a custom datasource that was wrapping 2 existing ones
>> (parquet and Hbase) in order to hide the implementation details (location of
>> the parquet files, hbase tables, etc) and to provide an abstraction layer to
>> our users.
>> We have entered a stage where we execute some performance tests on our data
>> and we have noticed that this approach did not provide the expected
>> performance observed using pure Spark. In other words reading a parquet file
>> with some simple predicates behaves 15 times slower if the same code is
>> executed from within a custom datasource (that just uses Spark to read
>> parquet).
>> After some investigation we've learnt that Spark did not apply the same
>> optimisations for both.
>> We could see that in Spark 2.3.0 there was a new V2 version that abstracts
>> from SparkSession and focuses on low level Row API.
>> Could you give us some suggestions of how to correctly implement our
>> datasource using the V2 API?
>> Is this a correct way of doing it at all?
>>
>> What we want to achieve is to join existing datasources with some level of
>> additional abstraction on top.
>> At the same time we want to profit from all catalyst & parquet optimisations
>> that exist for the original ones.
>> We also don't want to reimplement access to parquet files or Hbase at the
>> low level (like Row) but just profit from the Dataset API.
>> We could have achieved the same by providing an external library on top of
>> Spark but the datasource approach looked like a more elegant solution. Only
>> the performance is still far from the desired one.
>>
>> Any help or direction in that matter would be greatly appreciated as we have
>> only started to build our Spark expertise yet. 
>>
>> Best regards,
>> Jakub Wozniak
>> Software Engineer
>> CERN
>>
>>
>>
>> --
>> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: [hidden email]
>>



Reply | Threaded
Open this post in threaded view
|

Re: Custom datasource as a wrapper for existing ones?

Jörn Franke
In reply to this post by cloud0fan
It changed from 2.0 to 2.1 to 2.2 ...
Not much but still changed. I somehow agree that this is still manageable 

On 3. May 2018, at 16:46, Wenchen Fan <[hidden email]> wrote:

Hi Jakub,

Yea I think data source would be the most elegant way to solve your problem. Unfortunately in Spark 2.3 the only stable data source API is data source v1, which can't be used to implement high-performance data source. Data source v2 is still a preview version in Spark 2.3 and may change in the next release.

For now I'd suggest you take a look at `FileFormat`, which is the API for the Spark builtin file-based data source like parquet. It's an internal API but has not been changed for a long time. In the future, data source v2 would be the best solution.

Thanks,
Wenchen

On Thu, May 3, 2018 at 4:17 AM, Jakub Wozniak <[hidden email]> wrote:
Hello,

Thanks a lot for your answers.

We normally look for some stability so the use of internal APIs that are a subject to change with no warning are somewhat questionable.
As to the approach of putting this functionality on top of Spark instead of a datasource - this works but poses a problem for Python.
In Python we would like to reuse the code written in Java. An external lib in Java has to proxy to Python and Spark proxies as well.
This means passing over objects (like SparkSession) back and forth from one jvm to the other. Not surprisingly this did not work for us in the past (although we did not push much hoping for the datasource).
All in all if we don’t find another solution we might go for an external library that most likely have to be reimplemented twice in Python…
Or there might be a way to force our lib execution in the same JVM as Spark uses. To be seen… Again the most elegant way would be the datasource.

Cheers,
Jakub


> On 2 May 2018, at 21:07, Jörn Franke <[hidden email]> wrote:
>
> Some note on the internal API - it used to change with each release which was quiet annoying because  other data sources (Avro, HadoopOffice etc) had to follow up in this. In the end it is an internal API and thus does not guarantee to be stable. If you want to have something stable you have to use the official data source APIs with some disadvantages.
>
>> On 2. May 2018, at 18:49, jwozniak <[hidden email]> wrote:
>>
>> Hello,
>>
>> At CERN we are developing a Big Data system called NXCALS that uses Spark as
>> Extraction API.
>> We have implemented a custom datasource that was wrapping 2 existing ones
>> (parquet and Hbase) in order to hide the implementation details (location of
>> the parquet files, hbase tables, etc) and to provide an abstraction layer to
>> our users.
>> We have entered a stage where we execute some performance tests on our data
>> and we have noticed that this approach did not provide the expected
>> performance observed using pure Spark. In other words reading a parquet file
>> with some simple predicates behaves 15 times slower if the same code is
>> executed from within a custom datasource (that just uses Spark to read
>> parquet).
>> After some investigation we've learnt that Spark did not apply the same
>> optimisations for both.
>> We could see that in Spark 2.3.0 there was a new V2 version that abstracts
>> from SparkSession and focuses on low level Row API.
>> Could you give us some suggestions of how to correctly implement our
>> datasource using the V2 API?
>> Is this a correct way of doing it at all?
>>
>> What we want to achieve is to join existing datasources with some level of
>> additional abstraction on top.
>> At the same time we want to profit from all catalyst & parquet optimisations
>> that exist for the original ones.
>> We also don't want to reimplement access to parquet files or Hbase at the
>> low level (like Row) but just profit from the Dataset API.
>> We could have achieved the same by providing an external library on top of
>> Spark but the datasource approach looked like a more elegant solution. Only
>> the performance is still far from the desired one.
>>
>> Any help or direction in that matter would be greatly appreciated as we have
>> only started to build our Spark expertise yet. 
>>
>> Best regards,
>> Jakub Wozniak
>> Software Engineer
>> CERN
>>
>>
>>
>> --
>> Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/
>>
>> ---------------------------------------------------------------------
>> To unsubscribe e-mail: [hidden email]
>>