-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathdata.py
executable file
·77 lines (54 loc) · 2.12 KB
/
data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
# import packages
from collections import defaultdict
from typing import Sequence
class OneMinuteData:
"""Play with the stream data for one minute period
"""
@property
def length(self):
return len(self.sequence_data)
@property
def sequence(self):
return self.sequence_data
def __init__(self):
self.sequence_data = []
def append_stream(self, stream):
"""parse the stream data and append to sequence data list
Args:
stream (dict): received stream from client
"""
for item in stream["content"]:
self.sequence_data.append(
{
"symbol": item['key'],
"price": item['LAST_PRICE'],
"volume": item['LAST_SIZE']
})
def append_ticker(self, ticker):
"""append one ticker info to sequence data list
Args:
ticker (dict): received one ticker value from client
"""
self.sequence_data.append(
{
"symbol": ticker['key'],
"price": ticker['LAST_PRICE'],
"volume": ticker['LAST_SIZE']
})
def get_result(self):
result = []
# extract the unique symbols which is received in one minute
exit_symbols = set([item['symbol'] for item in self.sequence_data])
# calculate all values for each symbols
for symbol in exit_symbols:
sym_data = [item for item in self.sequence_data if item['symbol'] == symbol]
temp_dict = {}
temp_dict['symbol'] = symbol
temp_dict['date_time'] = ''
temp_dict['open'] = sym_data[0]['price']
temp_dict['high'] = max([data['price'] for data in sym_data])
temp_dict['low'] = min([data['price'] for data in sym_data])
temp_dict['close'] = sym_data[-1]['price']
temp_dict['volume'] = sum([data['volume'] for data in sym_data])
result.append(temp_dict)
return result