spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Abhishek Somani <>
Subject Re: Custom datasource: when acquire and release a lock?
Date Mon, 27 May 2019 14:13:49 GMT
Hey Jörn,

Thanks a lot for replying.

My Data Source extends BaseRelation and PrunedFilteredScan. The buildScan()
returns my custom RDD. I want to take a lock before any executors start
reading the data, and release it after all executors are done, and so I
tried to acquire lock in MyRDD.getPartitions().

I cannot release it at the end of the build scan method because I need to
hold the lock throughout the duration of the read(which will happen in the
compute() of the RDD in executors). So I can really release the lock after
all the executors have read the data.

The transactional data endpoint provides me an api to acquireLock() and one
to releaseLock() (which it stores in mysql behind the scenes).

Thanks again!

On Mon, May 27, 2019 at 10:38 AM Jörn Franke <> wrote:

> What does your data source structure look like?
> Can’t you release it at the end of the build scan method?
> What technology is used in the transactional data endpoint?
> > Am 24.05.2019 um 15:36 schrieb Abhishek Somani <
> >
> > Hi experts,
> >
> > I am trying to create a custom Spark Datasource(v1) to read from a
> transactional data endpoint, and I need to acquire a lock with the endpoint
> before fetching data and release the lock after reading. Note that the lock
> acquisition and release needs to happen in the Driver JVM.
> >
> > I have created a custom RDD for this purpose, and tried acquiring the
> lock in MyRDD.getPartitions(), and releasing the lock at the end of the job
> by registering a QueryExecutionListener.
> >
> > Now as I have learnt, this is not the right approach as the RDD can get
> reused on further actions WITHOUT calling getPartitions() again(as the
> partitions of an RDD get cached). For example, if someone calls
> Dataset.collect() twice, the first time MyRDD.getPartitions() will get
> invoked, I will acquire a lock and release the lock at the end. However the
> second time collect() is called, getPartitions will NOT be called again as
> the RDD would be reused and the partitions would have gotten cached in the
> RDD.
> >
> > Can someone advice me on where would be the right places to acquire and
> release a lock with my data endpoint in this scenario.
> >
> > Thanks a lot,
> > Abhishek Somani

View raw message