Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement FindCandidatePersistentCacheParents for scheduling persistent cache task #3770

Merged
merged 2 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ coverage:
project:
default:
enabled: yes
target: 35%
target: 34%
patch:
default:
enabled: no
Expand Down
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ go 1.23.0

require (
cloud.google.com/go/storage v1.49.0
d7y.io/api/v2 v2.0.177
d7y.io/api/v2 v2.1.8
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.8
github.com/Showmax/go-fqdn v1.0.0
Expand All @@ -25,7 +25,6 @@ require (
github.com/docker/go-connections v0.5.0
github.com/docker/go-units v0.4.0
github.com/elastic/go-freelru v0.16.0
github.com/fsnotify/fsnotify v1.7.0
github.com/fsouza/fake-gcs-server v1.52.0
github.com/gaius-qi/ping v1.0.0
github.com/gammazero/deque v1.0.0
Expand Down Expand Up @@ -157,6 +156,7 @@ require (
github.com/envoyproxy/protoc-gen-validate v1.1.0 // indirect
github.com/fatih/color v1.16.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/fsnotify/fsnotify v1.7.0 // indirect
github.com/gabriel-vasile/mimetype v1.4.7 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-echarts/go-echarts/v2 v2.2.4 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@ cloud.google.com/go/storage v1.49.0 h1:zenOPBOWHCnojRd9aJZAyQXBYqkJkdQS42dxL55CI
cloud.google.com/go/storage v1.49.0/go.mod h1:k1eHhhpLvrPjVGfo0mOUPEJ4Y2+a/Hv5PiwehZI9qGU=
cloud.google.com/go/trace v1.11.2 h1:4ZmaBdL8Ng/ajrgKqY5jfvzqMXbrDcBsUGXOT9aqTtI=
cloud.google.com/go/trace v1.11.2/go.mod h1:bn7OwXd4pd5rFuAnTrzBuoZ4ax2XQeG3qNgYmfCy0Io=
d7y.io/api/v2 v2.0.177 h1:iC+Jm4n7lKs3N1JIO25XOdtELbKSlis85LEoGbYlp98=
d7y.io/api/v2 v2.0.177/go.mod h1:+l4ErhthKmcIhcRU6F01Km8q+yDyICF7JImefg0t6HY=
d7y.io/api/v2 v2.1.8 h1:iM6qA7SyxSaRpqVVZ4Pwj0pkWMd1NX84FRg33eBHMlU=
d7y.io/api/v2 v2.1.8/go.mod h1:/N5t2H+b2XBwfvD7RmZS0ae96861a9ibVXkeF26ir6A=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/Azure/azure-sdk-for-go v16.2.1+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.0.0/go.mod h1:uGG2W01BaETf0Ozp+QxxKJdMBNRWPdstHG0Fmdwn1/U=
Expand Down
56 changes: 56 additions & 0 deletions scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,62 @@ var (
Help: "Counter of the number of failed of the delete cache task.",
})

RegisterPersistentCachePeerCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "register_persistent_cache_peer_total",
Help: "Counter of the number of the register persistent cache peer.",
}, []string{"host_type"})

RegisterPersistentCachePeerFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "register_persistent_cache_peer_failure_total",
Help: "Counter of the number of failed of the register persistent cache peer.",
}, []string{"host_type"})

DownloadPersistentCachePeerStartedCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_persistent_cache_peer_started_total",
Help: "Counter of the number of the download persistent cache peer started.",
}, []string{"host_type"})

DownloadPersistentCachePeerStartedFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_persistent_cache_peer_started_failure_total",
Help: "Counter of the number of failed of the download persistent cache peer started.",
}, []string{"host_type"})

DownloadPersistentCachePeerCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_persistent_cache_peer_finished_total",
Help: "Counter of the number of the download persistent cache peer.",
}, []string{"host_type"})

DownloadPersistentCachePeerFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_persistent_cache_peer_finished_failure_total",
Help: "Counter of the number of failed of the download persistent cache peer.",
}, []string{"host_type"})

DownloadPersistentCachePieceCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_persistent_cache_piece_finished_total",
Help: "Counter of the number of the download persistent cache piece.",
}, []string{"host_type"})

DownloadPersistentCachePieceFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_persistent_cache_piece_finished_failure_total",
Help: "Counter of the number of failed of the download persistent cache piece.",
}, []string{"host_type"})

VersionGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Expand Down
93 changes: 29 additions & 64 deletions scheduler/resource/persistentcache/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,8 @@ import (

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/config"
)

// HostOption is a functional option for configuring the persistent cache host.
type HostOption func(h *Host)

