Help with basic stream action

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

Help with basic stream action

Basil Hariri

Hi all,


Sorry if this isn’t the right place to ask basic questions, but I’m at the end of my rope here – please let me know where else I can get help if this isn’t the right place.


I’m trying to continuously read from a Kafka topic and send the number of rows Spark has received to a metric tracking service. I’m expecting an unbounded stream of input data, so I need to send the number of rows periodically and will sum them within the metric tracking service. I thought counting per Dataframe or over non-overlapping periods of time would make sense, but I haven’t had luck with either.


Whatever approach I take, I inevitably need to call count() which triggers Spark to execute the DAG and terminate the application (presumably because the count() action has been completed). What I really need is for my Spark application to receive data indefinitely, count the rows periodically, and send the count(s) to the metric tracker.


My current program looks something like this:


val df = spark



.<other options>


.select($"partition", $"timestamp", $"offset", $"value" cast "string")


val metricTracker = new metricTracker()


//Track the metric, second parameter needs to be type Long

metricTracker.track(“numberOfRows”, df.count())


//Output data to console

val query = df




    .trigger(Trigger.Continuous("5 seconds"))



If I remove the metricTracker lines, it receives data indefinitely and prints it to console. When I add the highlighted call to df.count(), it executes and terminates the program very quickly. Any ideas on how I can send the number of rows Spark is receiving/processing from a stream with no end?