Skip to content

Commit

Permalink
chore: update pex replica clean logic (#3272)
Browse files Browse the repository at this point in the history
Signed-off-by: Jim Ma <[email protected]>
  • Loading branch information
jim3ma authored May 16, 2024
1 parent 756b6b4 commit 3ddf37a
Show file tree
Hide file tree
Showing 5 changed files with 50 additions and 25 deletions.
2 changes: 2 additions & 0 deletions client/config/peerhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,4 +983,6 @@ type PeerExchangeOption struct {
ReSyncInterval time.Duration `mapstructure:"reSyncInterval" yaml:"reSyncInterval"`
// ReplicaThreshold is used for keeping replicas in all peers is not bigger than threshold to save storage
ReplicaThreshold int `mapstructure:"replicaThreshold" yaml:"replicaThreshold"`
// ReplicaCleanPercentage is percentage probability to clean local replica when reach threshold, available values: [0, 100]
ReplicaCleanPercentage int32 `mapstructure:"replicaCleanPercentage" yaml:"replicaCleanPercentage"`
}
11 changes: 6 additions & 5 deletions client/config/peerhost_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,12 @@ var peerHostConfig = func() *DaemonOption {
LogMaxAge: DefaultLogRotateMaxAge,
LogMaxBackups: DefaultLogRotateMaxBackups,
PeerExchange: PeerExchangeOption{
Enable: false,
InitialInterval: time.Minute,
InitialBroadcastDelay: 3 * time.Minute,
ReSyncInterval: 10 * time.Minute,
ReplicaThreshold: 2,
Enable: false,
InitialInterval: time.Minute,
InitialBroadcastDelay: 3 * time.Minute,
ReSyncInterval: 10 * time.Minute,
ReplicaThreshold: 2,
ReplicaCleanPercentage: 1,
},
}
}
11 changes: 6 additions & 5 deletions client/config/peerhost_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,12 @@ var peerHostConfig = func() *DaemonOption {
LogMaxAge: DefaultLogRotateMaxAge,
LogMaxBackups: DefaultLogRotateMaxBackups,
PeerExchange: PeerExchangeOption{
Enable: false,
InitialInterval: time.Minute,
InitialBroadcastDelay: 3 * time.Minute,
ReSyncInterval: 10 * time.Minute,
ReplicaThreshold: 2,
Enable: false,
InitialInterval: time.Minute,
InitialBroadcastDelay: 3 * time.Minute,
ReSyncInterval: 10 * time.Minute,
ReplicaThreshold: 2,
ReplicaCleanPercentage: 1,
},
}
}
3 changes: 2 additions & 1 deletion client/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
},
pex.WithInitialRetryInterval(opt.PeerExchange.InitialInterval),
pex.WithReSyncInterval(opt.PeerExchange.ReSyncInterval),
pex.WithReplicaThreshold(opt.PeerExchange.ReplicaThreshold))
pex.WithReplicaThreshold(opt.PeerExchange.ReplicaThreshold),
pex.WithReplicaCleanPercentage(opt.PeerExchange.ReplicaCleanPercentage))
if err != nil {
return nil, err
}
Expand Down
48 changes: 34 additions & 14 deletions client/daemon/pex/peer_exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ type peerExchange struct {
}

type peerExchangeConfig struct {
initialRetryInterval time.Duration
reSyncInterval time.Duration
replicaThreshold int
initialRetryInterval time.Duration
reSyncInterval time.Duration
replicaThreshold int
replicaCleanPercentage int32
}

func WithName(name string) func(*memberlist.Config, *peerExchangeConfig) {
Expand Down Expand Up @@ -102,6 +103,14 @@ func WithReplicaThreshold(threshold int) func(*memberlist.Config, *peerExchangeC
}
}

func WithReplicaCleanPercentage(percentage int32) func(*memberlist.Config, *peerExchangeConfig) {
return func(memberConfig *memberlist.Config, pexConfig *peerExchangeConfig) {
if percentage > 0 {
pexConfig.replicaCleanPercentage = percentage
}
}
}

func NewPeerExchange(
reclaim ReclaimFunc,
lister InitialMemberLister,
Expand Down Expand Up @@ -130,6 +139,7 @@ func NewPeerExchange(
logger.Infof("peer exchange initial retry interval: %s", pexConfig.initialRetryInterval)
logger.Infof("peer exchange re-sync interval: %s", pexConfig.reSyncInterval)
logger.Infof("peer exchange replica threshold: %d", pexConfig.replicaThreshold)
logger.Infof("peer exchange replica clean percentage: %d", pexConfig.replicaCleanPercentage)

pex := &peerExchange{
config: pexConfig,
Expand Down Expand Up @@ -168,7 +178,11 @@ func (p *peerExchange) SearchPeer(task string) SearchPeerResult {
case SearchPeerResultTypeLocal:
// check replica threshold and reclaim local cache
if len(searchPeerResult.Peers) > p.config.replicaThreshold {
p.tryReclaim(task, searchPeerResult)
if p.tryReclaim(task, searchPeerResult) {
// change result type to remote and drop local peer
searchPeerResult.Type = SearchPeerResultTypeRemote
searchPeerResult.Peers = searchPeerResult.Peers[1:]
}
}
case SearchPeerResultTypeRemote:
if len(searchPeerResult.Peers) < p.config.replicaThreshold {
Expand All @@ -179,18 +193,24 @@ func (p *peerExchange) SearchPeer(task string) SearchPeerResult {
return searchPeerResult
}

func (p *peerExchange) tryReclaim(task string, searchPeerResult SearchPeerResult) {
func (p *peerExchange) tryReclaim(task string, searchPeerResult SearchPeerResult) bool {
if p.config.replicaCleanPercentage == 0 {
return false
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
// reclaim with 1% probability for shrink double reclaim with other members
if r.Int31n(100) == 0 {
peer := searchPeerResult.Peers[0].PeerID
searchPeerResult.Type = SearchPeerResultTypeRemote
p.memberManager.logger.Debugf("task %s replica threshold reached, try to reclaim local peer cache %s", task, peer)
err := p.reclaim(task, peer)
if err != nil {
p.memberManager.logger.Warnf("task %s peer %s reclaim local cache error: %s", task, peer, err)
}
// reclaim with probability for shrink double reclaim with other members
// Int31n is [0, n), +1 for percentage [1, 100]
if r.Int31n(100)+1 > p.config.replicaCleanPercentage {
return false
}
// when Type is SearchPeerResultTypeLocal, peer 0 is always local peer
peer := searchPeerResult.Peers[0].PeerID
p.memberManager.logger.Debugf("task %s replica threshold reached, try to reclaim local peer cache %s", task, peer)
err := p.reclaim(task, peer)
if err != nil {
p.memberManager.logger.Warnf("task %s peer %s reclaim local cache error: %s", task, peer, err)
}
return true
}

func (p *peerExchange) BroadcastPeer(data *dfdaemonv1.PeerMetadata) {
Expand Down

0 comments on commit 3ddf37a

Please sign in to comment.