issue about the windows slice of stream

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

issue about the windows slice of stream

萝卜丝炒饭
Hi all,
I found an issue about the windows slice of dstream.
My code is :

ssc = new StreamingContext( conf, Seconds(1))

val content = ssc.socketTextStream('ip','port')
content.countByValueAndWindow( Seconds(2),  Seconds(8)).foreach( println(xxxx))
The key is that slide is greater than windows.
I checked the output.The result from  foreach( println(xxxx)) was wrong.
I found the stream was cut apart wrong.
Can I open a JIRA please?

thanks
Fei Shao




Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: issue about the windows slice of stream

萝卜丝炒饭
Hi all,

Let me add more info about this.
The log showed:
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Time 1498383086000 ms is valid
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Window time = 2000 ms
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Slide time = 8000 ms
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Zero time = 1498383078000 ms
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Current window = [1498383085000 ms, 1498383086000 ms]
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Previous window = [1498383077000 ms, 1498383078000 ms]
17/06/25 17:31:26 INFO ShuffledDStream: Slicing from 1498383077000 ms to 1498383084000 ms (aligned to 1498383077000 ms and 1498383084000 ms)
17/06/25 17:31:26 INFO ShuffledDStream: Time 1498383078000 ms is invalid as zeroTime is 1498383078000 ms , slideDuration is 1000 ms and difference is 0 ms
17/06/25 17:31:26 DEBUG ShuffledDStream: Time 1498383079000 ms is valid
17/06/25 17:31:26 DEBUG MappedDStream: Time 1498383079000 ms is valid
the slice time is wrong.

For my test code:
lines.countByValueAndWindow( Seconds(2), Seconds(8)).foreachRDD( s => { 《=== here the windowDuration is 2 seconds and the slideDuration is 8 seconds.
===========key  log begin ============
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Current window = [1498383085000 ms, 1498383086000 ms]
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Previous window = [1498383077000 ms, 1498383078000 ms]
17/06/25 17:31:26 INFO ShuffledDStream: Slicing from 1498383077000 ms to 1498383084000 ms (aligned to 1498383077000 ms and 1498383084000 ms) 《=== here, the old RDD slices from 1498383077000 to 1498383084000 . It is 8 seconds. Actual it should be 2 seconds.
===========key log end============
===========code in ReducedWindowedDStream.scala begin============
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
val reduceF = reduceFunc
val invReduceF = invReduceFunc
val currentTime = validTime
val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration,
currentTime)
val previousWindow = currentWindow - slideDuration
logDebug("Window time = " + windowDuration)
logDebug("Slide time = " + slideDuration)
logDebug("Zero time = " + zeroTime)
logDebug("Current window = " + currentWindow)
logDebug("Previous window = " + previousWindow)
// _____________________________
// | previous window ________|__________________
// |___________________| current window | --------------> Time
// |_____________________________|
//
// |________ ________| |_______ _________|
// | |
// V V
// old RDDs new RDDs
//
// Get the RDDs of the reduced values in "old time steps"
val oldRDDs =
reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration) 《== I think this line is "reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime + windowDuration - parent.slideDuration)"
logDebug("# old RDDs = " + oldRDDs.size)
// Get the RDDs of the reduced values in "new time steps"
val newRDDs =
reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)《== this line is "reducedStream.slice(previousWindow.endTime + windowDuration - parent.slideDuration,
currentWindow.endTime)"
logDebug("# new RDDs = " + newRDDs.size)
===========code in ReducedWindowedDStream.scala end============

Thanks
Fei Shao
---Original---
From: "萝卜丝炒饭"<[hidden email]>
Date: 2017/6/24 14:51:52
To: "user"<[hidden email]>;"dev"<[hidden email]>;
Subject: issue about the windows slice of stream

Hi all,
I found an issue about the windows slice of dstream.
My code is :

ssc = new StreamingContext( conf, Seconds(1))

val content = ssc.socketTextStream('ip','port')
content.countByValueAndWindow( Seconds(2),  Seconds(8)).foreach( println(xxxx))
The key is that slide is greater than windows.
I checked the output.The result from  foreach( println(xxxx)) was wrong.
I found the stream was cut apart wrong.
Can I open a JIRA please?

thanks
Fei Shao




Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

Re: issue about the windows slice of stream

萝卜丝炒饭
In reply to this post by 萝卜丝炒饭
Hi  Owen,

