using accumulators in (MicroBatch) InputPartitionReader

classic Classic list List threaded Threaded
2 messages Options
Reply | Threaded
Open this post in threaded view
|

using accumulators in (MicroBatch) InputPartitionReader

kordex
I tried to create a data source, however our use case is bit hard as
we do only know the available offsets within the tasks, not on the
driver. I therefore planned to use accumulators in the
InputPartitionReader but they seem not to work.

Example accumulation is done here
https://github.com/kortemik/spark-source/blob/master/src/main/java/com/teragrep/pth06/ArchiveMicroBatchInputPartitionReader.java#L118

I get on the task logs that the System.out.println() are called, so it
can not be that the flow itself is broken, but the accumulators seem
to work only while on the driver as on the logs at the
https://github.com/kortemik/spark-source/tree/master

Is it intentional that the accumulators do not work within the data source?

One might ask why all this so I give brief explanation. We use gzipped
files as the storage blobs and it's unknown prior to execution how
many records they contain. Of course this can be mitigated by
decompressing the files on the driver and then sending the offsets
through to executors but it's a double effort. The aim however was to
decompress them only once by doing a forward-lookup into the data and
use accumulator to inform the driver that there is stuff available for
the next batch as well or that the file is done and driver needs to
pull the next one to keep executors busy.

Any advices are welcome.

Kind regards,
-Mikko Kortelainen

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: using accumulators in (MicroBatch) InputPartitionReader

Jungtaek Lim-2
I'm not sure about the accumulator approach; one possible approach which might work (DISCLAIMER: a random thought) would be employing an RPC endpoint on the driver side which receives such information from executors and plays as a coordinator.

Beware that Spark's RPC implementation is package private, so you may need to play with some hacks (package name) and deal with changes on version changes as Spark won't guarantee backward compatibility. If you could employ similar things with only some lightweight dependencies making no conflict, then I guess it would work as well.

On Thu, Mar 4, 2021 at 11:41 PM kordex <[hidden email]> wrote:
I tried to create a data source, however our use case is bit hard as
we do only know the available offsets within the tasks, not on the
driver. I therefore planned to use accumulators in the
InputPartitionReader but they seem not to work.

Example accumulation is done here
https://github.com/kortemik/spark-source/blob/master/src/main/java/com/teragrep/pth06/ArchiveMicroBatchInputPartitionReader.java#L118

I get on the task logs that the System.out.println() are called, so it
can not be that the flow itself is broken, but the accumulators seem
to work only while on the driver as on the logs at the
https://github.com/kortemik/spark-source/tree/master

Is it intentional that the accumulators do not work within the data source?

One might ask why all this so I give brief explanation. We use gzipped
files as the storage blobs and it's unknown prior to execution how
many records they contain. Of course this can be mitigated by
decompressing the files on the driver and then sending the offsets
through to executors but it's a double effort. The aim however was to
decompress them only once by doing a forward-lookup into the data and
use accumulator to inform the driver that there is stuff available for
the next batch as well or that the file is done and driver needs to
pull the next one to keep executors busy.

Any advices are welcome.

Kind regards,
-Mikko Kortelainen

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]