Skip to content

Commit

Permalink
Merge branch 'main' of github.com:dragonflyoss/Dragonfly2 into featur…
Browse files Browse the repository at this point in the history
…e/nydus-e2e

Signed-off-by: Gaius <[email protected]>
  • Loading branch information
gaius-qi committed May 13, 2024
2 parents 042155b + 8927d60 commit a93bb8c
Show file tree
Hide file tree
Showing 22 changed files with 191 additions and 80 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/compatibility-e2e-v1.yml
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ jobs:
cache-to: type=local,dest=/tmp/.buildx-cache-new

- name: Setup Kind
uses: helm/kind-action@v1.9.0
uses: helm/kind-action@v1.10.0
with:
version: ${{ env.KIND_VERSION }}
config: ${{ env.KIND_CONFIG_PATH }}
Expand Down
10 changes: 5 additions & 5 deletions .github/workflows/compatibility-e2e-v2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,19 @@ jobs:
include:
- module: manager
image: manager
image-tag: v2.1.42
image-tag: v2.1.45
chart-name: manager
- module: scheduler
image: scheduler
image-tag: v2.1.42
image-tag: v2.1.45
chart-name: scheduler
- module: client
image: client
image-tag: v0.1.35
image-tag: v0.1.51
chart-name: client
- module: seed-client
image: client
image-tag: v0.1.35
image-tag: v0.1.51
chart-name: seed-client

steps:
Expand Down Expand Up @@ -117,7 +117,7 @@ jobs:
cache-to: type=local,dest=/tmp/.buildx-cache-new

