Skip to content

Commit

Permalink
feat: implement FindCandidatePersistentCacheParents for scheduling pe…
Browse files Browse the repository at this point in the history
…rsistent cache task

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed Jan 16, 2025
1 parent 6c095f1 commit 1d65e7a
Show file tree
Hide file tree
Showing 16 changed files with 1,407 additions and 644 deletions.
56 changes: 56 additions & 0 deletions scheduler/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,62 @@ var (
Help: "Counter of the number of failed of the delete cache task.",
})

RegisterPersistentCachePeerCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "register_persistent_cache_peer_total",
Help: "Counter of the number of the register persistent cache peer.",
}, []string{"host_type"})

RegisterPersistentCachePeerFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "register_persistent_cache_peer_failure_total",
Help: "Counter of the number of failed of the register persistent cache peer.",
}, []string{"host_type"})

DownloadPersistentCachePeerStartedCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_persistent_cache_peer_started_total",
Help: "Counter of the number of the download persistent cache peer started.",
}, []string{"host_type"})

DownloadPersistentCachePeerStartedFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_persistent_cache_peer_started_failure_total",
Help: "Counter of the number of failed of the download persistent cache peer started.",
}, []string{"host_type"})

DownloadPersistentCachePeerCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_persistent_cache_peer_finished_total",
Help: "Counter of the number of the download persistent cache peer.",
}, []string{"host_type"})

DownloadPersistentCachePeerFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_persistent_cache_peer_finished_failure_total",
Help: "Counter of the number of failed of the download persistent cache peer.",
}, []string{"host_type"})

DownloadPersistentCachePieceCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_persistent_cache_piece_finished_total",
Help: "Counter of the number of the download persistent cache piece.",
}, []string{"host_type"})

DownloadPersistentCachePieceFailureCount = promauto.NewCounterVec(prometheus.CounterOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Name: "download_persistent_cache_piece_finished_failure_total",
Help: "Counter of the number of failed of the download persistent cache piece.",
}, []string{"host_type"})

VersionGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
Namespace: types.MetricsNamespace,
Subsystem: types.SchedulerMetricsName,
Expand Down
93 changes: 29 additions & 64 deletions scheduler/resource/persistentcache/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,8 @@ import (

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/types"
"d7y.io/dragonfly/v2/scheduler/config"
)

// HostOption is a functional option for configuring the persistent cache host.
type HostOption func(h *Host)

// WithConcurrentUploadLimit sets persistent cache host's ConcurrentUploadLimit.
func WithConcurrentUploadLimit(limit int32) HostOption {
return func(h *Host) {
h.ConcurrentUploadLimit = limit
}
}

