Re: dropDuplicates and watermark in structured streaming

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view

Re: dropDuplicates and watermark in structured streaming

Tathagata Das
why do you have two watermarks? once you apply the watermark to a column (i.e., "time"), it can be used in all later operations as long as the column is preserved. So the above code should be equivalent to 

df.withWarmark("time","window size").dropDulplicates("id").groupBy(window("time","window size","window size")).agg(count("id"))

The right way to think about the watermark threshold is "how late and out of order my data can be". The answer may be different from the window size completely. You may want to calculate 10 minutes windows but your data may come in 5 hour late. So you should define watermark with 5 hour, not 10 minutes.

Btw, on a side note, just so you know, you can use "approx_count_distinct" if you are okay with some approximation. 

On Thu, Feb 27, 2020 at 9:11 PM lec ssmi <[hidden email]> wrote:
  Such as :
        df.withWarmark("time","window size").dropDulplicates("id").withWatermark("time","real watermark").groupBy(window("time","window size","window size")).agg(count("id"))....
   can It  make count(distinct count) success? 

Tathagata Das <[hidden email]> 于2020年2月28日周五 上午10:25写道:
1. Yes. All times in event time, not processing time. So you may get 10AM event time data at 11AM processing time, but it will still be compared again all data within 9-10AM event times.

2. Show us your code.

On Thu, Feb 27, 2020 at 2:30 AM lec ssmi <[hidden email]> wrote:
    I'm new to structured streaming. Because the built-in API cannot perform the Count Distinct operation of Window, I want to use dropDuplicates first, and then perform the window count.
   But in the process of using, there are two problems:
           1. Because it is streaming computing, in the process of deduplication, the state needs to be cleared in time, which requires the cooperation of watermark. Assuming my event time field is consistently      
              increasing, and I set the watermark to 1 hour, does it mean that the data at 10 o'clock will only be compared in these data from 9 o'clock to 10 o'clock, and the data before 9 o'clock will be cleared ?
           2. Because it is window deduplication, I set the watermark before deduplication to the window size.But after deduplication, I need to call withWatermark () again to set the watermark to the real  
               watermark. Will setting the watermark again take effect?

     Thanks a lot !