- name: Setup Kind
uses: helm/kind-action@v1.9.0
uses: helm/kind-action@v1.10.0
with:
version: ${{ env.KIND_VERSION }}
config: ${{ env.KIND_CONFIG_PATH }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/e2e-v1-nydus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ jobs:
cache-to: type=local,dest=/tmp/.buildx-cache-new

- name: Setup Kind
uses: helm/kind-action@v1.9.0
uses: helm/kind-action@v1.10.0
with:
version: ${{ env.KIND_VERSION }}
config: ${{ env.KIND_CONFIG_PATH }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/e2e-v1.yml
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ jobs:
cache-to: type=local,dest=/tmp/.buildx-cache-new

- name: Setup Kind
uses: helm/kind-action@v1.9.0
uses: helm/kind-action@v1.10.0
with:
version: ${{ env.KIND_VERSION }}
config: ${{ env.KIND_CONFIG_PATH }}
Expand Down
51 changes: 39 additions & 12 deletions .github/workflows/e2e-v2-nydus.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,22 @@ jobs:
runs-on: ubuntu-latest
timeout-minutes: 60
steps:
- name: Free Disk Space (Ubuntu)
uses: jlumbroso/free-disk-space@main
with:
tool-cache: false
android: true
dotnet: true
haskell: true
large-packages: true
docker-images: true
swap-storage: true

- name: Checkout code
uses: actions/checkout@v4
with:
submodules: recursive
fetch-depth: 0

- name: Setup buildx
uses: docker/setup-buildx-action@v3
Expand Down Expand Up @@ -130,22 +142,37 @@ jobs:
if: always()
continue-on-error: true
run: |
log_dir="/tmp/nydus-log"
mkdir $log_dir
export ns=nydus-system
for p in `kubectl -n $ns get pods --no-headers -o custom-columns=NAME:metadata.name`; do kubectl -n $ns get pod $p -o yaml >> $log_dir/nydus-pods.log; done
for p in `kubectl -n $ns get pods --no-headers -o custom-columns=NAME:metadata.name`; do kubectl -n $ns describe pod $p >> $log_dir/nydus-pods.log; done
docker exec kind-control-plane cat /etc/containerd/config.toml >> $log_dir/containerd-config.toml
docker exec kind-control-plane containerd config dump >> $log_dir/containerd-config-dump.toml
docker exec kind-control-plane journalctl -u containerd >> $log_dir/containerd.log
docker exec kind-control-plane journalctl -u kubelet >> $log_dir/kubelet.log
# Dump nydus logs.
nydus_log_dir="/tmp/nydus"
mkdir -p $nydus_log_dir
export nydus_ns=nydus-system
for p in `kubectl -n $nydus_ns get pods --no-headers -o custom-columns=NAME:metadata.name`; do kubectl -n $nydus_ns get pod $p -o yaml >> $nydus_log_dir/nydus-pods.log; done
for p in `kubectl -n $nydus_ns get pods --no-headers -o custom-columns=NAME:metadata.name`; do kubectl -n $nydus_ns describe pod $p >> $nydus_log_dir/nydus-pods.log; done
for p in `kubectl -n $nydus_ns get pods --no-headers -o custom-columns=NAME:metadata.name`; do kubectl -n $nydus_ns logs pod $p >> $nydus_log_dir/nydus-stdout.log; done
docker exec kind-control-plane cat /etc/containerd/config.toml >> $nydus_log_dir/containerd-config.toml
docker exec kind-control-plane containerd config dump >> $nydus_log_dir/containerd-config-dump.toml
docker exec kind-control-plane journalctl -u containerd >> $nydus_log_dir/containerd.log
docker exec kind-control-plane journalctl -u kubelet >> $nydus_log_dir/kubelet.log
# Dump dragonfly client logs.
dragonfly_log_dir="tmp/dragonfly"
mkdir -p $dragonfly_log_dir
export dragonfly_ns=dragonfly-system
pod_names=$(kubectl get pods -l 'component in (client, seed-client)' -o custom-columns=NAME:metadata.name --no-headers)
echo $pod_names
for pod_name in $pod_names; do
mkdir -p "/tmp/dragonfly/${pod_name}"
kubectl cp "default/${pod_name}:/var/log/dragonfly" "/tmp/dragonfly/${pod_name}"
done
- name: Upload Logs
uses: actions/upload-artifact@v4
if: always()
with:
name: nydus-e2e-tests-logs
path: |
/tmp/nydus-log
/tmp/nydus
/tmp/dragonfly
2 changes: 1 addition & 1 deletion .github/workflows/e2e-v2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ jobs:
cache-to: type=local,dest=/tmp/.buildx-cache-new

- name: Setup Kind
uses: helm/kind-action@v1.9.0
uses: helm/kind-action@v1.10.0
with:
version: ${{ env.KIND_VERSION }}
config: ${{ env.KIND_CONFIG_PATH }}
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ jobs:
cache: false

- name: Golangci lint
uses: golangci/golangci-lint-action@v4
uses: golangci/golangci-lint-action@v5
with:
version: v1.54
args: --verbose
Expand Down
2 changes: 2 additions & 0 deletions ADOPTERS.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,5 @@ refer to [Setup Dragonfly in Kubernetes](https://d7y.io/docs/getting-started/qui
**_[Yipitdata](https://www.yipitdata.com/)_** - Large-scale image and file distribution.

**_[Amap](https://mobile.amap.com/)_** - Large-scale image and file distribution.

**_[iQIYI](https://www.iqiyi.com/)_** - Large-scale image distribution.
18 changes: 13 additions & 5 deletions client/daemon/peer/peertask_conductor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1086,11 +1086,16 @@ func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPiec
pt.runningPieces.Set(request.piece.PieceNum)
pt.runningPiecesLock.Unlock()

defer func() {
pt.runningPiecesLock.Lock()
pt.runningPieces.Clean(request.piece.PieceNum)
pt.runningPiecesLock.Unlock()
}()
var cleanRunningPieceDone bool
cleanRunningPiece := func() {
if cleanRunningPieceDone {
cleanRunningPieceDone = true
pt.runningPiecesLock.Lock()
pt.runningPieces.Clean(request.piece.PieceNum)
pt.runningPiecesLock.Unlock()
}
}
defer cleanRunningPiece()

ctx, span := tracer.Start(pt.pieceDownloadCtx, fmt.Sprintf(config.SpanDownloadPiece, request.piece.PieceNum))
span.SetAttributes(config.AttributePiece.Int(int(request.piece.PieceNum)))
Expand All @@ -1117,6 +1122,9 @@ func (pt *peerTaskConductor) downloadPiece(workerID int32, request *DownloadPiec
pt.Infof("switch to back source, skip send failed piece")
return result
}

// clean running piece first
cleanRunningPiece()
attempt, success := pt.pieceTaskSyncManager.acquire(
&commonv1.PieceTaskRequest{
Limit: 1,
Expand Down
62 changes: 42 additions & 20 deletions client/daemon/rpcserver/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,13 @@ func searchNextPieceNum(sentMap map[int32]struct{}, cur uint32) (nextPieceNum ui
}

// sendExistPieces will send as much as possible pieces
func (s *subscriber) sendExistPieces(startNum uint32) (total int32, err error) {
func (s *subscriber) sendExistPieces(startNum uint32) error {
s.request.StartNum = startNum
return sendExistPieces(s.sync.Context(), s.SugaredLoggerOnWith, s.getPieces, s.request, s.sync, s.sentMap, true)
total, err := sendExistPieces(s.sync.Context(), s.SugaredLoggerOnWith, s.getPieces, s.request, s.sync, s.sentMap, true)
if total > -1 && s.isUnknownTotalPieces() {
s.totalPieces = total
}
return err
}

func (s *subscriber) receiveRemainingPieceTaskRequests() {
Expand Down Expand Up @@ -161,9 +165,23 @@ func (s *subscriber) receiveRemainingPieceTaskRequests() {
}
}

// totalPieces is -1, 0, n
func (s *subscriber) isKnownTotalPieces() bool {
return s.totalPieces > -1
}

func (s *subscriber) isUnknownTotalPieces() bool {
return !s.isKnownTotalPieces()
}

func (s *subscriber) isAllPiecesSent(nextPieceNum uint32) bool {
return nextPieceNum == uint32(s.totalPieces)
}

func (s *subscriber) sendRemainingPieceTasks() error {
// nextPieceNum is the least piece num which did not send to remote peer
// may great then total piece count, check the total piece count when use it
// available values: [0, n], n is total piece count
// when nextPieceNum is n, indicate all pieces done
var nextPieceNum uint32
s.Lock()
for i := int32(s.skipPieceCount); ; i++ {
Expand All @@ -173,6 +191,7 @@ func (s *subscriber) sendRemainingPieceTasks() error {
}
}
s.Unlock()
s.Debugf("desired next piece num: %d", nextPieceNum)
loop:
for {
select {
Expand All @@ -182,51 +201,54 @@ loop:
case info := <-s.PieceInfoChannel:
s.Infof("receive piece info, num: %d, finished: %v", info.Num, info.Finished)
// not desired piece
if s.totalPieces > -1 && uint32(info.Num) < nextPieceNum {
if uint32(info.Num) < nextPieceNum {
continue
}

s.Lock()
total, err := s.sendExistPieces(uint32(info.Num))
err := s.sendExistPieces(uint32(info.Num))
if err != nil {
err = s.saveError(err)
s.Unlock()
return err
}
if total > -1 && s.totalPieces == -1 {
s.totalPieces = total
}
if s.totalPieces > -1 && len(s.sentMap)+int(s.skipPieceCount) == int(s.totalPieces) {
s.Unlock()
break loop
}
if info.Finished {

nextPieceNum = s.searchNextPieceNum(nextPieceNum)
s.Debugf("update desired next piece num: %d", nextPieceNum)

if info.Finished && s.isAllPiecesSent(nextPieceNum) {
s.Unlock()
break loop
}
nextPieceNum = s.searchNextPieceNum(nextPieceNum)
s.Unlock()
case <-s.Success:
s.Infof("peer task is success, send remaining pieces")
s.Lock()
// all pieces already sent
// empty piece task will reach sendExistPieces to sync content length and piece count
if s.totalPieces > 0 && nextPieceNum == uint32(s.totalPieces) {
if s.totalPieces > 0 && s.isAllPiecesSent(nextPieceNum) {
s.Unlock()
break loop
}
total, err := s.sendExistPieces(nextPieceNum)

err := s.sendExistPieces(nextPieceNum)
if err != nil {
err = s.saveError(err)
s.Unlock()
return err
}
if total > -1 && s.totalPieces == -1 {
s.totalPieces = total

if s.isUnknownTotalPieces() {
s.Unlock()
msg := "task success, but total pieces is unknown"
s.Errorf(msg)
return dferrors.Newf(commonv1.Code_ClientError, msg)
}
if s.totalPieces > -1 && len(s.sentMap)+int(s.skipPieceCount) != int(s.totalPieces) {

nextPieceNum = s.searchNextPieceNum(nextPieceNum)
if !s.isAllPiecesSent(nextPieceNum) {
s.Unlock()
msg := "peer task success, but can not send all pieces"
msg := "task success, but not all pieces are sent out"
s.Errorf(msg)
return dferrors.Newf(commonv1.Code_ClientError, msg)
}
Expand Down
15 changes: 8 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ module d7y.io/dragonfly/v2
go 1.21

require (
d7y.io/api/v2 v2.0.110
d7y.io/api/v2 v2.0.112
github.com/MysteriousPotato/go-lockable v1.0.0
github.com/RichardKnop/machinery v1.10.8
github.com/Showmax/go-fqdn v1.0.0
github.com/VividCortex/mysqlerr v1.0.0
github.com/aliyun/aliyun-oss-go-sdk v3.0.2+incompatible
github.com/appleboy/gin-jwt/v2 v2.9.2
github.com/aws/aws-sdk-go v1.51.11
github.com/aws/aws-sdk-go v1.52.2
github.com/bits-and-blooms/bitset v1.13.0
github.com/casbin/casbin/v2 v2.81.0
github.com/casbin/gorm-adapter/v3 v3.5.0
Expand All @@ -19,7 +19,7 @@ require (
github.com/deckarep/golang-set/v2 v2.6.0
github.com/distribution/distribution/v3 v3.0.0-20220620080156-3e4f8a0ab147
github.com/docker/distribution v2.8.3+incompatible
github.com/docker/docker v26.0.2+incompatible
github.com/docker/docker v26.1.1+incompatible
github.com/docker/go-connections v0.5.0
github.com/docker/go-units v0.4.0
github.com/gaius-qi/ping v1.0.0
Expand Down Expand Up @@ -70,12 +70,13 @@ require (
github.com/swaggo/gin-swagger v1.6.0
github.com/swaggo/swag v1.16.3
github.com/yl2chen/cidranger v1.0.2
github.com/zeebo/blake3 v0.2.3
go.opentelemetry.io/contrib/instrumentation/github.com/gin-gonic/gin/otelgin v0.47.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.49.0
go.opentelemetry.io/otel v1.25.0
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.51.0
go.opentelemetry.io/otel v1.26.0
go.opentelemetry.io/otel/exporters/jaeger v1.17.0
go.opentelemetry.io/otel/sdk v1.25.0
go.opentelemetry.io/otel/trace v1.25.0
go.opentelemetry.io/otel/trace v1.26.0
go.uber.org/atomic v1.11.0
go.uber.org/mock v0.4.0
go.uber.org/zap v1.27.0
Expand Down Expand Up @@ -233,7 +234,7 @@ require (
go.mongodb.org/mongo-driver v1.9.1 // indirect
go.opencensus.io v0.24.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 // indirect
go.opentelemetry.io/otel/metric v1.25.0 // indirect
go.opentelemetry.io/otel/metric v1.26.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/arch v0.7.0 // indirect
golang.org/x/net v0.24.0 // indirect
Expand Down
Loading

0 comments on commit a93bb8c

Please sign in to comment.