Would you like help me check this issue please?
Is it a potential bug please or not?

thanks
Fei Shao


---Original---
From: "萝卜丝炒饭"<[hidden email]>
Date: 2017/6/25 21:44:41
To: "user"<[hidden email]>;"dev"<[hidden email]>;
Subject: Re: issue about the windows slice of stream

Hi all,

Let me add more info about this.
The log showed:
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Time 1498383086000 ms is valid
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Window time = 2000 ms
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Slide time = 8000 ms
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Zero time = 1498383078000 ms
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Current window = [1498383085000 ms, 1498383086000 ms]
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Previous window = [1498383077000 ms, 1498383078000 ms]
17/06/25 17:31:26 INFO ShuffledDStream: Slicing from 1498383077000 ms to 1498383084000 ms (aligned to 1498383077000 ms and 1498383084000 ms)
17/06/25 17:31:26 INFO ShuffledDStream: Time 1498383078000 ms is invalid as zeroTime is 1498383078000 ms , slideDuration is 1000 ms and difference is 0 ms
17/06/25 17:31:26 DEBUG ShuffledDStream: Time 1498383079000 ms is valid
17/06/25 17:31:26 DEBUG MappedDStream: Time 1498383079000 ms is valid
the slice time is wrong.

For my test code:
lines.countByValueAndWindow( Seconds(2), Seconds(8)).foreachRDD( s => { 《=== here the windowDuration is 2 seconds and the slideDuration is 8 seconds.
===========key  log begin ============
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Current window = [1498383085000 ms, 1498383086000 ms]
17/06/25 17:31:26 DEBUG ReducedWindowedDStream: Previous window = [1498383077000 ms, 1498383078000 ms]
17/06/25 17:31:26 INFO ShuffledDStream: Slicing from 1498383077000 ms to 1498383084000 ms (aligned to 1498383077000 ms and 1498383084000 ms) 《=== here, the old RDD slices from 1498383077000 to 1498383084000 . It is 8 seconds. Actual it should be 2 seconds.
===========key log end============
===========code in ReducedWindowedDStream.scala begin============
override def compute(validTime: Time): Option[RDD[(K, V)]] = {
val reduceF = reduceFunc
val invReduceF = invReduceFunc
val currentTime = validTime
val currentWindow = new Interval(currentTime - windowDuration + parent.slideDuration,
currentTime)
val previousWindow = currentWindow - slideDuration
logDebug("Window time = " + windowDuration)
logDebug("Slide time = " + slideDuration)
logDebug("Zero time = " + zeroTime)
logDebug("Current window = " + currentWindow)
logDebug("Previous window = " + previousWindow)
// _____________________________
// | previous window ________|__________________
// |___________________| current window | --------------> Time
// |_____________________________|
//
// |________ ________| |_______ _________|
// | |
// V V
// old RDDs new RDDs
//
// Get the RDDs of the reduced values in "old time steps"
val oldRDDs =
reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime - parent.slideDuration) 《== I think this line is "reducedStream.slice(previousWindow.beginTime, currentWindow.beginTime + windowDuration - parent.slideDuration)"
logDebug("# old RDDs = " + oldRDDs.size)
// Get the RDDs of the reduced values in "new time steps"
val newRDDs =
reducedStream.slice(previousWindow.endTime + parent.slideDuration, currentWindow.endTime)《== this line is "reducedStream.slice(previousWindow.endTime + windowDuration - parent.slideDuration,
currentWindow.endTime)"
logDebug("# new RDDs = " + newRDDs.size)
===========code in ReducedWindowedDStream.scala end============

Thanks
Fei Shao
---Original---
From: "萝卜丝炒饭"<[hidden email]>
Date: 2017/6/24 14:51:52
To: "user"<[hidden email]>;"dev"<[hidden email]>;
Subject: issue about the windows slice of stream

Hi all,
I found an issue about the windows slice of dstream.
My code is :

ssc = new StreamingContext( conf, Seconds(1))

val content = ssc.socketTextStream('ip','port')
content.countByValueAndWindow( Seconds(2),  Seconds(8)).foreach( println(xxxx))
The key is that slide is greater than windows.
I checked the output.The result from  foreach( println(xxxx)) was wrong.
I found the stream was cut apart wrong.
Can I open a JIRA please?

thanks
Fei Shao




Loading...