Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: update pex replica clean logic #3272

Merged
merged 3 commits into from
May 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions client/config/peerhost.go
Original file line number Diff line number Diff line change
Expand Up @@ -983,4 +983,6 @@ type PeerExchangeOption struct {
ReSyncInterval time.Duration `mapstructure:"reSyncInterval" yaml:"reSyncInterval"`
// ReplicaThreshold is used for keeping replicas in all peers is not bigger than threshold to save storage
ReplicaThreshold int `mapstructure:"replicaThreshold" yaml:"replicaThreshold"`
// ReplicaCleanPercentage is percentage probability to clean local replica when reach threshold, available values: [0, 100]
ReplicaCleanPercentage int32 `mapstructure:"replicaCleanPercentage" yaml:"replicaCleanPercentage"`
}
11 changes: 6 additions & 5 deletions client/config/peerhost_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,12 @@ var peerHostConfig = func() *DaemonOption {
LogMaxAge: DefaultLogRotateMaxAge,
LogMaxBackups: DefaultLogRotateMaxBackups,
PeerExchange: PeerExchangeOption{
Enable: false,
InitialInterval: time.Minute,
InitialBroadcastDelay: 3 * time.Minute,
ReSyncInterval: 10 * time.Minute,
ReplicaThreshold: 2,
Enable: false,
InitialInterval: time.Minute,
InitialBroadcastDelay: 3 * time.Minute,
ReSyncInterval: 10 * time.Minute,
ReplicaThreshold: 2,
ReplicaCleanPercentage: 1,
},
}
}
11 changes: 6 additions & 5 deletions client/config/peerhost_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,12 @@ var peerHostConfig = func() *DaemonOption {
LogMaxAge: DefaultLogRotateMaxAge,
LogMaxBackups: DefaultLogRotateMaxBackups,
PeerExchange: PeerExchangeOption{
Enable: false,
InitialInterval: time.Minute,
InitialBroadcastDelay: 3 * time.Minute,
ReSyncInterval: 10 * time.Minute,
ReplicaThreshold: 2,
Enable: false,
InitialInterval: time.Minute,
InitialBroadcastDelay: 3 * time.Minute,
ReSyncInterval: 10 * time.Minute,
ReplicaThreshold: 2,
ReplicaCleanPercentage: 1,
},
}
}
3 changes: 2 additions & 1 deletion client/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,8 @@ func New(opt *config.DaemonOption, d dfpath.Dfpath) (Daemon, error) {
},
pex.WithInitialRetryInterval(opt.PeerExchange.InitialInterval),
pex.WithReSyncInterval(opt.PeerExchange.ReSyncInterval),
pex.WithReplicaThreshold(opt.PeerExchange.ReplicaThreshold))
pex.WithReplicaThreshold(opt.PeerExchange.ReplicaThreshold),
pex.WithReplicaCleanPercentage(opt.PeerExchange.ReplicaCleanPercentage))
if err != nil {
return nil, err
}
Expand Down
48 changes: 34 additions & 14 deletions client/daemon/pex/peer_exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,10 @@ type peerExchange struct {
}

type peerExchangeConfig struct {
initialRetryInterval time.Duration
reSyncInterval time.Duration
replicaThreshold int
initialRetryInterval time.Duration
reSyncInterval time.Duration
replicaThreshold int
replicaCleanPercentage int32
}

func WithName(name string) func(*memberlist.Config, *peerExchangeConfig) {
Expand Down Expand Up @@ -102,6 +103,14 @@ func WithReplicaThreshold(threshold int) func(*memberlist.Config, *peerExchangeC
}
}

func WithReplicaCleanPercentage(percentage int32) func(*memberlist.Config, *peerExchangeConfig) {
return func(memberConfig *memberlist.Config, pexConfig *peerExchangeConfig) {
if percentage > 0 {
pexConfig.replicaCleanPercentage = percentage
}
}
}

func NewPeerExchange(
reclaim ReclaimFunc,
lister InitialMemberLister,
Expand Down Expand Up @@ -130,6 +139,7 @@ func NewPeerExchange(
logger.Infof("peer exchange initial retry interval: %s", pexConfig.initialRetryInterval)
logger.Infof("peer exchange re-sync interval: %s", pexConfig.reSyncInterval)
logger.Infof("peer exchange replica threshold: %d", pexConfig.replicaThreshold)
logger.Infof("peer exchange replica clean percentage: %d", pexConfig.replicaCleanPercentage)

pex := &peerExchange{
config: pexConfig,
Expand Down Expand Up @@ -168,7 +178,11 @@ func (p *peerExchange) SearchPeer(task string) SearchPeerResult {
case SearchPeerResultTypeLocal:
// check replica threshold and reclaim local cache
if len(searchPeerResult.Peers) > p.config.replicaThreshold {
p.tryReclaim(task, searchPeerResult)
if p.tryReclaim(task, searchPeerResult) {
// change result type to remote and drop local peer
searchPeerResult.Type = SearchPeerResultTypeRemote
searchPeerResult.Peers = searchPeerResult.Peers[1:]
}
}
case SearchPeerResultTypeRemote:
if len(searchPeerResult.Peers) < p.config.replicaThreshold {
Expand All @@ -179,18 +193,24 @@ func (p *peerExchange) SearchPeer(task string) SearchPeerResult {
return searchPeerResult
}

func (p *peerExchange) tryReclaim(task string, searchPeerResult SearchPeerResult) {
func (p *peerExchange) tryReclaim(task string, searchPeerResult SearchPeerResult) bool {
if p.config.replicaCleanPercentage == 0 {
return false
}
r := rand.New(rand.NewSource(time.Now().UnixNano()))
// reclaim with 1% probability for shrink double reclaim with other members
if r.Int31n(100) == 0 {
peer := searchPeerResult.Peers[0].PeerID
searchPeerResult.Type = SearchPeerResultTypeRemote
p.memberManager.logger.Debugf("task %s replica threshold reached, try to reclaim local peer cache %s", task, peer)
err := p.reclaim(task, peer)
if err != nil {
p.memberManager.logger.Warnf("task %s peer %s reclaim local cache error: %s", task, peer, err)
}
// reclaim with probability for shrink double reclaim with other members
// Int31n is [0, n), +1 for percentage [1, 100]
if r.Int31n(100)+1 > p.config.replicaCleanPercentage {
return false
}
// when Type is SearchPeerResultTypeLocal, peer 0 is always local peer
peer := searchPeerResult.Peers[0].PeerID
p.memberManager.logger.Debugf("task %s replica threshold reached, try to reclaim local peer cache %s", task, peer)
err := p.reclaim(task, peer)
if err != nil {
p.memberManager.logger.Warnf("task %s peer %s reclaim local cache error: %s", task, peer, err)
}
return true
}

func (p *peerExchange) BroadcastPeer(data *dfdaemonv1.PeerMetadata) {
Expand Down
Loading