[Spark Streaming] [DISCUSS] Clear metadata method and Generate Batches using same Event Loop

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

[Spark Streaming] [DISCUSS] Clear metadata method and Generate Batches using same Event Loop

Karthikeyan Ravi

Our system observed this behaviour of Batches getting delayed for generation in spark streaming and thereby creating a very big batch and followed by few zero record batches. I read the code and added logs to confirm this root cause details below

1. GenerateBatch(Recurring Timer) and ClearMetadata(Event after completion of a batch) are being added to same event loop - https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/scheduler/JobGenerator.scala#L62
2. In our case we are running concurrentJobs(which is more than 1) - so the clearMetadata takes longer time because it is trying to clear all the batches before (current batch - rememberDuration) - which is again a problem here - in case of teams using concurrentJobs > 1 should we clear all previous batches? some batches might be still executing because of some dependency delays or other issues. 
3. Because of #2, GenerateBatch event is delayed since eventLoop has a BlockingQueue which tries to complete clearMetadata first. And the generateBatch when it comes in to execute - pulls all remaining which has not been pulled before for a long time and other consecutive generate batches are pulling zero records. 

1. Is there a way we could use different eventloops for generate batch and clear metadata? is this a feasible option? 
2. Should we just clear the metadata only for the batch that completes? and not assume all the before batches of (batch time - rememberDuration) would have been complete. ?  https://github.com/apache/spark/blob/master/streaming/src/main/scala/org/apache/spark/streaming/dstream/DStream.scala#L452 

Change the code from generatedRDDs.filter(_._1 <= (time - rememberDuration)) to this generatedRDDs.filter(_._1 == (time))

I would like to pick up fixing these issues if anyone could validate the problems and proposals.

Karthikeyan R.