Skip to content

Commit

Permalink
feat: implement FindCandidatePersistentCacheParents for scheduling pe…
Browse files Browse the repository at this point in the history
…rsistent cache task (#3770)

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi authored Jan 17, 2025
1 parent 7f0394c commit a63126e
Show file tree
Hide file tree
Showing 21 changed files with 1,464 additions and 717 deletions.
2 changes: 1 addition & 1 deletion codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ coverage:
project:
default:
enabled: yes
target: 35%
target: 34%
patch:
default:
enabled: no
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.23.0

require (
cloud.google.com/go/storage v1.49.0
d7y.io/api/v2 v2.0.177
d7y.io/api/v2 v2.1.8
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.8
github.com/Showmax/go-fqdn v1.0.0
Expand All @@ -25,7 +25,6 @@ require (
github.com/docker/go-connections v0.5.0
github.com/docker/go-units v0.4.0
github.com/elastic/go-freelru v0.16.0
github.com/fsnotify/fsnotify v1.7.0
github.com/fsouza/fake-gcs-server v1.52.0
github.com/gaius-qi/ping v1.0.0
github.com/gammazero/deque v1.0.0
Expand Down Expand Up @@ -157,6 +156,7 @@ require (
github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect
github.com/fatih/color v1.16.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.7 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-echarts/go-echarts/v2 v2.2.4 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ cloud.google.com/go/storage v1.49.0 h1:zenOPBOWHCnojRd9aJZAyQXBYqkJkdQS42dxL55CI
cloud.google.com/go/storage v1.49.0/go.mod h1:k1eHhhpLvrPjVGfo0mOUPEJ4Y2+a/Hv5PiwehZI9qGU=
cloud.google.com/go/trace v1.11.2 h1:4ZmaBdL8Ng/ajrgKqY5jfvzqMXbrDcBsUGXOT9aqTtI=
cloud.google.com/go/trace v1.11.2/go.mod h1:bn7OwXd4pd5rFuAnTrzBuoZ4ax2XQeG3qNgYmfCy0Io=
d7y.io/api/v2 v2.0.177 h1:iC+Jm4n7lKs3N1JIO25XOdtELbKSlis85LEoGbYlp98=
d7y.io/api/v2 v2.0.177/go.mod h1:+l4ErhthKmcIhcRU6F01Km8q+yDyICF7JImefg0t6HY=
d7y.io/api/v2 v2.1.8 h1:iM6qA7SyxSaRpqVVZ4Pwj0pkWMd1NX84FRg33eBHMlU=
d7y.io/api/v2 v2.1.8/go.mod h1:/N5t2H+b2XBwfvD7RmZS0ae96861a9ibVXkeF26ir6A=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
56 changes: 56 additions & 0 deletions scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,62 @@ var (
Help: "Counter of the number of failed of the delete cache task.",
})

RegisterPersistentCachePeerCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "register_persistent_cache_peer_total",
Help: "Counter of the number of the register persistent cache peer.",
}, []string{"host_type"})

RegisterPersistentCachePeerFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "register_persistent_cache_peer_failure_total",
Help: "Counter of the number of failed of the register persistent cache peer.",
}, []string{"host_type"})

DownloadPersistentCachePeerStartedCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_persistent_cache_peer_started_total",
Help: "Counter of the number of the download persistent cache peer started.",
}, []string{"host_type"})

DownloadPersistentCachePeerStartedFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_persistent_cache_peer_started_failure_total",
Help: "Counter of the number of failed of the download persistent cache peer started.",
}, []string{"host_type"})

DownloadPersistentCachePeerCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_persistent_cache_peer_finished_total",
Help: "Counter of the number of the download persistent cache peer.",
}, []string{"host_type"})

DownloadPersistentCachePeerFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_persistent_cache_peer_finished_failure_total",
Help: "Counter of the number of failed of the download persistent cache peer.",
}, []string{"host_type"})

DownloadPersistentCachePieceCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_persistent_cache_piece_finished_total",
Help: "Counter of the number of the download persistent cache piece.",
}, []string{"host_type"})

DownloadPersistentCachePieceFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_persistent_cache_piece_finished_failure_total",
Help: "Counter of the number of failed of the download persistent cache piece.",
}, []string{"host_type"})

VersionGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Expand Down
93 changes: 29 additions & 64 deletions scheduler/resource/persistentcache/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,8 @@ import (

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/config"
)

// HostOption is a functional option for configuring the persistent cache host.
type HostOption func(h *Host)

// WithConcurrentUploadLimit sets persistent cache host's ConcurrentUploadLimit.
func WithConcurrentUploadLimit(limit int32) HostOption {
return func(h *Host) {
h.ConcurrentUploadLimit = limit
}
}

