the function of countByValueAndWindow and foreachRDD in DStream, would you like help me understand it please?

classic Classic list List threaded Threaded
1 message Options
Reply | Threaded
Open this post in threaded view
|  
Report Content as Inappropriate

the function of countByValueAndWindow and foreachRDD in DStream, would you like help me understand it please?

萝卜丝炒饭
HI all,

I have code like below:
    Logger.getLogger("org.apache.spark").setLevel( Level.ERROR)
// Logger.getLogger("org.apache.spark.streaming.dstream").setLevel( Level.DEBUG)
val conf = new SparkConf().setAppName("testDstream").setMaster("local[4]")
// val sc = SparkContext.getOrCreate( conf)
val ssc = new StreamingContext(conf, Seconds(1))

ssc.checkpoint( "E:\\spark\\tmp\\cp")
val lines = ssc.socketTextStream("127.0.0.1", 9999)
lines.foreachRDD( r=>{
println("RDD" + r.id + "begin" + " " + new SimpleDateFormat("yyyy-mm-dd HH:MM:SS").format( new Date()))
r.foreach( ele => println(":::" + ele))
println("RDD" + r.id + "end")
})
lines.countByValueAndWindow( Seconds(4), Seconds(1)).foreachRDD( s => { // here is key code
println( "countByValueAndWindow RDD ID IS : " + s.id + "begin")
println("time is " + new SimpleDateFormat("yyyy-mm-dd HH:MM:SS").format( new Date()))
s.foreach( e => println("data is " + e._1 + " :" + e._2))
println("countByValueAndWindow RDD ID IS : " + s.id + "end")
})

ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate

I run the code and use "nc" send the message manually. The speed I input message is about one letter per seconds.
I know the time in log does not equal the window duration, but I think they are very near.
the output and my comment is :
-----------------------------------------------------------
RDD1begin   2017-41-27  22:06:16
RDD1end
countByValueAndWindow RDD ID IS : 7  begin
time is 2017-41-27  22:06:16
countByValueAndWindow RDD ID IS : 7  end
RDD8begin   2017-41-27  22:06:17
RDD8end
countByValueAndWindow RDD ID IS : 13  begin
time is 2017-41-27  22:06:17
countByValueAndWindow RDD ID IS : 13  end
RDD14begin   2017-41-27  22:06:18
:::1
RDD14end
countByValueAndWindow RDD ID IS : 19  begin  
time is 2017-41-27  22:06:18  <== data from 22:06:15 -- 22:06:18 is in RDD 14.
data is 1 :1
countByValueAndWindow RDD ID IS : 19  end
RDD20begin   2017-41-27  22:06:19
:::2
RDD20end
countByValueAndWindow RDD ID IS : 25  begin 
time is 2017-41-27  22:06:19  <== data from 22:06:16 -- 22:06:19 is in RDD 14 ,20.
data is 1 :1
data is 2 :1
countByValueAndWindow RDD ID IS : 25  end
RDD26begin   2017-41-27  22:06:20
:::3
RDD26end
countByValueAndWindow RDD ID IS : 31  begin
time is 2017-41-27  22:06:20 <== data from 22:06:17 -- 22:06:20 is in RDD 14 , 20, 26
data is 2 :1
data is 1 :1
data is 3 :1
countByValueAndWindow RDD ID IS : 31  end
RDD32begin   2017-41-27  22:06:21
:::4
RDD32end
countByValueAndWindow RDD ID IS : 37  begin
time is 2017-41-27  22:06:21 <== data from 22:06:18 -- 22:06:21 is in RDD 14 , 20,  26, 32
data is 2 :1
data is 1 :1
data is 4 :1
data is 3 :1
countByValueAndWindow RDD ID IS : 37  end
RDD38begin   2017-41-27  22:06:22
:::5
:::6
RDD38end
countByValueAndWindow RDD ID IS : 43  begin
time is 2017-41-27  22:06:22<== data from 22:06:19 -- 22:06:22 is in RDD  20,  26, 32,38. Here 14 is out of window.
data is 4 :1
data is 5 :1
data is 6 :1
data is 2 :1
data is 3 :1
countByValueAndWindow RDD ID IS : 43  end
RDD44begin   2017-41-27  22:06:23
:::7
RDD44end
countByValueAndWindow RDD ID IS : 49  begin
time is 2017-41-27  22:06:23  <== data from 22:06:29 -- 22:06:23 is in RDD    26, 32,38, 44. Here 20is out of window.
data is 5 :1
data is 4 :1
data is 6 :1
data is 7 :1
data is 3 :1
countByValueAndWindow RDD ID IS : 49  end
-----------------------------------------------------------
I think the foreachRDD function outputs the last RDD calculated by countByValueAndWindow, and the above log validate my idea.
Now, I change the red code to
lines.countByValueAndWindow( Seconds(4), Seconds(6)).foreachRDD( s => {      // here is key code 
the slide duration is 6 seconds. The log and my comment is below:
-----------------------------------------------------------
DD1begin   2017-59-27  10:59:12
RDD1end
RDD2begin   2017-59-27  10:59:13
:::1
:::2
RDD2end
RDD3begin   2017-59-27  10:59:14
:::3
RDD3end
RDD4begin   2017-59-27  10:59:15
:::4
RDD4end
RDD5begin   2017-59-27  10:59:16
:::5
RDD5end
RDD6begin   2017-59-27  10:59:17
RDD6end
countByValueAndWindow RDD ID IS : 22  begin
time is 2017-59-27  10:59:17 <== I think here is OK, event RDD2 is calculated.
data is 4 :1
data is 5 :1
data is 1 :1
data is 2 :1
data is 3 :1
countByValueAndWindow RDD ID IS : 22  end
RDD23begin   2017-59-27  10:59:18
:::6
RDD23end
RDD24begin   2017-59-27  10:59:19
:::8
:::7
RDD24end
RDD25begin   2017-59-27  10:59:20
:::9
RDD25end
RDD26begin   2017-59-27  10:59:21
:::0
RDD26end
RDD27begin   2017-59-27  10:59:22
:::-
RDD27end
RDD28begin   2017-59-27  10:59:23
:::p
RDD28end
countByValueAndWindow RDD ID IS : 43  begin
time is 2017-59-27  10:59:23 <==the data between 10:59:20 --10:59:23 should be RDD 25, 26, 27, 28. but the data is wrong. 
data is 6 :1
data is 2 :1
data is 9 :1
data is - :1
data is 1 :1
data is 8 :1
data is p :1
data is 0 :1
data is 7 :1
countByValueAndWindow RDD ID IS : 43  end
RDD44begin   2017-59-27  10:59:24
:::o
RDD44end
RDD46begin   2017-59-27  10:59:25
:::i
RDD46end
RDD47begin   2017-59-27  10:59:26
:::u
RDD47end
RDD48begin   2017-59-27  10:59:27
:::y
RDD48end
RDD49begin   2017-59-27  10:59:28
:::t
RDD49end
RDD50begin   2017-59-27  10:59:29
:::r
RDD50end
countByValueAndWindow RDD ID IS : 65  begin
time is 2017-59-27  10:59:29<==here  is wrong too. 
data is 6 :1
data is 2 :1
data is r :1
data is 8 :1
data is t :1
data is i :1
data is y :1
data is u :1
data is 1 :1
data is 7 :1
data is o :1
countByValueAndWindow RDD ID IS : 65  end
-----------------------------------------------------------

Would you like tell me why the log of second time is not same with my understanding please?
This issue besets me several days.
Thanks
Fei Shao
Loading...