// WithConcurrentUploadLimit sets persistent cache host's ConcurrentUploadLimit.
func WithConcurrentUploadLimit(limit int32) HostOption {
return func(h *Host) {
h.ConcurrentUploadLimit = limit
}
}

// Host contains content for host.
type Host struct {
// ID is host id.
Expand Down Expand Up @@ -88,21 +77,12 @@ type Host struct {
// Build information.
Build Build

// SchedulerClusterID is the scheduler cluster id matched by scopes.
SchedulerClusterID uint64

// AnnounceInterval is the interval between host announces to scheduler.
AnnounceInterval time.Duration

// ConcurrentUploadLimit is concurrent upload limit count.
ConcurrentUploadLimit int32

// ConcurrentUploadCount is concurrent upload count.
ConcurrentUploadCount int32

// UploadCount is total upload count.
UploadCount int64

// UploadFailedCount is upload failed count.
UploadFailedCount int64

// CreatedAt is host create time.
CreatedAt time.Time

Expand Down Expand Up @@ -265,47 +245,32 @@ type Disk struct {

// New host instance.
func NewHost(
id, hostname, ip, os, platform, platformFamily, platformVersion, kernelVersion string, port, downloadPort, concurrentUploadCount int32,
UploadCount, UploadFailedCount int64, disableShared bool, typ types.HostType, cpu CPU, memory Memory, network Network, disk Disk,
build Build, announceInterval time.Duration, createdAt, updatedAt time.Time, log *logger.SugaredLoggerOnWith, options ...HostOption,
id, hostname, ip, os, platform, platformFamily, platformVersion, kernelVersion string, port, downloadPort int32,
schedulerClusterId uint64, disableShared bool, typ types.HostType, cpu CPU, memory Memory, network Network, disk Disk,
build Build, announceInterval time.Duration, createdAt, updatedAt time.Time, log *logger.SugaredLoggerOnWith,
) *Host {
// Calculate default of the concurrent upload limit by host type.
concurrentUploadLimit := config.DefaultSeedPeerConcurrentUploadLimit
if typ == types.HostTypeNormal {
concurrentUploadLimit = config.DefaultPeerConcurrentUploadLimit
}

h := &Host{
ID: id,
Type: types.HostType(typ),
Hostname: hostname,
IP: ip,
Port: port,
DownloadPort: downloadPort,
DisableShared: disableShared,
OS: os,
Platform: platform,
PlatformFamily: platformFamily,
PlatformVersion: platformVersion,
KernelVersion: kernelVersion,
CPU: cpu,
Memory: memory,
Network: network,
Disk: disk,
Build: build,
AnnounceInterval: announceInterval,
ConcurrentUploadLimit: int32(concurrentUploadLimit),
ConcurrentUploadCount: concurrentUploadCount,
UploadCount: UploadCount,
UploadFailedCount: UploadFailedCount,
CreatedAt: createdAt,
UpdatedAt: updatedAt,
Log: logger.WithHost(id, hostname, ip),
return &Host{
ID: id,
Type: types.HostType(typ),
Hostname: hostname,
IP: ip,
Port: port,
DownloadPort: downloadPort,
DisableShared: disableShared,
OS: os,
Platform: platform,
PlatformFamily: platformFamily,
PlatformVersion: platformVersion,
KernelVersion: kernelVersion,
CPU: cpu,
Memory: memory,
Network: network,
Disk: disk,
Build: build,
SchedulerClusterID: schedulerClusterId,
AnnounceInterval: announceInterval,
CreatedAt: createdAt,
UpdatedAt: updatedAt,
Log: logger.WithHost(id, hostname, ip),
}

for _, opt := range options {
opt(h)
}

return h
}
33 changes: 5 additions & 28 deletions scheduler/resource/persistentcache/host_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,27 +106,10 @@ func (h *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {
return nil, false
}

concurrentUploadLimit, err := strconv.ParseInt(rawHost["concurrent_upload_limit"], 10, 32)
if err != nil {
log.Errorf("parsing concurrent upload limit failed: %v", err)
return nil, false
}

concurrentUploadCount, err := strconv.ParseInt(rawHost["concurrent_upload_count"], 10, 32)
if err != nil {
log.Errorf("parsing concurrent upload count failed: %v", err)
return nil, false
}

uploadCount, err := strconv.ParseInt(rawHost["upload_count"], 10, 64)
if err != nil {
log.Errorf("parsing upload count failed: %v", err)
return nil, false
}

uploadFailedCount, err := strconv.ParseInt(rawHost["upload_failed_count"], 10, 64)
// Set cpu fields from raw host.
schedulerClusterID, err := strconv.ParseUint(rawHost["scheduler_cluster_id"], 10, 64)
if err != nil {
log.Errorf("parsing upload failed count failed: %v", err)
log.Errorf("parsing scheduler cluster id failed: %v", err)
return nil, false
}

Expand Down Expand Up @@ -446,9 +429,7 @@ func (h *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {
rawHost["kernel_version"],
int32(port),
int32(downloadPort),
int32(concurrentUploadCount),
uploadCount,
uploadFailedCount,
uint64(schedulerClusterID),
diableShared,
pkgtypes.ParseHostType(rawHost["type"]),
cpu,
Expand All @@ -460,7 +441,6 @@ func (h *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {
createdAt,
updatedAt,
logger.WithHost(rawHost["id"], rawHost["hostname"], rawHost["ip"]),
WithConcurrentUploadLimit(int32(concurrentUploadLimit)),
), true
}

Expand Down Expand Up @@ -522,11 +502,8 @@ func (h *hostManager) Store(ctx context.Context, host *Host) error {
"build_git_commit", host.Build.GitCommit,
"build_go_version", host.Build.GoVersion,
"build_platform", host.Build.Platform,
"scheduler_cluster_id", host.SchedulerClusterID,
"announce_interval", host.AnnounceInterval,
"concurrent_upload_limit", host.ConcurrentUploadLimit,
"concurrent_upload_count", host.ConcurrentUploadCount,
"upload_count", host.UploadCount,
"upload_failed_count", host.UploadFailedCount,
"created_at", host.CreatedAt.Format(time.RFC3339),
"updated_at", host.UpdatedAt.Format(time.RFC3339)).Result()

Expand Down
26 changes: 18 additions & 8 deletions scheduler/resource/persistentcache/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@ const (
// Peer is uploading resources for p2p cluster.
PeerStateUploading = "Uploading"

// Peer successfully registered and perpared to download.
PeerStateReceived = "Received"
// Peer successfully registered as empty scope size.
PeerStateReceivedEmpty = "ReceivedEmpty"

// Peer successfully registered as normal scope size.
PeerStateReceivedNormal = "ReceivedNormal"

// Peer is downloading resources from peer.
PeerStateRunning = "Running"
Expand All @@ -48,10 +51,13 @@ const (

const (
// Peer is uploding.
PeerEventUpload = "Uploda"
PeerEventUpload = "Upload"

// Peer is registered as empty scope size.
PeerEventRegisterEmpty = "RegisterEmpty"

// Peer is registered and perpared to download.
PeerEventRegister = "Register"
// Peer is registered as normal scope size.
PeerEventRegisterNormal = "RegisterNormal"

// Peer is downloading.
PeerEventDownload = "Download"
Expand Down Expand Up @@ -119,16 +125,20 @@ func NewPeer(id, state string, persistent bool, finishedPieces *bitset.BitSet, b
PeerStatePending,
fsm.Events{
fsm.EventDesc{Name: PeerEventUpload, Src: []string{PeerStatePending, PeerStateFailed}, Dst: PeerStateUploading},
fsm.EventDesc{Name: PeerEventRegister, Src: []string{PeerStatePending, PeerStateFailed}, Dst: PeerStateReceived},
fsm.EventDesc{Name: PeerEventDownload, Src: []string{PeerStateReceived}, Dst: PeerStateRunning},
fsm.EventDesc{Name: PeerEventRegisterEmpty, Src: []string{PeerStatePending, PeerStateFailed}, Dst: PeerStateReceivedEmpty},
fsm.EventDesc{Name: PeerEventRegisterNormal, Src: []string{PeerStatePending, PeerStateFailed}, Dst: PeerStateReceivedNormal},
fsm.EventDesc{Name: PeerEventDownload, Src: []string{PeerStateReceivedEmpty, PeerStateReceivedNormal}, Dst: PeerStateRunning},
fsm.EventDesc{Name: PeerEventSucceeded, Src: []string{PeerStateUploading, PeerStateRunning}, Dst: PeerStateSucceeded},
fsm.EventDesc{Name: PeerEventFailed, Src: []string{PeerStateUploading, PeerStateRunning}, Dst: PeerStateFailed},
},
fsm.Callbacks{
PeerEventUpload: func(ctx context.Context, e *fsm.Event) {
p.Log.Infof("peer state is %s", e.FSM.Current())
},
PeerEventRegister: func(ctx context.Context, e *fsm.Event) {
PeerEventRegisterEmpty: func(ctx context.Context, e *fsm.Event) {
p.Log.Infof("peer state is %s", e.FSM.Current())
},
PeerEventRegisterNormal: func(ctx context.Context, e *fsm.Event) {
p.Log.Infof("peer state is %s", e.FSM.Current())
},
PeerEventDownload: func(ctx context.Context, e *fsm.Event) {
Expand Down
Loading
Loading