Skip to content

Commit

Permalink
Merge pull request #16 from itsphat/wip/market-watcher-drop-runner
Browse files Browse the repository at this point in the history
Drop runner on the watcher
  • Loading branch information
heyphat authored Jan 25, 2022
2 parents a551f34 + 26ff46c commit 73da8f6
Show file tree
Hide file tree
Showing 10 changed files with 131 additions and 28 deletions.
10 changes: 6 additions & 4 deletions cmd/app/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,17 +108,19 @@ func Mux(middleware Func) *mux.Router {
middleware(http.HandlerFunc(watchlist))).Methods("GET")
router.Handle("/watcher/last/{ticker}",
middleware(http.HandlerFunc(last))).Methods("GET")
router.Handle("/watcher/watch/{ticker}",
middleware(http.HandlerFunc(watch))).Methods("POST")
router.Handle("/watcher/is_synced/{ticker}/{frame}",
middleware(http.HandlerFunc(synced))).Methods("GET")
router.Handle("/watcher/watch/{ticker}",
middleware(http.HandlerFunc(watch))).Methods("POST")
router.Handle("/watcher/drop/{ticker}",
middleware(http.HandlerFunc(dropRunner))).Methods("POST")

// evaluator endpoints
router.Handle("/evaluator/list",
middleware(http.HandlerFunc(listSignals))).Methods("GET")
router.Handle("/evaluator/add_signal/{patterns}",
router.Handle("/evaluator/add/{patterns}",
middleware(http.HandlerFunc(addSignal))).Methods("POST")
router.Handle("/evaluator/drop_signals/{names}",
router.Handle("/evaluator/drop/{names}",
middleware(http.HandlerFunc(dropSignals))).Methods("POST")

// notifier enpoints
Expand Down
38 changes: 34 additions & 4 deletions cmd/app/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ func watch(w http.ResponseWriter, req *http.Request) {
tickers, ok := parseVars(mux.Vars(req), "ticker")
if !ok {
w.WriteHeader(http.StatusBadRequest)
return
}
mk, ok := parseOptions(req.URL.Query(), "market")
if !ok {
Expand All @@ -48,18 +49,47 @@ func watch(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
}

func dropRunner(w http.ResponseWriter, req *http.Request) {
tickers, ok := parseVars(mux.Vars(req), "ticker")
if !ok {
w.WriteHeader(http.StatusBadRequest)
return
}
mk, ok := parseOptions(req.URL.Query(), "market")
if !ok {
mk = []string{"CASH"}
}
for _, t := range tickers {
if err := market.Drop(t, mk[0]); err != nil {
logger.Error.Println(err)
InternalError(w)
return
}
}
w.WriteHeader(http.StatusOK)
}

func last(w http.ResponseWriter, req *http.Request) {
strs, ok := parseVars(mux.Vars(req), "ticker")
tickers, ok := parseVars(mux.Vars(req), "ticker")
if !ok {
w.WriteHeader(http.StatusBadRequest)
return
}
if !market.IsWatchingOn(strs[0]) {
mk, ok := parseOptions(req.URL.Query(), "market")
if !ok {
mk = []string{"CASH"}
}
//strs, ok := parseVars(mux.Vars(req), "ticker")
//if !ok {
// w.WriteHeader(http.StatusBadRequest)
// return
//}
if !market.IsWatchingOn(tickers[0], mk[0]) {
w.WriteHeader(http.StatusNotFound)
return
}
clast := market.LastCandles(strs[0])
ilast := market.LastIndicators(strs[0])
clast := market.LastCandles(tickers[0])
ilast := market.LastIndicators(tickers[0])
type candles struct {
Candles tax.CandlesJSON `json:"candles"`
Indicators tax.IndicatorsJSON `json:"indicators"`
Expand Down
1 change: 0 additions & 1 deletion internal/cmd/market/communicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,5 +64,4 @@ func (c *communicator) newPayloadWithID(id string, data interface{}) *payload {
when: time.Now(),
what: data,
}

}
2 changes: 1 addition & 1 deletion internal/cmd/market/evaluator.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (e *evaluator) add(patterns []string, s *strategy.Signal) error {
return nil
}

// remove removes the given signal from the evaluator. After the removal, the singal won't be
// drop removes the given signal from the evaluator. After the removal, the singal won't be
// evaluated any longer.
func (e *evaluator) drop(name string) error {
e.Lock()
Expand Down
23 changes: 18 additions & 5 deletions internal/cmd/market/market.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,8 @@ func NewMarket(configFilePath *string) (*MarketStruct, error) {
for {
time.Sleep(time.Minute)
// the duration must be the time period that the watcher is watching on.
duration := time.Minute * 5
ticker := "BTCUSDT"
if synced := Market.IsSynced(ticker, duration); !synced {
if synced := Market.IsSynced(ticker, time.Minute*5); !synced {
Market.notifier.notify(fmt.Sprintf("%s is out of sync for %s", ticker, duration.String()))
}
}
Expand Down Expand Up @@ -237,15 +236,29 @@ func (m *MarketStruct) Watch(ticker, market string) error {
if !ok {
return errors.New("unsupported market")
}
return m.watcher.watch(ticker+m.configs.Market.Watcher.BaseMarket, m.parseRunnerConfigs(mk), nil)
return m.watcher.watch(ticker, m.parseRunnerConfigs(mk), nil)
}

func (m *MarketStruct) Drop(ticker, market string) error {
mk, ok := runner.ValidateMarket(market)
if !ok {
return errors.New("unsupported market")
}
return m.watcher.drop(ticker, m.parseRunnerConfigs(mk))
}

func (m *MarketStruct) Watchlist() []string {
return m.watcher.watchlist()
}

func (m *MarketStruct) IsWatchingOn(ticker string) bool {
return m.watcher.isWatchingOn(ticker)
func (m *MarketStruct) IsWatchingOn(ticker string, market string) bool {
mk, ok := runner.ValidateMarket(market)
if !ok {
return ok
}
rc := runner.NewRunnerDefaultConfigs()
rc.Market = mk
return m.watcher.isWatchingOn(runner.NewRunner(ticker, rc).GetUniqueName())
}

func (m *MarketStruct) LastCandles(ticker string) tax.CandlesJSON {
Expand Down
15 changes: 15 additions & 0 deletions internal/cmd/market/notifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,24 @@ func (n *notifier) connect() {
go n.processEvaluatorRequest(msg)
}
}()
go n.await()
n.connected = true
}

// await awaits for message from user to add chatID.
func (n *notifier) await() {
updates := n.bot.GetUpdatesChan(tele.NewUpdate(0))
for update := range updates {
if update.Message == nil {
continue
}
go n.addChatIDs([]int64{update.Message.Chat.ID})
msg := tele.NewMessage(update.Message.Chat.ID, fmt.Sprintf("You're all set. Your chatID is %d.", update.Message.Chat.ID))
msg.ReplyToMessageID = update.Message.MessageID
n.bot.Send(msg)
}
}

// isConnected returns true if the notifier is connected to the system, false otherwise.
func (n *notifier) isConnected() bool { return n.connected }

Expand Down
2 changes: 1 addition & 1 deletion internal/cmd/market/provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func Test_Provider(t *testing.T) {
assert.EqualValues(t, nil, err)
assert.EqualValues(t, true, len(listings) == 1)

_, err := provider.binFutu.NewListPriceChangeStatsService().Do(context.Background())
_, err = provider.binFutu.NewListPriceChangeStatsService().Do(context.Background())
assert.EqualValues(t, nil, err)
//for _, s := range stats {
// fmt.Println(fmt.Sprintf("%v", s.Symbol))
Expand Down
25 changes: 14 additions & 11 deletions internal/cmd/market/streamer.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,10 @@ func (s *streamer) processingWatcherRequest(msg *message) {
//defer s.Unlock()
m := msg.request.what.(wmember)
if s.isStreamingOn(m.runner.GetUniqueName(), WATCHER) {
s.logger.Info.Println(s.newLog(m.runner.GetUniqueName(), "already streaming this ticker"))
// TODO: this only works if streamer receives request from one market participant.
s.unsubscribe(m.runner.GetUniqueName())
close(m.bChann)
close(m.tChann)
} else {
// TODO: need to check if it is streaming for other participants
bChann := []chan *ta.Candle{m.bChann}
Expand Down Expand Up @@ -213,7 +216,7 @@ func (s *streamer) subscribe(
tChann []chan *tax.Trade) (chan struct{}, chan struct{}) {
s.Lock()
defer s.Unlock()
// cash types
// cash handlers
tradeHandler := func(event *bn.WsAggTradeEvent) {
for _, c := range tChann {
c <- tax.ConvertBinanceStreamingAggTrade(event)
Expand All @@ -227,7 +230,7 @@ func (s *streamer) subscribe(
c <- tax.ConvertBinanceStreamingKline(event, nil)
}
}
// futures types
// futures handlers
futuTradeHandler := func(event *bnf.WsAggTradeEvent) {
for _, c := range tChann {
c <- tax.ConvertBinanceFrturesStreamingAggTrade(event)
Expand Down Expand Up @@ -272,9 +275,9 @@ func (s *streamer) streamingBinanceKline(name string, stop chan struct{},
klineHandler func(e *bn.WsKlineEvent)) chan struct{} {
errorHandler := func(err error) { s.logger.Error.Println(s.newLog(name, err.Error())) }
go func(stopC chan struct{}) {
var err error
err := errors.New("not an error")
var done chan struct{}
for {
for err != nil {
done, stop, err = bn.WsKlineServe(name, "1m", klineHandler, errorHandler)
if err != nil {
s.logger.Error.Println(s.newLog(name, err.Error()))
Expand All @@ -290,9 +293,9 @@ func (s *streamer) streamingBinanceFuturesKline(name string, stop chan struct{},
klineHandler func(e *bnf.WsKlineEvent)) chan struct{} {
errorHandler := func(err error) { s.logger.Error.Println(s.newLog(name, err.Error())) }
go func(stopC chan struct{}) {
var err error
err := errors.New("not an error")
var done chan struct{}
for {
for err != nil {
done, stop, err = bnf.WsKlineServe(name, "1m", klineHandler, errorHandler)
if err != nil {
s.logger.Error.Println(s.newLog(name, err.Error()))
Expand All @@ -308,9 +311,9 @@ func (s *streamer) streamingBinanceTrade(name string, stop chan struct{},
tradeHandler func(e *bn.WsAggTradeEvent)) chan struct{} {
errorHandler := func(err error) { s.logger.Error.Println(s.newLog(name, err.Error())) }
go func(stopC chan struct{}) {
var err error
err := errors.New("not an error")
var done chan struct{}
for {
for err != nil {
done, stop, err = bn.WsAggTradeServe(name, tradeHandler, errorHandler)
if err != nil {
s.logger.Error.Println(s.newLog(name, err.Error()))
Expand All @@ -326,9 +329,9 @@ func (s *streamer) streamingBinanceFuturesTrade(name string, stop chan struct{},
tradeHandler func(e *bnf.WsAggTradeEvent)) chan struct{} {
errorHandler := func(err error) { s.logger.Error.Println(s.newLog(name, err.Error())) }
go func(stopC chan struct{}) {
var err error
err := errors.New("not an error")
var done chan struct{}
for {
for err != nil {
done, stop, err = bnf.WsAggTradeServe(name, tradeHandler, errorHandler)
if err != nil {
s.logger.Error.Println(s.newLog(name, err.Error()))
Expand Down
36 changes: 35 additions & 1 deletion internal/cmd/market/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,25 @@ func (w *watcher) await(mem wmember) {
}()
}

// drop removes the given ticker from the watchlist. It closes all the streaming channels.
func (w *watcher) drop(ticker string, rc *runner.RunnerConfigs) error {
if rc == nil {
return errors.New("missing runner config")
}
w.Lock()
defer w.Unlock()
mem, ok := w.runners.Load(runner.NewRunner(ticker, rc).GetUniqueName())
if !ok {
return errors.New("runner not found")
}
r := mem.(wmember).runner
for !w.registerStreamingChannel(mem.(wmember)) {
w.logger.Error.Println(w.newLog(r.GetName(), "failed to deregister streaming data"))
}
w.runners.Delete(r.GetUniqueName())
return nil
}

// lastCandles returns all last candles from all time frames of a member in the watchlist
func (w *watcher) lastCandles(ticker string) []*ta.Candle {
candles := make([]*ta.Candle, 0)
Expand Down Expand Up @@ -219,7 +238,7 @@ func (w *watcher) connect() {
}

// registerStreamingChannel registers the runners with the streamer in order to
// recevie and consume candles broadcasted by data providor. Every time the Watch
// recevie and consume candles broadcasted by data providor. Every time the drop
// method is called and the ticker is vallid, it will invoke this method.
func (w *watcher) registerStreamingChannel(mem wmember) bool {
doneStreamingRegister := false
Expand All @@ -233,6 +252,21 @@ func (w *watcher) registerStreamingChannel(mem wmember) bool {
return doneStreamingRegister
}

// deregisterStreamingChannel deregisters the runners with the streamer in order to
// cancle streaming channels broadcasted by data providor. Every time the drop
// method is called and the ticker is vallid, it will invoke this method.
//func (w *watcher) deregisterStreamingChannel(mem wmember) bool {
// doneStreamingDeregister := false
// var maxTries int
// for !doneStreamingDeregister && maxTries <= 3 {
// resC := make(chan *payload)
// w.communicator.watcher2Streamer <- w.communicator.newMessage(mem, resC)
// doneStreamingDeregister = (<-resC).what.(bool)
// maxTries++
// }
// return doneStreamingDeregister
//}

// This processes the request from the streamer, currently the streamer only requests
// for the `mem` channels in order to reinitialize the streaming data if necessary.
func (w *watcher) processStreamerRequest(msg *message) {
Expand Down
7 changes: 7 additions & 0 deletions internal/cmd/market/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func Test_Watcher(t *testing.T) {
}
}()

// test watch runner
err = watcher.watch(ticker, runner.NewRunnerDefaultConfigs(), nil)
assert.EqualValues(t, nil, err)
assert.EqualValues(t, true, watcher.isConnected())
Expand All @@ -50,4 +51,10 @@ func Test_Watcher(t *testing.T) {
assert.EqualValues(t, 499, len(line.Candles.Candles))
}
}

// test drop runner
err = watcher.drop(ticker, runner.NewRunnerDefaultConfigs())
assert.EqualValues(t, nil, err)
assert.EqualValues(t, false, watcher.isWatchingOn(ticker))
assert.EqualValues(t, []string{}, watcher.watchlist())
}

0 comments on commit 73da8f6

Please sign in to comment.