-
Notifications
You must be signed in to change notification settings - Fork 0
/
connect.py
73 lines (57 loc) · 2.29 KB
/
connect.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import websockets
from talipp.ohlcv import OHLCV
from talipp.indicators import ATR, EMA, MACD, RSI
from talipp.indicator_util import composite_to_lists
from collections import deque
import json
from datetime import datetime
async def binance_connect(uri: str):
periods = 5
actual = deque(maxlen=5)
atr = ATR(period=periods)
ema = EMA(period=periods)
macd = MACD(
fast_period=3,
slow_period=5,
signal_period=5,
# input_sampling= SamplingPeriodType.SEC_5
)
rsi = RSI(period=3)
try:
async with websockets.connect(uri=uri) as websocket:
async for message in websocket:
data = json.loads(message)
kline_data = data.get('k')
ohlcv = OHLCV(
open= float(kline_data['o']),
high= float(kline_data['h']),
low= float(kline_data['l']),
close= float(kline_data['c']),
volume= float(kline_data['v']),
time= datetime.fromtimestamp(kline_data['t']/1000)
)
actual.append(ohlcv.close)
if len(ema) < 5:
atr.add(ohlcv)
ema.add(ohlcv.close)
macd.add(ohlcv.close)
rsi.add(ohlcv.close)
else:
atr.purge_oldest(1)
ema.purge_oldest(1)
macd.purge_oldest(1)
rsi.purge_oldest(1)
atr.add(ohlcv)
ema.add(ohlcv.close)
macd.add(ohlcv.close)
rsi.add(ohlcv.close)
if ema[-1] is not None:
indicator_data = {'actual': round(actual[-1],2),
'atr': round(atr[-1],2),
'ema': round(ema[-1],2),
'macd': round(composite_to_lists(macd).get('macd')[-1],2),
'rsi': round(rsi[-1]),
'time': ohlcv.time.time().strftime("%M:%S")}
yield indicator_data
finally:
await websocket.close()