Custom datasource: when acquire and release a lock?

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Custom datasource: when acquire and release a lock?

Abhishek Somani
Hi experts,

I am trying to create a custom Spark Datasource(v1) to read from a transactional data endpoint, and I need to acquire a lock with the endpoint before fetching data and release the lock after reading. Note that the lock acquisition and release needs to happen in the Driver JVM.

I have created a custom RDD for this purpose, and tried acquiring the lock in MyRDD.getPartitions(), and releasing the lock at the end of the job by registering a QueryExecutionListener. 

Now as I have learnt, this is not the right approach as the RDD can get reused on further actions WITHOUT calling getPartitions() again(as the partitions of an RDD get cached). For example, if someone calls Dataset.collect() twice, the first time MyRDD.getPartitions() will get invoked, I will acquire a lock and release the lock at the end. However the second time collect() is called, getPartitions will NOT be called again as the RDD would be reused and the partitions would have gotten cached in the RDD. 

Can someone advice me on where would be the right places to acquire and release a lock with my data endpoint in this scenario.

Thanks a lot,
Abhishek Somani
Reply | Threaded
Open this post in threaded view
|

Re: Custom datasource: when acquire and release a lock?

Abhishek Somani
Hi experts,

I'll be very grateful if someone could help.

Thanks,
Abhishek

On Fri, May 24, 2019 at 7:06 PM Abhishek Somani <[hidden email]> wrote:
Hi experts,

I am trying to create a custom Spark Datasource(v1) to read from a transactional data endpoint, and I need to acquire a lock with the endpoint before fetching data and release the lock after reading. Note that the lock acquisition and release needs to happen in the Driver JVM.

I have created a custom RDD for this purpose, and tried acquiring the lock in MyRDD.getPartitions(), and releasing the lock at the end of the job by registering a QueryExecutionListener. 

Now as I have learnt, this is not the right approach as the RDD can get reused on further actions WITHOUT calling getPartitions() again(as the partitions of an RDD get cached). For example, if someone calls Dataset.collect() twice, the first time MyRDD.getPartitions() will get invoked, I will acquire a lock and release the lock at the end. However the second time collect() is called, getPartitions will NOT be called again as the RDD would be reused and the partitions would have gotten cached in the RDD. 

Can someone advice me on where would be the right places to acquire and release a lock with my data endpoint in this scenario.

Thanks a lot,
Abhishek Somani
Reply | Threaded
Open this post in threaded view
|

Re: Custom datasource: when acquire and release a lock?

Jörn Franke
In reply to this post by Abhishek Somani
What does your data source structure look like?
Can’t you release it at the end of the build scan method?

What technology is used in the transactional data endpoint?


> Am 24.05.2019 um 15:36 schrieb Abhishek Somani <[hidden email]>:
>
> Hi experts,
>
> I am trying to create a custom Spark Datasource(v1) to read from a transactional data endpoint, and I need to acquire a lock with the endpoint before fetching data and release the lock after reading. Note that the lock acquisition and release needs to happen in the Driver JVM.
>
> I have created a custom RDD for this purpose, and tried acquiring the lock in MyRDD.getPartitions(), and releasing the lock at the end of the job by registering a QueryExecutionListener.
>
> Now as I have learnt, this is not the right approach as the RDD can get reused on further actions WITHOUT calling getPartitions() again(as the partitions of an RDD get cached). For example, if someone calls Dataset.collect() twice, the first time MyRDD.getPartitions() will get invoked, I will acquire a lock and release the lock at the end. However the second time collect() is called, getPartitions will NOT be called again as the RDD would be reused and the partitions would have gotten cached in the RDD.
>
> Can someone advice me on where would be the right places to acquire and release a lock with my data endpoint in this scenario.
>
> Thanks a lot,
> Abhishek Somani

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Custom datasource: when acquire and release a lock?

Abhishek Somani
Hey Jörn,

Thanks a lot for replying.

My Data Source extends BaseRelation and PrunedFilteredScan. The buildScan() returns my custom RDD. I want to take a lock before any executors start reading the data, and release it after all executors are done, and so I tried to acquire lock in MyRDD.getPartitions().

I cannot release it at the end of the build scan method because I need to hold the lock throughout the duration of the read(which will happen in the compute() of the RDD in executors). So I can really release the lock after all the executors have read the data. 

The transactional data endpoint provides me an api to acquireLock() and one to releaseLock() (which it stores in mysql behind the scenes).

Thanks again!
Abhishek

On Mon, May 27, 2019 at 10:38 AM Jörn Franke <[hidden email]> wrote:
What does your data source structure look like?
Can’t you release it at the end of the build scan method?

What technology is used in the transactional data endpoint?


> Am 24.05.2019 um 15:36 schrieb Abhishek Somani <[hidden email]>:
>
> Hi experts,
>
> I am trying to create a custom Spark Datasource(v1) to read from a transactional data endpoint, and I need to acquire a lock with the endpoint before fetching data and release the lock after reading. Note that the lock acquisition and release needs to happen in the Driver JVM.
>
> I have created a custom RDD for this purpose, and tried acquiring the lock in MyRDD.getPartitions(), and releasing the lock at the end of the job by registering a QueryExecutionListener.
>
> Now as I have learnt, this is not the right approach as the RDD can get reused on further actions WITHOUT calling getPartitions() again(as the partitions of an RDD get cached). For example, if someone calls Dataset.collect() twice, the first time MyRDD.getPartitions() will get invoked, I will acquire a lock and release the lock at the end. However the second time collect() is called, getPartitions will NOT be called again as the RDD would be reused and the partitions would have gotten cached in the RDD.
>
> Can someone advice me on where would be the right places to acquire and release a lock with my data endpoint in this scenario.
>
> Thanks a lot,
> Abhishek Somani