Help with basic stream action

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|

Help with basic stream action

Basil Hariri

Hi all,

 

Sorry if this isn’t the right place to ask basic questions, but I’m at the end of my rope here – please let me know where else I can get help if this isn’t the right place.

 

I’m trying to continuously read from a Kafka topic and send the number of rows Spark has received to a metric tracking service. I’m expecting an unbounded stream of input data, so I need to send the number of rows periodically and will sum them within the metric tracking service. I thought counting per Dataframe or over non-overlapping periods of time would make sense, but I haven’t had luck with either.

 

Whatever approach I take, I inevitably need to call count() which triggers Spark to execute the DAG and terminate the application (presumably because the count() action has been completed). What I really need is for my Spark application to receive data indefinitely, count the rows periodically, and send the count(s) to the metric tracker.

 

My current program looks something like this:

 

val df = spark

.readStream

.format(“kafka”)

.<other options>

.load()

.select($"partition", $"timestamp", $"offset", $"value" cast "string")

 

val metricTracker = new metricTracker()

 

//Track the metric, second parameter needs to be type Long

metricTracker.track(“numberOfRows”, df.count())

 

//Output data to console

val query = df

    .writeStream

    .outputMode("append")

    .format("console")

    .trigger(Trigger.Continuous("5 seconds"))

    .start()

 

If I remove the metricTracker lines, it receives data indefinitely and prints it to console. When I add the highlighted call to df.count(), it executes and terminates the program very quickly. Any ideas on how I can send the number of rows Spark is receiving/processing from a stream with no end?

 

Thanks,

Basil