// Host contains content for host.
type Host struct {
// ID is host id.
Expand Down Expand Up @@ -88,21 +77,12 @@ type Host struct {
// Build information.
Build Build

// SchedulerClusterID is the scheduler cluster id matched by scopes.
SchedulerClusterID uint64

// AnnounceInterval is the interval between host announces to scheduler.
AnnounceInterval time.Duration

// ConcurrentUploadLimit is concurrent upload limit count.
ConcurrentUploadLimit int32

// ConcurrentUploadCount is concurrent upload count.
ConcurrentUploadCount int32

// UploadCount is total upload count.
UploadCount int64

// UploadFailedCount is upload failed count.
UploadFailedCount int64

// CreatedAt is host create time.
CreatedAt time.Time

Expand Down Expand Up @@ -265,47 +245,32 @@ type Disk struct {

// New host instance.
func NewHost(
id, hostname, ip, os, platform, platformFamily, platformVersion, kernelVersion string, port, downloadPort, concurrentUploadCount int32,
UploadCount, UploadFailedCount int64, disableShared bool, typ types.HostType, cpu CPU, memory Memory, network Network, disk Disk,
build Build, announceInterval time.Duration, createdAt, updatedAt time.Time, log *logger.SugaredLoggerOnWith, options ...HostOption,
id, hostname, ip, os, platform, platformFamily, platformVersion, kernelVersion string, port, downloadPort int32,
schedulerClusterId uint64, disableShared bool, typ types.HostType, cpu CPU, memory Memory, network Network, disk Disk,
build Build, announceInterval time.Duration, createdAt, updatedAt time.Time, log *logger.SugaredLoggerOnWith,
) *Host {
// Calculate default of the concurrent upload limit by host type.
concurrentUploadLimit := config.DefaultSeedPeerConcurrentUploadLimit
if typ == types.HostTypeNormal {
concurrentUploadLimit = config.DefaultPeerConcurrentUploadLimit
}

h := &Host{
ID: id,
Type: types.HostType(typ),
Hostname: hostname,
IP: ip,
Port: port,
DownloadPort: downloadPort,
DisableShared: disableShared,
OS: os,
Platform: platform,
PlatformFamily: platformFamily,
PlatformVersion: platformVersion,
KernelVersion: kernelVersion,
CPU: cpu,
Memory: memory,
Network: network,
Disk: disk,
Build: build,
AnnounceInterval: announceInterval,
ConcurrentUploadLimit: int32(concurrentUploadLimit),
ConcurrentUploadCount: concurrentUploadCount,
UploadCount: UploadCount,
UploadFailedCount: UploadFailedCount,
CreatedAt: createdAt,
UpdatedAt: updatedAt,
Log: logger.WithHost(id, hostname, ip),
return &Host{
ID: id,
Type: types.HostType(typ),
Hostname: hostname,
IP: ip,
Port: port,
DownloadPort: downloadPort,
DisableShared: disableShared,
OS: os,
Platform: platform,
PlatformFamily: platformFamily,
PlatformVersion: platformVersion,
KernelVersion: kernelVersion,
CPU: cpu,
Memory: memory,
Network: network,
Disk: disk,
Build: build,
SchedulerClusterID: schedulerClusterId,
AnnounceInterval: announceInterval,
CreatedAt: createdAt,
UpdatedAt: updatedAt,
Log: logger.WithHost(id, hostname, ip),
}

for _, opt := range options {
opt(h)
}

return h
}
33 changes: 5 additions & 28 deletions scheduler/resource/persistentcache/host_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,27 +106,10 @@ func (h *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {
return nil, false
}

concurrentUploadLimit, err := strconv.ParseInt(rawHost["concurrent_upload_limit"], 10, 32)
if err != nil {
log.Errorf("parsing concurrent upload limit failed: %v", err)
return nil, false
}

concurrentUploadCount, err := strconv.ParseInt(rawHost["concurrent_upload_count"], 10, 32)
if err != nil {
log.Errorf("parsing concurrent upload count failed: %v", err)
return nil, false
}

uploadCount, err := strconv.ParseInt(rawHost["upload_count"], 10, 64)
if err != nil {
log.Errorf("parsing upload count failed: %v", err)
return nil, false
}

uploadFailedCount, err := strconv.ParseInt(rawHost["upload_failed_count"], 10, 64)
// Set cpu fields from raw host.
schedulerClusterID, err := strconv.ParseUint(rawHost["scheduler_cluster_id"], 10, 64)
if err != nil {
log.Errorf("parsing upload failed count failed: %v", err)
log.Errorf("parsing scheduler cluster id failed: %v", err)
return nil, false
}

Expand Down Expand Up @@ -446,9 +429,7 @@ func (h *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {
rawHost["kernel_version"],
int32(port),
int32(downloadPort),
int32(concurrentUploadCount),
uploadCount,
uploadFailedCount,
uint64(schedulerClusterID),
diableShared,
pkgtypes.ParseHostType(rawHost["type"]),
cpu,
Expand All @@ -460,7 +441,6 @@ func (h *hostManager) Load(ctx context.Context, hostID string) (*Host, bool) {
createdAt,
updatedAt,
logger.WithHost(rawHost["id"], rawHost["hostname"], rawHost["ip"]),
WithConcurrentUploadLimit(int32(concurrentUploadLimit)),
), true
}

Expand Down Expand Up @@ -522,11 +502,8 @@ func (h *hostManager) Store(ctx context.Context, host *Host) error {
"build_git_commit", host.Build.GitCommit,
"build_go_version", host.Build.GoVersion,
"build_platform", host.Build.Platform,
"scheduler_cluster_id", host.SchedulerClusterID,
"announce_interval", host.AnnounceInterval,
"concurrent_upload_limit", host.ConcurrentUploadLimit,
"concurrent_upload_count", host.ConcurrentUploadCount,
"upload_count", host.UploadCount,
"upload_failed_count", host.UploadFailedCount,
"created_at", host.CreatedAt.Format(time.RFC3339),
"updated_at", host.UpdatedAt.Format(time.RFC3339)).Result()

Expand Down
26 changes: 18 additions & 8 deletions scheduler/resource/persistentcache/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,11 @@ const (
// Peer is uploading resources for p2p cluster.
PeerStateUploading = "Uploading"

// Peer successfully registered and perpared to download.
PeerStateReceived = "Received"
// Peer successfully registered as empty scope size.
PeerStateReceivedEmpty = "ReceivedEmpty"

// Peer successfully registered as normal scope size.
PeerStateReceivedNormal = "ReceivedNormal"

// Peer is downloading resources from peer.
PeerStateRunning = "Running"
Expand All @@ -48,10 +51,13 @@ const (

const (
// Peer is uploding.
PeerEventUpload = "Uploda"
PeerEventUpload = "Upload"

// Peer is registered as empty scope size.
PeerEventRegisterEmpty = "RegisterEmpty"

// Peer is registered and perpared to download.
PeerEventRegister = "Register"
// Peer is registered as normal scope size.
PeerEventRegisterNormal = "RegisterNormal"

// Peer is downloading.
PeerEventDownload = "Download"
Expand Down Expand Up @@ -119,16 +125,20 @@ func NewPeer(id, state string, persistent bool, finishedPieces *bitset.BitSet, b
PeerStatePending,
fsm.Events{
fsm.EventDesc{Name: PeerEventUpload, Src: []string{PeerStatePending, PeerStateFailed}, Dst: PeerStateUploading},
fsm.EventDesc{Name: PeerEventRegister, Src: []string{PeerStatePending, PeerStateFailed}, Dst: PeerStateReceived},
fsm.EventDesc{Name: PeerEventDownload, Src: []string{PeerStateReceived}, Dst: PeerStateRunning},
fsm.EventDesc{Name: PeerEventRegisterEmpty, Src: []string{PeerStatePending, PeerStateFailed}, Dst: PeerStateReceivedEmpty},
fsm.EventDesc{Name: PeerEventRegisterNormal, Src: []string{PeerStatePending, PeerStateFailed}, Dst: PeerStateReceivedNormal},
fsm.EventDesc{Name: PeerEventDownload, Src: []string{PeerStateReceivedEmpty, PeerStateReceivedNormal}, Dst: PeerStateRunning},
fsm.EventDesc{Name: PeerEventSucceeded, Src: []string{PeerStateUploading, PeerStateRunning}, Dst: PeerStateSucceeded},
fsm.EventDesc{Name: PeerEventFailed, Src: []string{PeerStateUploading, PeerStateRunning}, Dst: PeerStateFailed},
},
fsm.Callbacks{
PeerEventUpload: func(ctx context.Context, e *fsm.Event) {
p.Log.Infof("peer state is %s", e.FSM.Current())
},
PeerEventRegister: func(ctx context.Context, e *fsm.Event) {
PeerEventRegisterEmpty: func(ctx context.Context, e *fsm.Event) {
p.Log.Infof("peer state is %s", e.FSM.Current())
},
PeerEventRegisterNormal: func(ctx context.Context, e *fsm.Event) {
p.Log.Infof("peer state is %s", e.FSM.Current())
},
PeerEventDownload: func(ctx context.Context, e *fsm.Event) {
Expand Down
35 changes: 35 additions & 0 deletions scheduler/resource/persistentcache/task.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,20 @@ import (

"github.com/looplab/fsm"

commonv2 "d7y.io/api/v2/pkg/apis/common/v2"

logger "d7y.io/dragonfly/v2/internal/dflog"
"d7y.io/dragonfly/v2/pkg/digest"
)

const (
// Tiny file size is 128 bytes.
TinyFileSize = 128

// Empty file size is 0 bytes.
EmptyFileSize = 0
)

const (
// Task has been created but did not start uploading.
TaskStatePending = "Pending"
Expand Down Expand Up @@ -137,3 +147,28 @@ func NewTask(id, tag, application, state string, persistentReplicaCount uint64,

return t
}

// SizeScope return task size scope type.
func (t *Task) SizeScope() commonv2.SizeScope {
if t.ContentLength < 0 {
return commonv2.SizeScope_UNKNOW
}

if t.TotalPieceCount < 0 {
return commonv2.SizeScope_UNKNOW
}

if t.ContentLength == EmptyFileSize {
return commonv2.SizeScope_EMPTY
}

if t.ContentLength <= TinyFileSize {
return commonv2.SizeScope_TINY
}

if t.TotalPieceCount == 1 {
return commonv2.SizeScope_SMALL
}

return commonv2.SizeScope_NORMAL
}
2 changes: 1 addition & 1 deletion scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func New(ctx context.Context, cfg *config.Config, d dfpath.Dfpath) (*Server, err
}

// Initialize scheduling.
scheduling := scheduling.New(&cfg.Scheduler, dynconfig, d.PluginDir())
scheduling := scheduling.New(&cfg.Scheduler, s.persistentCacheResource, dynconfig, d.PluginDir())

// Initialize server options of scheduler grpc server.
schedulerServerOptions := []grpc.ServerOption{}
Expand Down
Loading

0 comments on commit 1d65e7a

Please sign in to comment.