Structured Streaming with Watermark

classic Classic list List threaded Threaded
4 messages Options
Reply | Threaded
Open this post in threaded view
|

Structured Streaming with Watermark

sandeep_katta

I am trying to test the water mark concept in structured streaming using the
below program

 import java.sql.Timestamp
    import org.apache.spark.sql.functions.{col, expr}
    import org.apache.spark.sql.streaming.Trigger

    val lines_stream = spark.readStream.
      format("kafka").
      option("kafka.bootstrap.servers", "vm1:21005,vm2:21005").
      option("subscribe", "s1").
      load().
      select('value.cast("String") as "key",
('value.cast("String")).cast("long").cast
      ("timestamp") as "timeStampValue").
      select("key", "timeStampValue").
      withWatermark("timeStampValue", "10 seconds ")
   
    val query = lines_stream.
      writeStream.
      option("truncate", "false").
      outputMode("append").
      format("console").
      trigger(Trigger.ProcessingTime(3000)).
      start()
    query.awaitTermination()


//Corresponding output

scala>     query.awaitTermination()
-------------------------------------------                                    
Batch: 0
-------------------------------------------
+---+--------------+
|key|timeStampValue|
+---+--------------+
+---+--------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+----------+-------------------+
|key       |timeStampValue     |
+----------+-------------------+
|1539844822|2018-10-18 14:40:22|
+----------+-------------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+----------+-------------------+
|key       |timeStampValue     |
+----------+-------------------+
|1539844842|2018-10-18 14:40:42|
+----------+-------------------+

-------------------------------------------
Batch: 3
-------------------------------------------
+----------+-------------------+
|key       |timeStampValue     |
+----------+-------------------+
|1539844862|2018-10-18 14:41:02|
+----------+-------------------+

-------------------------------------------
Batch: 4
-------------------------------------------
+----------+-------------------+
|key       |timeStampValue     |
+----------+-------------------+
|1539844882|2018-10-18 14:41:22|
+----------+-------------------+

-------------------------------------------
Batch: 5
-------------------------------------------
+----------+-------------------+
|key       |timeStampValue     |
+----------+-------------------+
|1539844852|2018-10-18 14:40:52|* // As per watermark this event should be
discarded but it didnt*
+----------+-------------------+

Note:Below are the values I sent from kafka-producer
1539844822
1539844842
1539844862
1539844882
1539844852

Is this correct way to test the water mark scenarios ?

Regards
Sandeep Katta





--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming with Watermark

Burak Yavuz-2
Hi Sandeep,

Watermarks are used in aggregation queries to ensure correctness and clean up state. They don't allow you to drop records in map-only scenarios, which you have in your example. If you would do a test of `groupBy().count()` then you will see that the count doesn't increase with the last event.

On Thu, Oct 18, 2018 at 8:48 AM sandeep_katta <[hidden email]> wrote:

I am trying to test the water mark concept in structured streaming using the
below program

 import java.sql.Timestamp
    import org.apache.spark.sql.functions.{col, expr}
    import org.apache.spark.sql.streaming.Trigger

    val lines_stream = spark.readStream.
      format("kafka").
      option("kafka.bootstrap.servers", "vm1:21005,vm2:21005").
      option("subscribe", "s1").
      load().
      select('value.cast("String") as "key",
('value.cast("String")).cast("long").cast
      ("timestamp") as "timeStampValue").
      select("key", "timeStampValue").
      withWatermark("timeStampValue", "10 seconds ")

    val query = lines_stream.
      writeStream.
      option("truncate", "false").
      outputMode("append").
      format("console").
      trigger(Trigger.ProcessingTime(3000)).
      start()
    query.awaitTermination()


//Corresponding output

scala>     query.awaitTermination()
-------------------------------------------                                     
Batch: 0
-------------------------------------------
+---+--------------+
|key|timeStampValue|
+---+--------------+
+---+--------------+

-------------------------------------------
Batch: 1
-------------------------------------------
+----------+-------------------+
|key       |timeStampValue     |
+----------+-------------------+
|1539844822|2018-10-18 14:40:22|
+----------+-------------------+

-------------------------------------------
Batch: 2
-------------------------------------------
+----------+-------------------+
|key       |timeStampValue     |
+----------+-------------------+
|1539844842|2018-10-18 14:40:42|
+----------+-------------------+

-------------------------------------------
Batch: 3
-------------------------------------------
+----------+-------------------+
|key       |timeStampValue     |
+----------+-------------------+
|1539844862|2018-10-18 14:41:02|
+----------+-------------------+

-------------------------------------------
Batch: 4
-------------------------------------------
+----------+-------------------+
|key       |timeStampValue     |
+----------+-------------------+
|1539844882|2018-10-18 14:41:22|
+----------+-------------------+

-------------------------------------------
Batch: 5
-------------------------------------------
+----------+-------------------+
|key       |timeStampValue     |
+----------+-------------------+
|1539844852|2018-10-18 14:40:52|* // As per watermark this event should be
discarded but it didnt*
+----------+-------------------+

Note:Below are the values I sent from kafka-producer
1539844822
1539844842
1539844862
1539844882
1539844852

Is this correct way to test the water mark scenarios ?

Regards
Sandeep Katta





--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming with Watermark

sandeep_katta
Now I ve added same aggregation query as below but still it is didn't filter

val lines_stream = spark.readStream.
      format("kafka").
      option("kafka.bootstrap.servers", "vm3:21005,vm2:21005").
      option("subscribe", "s1").
      load().
      withColumn("tokens", split('value, ",")).
      withColumn("seconds", 'tokens(1) cast "long").
      withColumn("event_time", to_timestamp(from_unixtime('seconds))). //
<-- Event time has to be a timestamp
      withColumn("id", 'tokens(0)).
      select("id", "event_time").
      withWatermark("event_time", "10 seconds ").groupBy("id").count()  

//Output
-------------------------------------------                                    
Batch: 0
-------------------------------------------
+---+-----+
|id |count|
+---+-----+
+---+-----+

-------------------------------------------                                    
Batch: 1
-------------------------------------------
+---+-----+
|id |count|
+---+-----+
|1  |1    |
+---+-----+

-------------------------------------------                                    
Batch: 2
-------------------------------------------
+---+-----+
|id |count|
+---+-----+
|1  |2    |
+---+-----+

-------------------------------------------                                    
Batch: 3
-------------------------------------------
+---+-----+
|id |count|
+---+-----+
|1  |4    |
+---+-----+

-------------------------------------------                                    
Batch: 4
-------------------------------------------
+---+-----+
|id |count|
+---+-----+
|1  |5    | //It should be still 4
+---+-----+

//values sent from Kafka-producer

1,1539844822
1,1539844842
1,1539844862
1,1539844882
1,1539844852




--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: Structured Streaming with Watermark

Jungtaek Lim
Which version of Spark do you use?

You can get help on attaching streaming query listener and print out the QueryProcessEvent to track watermark. The value of watermark will be updated per batch and next batch will utilize that value.

If watermark exceeds the last timestamp but the value is still added, please let me know about the version of Spark as well as physical plan (if you don't mind) and I can take a look.

Thanks,
Jungtaek Lim (HeartSaVioR)

2018년 10월 18일 (목) 오후 5:51, sandeep_katta <[hidden email]>님이 작성:
Now I ve added same aggregation query as below but still it is didn't filter

val lines_stream = spark.readStream.
      format("kafka").
      option("kafka.bootstrap.servers", "vm3:21005,vm2:21005").
      option("subscribe", "s1").
      load().
      withColumn("tokens", split('value, ",")).
      withColumn("seconds", 'tokens(1) cast "long").
      withColumn("event_time", to_timestamp(from_unixtime('seconds))). //
<-- Event time has to be a timestamp
      withColumn("id", 'tokens(0)).
      select("id", "event_time").
      withWatermark("event_time", "10 seconds ").groupBy("id").count()   

//Output
-------------------------------------------                                     
Batch: 0
-------------------------------------------
+---+-----+
|id |count|
+---+-----+
+---+-----+

-------------------------------------------                                     
Batch: 1
-------------------------------------------
+---+-----+
|id |count|
+---+-----+
|1  |1    |
+---+-----+

-------------------------------------------                                     
Batch: 2
-------------------------------------------
+---+-----+
|id |count|
+---+-----+
|1  |2    |
+---+-----+

-------------------------------------------                                     
Batch: 3
-------------------------------------------
+---+-----+
|id |count|
+---+-----+
|1  |4    |
+---+-----+

-------------------------------------------                                     
Batch: 4
-------------------------------------------
+---+-----+
|id |count|
+---+-----+
|1  |5    | //It should be still 4
+---+-----+

//values sent from Kafka-producer

1,1539844822
1,1539844842
1,1539844862
1,1539844882
1,1539844852




--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]