I want to apply external function on streaming data

classic Classic list List threaded Threaded
3 messages Options
Reply | Threaded
Open this post in threaded view
|

I want to apply external function on streaming data

hagersaleh
I have 3 external function
I want to apply external function on streaming data, I use sliding window to
get streaming data but I can not apply external function on streaming data  
I write code by python



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: I want to apply external function on streaming data

hagersaleh
Can any one help me please



--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: I want to apply external function on streaming data

hagersaleh
I have external function
scaler1 = MinMaxScaler(feature_range=(-1, 1))
def difference(dataset, interval=1):
        diff = list()
        for i in range(interval, len(dataset)):
                value = dataset[i] - dataset[i - interval]
                diff.append(value)
        return Series(diff)


when I apply this function on streaming data there are any result is display

c=list()

ssc = StreamingContext(scc, 1)

activeUsers = [[120.92187299645627],
               [121.84247351449525],
               [122.87717906432528],
               [ 123.07419758947418],
               [ 124.83203764216505],
               [123.278584495919],
               [123.04382133819664],
               [120.92187299645627],
               [121.84247351449525],
               [122.87717906432528],
               [ 123.07419758947418],
               [ 124.83203764216505],
               [123.278584495919],
               [123.04382133819664],
               [120.92187299645627],
               [121.84247351449525],
               [122.87717906432528],
               [ 123.07419758947418],
               [ 124.83203764216505],
               [123.278584495919],
               [123.04382133819664]]

rddQueue = []
for datum in activeUsers:
    rddQueue += [ssc.sparkContext.parallelize(datum)]    
inputStream = ssc.queueStream(rddQueue)
k=inputStream.window(5, 1).map(lambda x: [x])
s=k.map(difference)
s.pprint()


ssc.start()
time.sleep(1)





--
Sent from: http://apache-spark-developers-list.1001551.n3.nabble.com/

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]