From 4ef403fab7d8bbdefa465206bc1accf3a86a9874 Mon Sep 17 00:00:00 2001 From: Phat Huynh Date: Sat, 22 Jan 2022 11:57:25 +0000 Subject: [PATCH 1/5] Add runner asset class & market type --- internal/pkg/runner/runner.go | 22 ++++++++++++++++++++++ internal/pkg/runner/types.go | 32 ++++++++++++++++++++++++++++++++ 2 files changed, 54 insertions(+) create mode 100644 internal/pkg/runner/types.go diff --git a/internal/pkg/runner/runner.go b/internal/pkg/runner/runner.go index d3a09bd..633ff0f 100644 --- a/internal/pkg/runner/runner.go +++ b/internal/pkg/runner/runner.go @@ -3,6 +3,7 @@ package runner import ( "fmt" "sort" + "strconv" "sync" "time" @@ -33,6 +34,8 @@ func ChangeMaxSize(size int) { } type RunnerConfigs struct { + Asset AssetClass + Market MarketType LFrames []time.Duration IConfigs tax.IndicatorConfigs } @@ -51,6 +54,8 @@ func NewRunnerDefaultConfigs() *RunnerConfigs { 24 * time.Hour, } return &RunnerConfigs{ + Asset: Crypto, + Market: Cash, LFrames: lineFrames, IConfigs: tax.NewDefaultIndicatorConfigs(), } @@ -100,6 +105,23 @@ func (r *Runner) GetConfigs() *RunnerConfigs { return r.configs } // GetName returns the runner's name. func (r *Runner) GetName() string { return r.name } +// GetUniqueName returns the unique name for the runner. +func (r *Runner) GetUniqueName() string { + switch r.configs.Market { + case Cash: + return r.name + case Futures: + return r.name + "PERP" + case Margin: + return r.name + "MARG" + default: + return r.name + strconv.Itoa(int(time.Now().Unix())) + } +} + +// GetMarketType returns the runner market. +func (r *Runner) GetMarketType() MarketType { return r.configs.Market } + // GetCap return the current marketcap based on the current price of the runner. func (r *Runner) GetCap() big.Decimal { if r == nil || r.fundamental == nil { diff --git a/internal/pkg/runner/types.go b/internal/pkg/runner/types.go new file mode 100644 index 0000000..6317950 --- /dev/null +++ b/internal/pkg/runner/types.go @@ -0,0 +1,32 @@ +package runner + +import "strings" + +type AssetClass string + +const ( + Crypto AssetClass = "CRYPTO" + Stock AssetClass = "STOCK" + Forex AssetClass = "Forex" +) + +type MarketType string + +const ( + Cash MarketType = "CASH" + Margin MarketType = "MARGIN" + Futures MarketType = "FUTURES" +) + +func ValidateMarket(market string) (MarketType, bool) { + if strings.ToUpper(market) == "CASH" { + return Cash, true + } + if strings.ToUpper(market) == "FUTURES" { + return Futures, true + } + //if strings.ToUpper(market) == "MARGIN" { + // return Margin, true + //} + return Cash, false +} From d0c062def7b943f9582d19ae98ca22e794966b78 Mon Sep 17 00:00:00 2001 From: Phat Huynh Date: Sat, 22 Jan 2022 11:58:36 +0000 Subject: [PATCH 2/5] Add futures convertors --- internal/pkg/techanex/convertor.go | 42 ++++++++++++++++++++++++++++++ 1 file changed, 42 insertions(+) diff --git a/internal/pkg/techanex/convertor.go b/internal/pkg/techanex/convertor.go index 7732d4b..99f69af 100644 --- a/internal/pkg/techanex/convertor.go +++ b/internal/pkg/techanex/convertor.go @@ -4,6 +4,7 @@ import ( "time" bn "github.com/adshao/go-binance/v2" + bnf "github.com/adshao/go-binance/v2/futures" ta "github.com/itsphat/techan" "github.com/sdcoffey/big" ) @@ -35,6 +36,22 @@ func ConvertBinanceKline(kline *bn.Kline, duration *time.Duration) *ta.Candle { return candle } +func ConvertBinanceFuturesKline(kline *bnf.Kline, duration *time.Duration) *ta.Candle { + d := time.Minute + if duration != nil { + d = *duration + } + period := ta.NewTimePeriod(time.Unix(kline.OpenTime/1000, 0), d) + candle := ta.NewCandle(period) + candle.OpenPrice = big.NewFromString(kline.Open) + candle.ClosePrice = big.NewFromString(kline.Close) + candle.MaxPrice = big.NewFromString(kline.High) + candle.MinPrice = big.NewFromString(kline.Low) + candle.Volume = big.NewFromString(kline.Volume) + candle.TradeCount = uint(kline.TradeNum) + return candle +} + func ConvertBinanceStreamingKline(kline *bn.WsKlineEvent, duration *time.Duration) *ta.Candle { d := time.Minute if duration != nil { @@ -51,6 +68,22 @@ func ConvertBinanceStreamingKline(kline *bn.WsKlineEvent, duration *time.Duratio return candle } +func ConvertBinanceFuturesStreamingKline(kline *bnf.WsKlineEvent, duration *time.Duration) *ta.Candle { + d := time.Minute + if duration != nil { + d = *duration + } + period := ta.NewTimePeriod(time.Unix(kline.Kline.StartTime/1000, 0), d) + candle := ta.NewCandle(period) + candle.OpenPrice = big.NewFromString(kline.Kline.Open) + candle.ClosePrice = big.NewFromString(kline.Kline.Close) + candle.MaxPrice = big.NewFromString(kline.Kline.High) + candle.MinPrice = big.NewFromString(kline.Kline.Low) + candle.Volume = big.NewFromString(kline.Kline.Volume) + candle.TradeCount = uint(kline.Kline.TradeNum) + return candle +} + func ConvertBinanceStreamingTrade(t *bn.WsTradeEvent) *Trade { trade := NewTrade() trade.Price = big.NewFromString(t.Price) @@ -69,6 +102,15 @@ func ConvertBinanceStreamingAggTrade(t *bn.WsAggTradeEvent) *Trade { return trade } +func ConvertBinanceFrturesStreamingAggTrade(t *bnf.WsAggTradeEvent) *Trade { + trade := NewTrade() + trade.Price = big.NewFromString(t.Price) + trade.Quantity = big.NewFromString(t.Quantity) + trade.TradeTime = t.TradeTime + //trade.IsBuyerMaker = t.IsBuyerMaker + return trade +} + func NewCandleFromCandle(candle *ta.Candle, duration *time.Duration) *ta.Candle { period := syncPeriod(candle.Period, duration) newCandle := ta.NewCandle(period) From 08ef8a04e0dfbecfb7faf1f94f406cd659a3c776 Mon Sep 17 00:00:00 2001 From: Phat Huynh Date: Sat, 22 Jan 2022 11:59:52 +0000 Subject: [PATCH 3/5] Update market package to accept futures --- internal/cmd/market/market.go | 40 ++++++- internal/cmd/market/provider.go | 39 ++++++- internal/cmd/market/provider_test.go | 12 ++ internal/cmd/market/streamer.go | 158 +++++++++++++++++++-------- internal/cmd/market/tester.go | 2 +- internal/cmd/market/watcher.go | 29 +++-- internal/cmd/market/watcher_test.go | 9 +- 7 files changed, 223 insertions(+), 66 deletions(-) diff --git a/internal/cmd/market/market.go b/internal/cmd/market/market.go index e653a67..1780b3d 100644 --- a/internal/cmd/market/market.go +++ b/internal/cmd/market/market.go @@ -2,6 +2,7 @@ package market import ( "context" + "errors" "fmt" "io/ioutil" "strings" @@ -112,8 +113,11 @@ func NewMarket(configFilePath *string) (*MarketStruct, error) { return Market, nil } -func (m *MarketStruct) parseRunnerConfigs() *runner.RunnerConfigs { +func (m *MarketStruct) parseRunnerConfigs(market runner.MarketType) *runner.RunnerConfigs { out := runner.NewRunnerDefaultConfigs() + if market == runner.Cash || market == runner.Futures { + out.Market = runner.MarketType(market) + } frames := []time.Duration{} if len(m.configs.Market.Watcher.Runner.Frames) > 0 { for _, f := range m.configs.Market.Watcher.Runner.Frames { @@ -142,6 +146,10 @@ func (m *MarketStruct) initWatchlist() error { if err != nil { return err } + futuStats, err := m.watcher.provider.binFutu.NewListPriceChangeStatsService().Do(context.Background()) + if err != nil { + return err + } limit := 1 if m.configs.IsProduction() { limit = 5000 @@ -165,8 +173,26 @@ func (m *MarketStruct) initWatchlist() error { if val, ok := listings[s.Symbol]; ok { fd = &val } - if err := m.watcher.watch(s.Symbol, m.parseRunnerConfigs(), fd); err != nil { - m.watcher.logger.Error.Println(m.watcher.newLog(s.Symbol, err.Error())) + if err := m.watcher.watch(s.Symbol, m.parseRunnerConfigs(runner.Cash), fd); err != nil { + m.watcher.logger.Error.Println(m.watcher.newLog(s.Symbol+"-"+string(runner.Cash), err.Error())) + } + } + } + for _, s := range futuStats { + if len(strings.Split(s.Symbol, "_")) > 1 { + continue + } + isMatched, err := re.MatchString(s.Symbol) + if err != nil { + return err + } + if isMatched { + var fd *runner.Fundamental + if val, ok := listings[s.Symbol]; ok { + fd = &val + } + if err := m.watcher.watch(s.Symbol, m.parseRunnerConfigs(runner.Futures), fd); err != nil { + m.watcher.logger.Error.Println(m.watcher.newLog(s.Symbol+"-"+string(runner.Futures), err.Error())) } } } @@ -206,8 +232,12 @@ func (m *MarketStruct) connect() { } // watcher endpoints -func (m *MarketStruct) Watch(ticker string) error { - return m.watcher.watch(ticker+m.configs.Market.Watcher.BaseMarket, m.parseRunnerConfigs(), nil) +func (m *MarketStruct) Watch(ticker, market string) error { + mk, ok := runner.ValidateMarket(market) + if !ok { + return errors.New("unsupported market") + } + return m.watcher.watch(ticker+m.configs.Market.Watcher.BaseMarket, m.parseRunnerConfigs(mk), nil) } func (m *MarketStruct) Watchlist() []string { diff --git a/internal/cmd/market/provider.go b/internal/cmd/market/provider.go index 0b93c44..c28d141 100644 --- a/internal/cmd/market/provider.go +++ b/internal/cmd/market/provider.go @@ -21,6 +21,7 @@ const ( type provider struct { binSpot *bn.Client + binFutu *bnf.Client coinCap *cc.Client } @@ -69,7 +70,7 @@ type fetchOptions struct { end *time.Time } -func (p *provider) fetchBinanceKlinesV3(ticker string, d time.Duration, opt *fetchOptions) ([]*ta.Candle, error) { +func (p *provider) fetchBinanceSpotKlinesV3(ticker string, d time.Duration, opt *fetchOptions) ([]*ta.Candle, error) { lmt := 1000 if opt != nil && opt.limit > 0 && opt.limit <= 1000 { lmt = opt.limit @@ -105,6 +106,42 @@ func (p *provider) fetchBinanceKlinesV3(ticker string, d time.Duration, opt *fet return candles, nil } +func (p *provider) fetchBinanceFuturesKlinesV3(ticker string, d time.Duration, opt *fetchOptions) ([]*ta.Candle, error) { + lmt := 1000 + if opt != nil && opt.limit > 0 && opt.limit <= 1000 { + lmt = opt.limit + } + re, _ := regexp.Compile(timeFramePattern) + interval := re.FindString(d.String()) + if d >= time.Hour*24 { + interval = "1d" + } + if d == time.Minute*10 { + interval = "5m" + } + end := time.Now().Unix() * 1000 + if opt != nil && opt.end != nil { + end = opt.end.Unix() * 1000 + } + var klines []*bnf.Kline + for len(klines) < lmt || (opt.start != nil && len(klines) > 0 && klines[0].OpenTime > opt.start.Unix()*1000) { + service := p.binFutu.NewKlinesService().Symbol(ticker).Interval(interval).EndTime(end).Limit(lmt) + kls, err := service.Do(context.Background()) + if err != nil { + return nil, err + } + klines = append(kls, klines...) + if len(kls) > 0 { + end = kls[0].CloseTime + } + } + var candles []*ta.Candle + for _, kline := range klines { + candles = append(candles, tax.ConvertBinanceFuturesKline(kline, &d)) + } + return candles, nil +} + func (p *provider) fetchCoinFundamentals(base string, limit int) (map[string]runner.Fundamental, error) { out := make(map[string]runner.Fundamental) listings, err := p.coinCap.Cryptocurrency.LatestListings(&cc.ListingOptions{Limit: limit}) diff --git a/internal/cmd/market/provider_test.go b/internal/cmd/market/provider_test.go index fcf859d..115016b 100644 --- a/internal/cmd/market/provider_test.go +++ b/internal/cmd/market/provider_test.go @@ -1,7 +1,9 @@ package market import ( + "context" "testing" + "time" "follow.markets/pkg/config" "github.com/stretchr/testify/assert" @@ -42,4 +44,14 @@ func Test_Provider(t *testing.T) { listings, err := provider.fetchCoinFundamentals(configs.Market.Watcher.BaseMarket, 1) assert.EqualValues(t, nil, err) assert.EqualValues(t, true, len(listings) == 1) + + _, err := provider.binFutu.NewListPriceChangeStatsService().Do(context.Background()) + assert.EqualValues(t, nil, err) + //for _, s := range stats { + // fmt.Println(fmt.Sprintf("%v", s.Symbol)) + //} + + klines, err := provider.fetchBinanceFuturesKlinesV3("BTCUSDT", time.Minute, &fetchOptions{limit: 60}) + assert.EqualValues(t, nil, err) + assert.EqualValues(t, 60, len(klines)) } diff --git a/internal/cmd/market/streamer.go b/internal/cmd/market/streamer.go index 028408c..f928337 100644 --- a/internal/cmd/market/streamer.go +++ b/internal/cmd/market/streamer.go @@ -7,8 +7,10 @@ import ( "time" bn "github.com/adshao/go-binance/v2" + bnf "github.com/adshao/go-binance/v2/futures" ta "github.com/itsphat/techan" + "follow.markets/internal/pkg/runner" tax "follow.markets/internal/pkg/techanex" "follow.markets/pkg/log" "follow.markets/pkg/util" @@ -42,6 +44,7 @@ func newStreamer(participants *sharedParticipants) (*streamer, error) { type controller struct { name string + uName string from []string stops []chan struct{} } @@ -110,17 +113,17 @@ func (s *streamer) processingWatcherRequest(msg *message) { //s.Lock() //defer s.Unlock() m := msg.request.what.(wmember) - if s.isStreamingOn(m.runner.GetName(), WATCHER) { - s.logger.Info.Println(s.newLog(m.runner.GetName(), "already streaming this ticker")) + if s.isStreamingOn(m.runner.GetUniqueName(), WATCHER) { + s.logger.Info.Println(s.newLog(m.runner.GetUniqueName(), "already streaming this ticker")) } else { // TODO: need to check if it is streaming for other participants bChann := []chan *ta.Candle{m.bChann} tChann := []chan *tax.Trade{m.tChann} from := []string{} - c := s.get(m.runner.GetName()) + c := s.get(m.runner.GetUniqueName()) if c != nil { for _, f := range c.from { - bc, tc := s.collectStreamingChannels(m.runner.GetName(), f) + bc, tc := s.collectStreamingChannels(m.runner.GetUniqueName(), f) if bc != nil { bChann = append(bChann, bc) } @@ -129,12 +132,13 @@ func (s *streamer) processingWatcherRequest(msg *message) { } } from = c.from - s.unsubscribe(m.runner.GetName()) + s.unsubscribe(m.runner.GetUniqueName()) } - bStopC, tStopC := s.subscribe(m.runner.GetName(), bChann, tChann) - s.controllers.Store(m.runner.GetName(), + bStopC, tStopC := s.subscribe(m.runner.GetName(), m.runner.GetMarketType(), bChann, tChann) + s.controllers.Store(m.runner.GetUniqueName(), controller{ name: m.runner.GetName(), + uName: m.runner.GetUniqueName(), from: append(from, WATCHER), stops: []chan struct{}{bStopC, tStopC}, }, @@ -147,40 +151,41 @@ func (s *streamer) processingWatcherRequest(msg *message) { } func (s *streamer) processingEvaluatorRequest(msg *message) { - m := msg.request.what.(emember) - if s.isStreamingOn(m.name, EVALUATOR) { - s.logger.Info.Println(s.newLog(m.name, "already streaming this ticker")) - } else { - bChann := []chan *ta.Candle{} - tChann := []chan *tax.Trade{m.tChann} - from := []string{} - c := s.get(m.name) - if c != nil { - for _, f := range c.from { - bc, tc := s.collectStreamingChannels(m.name, f) - if bc != nil { - bChann = append(bChann, bc) - } - if tc != nil { - tChann = append(tChann, tc) - } - } - from = c.from - s.unsubscribe(m.name) - } - bStopC, tStopC := s.subscribe(m.name, bChann, tChann) - s.controllers.Store(m.name, - controller{ - name: m.name, - from: append(from, EVALUATOR), - stops: []chan struct{}{bStopC, tStopC}, - }, - ) - } - if msg.response != nil { - msg.response <- s.communicator.newPayload(true) - close(msg.response) - } + return + //m := msg.request.what.(emember) + //if s.isStreamingOn(m.name, EVALUATOR) { + // s.logger.Info.Println(s.newLog(m.name, "already streaming this ticker")) + //} else { + // bChann := []chan *ta.Candle{} + // tChann := []chan *tax.Trade{m.tChann} + // from := []string{} + // c := s.get(m.name) + // if c != nil { + // for _, f := range c.from { + // bc, tc := s.collectStreamingChannels(m.name, f) + // if bc != nil { + // bChann = append(bChann, bc) + // } + // if tc != nil { + // tChann = append(tChann, tc) + // } + // } + // from = c.from + // s.unsubscribe(m.name) + // } + // bStopC, tStopC := s.subscribe(m.name, bChann, tChann) + // s.controllers.Store(m.name, + // controller{ + // name: m.name, + // from: append(from, EVALUATOR), + // stops: []chan struct{}{bStopC, tStopC}, + // }, + // ) + //} + //if msg.response != nil { + // msg.response <- s.communicator.newPayload(true) + // close(msg.response) + //} } func (s *streamer) collectStreamingChannels(name string, from string) (chan *ta.Candle, chan *tax.Trade) { @@ -201,11 +206,14 @@ func (s *streamer) collectStreamingChannels(name string, from string) (chan *ta. return bChann, tChann } -func (s *streamer) subscribe(name string, +func (s *streamer) subscribe( + name string, + market runner.MarketType, bChann []chan *ta.Candle, tChann []chan *tax.Trade) (chan struct{}, chan struct{}) { s.Lock() defer s.Unlock() + // cash types tradeHandler := func(event *bn.WsAggTradeEvent) { for _, c := range tChann { c <- tax.ConvertBinanceStreamingAggTrade(event) @@ -219,17 +227,37 @@ func (s *streamer) subscribe(name string, c <- tax.ConvertBinanceStreamingKline(event, nil) } } + // futures types + futuTradeHandler := func(event *bnf.WsAggTradeEvent) { + for _, c := range tChann { + c <- tax.ConvertBinanceFrturesStreamingAggTrade(event) + } + } + futuKlineHandler := func(event *bnf.WsKlineEvent) { + if !event.Kline.IsFinal { + return + } + for _, c := range bChann { + c <- tax.ConvertBinanceFuturesStreamingKline(event, nil) + } + } var bStopC, tStopC chan struct{} - bStopC = s.streamingBinanceKline(name, bStopC, klineHandler) - tStopC = s.streamingBinanceTrade(name, tStopC, tradeHandler) + switch market { + case runner.Cash: + bStopC = s.streamingBinanceKline(name, bStopC, klineHandler) + tStopC = s.streamingBinanceTrade(name, tStopC, tradeHandler) + case runner.Futures: + bStopC = s.streamingBinanceFuturesKline(name, bStopC, futuKlineHandler) + tStopC = s.streamingBinanceFuturesTrade(name, tStopC, futuTradeHandler) + } return bStopC, tStopC } -func (s *streamer) unsubscribe(name string) { +func (s *streamer) unsubscribe(uName string) { s.Lock() defer s.Unlock() s.controllers.Range(func(key, value interface{}) bool { - if name == key.(string) { + if uName == key.(string) { for _, c := range value.(controller).stops { c <- struct{}{} } @@ -237,7 +265,7 @@ func (s *streamer) unsubscribe(name string) { } return true }) - s.controllers.Delete(name) + s.controllers.Delete(uName) } func (s *streamer) streamingBinanceKline(name string, stop chan struct{}, @@ -258,6 +286,24 @@ func (s *streamer) streamingBinanceKline(name string, stop chan struct{}, return stop } +func (s *streamer) streamingBinanceFuturesKline(name string, stop chan struct{}, + klineHandler func(e *bnf.WsKlineEvent)) chan struct{} { + errorHandler := func(err error) { s.logger.Error.Println(s.newLog(name, err.Error())) } + go func(stopC chan struct{}) { + var err error + var done chan struct{} + for { + done, stop, err = bnf.WsKlineServe(name, "1m", klineHandler, errorHandler) + if err != nil { + s.logger.Error.Println(s.newLog(name, err.Error())) + } + <-done + } + }(stop) + time.Sleep(time.Second) + return stop +} + func (s *streamer) streamingBinanceTrade(name string, stop chan struct{}, tradeHandler func(e *bn.WsAggTradeEvent)) chan struct{} { errorHandler := func(err error) { s.logger.Error.Println(s.newLog(name, err.Error())) } @@ -276,6 +322,24 @@ func (s *streamer) streamingBinanceTrade(name string, stop chan struct{}, return stop } +func (s *streamer) streamingBinanceFuturesTrade(name string, stop chan struct{}, + tradeHandler func(e *bnf.WsAggTradeEvent)) chan struct{} { + errorHandler := func(err error) { s.logger.Error.Println(s.newLog(name, err.Error())) } + go func(stopC chan struct{}) { + var err error + var done chan struct{} + for { + done, stop, err = bnf.WsAggTradeServe(name, tradeHandler, errorHandler) + if err != nil { + s.logger.Error.Println(s.newLog(name, err.Error())) + } + <-done + } + }(stop) + time.Sleep(time.Second) + return stop +} + // returns a log for the streamer func (s *streamer) newLog(name, message string) string { return fmt.Sprintf("[streamer] %s: %s", name, message) diff --git a/internal/cmd/market/tester.go b/internal/cmd/market/tester.go index 6c253a4..6691786 100644 --- a/internal/cmd/market/tester.go +++ b/internal/cmd/market/tester.go @@ -60,7 +60,7 @@ func (t *tester) test(ticker string, balance: initBalance, strategy: stg.SetRunner(r), } - candles, err := t.provider.fetchBinanceKlinesV3(ticker, r.SmallestFrame(), &fetchOptions{start: start, end: end}) + candles, err := t.provider.fetchBinanceSpotKlinesV3(ticker, r.SmallestFrame(), &fetchOptions{start: start, end: end}) if err != nil { return mem, err } diff --git a/internal/cmd/market/watcher.go b/internal/cmd/market/watcher.go index 68d1ec3..848328b 100644 --- a/internal/cmd/market/watcher.go +++ b/internal/cmd/market/watcher.go @@ -100,24 +100,37 @@ func (w *watcher) isSynced(ticker string, duration time.Duration) bool { // watching the ticker by comsuming the 1-minute candle and trade information boardcasted // from the streamer. func (w *watcher) watch(ticker string, rc *runner.RunnerConfigs, fd *runner.Fundamental) error { + if rc == nil { + return errors.New("missing runner configs") + } + var err error if !w.connected { w.connect() } - if w.isWatchingOn(ticker) { - return nil - } m := wmember{ runner: runner.NewRunner(ticker, rc), bChann: make(chan *ta.Candle, 3), tChann: make(chan *tax.Trade, 10), } + if w.isWatchingOn(m.runner.GetUniqueName()) { + return nil + } if fd != nil { m.runner.SetFundamental(fd) } for _, f := range m.runner.GetConfigs().LFrames { - candles, err := w.provider.fetchBinanceKlinesV3(ticker, f, &fetchOptions{limit: 500}) - if err != nil { - return err + var candles []*ta.Candle + switch rc.Market { + case runner.Cash: + candles, err = w.provider.fetchBinanceSpotKlinesV3(ticker, f, &fetchOptions{limit: 499}) + if err != nil { + return err + } + case runner.Futures: + candles, err = w.provider.fetchBinanceFuturesKlinesV3(ticker, f, &fetchOptions{limit: 499}) + if err != nil { + return err + } } if len(candles) == 0 { return errors.New(fmt.Sprintf("failed to fetch data for frame %v", f)) @@ -128,9 +141,9 @@ func (w *watcher) watch(ticker string, rc *runner.RunnerConfigs, fd *runner.Fund } w.Lock() defer w.Unlock() - w.runners.Store(ticker, m) + w.runners.Store(m.runner.GetUniqueName(), m) go w.await(m) - w.logger.Info.Println(w.newLog(ticker, "started to watch")) + w.logger.Info.Println(w.newLog(m.runner.GetUniqueName(), "started to watch")) return nil } diff --git a/internal/cmd/market/watcher_test.go b/internal/cmd/market/watcher_test.go index e2f70e2..a3325ca 100644 --- a/internal/cmd/market/watcher_test.go +++ b/internal/cmd/market/watcher_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/assert" + "follow.markets/internal/pkg/runner" "follow.markets/pkg/config" ) @@ -30,7 +31,7 @@ func Test_Watcher(t *testing.T) { } }() - err = watcher.watch(ticker, nil, nil) + err = watcher.watch(ticker, runner.NewRunnerDefaultConfigs(), nil) assert.EqualValues(t, nil, err) assert.EqualValues(t, true, watcher.isConnected()) assert.EqualValues(t, true, watcher.isWatchingOn(ticker)) @@ -42,11 +43,11 @@ func Test_Watcher(t *testing.T) { assert.EqualValues(t, true, ok) switch d { case time.Minute: - assert.EqualValues(t, 500, len(line.Candles.Candles)) + assert.EqualValues(t, 499, len(line.Candles.Candles)) case time.Minute * 5: - assert.EqualValues(t, 500, len(line.Candles.Candles)) + assert.EqualValues(t, 499, len(line.Candles.Candles)) case time.Minute * 15: - assert.EqualValues(t, 500, len(line.Candles.Candles)) + assert.EqualValues(t, 499, len(line.Candles.Candles)) } } } From 7011c96a7d43a2b355aa828dc44b599311a11da3 Mon Sep 17 00:00:00 2001 From: Phat Huynh Date: Sat, 22 Jan 2022 12:00:29 +0000 Subject: [PATCH 4/5] Update configs --- cmd/app/watcher.go | 11 +++++++---- configs/configs.json | 2 +- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/cmd/app/watcher.go b/cmd/app/watcher.go index 9c19d92..43cfff8 100644 --- a/cmd/app/watcher.go +++ b/cmd/app/watcher.go @@ -5,7 +5,6 @@ import ( "fmt" "net/http" "strconv" - "strings" "time" "github.com/gorilla/mux" @@ -31,12 +30,16 @@ func watchlist(w http.ResponseWriter, req *http.Request) { } func watch(w http.ResponseWriter, req *http.Request) { - str, ok := mux.Vars(req)["ticker"] + tickers, ok := parseVars(mux.Vars(req), "ticker") if !ok { w.WriteHeader(http.StatusBadRequest) } - for _, t := range strings.Split(str, ",") { - if err := market.Watch(t); err != nil { + mk, ok := parseOptions(req.URL.Query(), "market") + if !ok { + mk = []string{"CASH"} + } + for _, t := range tickers { + if err := market.Watch(t, mk[0]); err != nil { logger.Error.Println(err) InternalError(w) return diff --git a/configs/configs.json b/configs/configs.json index 720c33b..63fe680 100644 --- a/configs/configs.json +++ b/configs/configs.json @@ -43,7 +43,7 @@ "watcher": { "base_market": "USDT", "watchlist": [ - "(?=(? Date: Sat, 22 Jan 2022 15:31:14 +0000 Subject: [PATCH 5/5] Mod id to send notifier from evaluator --- internal/cmd/market/evaluator.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/cmd/market/evaluator.go b/internal/cmd/market/evaluator.go index 7341f61..cc508f2 100644 --- a/internal/cmd/market/evaluator.go +++ b/internal/cmd/market/evaluator.go @@ -171,7 +171,7 @@ func (e *evaluator) processingWatcherRequest(msg *message) { signals := e.getByTicker(r.GetName()) for _, s := range signals { if s.Evaluate(r, nil) { - e.communicator.evaluator2Notifier <- e.communicator.newMessageWithPayloadID(r.GetName()+"-"+s.Name, s, nil) + e.communicator.evaluator2Notifier <- e.communicator.newMessageWithPayloadID(r.GetUniqueName()+"-"+s.Name, s, nil) if s.IsOnetime() { _ = e.drop(s.Name) }