Skip to content

Commit

Permalink
Add callback API to track allocation lifecycle
Browse files Browse the repository at this point in the history
  • Loading branch information
rg0now committed Jan 15, 2025
1 parent 9d49944 commit a4ddb68
Show file tree
Hide file tree
Showing 14 changed files with 810 additions and 45 deletions.
100 changes: 93 additions & 7 deletions internal/allocation/allocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@ type Allocation struct {
channelBindings []*ChannelBind
lifetimeTimer *time.Timer
closed chan interface{}
username, realm string
callback EventHandler
log logging.LeveledLogger

// Some clients (Firefox or others using resiprocate's nICE lib) may retry allocation
Expand All @@ -45,12 +47,18 @@ type Allocation struct {
}

// NewAllocation creates a new instance of NewAllocation.
func NewAllocation(turnSocket net.PacketConn, fiveTuple *FiveTuple, log logging.LeveledLogger) *Allocation {
func NewAllocation(
turnSocket net.PacketConn,
fiveTuple *FiveTuple,
callback EventHandler,
log logging.LeveledLogger,
) *Allocation {
return &Allocation{
TurnSocket: turnSocket,
fiveTuple: fiveTuple,
permissions: make(map[string]*Permission, 64),
closed: make(chan interface{}),
callback: callback,
log: log,
}
}
Expand Down Expand Up @@ -82,6 +90,21 @@ func (a *Allocation) AddPermission(perms *Permission) {
a.permissions[fingerprint] = perms
a.permissionsLock.Unlock()

if a.callback != nil {
if u, ok := perms.Addr.(*net.UDPAddr); ok {
a.callback(EventHandlerArgs{
Type: OnPermissionCreated,
SrcAddr: a.fiveTuple.SrcAddr,
DstAddr: a.fiveTuple.DstAddr,
Protocol: a.fiveTuple.Protocol,
Username: a.username,
Realm: a.realm,
RelayAddr: a.RelayAddr,
PeerIP: u.IP,
})
}
}

perms.start(permissionTimeout)
}

Expand All @@ -90,6 +113,33 @@ func (a *Allocation) RemovePermission(addr net.Addr) {
a.permissionsLock.Lock()
defer a.permissionsLock.Unlock()
delete(a.permissions, ipnet.FingerprintAddr(addr))

if a.callback != nil {
if u, ok := addr.(*net.UDPAddr); ok {
a.callback(EventHandlerArgs{
Type: OnPermissionDeleted,
SrcAddr: a.fiveTuple.SrcAddr,
DstAddr: a.fiveTuple.DstAddr,
Protocol: a.fiveTuple.Protocol,
Username: a.username,
Realm: a.realm,
RelayAddr: a.RelayAddr,
PeerIP: u.IP,
})
}
}
}

// ListPermissions returns the permissions associated with an allocation.
func (a *Allocation) ListPermissions() []*Permission {
ps := []*Permission{}
a.permissionsLock.RLock()
defer a.permissionsLock.RUnlock()
for _, p := range a.permissions {
ps = append(ps, p)
}

return ps
}

// AddChannelBind adds a new ChannelBind to the allocation, it also updates the
Expand All @@ -114,6 +164,20 @@ func (a *Allocation) AddChannelBind(chanBind *ChannelBind, lifetime time.Duratio

// Channel binds also refresh permissions.
a.AddPermission(NewPermission(chanBind.Peer, a.log))

if a.callback != nil {
a.callback(EventHandlerArgs{
Type: OnChannelCreated,
SrcAddr: a.fiveTuple.SrcAddr,
DstAddr: a.fiveTuple.DstAddr,
Protocol: a.fiveTuple.Protocol,
Username: a.username,
Realm: a.realm,
RelayAddr: a.RelayAddr,
PeerAddr: chanBind.Peer,
ChannelNumber: uint16(chanBind.Number),
})
}
} else {
channelByNumber.refresh(lifetime)

Expand All @@ -131,6 +195,20 @@ func (a *Allocation) RemoveChannelBind(number proto.ChannelNumber) bool {

for i := len(a.channelBindings) - 1; i >= 0; i-- {
if a.channelBindings[i].Number == number {
if a.callback != nil {
a.callback(EventHandlerArgs{
Type: OnChannelDeleted,
SrcAddr: a.fiveTuple.SrcAddr,
DstAddr: a.fiveTuple.DstAddr,
Protocol: a.fiveTuple.Protocol,
Username: a.username,
Realm: a.realm,
RelayAddr: a.RelayAddr,
PeerAddr: a.channelBindings[i].Peer,
ChannelNumber: uint16(a.channelBindings[i].Number),
})
}

a.channelBindings = append(a.channelBindings[:i], a.channelBindings[i+1:]...)

return true
Expand Down Expand Up @@ -166,6 +244,16 @@ func (a *Allocation) GetChannelByAddr(addr net.Addr) *ChannelBind {
return nil
}

// ListChannelBindings returns the channel bindings associated with an allocation.
func (a *Allocation) ListChannelBindings() []*ChannelBind {
cs := []*ChannelBind{}
a.channelBindingsLock.RLock()
defer a.channelBindingsLock.RUnlock()
cs = append(cs, a.channelBindings...)

return cs
}

// Refresh updates the allocations lifetime.
func (a *Allocation) Refresh(lifetime time.Duration) {
if !a.lifetimeTimer.Reset(lifetime) {
Expand Down Expand Up @@ -201,17 +289,15 @@ func (a *Allocation) Close() error {

a.lifetimeTimer.Stop()

a.permissionsLock.RLock()
for _, p := range a.permissions {
for _, p := range a.ListPermissions() {
a.RemovePermission(p.Addr)
p.lifetimeTimer.Stop()
}
a.permissionsLock.RUnlock()

a.channelBindingsLock.RLock()
for _, c := range a.channelBindings {
for _, c := range a.ListChannelBindings() {
a.RemoveChannelBind(c.Number)
c.lifetimeTimer.Stop()
}
a.channelBindingsLock.RUnlock()

return a.RelaySocket.Close()
}
Expand Down
32 changes: 31 additions & 1 deletion internal/allocation/allocation_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type ManagerConfig struct {
AllocatePacketConn func(network string, requestedPort int) (net.PacketConn, net.Addr, error)
AllocateConn func(network string, requestedPort int) (net.Conn, net.Addr, error)
PermissionHandler func(sourceAddr net.Addr, peerIP net.IP) bool
EventHandler EventHandler
}

type reservation struct {
Expand All @@ -36,6 +37,7 @@ type Manager struct {
allocatePacketConn func(network string, requestedPort int) (net.PacketConn, net.Addr, error)
allocateConn func(network string, requestedPort int) (net.Conn, net.Addr, error)
permissionHandler func(sourceAddr net.Addr, peerIP net.IP) bool
EventHandler EventHandler
}

// NewManager creates a new instance of Manager.
Expand All @@ -55,6 +57,7 @@ func NewManager(config ManagerConfig) (*Manager, error) {
allocatePacketConn: config.AllocatePacketConn,
allocateConn: config.AllocateConn,
permissionHandler: config.PermissionHandler,
EventHandler: config.EventHandler,
}, nil
}

Expand Down Expand Up @@ -94,6 +97,7 @@ func (m *Manager) CreateAllocation(
turnSocket net.PacketConn,
requestedPort int,
lifetime time.Duration,
username, realm string,
) (*Allocation, error) {
switch {
case fiveTuple == nil:
Expand All @@ -111,7 +115,9 @@ func (m *Manager) CreateAllocation(
if alloc := m.GetAllocation(fiveTuple); alloc != nil {
return nil, fmt.Errorf("%w: %v", errDupeFiveTuple, fiveTuple)
}
alloc := NewAllocation(turnSocket, fiveTuple, m.log)
alloc := NewAllocation(turnSocket, fiveTuple, m.EventHandler, m.log)
alloc.username = username
alloc.realm = realm

conn, relayAddr, err := m.allocatePacketConn("udp4", requestedPort)
if err != nil {
Expand All @@ -131,6 +137,19 @@ func (m *Manager) CreateAllocation(
m.allocations[fiveTuple.Fingerprint()] = alloc
m.lock.Unlock()

if m.EventHandler != nil {
m.EventHandler(EventHandlerArgs{
Type: OnAllocationCreated,
SrcAddr: fiveTuple.SrcAddr,
DstAddr: fiveTuple.DstAddr,
Protocol: UDP,
Username: username,
Realm: realm,
RelayAddr: relayAddr,
RequestedPort: requestedPort,
})
}

go alloc.packetHandler(m)

return alloc, nil
Expand All @@ -152,6 +171,17 @@ func (m *Manager) DeleteAllocation(fiveTuple *FiveTuple) {
if err := allocation.Close(); err != nil {
m.log.Errorf("Failed to close allocation: %v", err)
}

if m.EventHandler != nil {
m.EventHandler(EventHandlerArgs{
Type: OnAllocationDeleted,
SrcAddr: fiveTuple.SrcAddr,
DstAddr: fiveTuple.DstAddr,
Protocol: UDP,
Username: allocation.username,
Realm: allocation.realm,
})
}
}

// CreateReservation stores the reservation for the token+port.
Expand Down
21 changes: 11 additions & 10 deletions internal/allocation/allocation_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,13 @@ func subTestCreateInvalidAllocation(t *testing.T, turnSocket net.PacketConn) {
m, err := newTestManager()
assert.NoError(t, err)

if a, err := m.CreateAllocation(nil, turnSocket, 0, proto.DefaultLifetime); a != nil || err == nil {
if a, err := m.CreateAllocation(nil, turnSocket, 0, proto.DefaultLifetime, "", ""); a != nil || err == nil {
t.Errorf("Illegally created allocation with nil FiveTuple")
}
if a, err := m.CreateAllocation(randomFiveTuple(), nil, 0, proto.DefaultLifetime); a != nil || err == nil {
if a, err := m.CreateAllocation(randomFiveTuple(), nil, 0, proto.DefaultLifetime, "", ""); a != nil || err == nil {
t.Errorf("Illegally created allocation with nil turnSocket")
}
if a, err := m.CreateAllocation(randomFiveTuple(), turnSocket, 0, 0); a != nil || err == nil {
if a, err := m.CreateAllocation(randomFiveTuple(), turnSocket, 0, 0, "", ""); a != nil || err == nil {
t.Errorf("Illegally created allocation with 0 lifetime")
}
}
Expand All @@ -73,7 +73,7 @@ func subTestCreateAllocation(t *testing.T, turnSocket net.PacketConn) {
assert.NoError(t, err)

fiveTuple := randomFiveTuple()
if a, err := m.CreateAllocation(fiveTuple, turnSocket, 0, proto.DefaultLifetime); a == nil || err != nil {
if a, err := m.CreateAllocation(fiveTuple, turnSocket, 0, proto.DefaultLifetime, "", ""); a == nil || err != nil {
t.Errorf("Failed to create allocation %v %v", a, err)
}

Expand All @@ -90,11 +90,11 @@ func subTestCreateAllocationDuplicateFiveTuple(t *testing.T, turnSocket net.Pack
assert.NoError(t, err)

fiveTuple := randomFiveTuple()
if a, err := m.CreateAllocation(fiveTuple, turnSocket, 0, proto.DefaultLifetime); a == nil || err != nil {
if a, err := m.CreateAllocation(fiveTuple, turnSocket, 0, proto.DefaultLifetime, "", ""); a == nil || err != nil {
t.Errorf("Failed to create allocation %v %v", a, err)
}

if a, err := m.CreateAllocation(fiveTuple, turnSocket, 0, proto.DefaultLifetime); a != nil || err == nil {
if a, err := m.CreateAllocation(fiveTuple, turnSocket, 0, proto.DefaultLifetime, "", ""); a != nil || err == nil {
t.Errorf("Was able to create allocation with same FiveTuple twice")
}
}
Expand All @@ -106,7 +106,8 @@ func subTestDeleteAllocation(t *testing.T, turnSocket net.PacketConn) {
assert.NoError(t, err)

fiveTuple := randomFiveTuple()
if a, err := manager.CreateAllocation(fiveTuple, turnSocket, 0, proto.DefaultLifetime); a == nil || err != nil {
if a, err := manager.CreateAllocation(fiveTuple, turnSocket, 0, proto.DefaultLifetime,
"", ""); a == nil || err != nil {
t.Errorf("Failed to create allocation %v %v", a, err)
}

Expand All @@ -133,7 +134,7 @@ func subTestAllocationTimeout(t *testing.T, turnSocket net.PacketConn) {
for index := range allocations {
fiveTuple := randomFiveTuple()

a, err := m.CreateAllocation(fiveTuple, turnSocket, 0, lifetime)
a, err := m.CreateAllocation(fiveTuple, turnSocket, 0, lifetime, "", "")
if err != nil {
t.Errorf("Failed to create allocation with %v", fiveTuple)
}
Expand All @@ -159,9 +160,9 @@ func subTestManagerClose(t *testing.T, turnSocket net.PacketConn) {

allocations := make([]*Allocation, 2)

a1, _ := manager.CreateAllocation(randomFiveTuple(), turnSocket, 0, time.Second)
a1, _ := manager.CreateAllocation(randomFiveTuple(), turnSocket, 0, time.Second, "", "")
allocations[0] = a1
a2, _ := manager.CreateAllocation(randomFiveTuple(), turnSocket, 0, time.Minute)
a2, _ := manager.CreateAllocation(randomFiveTuple(), turnSocket, 0, time.Minute, "", "")
allocations[1] = a2

// Make a1 timeout
Expand Down
Loading

0 comments on commit a4ddb68

Please sign in to comment.