From 6ee311332c7a63b28343a0c9d17cb0e44734cd22 Mon Sep 17 00:00:00 2001 From: Phat Huynh Date: Sun, 23 Jan 2022 05:05:53 +0000 Subject: [PATCH 1/9] Update function description --- internal/cmd/market/evaluator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/cmd/market/evaluator.go b/internal/cmd/market/evaluator.go index cc508f2..8efd2ce 100644 --- a/internal/cmd/market/evaluator.go +++ b/internal/cmd/market/evaluator.go @@ -101,7 +101,7 @@ func (e *evaluator) add(patterns []string, s *strategy.Signal) error { return nil } -// remove removes the given signal from the evaluator. After the removal, the singal won't be +// drop removes the given signal from the evaluator. After the removal, the singal won't be // evaluated any longer. func (e *evaluator) drop(name string) error { e.Lock() From afea57d01dba61cfca8b547cf18ba504ca18a34a Mon Sep 17 00:00:00 2001 From: Phat Huynh Date: Sun, 23 Jan 2022 05:06:27 +0000 Subject: [PATCH 2/9] Update market watch and drop runner --- internal/cmd/market/market.go | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/internal/cmd/market/market.go b/internal/cmd/market/market.go index 1780b3d..526a2d2 100644 --- a/internal/cmd/market/market.go +++ b/internal/cmd/market/market.go @@ -237,15 +237,29 @@ func (m *MarketStruct) Watch(ticker, market string) error { if !ok { return errors.New("unsupported market") } - return m.watcher.watch(ticker+m.configs.Market.Watcher.BaseMarket, m.parseRunnerConfigs(mk), nil) + return m.watcher.watch(ticker, m.parseRunnerConfigs(mk), nil) +} + +func (m *MarketStruct) Drop(ticker, market string) error { + mk, ok := runner.ValidateMarket(market) + if !ok { + return errors.New("unsupported market") + } + return m.watcher.drop(ticker, m.parseRunnerConfigs(mk)) } func (m *MarketStruct) Watchlist() []string { return m.watcher.watchlist() } -func (m *MarketStruct) IsWatchingOn(ticker string) bool { - return m.watcher.isWatchingOn(ticker) +func (m *MarketStruct) IsWatchingOn(ticker string, market string) bool { + mk, ok := runner.ValidateMarket(market) + if !ok { + return ok + } + rc := runner.NewRunnerDefaultConfigs() + rc.Market = mk + return m.watcher.isWatchingOn(runner.NewRunner(ticker, rc).GetUniqueName()) } func (m *MarketStruct) LastCandles(ticker string) tax.CandlesJSON { From 6613a1ffc264cd83aa0916475e1c74ccc4880792 Mon Sep 17 00:00:00 2001 From: Phat Huynh Date: Sun, 23 Jan 2022 05:07:57 +0000 Subject: [PATCH 3/9] Add drop runner funtion to watcher --- internal/cmd/market/watcher.go | 36 ++++++++++++++++++++++++++++- internal/cmd/market/watcher_test.go | 7 ++++++ 2 files changed, 42 insertions(+), 1 deletion(-) diff --git a/internal/cmd/market/watcher.go b/internal/cmd/market/watcher.go index 848328b..eba09f5 100644 --- a/internal/cmd/market/watcher.go +++ b/internal/cmd/market/watcher.go @@ -170,6 +170,25 @@ func (w *watcher) await(mem wmember) { }() } +// drop removes the given ticker from the watchlist. It closes all the streaming channels. +func (w *watcher) drop(ticker string, rc *runner.RunnerConfigs) error { + if rc == nil { + return errors.New("missing runner config") + } + w.Lock() + defer w.Unlock() + mem, ok := w.runners.Load(runner.NewRunner(ticker, rc).GetUniqueName()) + if !ok { + return errors.New("runner not found") + } + r := mem.(wmember).runner + for !w.registerStreamingChannel(mem.(wmember)) { + w.logger.Error.Println(w.newLog(r.GetName(), "failed to deregister streaming data")) + } + w.runners.Delete(r.GetUniqueName()) + return nil +} + // lastCandles returns all last candles from all time frames of a member in the watchlist func (w *watcher) lastCandles(ticker string) []*ta.Candle { candles := make([]*ta.Candle, 0) @@ -219,7 +238,7 @@ func (w *watcher) connect() { } // registerStreamingChannel registers the runners with the streamer in order to -// recevie and consume candles broadcasted by data providor. Every time the Watch +// recevie and consume candles broadcasted by data providor. Every time the drop // method is called and the ticker is vallid, it will invoke this method. func (w *watcher) registerStreamingChannel(mem wmember) bool { doneStreamingRegister := false @@ -233,6 +252,21 @@ func (w *watcher) registerStreamingChannel(mem wmember) bool { return doneStreamingRegister } +// deregisterStreamingChannel deregisters the runners with the streamer in order to +// cancle streaming channels broadcasted by data providor. Every time the drop +// method is called and the ticker is vallid, it will invoke this method. +//func (w *watcher) deregisterStreamingChannel(mem wmember) bool { +// doneStreamingDeregister := false +// var maxTries int +// for !doneStreamingDeregister && maxTries <= 3 { +// resC := make(chan *payload) +// w.communicator.watcher2Streamer <- w.communicator.newMessage(mem, resC) +// doneStreamingDeregister = (<-resC).what.(bool) +// maxTries++ +// } +// return doneStreamingDeregister +//} + // This processes the request from the streamer, currently the streamer only requests // for the `mem` channels in order to reinitialize the streaming data if necessary. func (w *watcher) processStreamerRequest(msg *message) { diff --git a/internal/cmd/market/watcher_test.go b/internal/cmd/market/watcher_test.go index a3325ca..3c65ae0 100644 --- a/internal/cmd/market/watcher_test.go +++ b/internal/cmd/market/watcher_test.go @@ -31,6 +31,7 @@ func Test_Watcher(t *testing.T) { } }() + // test watch runner err = watcher.watch(ticker, runner.NewRunnerDefaultConfigs(), nil) assert.EqualValues(t, nil, err) assert.EqualValues(t, true, watcher.isConnected()) @@ -50,4 +51,10 @@ func Test_Watcher(t *testing.T) { assert.EqualValues(t, 499, len(line.Candles.Candles)) } } + + // test drop runner + err = watcher.drop(ticker, runner.NewRunnerDefaultConfigs()) + assert.EqualValues(t, nil, err) + assert.EqualValues(t, false, watcher.isWatchingOn(ticker)) + assert.EqualValues(t, []string{}, watcher.watchlist()) } From 78747549284a6664d9bb22a68acfb7dbd58e9c94 Mon Sep 17 00:00:00 2001 From: Phat Huynh Date: Sun, 23 Jan 2022 05:08:27 +0000 Subject: [PATCH 4/9] Update streamer to drop broadcasting channels --- internal/cmd/market/streamer.go | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/internal/cmd/market/streamer.go b/internal/cmd/market/streamer.go index f928337..3fb2e79 100644 --- a/internal/cmd/market/streamer.go +++ b/internal/cmd/market/streamer.go @@ -114,7 +114,10 @@ func (s *streamer) processingWatcherRequest(msg *message) { //defer s.Unlock() m := msg.request.what.(wmember) if s.isStreamingOn(m.runner.GetUniqueName(), WATCHER) { - s.logger.Info.Println(s.newLog(m.runner.GetUniqueName(), "already streaming this ticker")) + // TODO: this only works if streamer receives request from one market participant. + s.unsubscribe(m.runner.GetUniqueName()) + close(m.bChann) + close(m.tChann) } else { // TODO: need to check if it is streaming for other participants bChann := []chan *ta.Candle{m.bChann} @@ -213,7 +216,7 @@ func (s *streamer) subscribe( tChann []chan *tax.Trade) (chan struct{}, chan struct{}) { s.Lock() defer s.Unlock() - // cash types + // cash handlers tradeHandler := func(event *bn.WsAggTradeEvent) { for _, c := range tChann { c <- tax.ConvertBinanceStreamingAggTrade(event) @@ -227,7 +230,7 @@ func (s *streamer) subscribe( c <- tax.ConvertBinanceStreamingKline(event, nil) } } - // futures types + // futures handlers futuTradeHandler := func(event *bnf.WsAggTradeEvent) { for _, c := range tChann { c <- tax.ConvertBinanceFrturesStreamingAggTrade(event) @@ -272,9 +275,9 @@ func (s *streamer) streamingBinanceKline(name string, stop chan struct{}, klineHandler func(e *bn.WsKlineEvent)) chan struct{} { errorHandler := func(err error) { s.logger.Error.Println(s.newLog(name, err.Error())) } go func(stopC chan struct{}) { - var err error + err := errors.New("not an error") var done chan struct{} - for { + for err != nil { done, stop, err = bn.WsKlineServe(name, "1m", klineHandler, errorHandler) if err != nil { s.logger.Error.Println(s.newLog(name, err.Error())) @@ -290,9 +293,9 @@ func (s *streamer) streamingBinanceFuturesKline(name string, stop chan struct{}, klineHandler func(e *bnf.WsKlineEvent)) chan struct{} { errorHandler := func(err error) { s.logger.Error.Println(s.newLog(name, err.Error())) } go func(stopC chan struct{}) { - var err error + err := errors.New("not an error") var done chan struct{} - for { + for err != nil { done, stop, err = bnf.WsKlineServe(name, "1m", klineHandler, errorHandler) if err != nil { s.logger.Error.Println(s.newLog(name, err.Error())) @@ -308,9 +311,9 @@ func (s *streamer) streamingBinanceTrade(name string, stop chan struct{}, tradeHandler func(e *bn.WsAggTradeEvent)) chan struct{} { errorHandler := func(err error) { s.logger.Error.Println(s.newLog(name, err.Error())) } go func(stopC chan struct{}) { - var err error + err := errors.New("not an error") var done chan struct{} - for { + for err != nil { done, stop, err = bn.WsAggTradeServe(name, tradeHandler, errorHandler) if err != nil { s.logger.Error.Println(s.newLog(name, err.Error())) @@ -326,9 +329,9 @@ func (s *streamer) streamingBinanceFuturesTrade(name string, stop chan struct{}, tradeHandler func(e *bnf.WsAggTradeEvent)) chan struct{} { errorHandler := func(err error) { s.logger.Error.Println(s.newLog(name, err.Error())) } go func(stopC chan struct{}) { - var err error + err := errors.New("not an error") var done chan struct{} - for { + for err != nil { done, stop, err = bnf.WsAggTradeServe(name, tradeHandler, errorHandler) if err != nil { s.logger.Error.Println(s.newLog(name, err.Error())) From 3bbd2909e88f2e022862741f4c29d01be3c0f523 Mon Sep 17 00:00:00 2001 From: Phat Huynh Date: Sun, 23 Jan 2022 05:08:46 +0000 Subject: [PATCH 5/9] Minor change --- internal/cmd/market/provider_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/cmd/market/provider_test.go b/internal/cmd/market/provider_test.go index 115016b..2cc9add 100644 --- a/internal/cmd/market/provider_test.go +++ b/internal/cmd/market/provider_test.go @@ -45,7 +45,7 @@ func Test_Provider(t *testing.T) { assert.EqualValues(t, nil, err) assert.EqualValues(t, true, len(listings) == 1) - _, err := provider.binFutu.NewListPriceChangeStatsService().Do(context.Background()) + _, err = provider.binFutu.NewListPriceChangeStatsService().Do(context.Background()) assert.EqualValues(t, nil, err) //for _, s := range stats { // fmt.Println(fmt.Sprintf("%v", s.Symbol)) From 9fd7eb535a5ae96d1d3e57bbf5b676c900ba160a Mon Sep 17 00:00:00 2001 From: Phat Huynh Date: Sun, 23 Jan 2022 05:09:40 +0000 Subject: [PATCH 6/9] Update main app endpoints --- cmd/app/server.go | 10 ++++++---- cmd/app/watcher.go | 38 ++++++++++++++++++++++++++++++++++---- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/cmd/app/server.go b/cmd/app/server.go index fe5f69f..4e42283 100644 --- a/cmd/app/server.go +++ b/cmd/app/server.go @@ -108,17 +108,19 @@ func Mux(middleware Func) *mux.Router { middleware(http.HandlerFunc(watchlist))).Methods("GET") router.Handle("/watcher/last/{ticker}", middleware(http.HandlerFunc(last))).Methods("GET") - router.Handle("/watcher/watch/{ticker}", - middleware(http.HandlerFunc(watch))).Methods("POST") router.Handle("/watcher/is_synced/{ticker}/{frame}", middleware(http.HandlerFunc(synced))).Methods("GET") + router.Handle("/watcher/watch/{ticker}", + middleware(http.HandlerFunc(watch))).Methods("POST") + router.Handle("/watcher/drop/{ticker}", + middleware(http.HandlerFunc(dropRunner))).Methods("POST") // evaluator endpoints router.Handle("/evaluator/list", middleware(http.HandlerFunc(listSignals))).Methods("GET") - router.Handle("/evaluator/add_signal/{patterns}", + router.Handle("/evaluator/add/{patterns}", middleware(http.HandlerFunc(addSignal))).Methods("POST") - router.Handle("/evaluator/drop_signals/{names}", + router.Handle("/evaluator/drop/{names}", middleware(http.HandlerFunc(dropSignals))).Methods("POST") // notifier enpoints diff --git a/cmd/app/watcher.go b/cmd/app/watcher.go index 43cfff8..b2872a5 100644 --- a/cmd/app/watcher.go +++ b/cmd/app/watcher.go @@ -33,6 +33,7 @@ func watch(w http.ResponseWriter, req *http.Request) { tickers, ok := parseVars(mux.Vars(req), "ticker") if !ok { w.WriteHeader(http.StatusBadRequest) + return } mk, ok := parseOptions(req.URL.Query(), "market") if !ok { @@ -48,18 +49,47 @@ func watch(w http.ResponseWriter, req *http.Request) { w.WriteHeader(http.StatusOK) } +func dropRunner(w http.ResponseWriter, req *http.Request) { + tickers, ok := parseVars(mux.Vars(req), "ticker") + if !ok { + w.WriteHeader(http.StatusBadRequest) + return + } + mk, ok := parseOptions(req.URL.Query(), "market") + if !ok { + mk = []string{"CASH"} + } + for _, t := range tickers { + if err := market.Drop(t, mk[0]); err != nil { + logger.Error.Println(err) + InternalError(w) + return + } + } + w.WriteHeader(http.StatusOK) +} + func last(w http.ResponseWriter, req *http.Request) { - strs, ok := parseVars(mux.Vars(req), "ticker") + tickers, ok := parseVars(mux.Vars(req), "ticker") if !ok { w.WriteHeader(http.StatusBadRequest) return } - if !market.IsWatchingOn(strs[0]) { + mk, ok := parseOptions(req.URL.Query(), "market") + if !ok { + mk = []string{"CASH"} + } + //strs, ok := parseVars(mux.Vars(req), "ticker") + //if !ok { + // w.WriteHeader(http.StatusBadRequest) + // return + //} + if !market.IsWatchingOn(tickers[0], mk[0]) { w.WriteHeader(http.StatusNotFound) return } - clast := market.LastCandles(strs[0]) - ilast := market.LastIndicators(strs[0]) + clast := market.LastCandles(tickers[0]) + ilast := market.LastIndicators(tickers[0]) type candles struct { Candles tax.CandlesJSON `json:"candles"` Indicators tax.IndicatorsJSON `json:"indicators"` From b6ea56a5bc236b9159ff1e61558d701e5d8381c8 Mon Sep 17 00:00:00 2001 From: Phat Huynh Date: Mon, 24 Jan 2022 00:17:09 +0000 Subject: [PATCH 7/9] Notifier to accpect user from message initialization --- internal/cmd/market/notifier.go | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/internal/cmd/market/notifier.go b/internal/cmd/market/notifier.go index 7a12242..57ab53d 100644 --- a/internal/cmd/market/notifier.go +++ b/internal/cmd/market/notifier.go @@ -76,9 +76,24 @@ func (n *notifier) connect() { go n.processEvaluatorRequest(msg) } }() + go n.await() n.connected = true } +// await awaits for message from user to add chatID. +func (n *notifier) await() { + updates := n.bot.GetUpdatesChan(tele.NewUpdate(0)) + for update := range updates { + if update.Message == nil { + continue + } + go n.addChatIDs([]int64{update.Message.Chat.ID}) + msg := tele.NewMessage(update.Message.Chat.ID, "You're all set.") + msg.ReplyToMessageID = update.Message.MessageID + n.bot.Send(msg) + } +} + // isConnected returns true if the notifier is connected to the system, false otherwise. func (n *notifier) isConnected() bool { return n.connected } From dbba0a0e32fa82f0cf32f20cc079578c070f270c Mon Sep 17 00:00:00 2001 From: Phat Huynh Date: Mon, 24 Jan 2022 00:30:28 +0000 Subject: [PATCH 8/9] Update notifier with response chatID --- internal/cmd/market/notifier.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/cmd/market/notifier.go b/internal/cmd/market/notifier.go index 57ab53d..6533984 100644 --- a/internal/cmd/market/notifier.go +++ b/internal/cmd/market/notifier.go @@ -88,7 +88,7 @@ func (n *notifier) await() { continue } go n.addChatIDs([]int64{update.Message.Chat.ID}) - msg := tele.NewMessage(update.Message.Chat.ID, "You're all set.") + msg := tele.NewMessage(update.Message.Chat.ID, fmt.Sprintf("You're all set. Your chatID is %d.", update.Message.Chat.ID)) msg.ReplyToMessageID = update.Message.MessageID n.bot.Send(msg) } From 26ff46c27f461e54442c93f37fb1caba5438c992 Mon Sep 17 00:00:00 2001 From: Phat Huynh Date: Mon, 24 Jan 2022 06:11:43 +0000 Subject: [PATCH 9/9] Minor changes --- internal/cmd/market/communicator.go | 1 - internal/cmd/market/market.go | 3 +-- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/internal/cmd/market/communicator.go b/internal/cmd/market/communicator.go index b98de47..2a6cb92 100644 --- a/internal/cmd/market/communicator.go +++ b/internal/cmd/market/communicator.go @@ -64,5 +64,4 @@ func (c *communicator) newPayloadWithID(id string, data interface{}) *payload { when: time.Now(), what: data, } - } diff --git a/internal/cmd/market/market.go b/internal/cmd/market/market.go index 526a2d2..6ced66b 100644 --- a/internal/cmd/market/market.go +++ b/internal/cmd/market/market.go @@ -102,9 +102,8 @@ func NewMarket(configFilePath *string) (*MarketStruct, error) { for { time.Sleep(time.Minute) // the duration must be the time period that the watcher is watching on. - duration := time.Minute * 5 ticker := "BTCUSDT" - if synced := Market.IsSynced(ticker, duration); !synced { + if synced := Market.IsSynced(ticker, time.Minute*5); !synced { Market.notifier.notify(fmt.Sprintf("%s is out of sync for %s", ticker, duration.String())) } }