-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathconn.go
171 lines (149 loc) · 3.89 KB
/
conn.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
package wango
import (
"net/http"
"sync"
"time"
"golang.org/x/net/websocket"
)
// Conn represents a websocket connection
type Conn struct {
id string
connection *websocket.Conn
extra interface{}
extraLocker *sync.RWMutex
sendChan chan []byte
breakChan chan struct{}
subRequests subRequestsListeners
unsubRequests subRequestsListeners
callResults map[interface{}]chan *callResult
callResultsLocker *sync.Mutex
eventHandlers map[string]EventHandler
eventHandlersLocker *sync.RWMutex
connected bool
clientConnection bool
aliveTimer *time.Timer
aliveTimeout time.Duration
aliveMutex *sync.Mutex
stringMode bool
}
// EventHandler is an interface for handlers to published events. The uri
// is the URI of the event and event is the event centents.
type EventHandler func(uri string, event interface{})
// Close closes websocket connection
func (c *Conn) Close() {
c.breakChan <- struct{}{}
}
// Connected returns true if websocket connection established and not closed
func (c *Conn) Connected() bool {
c.extraLocker.RLock()
defer c.extraLocker.RUnlock()
connected := c.connected
return connected
}
// GetExtra returns extra data stored in connection
func (c *Conn) GetExtra() interface{} {
c.extraLocker.RLock()
extra := c.extra
c.extraLocker.RUnlock()
return extra
}
// ID returns connection ID
func (c *Conn) ID() string {
return c.id
}
// RemoteAddr returns remote address
func (c *Conn) RemoteAddr() string {
return c.connection.Request().RemoteAddr
}
// Request returns related *http.Request
func (c *Conn) Request() *http.Request {
return c.connection.Request()
}
// SendEvent sends event for provided uri directly to connection
func (c *Conn) SendEvent(uri string, event interface{}) error {
msg, _ := createMessage(msgIntTypes[msgEvent], uri, event)
if !c.Connected() {
return errConnectionClosed
}
c.send(msg)
return nil
}
// SetExtra stores extra data in connection
func (c *Conn) SetExtra(extra interface{}) {
c.extraLocker.Lock()
c.extra = extra
c.extraLocker.Unlock()
}
// StringMode sets a string mode to use TextFrame encoding for sending messages
func (c *Conn) StringMode() {
c.stringMode = true
}
func (c *Conn) heartbeating() {
var hbSequence int
ticker := time.NewTicker(heartBeatFrequency)
for c.Connected() {
msg, _ := createMessage(msgIntTypes[msgHeartbeat], hbSequence)
hbSequence++
logger("HB message: ", string(msg))
c.send(msg)
<-ticker.C
}
ticker.Stop()
}
func (c *Conn) resetTimeoutTimer() {
c.aliveMutex.Lock()
if c.aliveTimer.Stop() {
c.aliveTimer.Reset(c.aliveTimeout)
}
c.aliveMutex.Unlock()
}
func (c *Conn) send(msg []byte) {
c.sendChan <- msg
}
func (c *Conn) sender() {
var err error
for msg := range c.sendChan {
logger("Sending message ", string(msg))
if c.stringMode {
err = websocket.Message.Send(c.connection, string(msg))
} else {
err = websocket.Message.Send(c.connection, msg)
}
if err != nil {
logger("Error when send message", err.Error())
c.breakChan <- struct{}{}
}
}
}
type subRequestsListener struct {
id string
ch chan error
}
type callResult struct {
result interface{}
err error
}
func (s subRequestsListeners) addRequest(id, uri string, ch chan error) {
s.locker.Lock()
defer s.locker.Unlock()
if s.listeners[uri] == nil {
s.listeners[uri] = []subRequestsListener{}
}
s.listeners[uri] = append(s.listeners[uri], subRequestsListener{id, ch})
}
func (s subRequestsListeners) getRequests(uri string) []subRequestsListener {
s.locker.Lock()
defer s.locker.Unlock()
listeners := s.listeners[uri]
delete(s.listeners, uri)
return listeners
}
func (s subRequestsListeners) closeRequests() {
s.locker.Lock()
defer s.locker.Unlock()
for _, listeners := range s.listeners {
for _, listener := range listeners {
listener.ch <- errConnectionClosed
}
}
}