2017-08-28

VWAP calculation with a BigObject Sliding Window

VWAP calculation with a BigObject Sliding Window


"BigObject-Performance with 100x Speedup"
VWAP(Volume Weighted Average Price) is a common function in real-time trading processing. It calculates the
∑(price x volume ) / ∑volume
values of all stocks’ price and volume data from streaming input. In our BigObject system, we can insert these data into a sliding table and then compute all VWAP values periodically as a value-added data source.
BigObject SQL commands:
CREATE SLIDING TABLE st1 ('stockcode' STRING(16), 'price' float, 'volume' float, TIMEBOUND(3) )
Select top 5 stockcode,pv,v,pv/v as vwap From (select stockcode,sum(price*volume) as pv,sum(volume) as v from st1 group by stockcode) Order by stockcode
Output of the sample program:
1001.TW 1.0 1.0 1.0
1002.TW 4.0 2.0 2.0
1003.TW 9.0 3.0 3.0
1004.TW 16.0 4.0 4.0
1005.TW 25.0 5.0 5.0
1
1001.TW 1035.0 23.0 45.0
1002.TW 1108.0 25.0 44.32
1003.TW 1185.0 27.0 43.888889
1004.TW 1266.0 29.0 43.655172
1005.TW 1351.0 31.0 43.580645
2
1001.TW 2755.0 66.0 41.742424
1002.TW 2912.0 69.0 42.202899
1003.TW 3075.0 72.0 42.708333
1004.TW 3244.0 75.0 43.253333
1005.TW 3419.0 78.0 43.833333
....
Source Code:
# vwap.py
#***********
import datetime
import time
import mysql.connector
import random
cnx = mysql.connector.connect(user='scott', password='tiger',host='https://www.linkedin.com/redir/invalid-link-page?url=192%2e168%2e1%2e163')
cursor = cnx.cursor()
try:
   cursor.execute("Drop TABLE st1")
except:
   pass
cursor.execute( "CREATE SLIDING TABLE st1 ('stockcode' STRING(16), 'price' float, 'volume' float, TIMEBOUND(3) )" )
#cursor.execute( "CREATE TABLE t3 ('t1.id' STRING(32),'t2.acc' STRING(32),'amt' double )" )
#starttime=datetime.now()
#starttime=datetime.now()
starttime=time.time()
cnt=0
jmax=10
imax=1000
ids=[]
add_sql=''
print 'st1:'
for j in range(jmax):
   add_sql = "INSERT INTO st1 (stockcode, price, volume) VALUES "
   for i in range(imax):
      #cnt= random.uniform(1, 100000000)
      cnt=cnt+1
      add_sql += "('"+str(1001+i%1000)+".TW',"+str(cnt%53)+","+str(cnt%89)+"),"
      ids.append(cnt)
      #print 'sql:',add_sql
   # Insert new data_product
   cursor.execute(add_sql)
   print j
   time.sleep(1)
   sql = "select top 5 stockcode,pv,v,pv/v as vwap from (select stockcode,sum(price*volume) as pv,sum(volume) as v from st1 group by stockcode) order by stockcode"
   cursor.execute(sql)
   # Fetch a single row using fetchone() method.
   #data = cursor.fetchone()
   for (stockcode,pv,v,vwap) in cursor:
    print stockcode,pv,v,vwap
endtime=time.time()
diff=endtime-starttime
#print ("bigobject %d x %d inserts: %s seconds, rate=%f/s" % (jmax,imax,diff,jmax*imax/diff))
# Make sure data is committed to the database
cnx.commit()
cursor.close()
cnx.close()

No comments:

Post a Comment