// Host contains content for host.
type Host struct {
// ID is host id.
Expand Down Expand Up @@ -88,21 +77,12 @@ type Host struct {
// Build information.
Build Build

// SchedulerClusterID is the scheduler cluster id matched by scopes.
SchedulerClusterID uint64

// AnnounceInterval is the interval between host announces to scheduler.
AnnounceInterval time.Duration

// ConcurrentUploadLimit is concurrent upload limit count.
ConcurrentUploadLimit int32

// ConcurrentUploadCount is concurrent upload count.
ConcurrentUploadCount int32

// UploadCount is total upload count.
UploadCount int64

// UploadFailedCount is upload failed count.
UploadFailedCount int64

// CreatedAt is host create time.
CreatedAt time.Time

Expand Down Expand Up @@ -265,47 +245,32 @@ type Disk struct {

// New host instance.
func NewHost(
id, hostname, ip, os, platform, platformFamily, platformVersion, kernelVersion string, port, downloadPort, concurrentUploadCount int32,
UploadCount, UploadFailedCount int64, disableShared bool, typ types.HostType, cpu CPU, memory Memory, network Network, disk Disk,
build Build, announceInterval time.Duration, createdAt, updatedAt time.Time, log *logger.SugaredLoggerOnWith, options ...HostOption,
id, hostname, ip, os, platform, platformFamily, platformVersion, kernelVersion string, port, downloadPort int32,
schedulerClusterId uint64, disableShared bool, typ types.HostType, cpu CPU, memory Memory, network Network, disk Disk,
build Build, announceInterval time.Duration, createdAt, updatedAt time.Time, log *logger.SugaredLoggerOnWith,
) *Host {
// Calculate default of the concurrent upload limit by host type.
concurrentUploadLimit := config.DefaultSeedPeerConcurrentUploadLimit
if typ == types.HostTypeNormal {
concurrentUploadLimit = config.DefaultPeerConcurrentUploadLimit
}

h := &Host{
ID: id,
Type: types.HostType(typ),
Hostname: hostname,
IP: ip,
Port: port,
DownloadPort: downloadPort,
DisableShared: disableShared,
OS: os,
Platform: platform,
PlatformFamily: platformFamily,
PlatformVersion: platformVersion,
KernelVersion: kernelVersion,
CPU: cpu,
Memory: memory,
Network: network,
Disk: disk,
Build: build,
AnnounceInterval: announceInterval,
ConcurrentUploadLimit: int32(concurrentUploadLimit),
ConcurrentUploadCount: concurrentUploadCount,
UploadCount: UploadCount,
UploadFailedCount: UploadFailedCount,
CreatedAt: createdAt,
UpdatedAt: updatedAt,
Log: logger.WithHost(id, hostname, ip),
return &Host{
ID: id,
Type: types.HostType(typ),
Hostname: hostname,
IP: ip,
Port: port,
DownloadPort: downloadPort,
DisableShared: disableShared,
OS: os,
Platform: platform,
PlatformFamily: platformFamily,
PlatformVersion: platformVersion,
KernelVersion: kernelVersion,
CPU: cpu,
Memory: memory,
Network: network,
Disk: disk,
Build: build,
SchedulerClusterID: schedulerClusterId,
AnnounceInterval: announceInterval,
CreatedAt: createdAt,
UpdatedAt: updatedAt,
Log: logger.WithHost(id, hostname, ip),
}

for _, opt := range options {
opt(h)
}

return h
}
33 changes: 5 additions & 28 deletions scheduler/resource/persistentcache/host_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,27 +106,10 @@ func (h *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {
return nil, false
}

concurrentUploadLimit, err := strconv.ParseInt(rawHost["concurrent_upload_limit"], 10, 32)
if err != nil {
log.Errorf("parsing concurrent upload limit failed: %v", err)
return nil, false
}

concurrentUploadCount, err := strconv.ParseInt(rawHost["concurrent_upload_count"], 10, 32)
if err != nil {
log.Errorf("parsing concurrent upload count failed: %v", err)
return nil, false
}

uploadCount, err := strconv.ParseInt(rawHost["upload_count"], 10, 64)
if err != nil {
log.Errorf("parsing upload count failed: %v", err)
return nil, false
}

uploadFailedCount, err := strconv.ParseInt(rawHost["upload_failed_count"], 10, 64)
// Set cpu fields from raw host.
schedulerClusterID, err := strconv.ParseUint(rawHost["scheduler_cluster_id"], 10, 64)
if err != nil {
log.Errorf("parsing upload failed count failed: %v", err)
log.Errorf("parsing scheduler cluster id failed: %v", err)
return nil, false
}

Expand Down Expand Up @@ -446,9 +429,7 @@ func (h *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {
rawHost["kernel_version"],
int32(port),
int32(downloadPort),
int32(concurrentUploadCount),
uploadCount,
uploadFailedCount,
uint64(schedulerClusterID),
diableShared,
pkgtypes.ParseHostType(rawHost["type"]),
cpu,
Expand All @@ -460,7 +441,6 @@ func (h *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {
createdAt,
updatedAt,
logger.WithHost(rawHost["id"], rawHost["hostname"], rawHost["ip"]),
WithConcurrentUploadLimit(int32(concurrentUploadLimit)),
), true
}

Expand Down Expand Up @@ -522,11 +502,8 @@ func (h *hostManager) Store(ctx context.Context, host *Host) error {
"build_git_commit", host.Build.GitCommit,
"build_go_version", host.Build.GoVersion,
"build_platform", host.Build.Platform,
"scheduler_cluster_id", host.SchedulerClusterID,
"announce_interval", host.AnnounceInterval,
"concurrent_upload_limit", host.ConcurrentUploadLimit,
"concurrent_upload_count", host.ConcurrentUploadCount,
"upload_count", host.UploadCount,
"upload_failed_count", host.UploadFailedCount,
"created_at", host.CreatedAt.Format(time.RFC3339),
"updated_at", host.UpdatedAt.Format(time.RFC3339)).Result()

Expand Down
26 changes: 18 additions & 8 deletions scheduler/resource/persistentcache/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@ const (
// Peer is uploading resources for p2p cluster.
PeerStateUploading = "Uploading"

// Peer successfully registered and perpared to download.
PeerStateReceived = "Received"
// Peer successfully registered as empty scope size.
PeerStateReceivedEmpty = "ReceivedEmpty"

// Peer successfully registered as normal scope size.
PeerStateReceivedNormal = "ReceivedNormal"

// Peer is downloading resources from peer.
PeerStateRunning = "Running"
Expand All @@ -48,10 +51,13 @@ const (

const (
// Peer is uploding.
PeerEventUpload = "Uploda"
PeerEventUpload = "Upload"

// Peer is registered as empty scope size.
PeerEventRegisterEmpty = "RegisterEmpty"

// Peer is registered and perpared to download.
PeerEventRegister = "Register"
// Peer is registered as normal scope size.
PeerEventRegisterNormal = "RegisterNormal"

// Peer is downloading.
PeerEventDownload = "Download"
Expand Down Expand Up @@ -119,16 +125,20 @@ func NewPeer(id, state string, persistent bool, finishedPieces *bitset.BitSet, b
PeerStatePending,
fsm.Events{
fsm.EventDesc{Name: PeerEventUpload, Src: []string{PeerStatePending, PeerStateFailed}, Dst: PeerStateUploading},
fsm.EventDesc{Name: PeerEventRegister, Src: []string{PeerStatePending, PeerStateFailed}, Dst: PeerStateReceived},
fsm.EventDesc{Name: PeerEventDownload, Src: []string{PeerStateReceived}, Dst: PeerStateRunning},
fsm.EventDesc{Name: PeerEventRegisterEmpty, Src: []string{PeerStatePending, PeerStateFailed}, Dst: PeerStateReceivedEmpty},
fsm.EventDesc{Name: PeerEventRegisterNormal, Src: []string{PeerStatePending, PeerStateFailed}, Dst: PeerStateReceivedNormal},
fsm.EventDesc{Name: PeerEventDownload, Src: []string{PeerStateReceivedEmpty, PeerStateReceivedNormal}, Dst: PeerStateRunning},
fsm.EventDesc{Name: PeerEventSucceeded, Src: []string{PeerStateUploading, PeerStateRunning}, Dst: PeerStateSucceeded},
fsm.EventDesc{Name: PeerEventFailed, Src: []string{PeerStateUploading, PeerStateRunning}, Dst: PeerStateFailed},
},
fsm.Callbacks{
PeerEventUpload: func(ctx context.Context, e *fsm.Event) {
p.Log.Infof("peer state is %s", e.FSM.Current())
},
PeerEventRegister: func(ctx context.Context, e *fsm.Event) {
PeerEventRegisterEmpty: func(ctx context.Context, e *fsm.Event) {
p.Log.Infof("peer state is %s", e.FSM.Current())
},
PeerEventRegisterNormal: func(ctx context.Context, e *fsm.Event) {
p.Log.Infof("peer state is %s", e.FSM.Current())
},
PeerEventDownload: func(ctx context.Context, e *fsm.Event) {
Expand Down
Loading

0 comments on commit a63126e

Please sign in to comment.