Skip to content

Commit

Permalink
fix: pv cannot be restored correctly
Browse files Browse the repository at this point in the history
Signed-off-by: Kasakaze <[email protected]>
  • Loading branch information
njuptlzf committed Sep 26, 2023
1 parent cea5778 commit bf73079
Show file tree
Hide file tree
Showing 7 changed files with 345 additions and 55 deletions.
6 changes: 4 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ module github.com/openebs/velero-plugin
go 1.13

require (
cloud.google.com/go v0.58.0 // indirect
cloud.google.com/go/storage v1.9.0 // indirect
github.com/Azure/azure-pipeline-go v0.2.2 // indirect
github.com/Azure/azure-storage-blob-go v0.8.0 // indirect
github.com/aws/aws-sdk-go v1.35.24
Expand All @@ -16,6 +14,7 @@ require (
github.com/onsi/ginkgo v1.15.2
github.com/onsi/gomega v1.10.2
github.com/openebs/api/v2 v2.3.0
github.com/openebs/cstor-csi v1.12.0-RC1.0.20220712095109-ed7121554bd2
github.com/openebs/maya v1.12.1-0.20210416090832-ad9c32f086d5
github.com/openebs/zfs-localpv v1.6.1-0.20210504173514-62b3a0b7fe5d
github.com/pkg/errors v0.9.1
Expand All @@ -40,6 +39,8 @@ replace (
k8s.io/cluster-bootstrap => k8s.io/cluster-bootstrap v0.20.2
k8s.io/code-generator => k8s.io/code-generator v0.20.2
k8s.io/component-base => k8s.io/component-base v0.20.2
k8s.io/component-helpers => k8s.io/component-helpers v0.20.0
k8s.io/controller-manager => k8s.io/controller-manager v0.20.0
k8s.io/cri-api => k8s.io/cri-api v0.20.2
k8s.io/csi-translation-lib => k8s.io/csi-translation-lib v0.20.2
k8s.io/kube-aggregator => k8s.io/kube-aggregator v0.20.2
Expand All @@ -50,6 +51,7 @@ replace (
k8s.io/kubelet => k8s.io/kubelet v0.20.2
k8s.io/legacy-cloud-providers => k8s.io/legacy-cloud-providers v0.20.2
k8s.io/metrics => k8s.io/metrics v0.20.2
k8s.io/mount-utils => k8s.io/mount-utils v0.20.0
k8s.io/node-api => k8s.io/node-api v0.20.2
k8s.io/sample-apiserver => k8s.io/sample-apiserver v0.20.2
k8s.io/sample-cli-plugin => k8s.io/sample-cli-plugin v0.20.2
Expand Down
147 changes: 99 additions & 48 deletions go.sum

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion pkg/cstor/cstor.go
Original file line number Diff line number Diff line change
Expand Up @@ -446,11 +446,20 @@ func (p *Plugin) CreateSnapshot(volumeID, volumeAZ string, tags map[string]strin
}

if !p.local {
// If cloud snapshot is configured then we need to backup PVC also
// If cloud snapshot is configured then we need to backup PVC,PV, CVC also
p.Log.Infof("backup PVC, PV, CVC first")
err := p.backupPVC(volumeID)
if err != nil {
return "", errors.Wrapf(err, "failed to create backup for PVC")
}
err = p.backupPV(volumeID)
if err != nil {
return "", errors.Wrapf(err, "failed to create backup for PV")
}
err = p.backupCVC(volumeID)
if err != nil {
return "", errors.Wrapf(err, "failed to create backup for CVC")
}
}

p.Log.Infof("creating snapshot{%s}", bkpname)
Expand Down
116 changes: 116 additions & 0 deletions pkg/cstor/cvc_operation.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
Copyright 2019 The OpenEBS Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package cstor

import (
"encoding/json"
"fmt"

cstorv1 "github.com/openebs/api/v2/pkg/apis/cstor/v1"
maya "github.com/openebs/cstor-csi/pkg/utils"
"github.com/pkg/errors"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
)

// (Kasakaze)todo: Determine whether it is csiVolume, if so, cvc must be backed up
func (p *Plugin) backupCVC(volumeID string) error {
vol := p.volumes[volumeID]

bkpCvc, err := maya.GetVolume(volumeID)
if err != nil {
if !k8serrors.IsNotFound(err) {
return errors.Cause(err)
}
p.Log.Warnf("failed to get cvc, skip. %v", err)
return nil
}

data, err := json.MarshalIndent(bkpCvc, "", "\t")
if err != nil {
return errors.New("error doing json parsing")
}

// pv backup file name
filename := p.cl.GenerateRemoteFilename(vol.volname, vol.backupName)
if filename == "" {
return errors.New("error creating remote file name for pvc backup")
}
if ok := p.cl.Write(data, filename+".cvc"); !ok {
return errors.New("failed to upload CVC")
}

return nil
}

// restoreCVC create CVC for given volume name
// (Kasakaze)todo: Determine whether it is csiVolume, if so, cvc must be restored
func (p *Plugin) restoreCVC(volumeID, pvcName, pvcNamespace, snapName string) error {
// verify if the volume has already been created
cvc, err := maya.GetVolume(volumeID)
if err != nil {
if !k8serrors.IsNotFound(err) {
return errors.Cause(err)
}
}
if err == nil && cvc != nil && cvc.DeletionTimestamp == nil {
p.Log.Warn("cvc already exists, don't provision volume")
return nil
}

p.Log.Info("cvc does not exist, download and provision")
rcvc, err := p.downloadCVC(volumeID, snapName)
if err != nil {
p.Log.Warnf("failed to download cvc, skip. %v", err)
return nil
}

var (
size, _ = rcvc.Spec.Capacity.Storage().AsInt64()
rCount = fmt.Sprint(rcvc.Spec.Provision.ReplicaCount)
cspcName = rcvc.ObjectMeta.Labels["openebs.io/cstor-pool-cluster"]
snapshotID = ""
// (Kasakaze)todo: If the data is migrated to another cluster, the nodeID may not be the same
nodeID = rcvc.Publish.NodeID
policyName = rcvc.ObjectMeta.Labels["openebs.io/volume-policy"]
)

err = maya.ProvisionVolume(size, volumeID, rCount,
cspcName, snapshotID,
nodeID, policyName, pvcName, pvcNamespace)
if err != nil {
return errors.Cause(err)
}

return nil
}

func (p *Plugin) downloadCVC(volumeID, snapName string) (*cstorv1.CStorVolumeConfig, error) {
cvc := &cstorv1.CStorVolumeConfig{}

filename := p.cl.GenerateRemoteFilename(volumeID, snapName)
filename += ".cvc"
data, ok := p.cl.Read(filename)
if !ok {
return nil, errors.Errorf("failed to download CVC file=%s", filename)
}

if err := json.Unmarshal(data, cvc); err != nil {
return nil, errors.Errorf("failed to decode CVC file=%s", filename)
}

return cvc, nil
}
1 change: 1 addition & 0 deletions pkg/cstor/cvr_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ func (p *Plugin) waitForAllCVRsToBeInValidStatus(vol *Volume, statuses []string)
return errors.Errorf("Failed to fetch replicaCount for volume{%s}", vol.volname)
}

p.Log.Infof("Waiting for all CVRs of PV(%s) to be ready, replicaCount=%d", vol.volname, replicaCount)
if vol.isCSIVolume {
return p.waitForCSIBasedCVRs(vol, replicaCount, statuses)
}
Expand Down
96 changes: 96 additions & 0 deletions pkg/cstor/pv_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
v1alpha1 "github.com/openebs/maya/pkg/apis/openebs.io/v1alpha1"
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
)

Expand Down Expand Up @@ -70,6 +71,7 @@ func (p *Plugin) restoreVolumeFromCloud(vol *Volume, targetBackupName string) er
err error
)

p.Log.Info("Restoring volume data from cloud")
if p.restoreAllSnapshots {
// We are restoring from base backup to targeted Backup
snapshotList, err = p.cl.GetSnapListFromCloud(vol.snapshotTag, p.getScheduleName(targetBackupName))
Expand Down Expand Up @@ -240,3 +242,97 @@ func contains(s []string, target string) bool {

return false
}

// backupPV perform backup for given volume's PV
func (p *Plugin) backupPV(volumeID string) error {
vol := p.volumes[volumeID]

bkpPv, err := p.K8sClient.
CoreV1().
PersistentVolumes().
Get(context.TODO(), vol.volname, metav1.GetOptions{})
if err != nil {
p.Log.Errorf("Error fetching PV(%s): %s", vol.volname, err.Error())
return errors.New("failed to fetch PV")
}

data, err := json.MarshalIndent(bkpPv, "", "\t")
if err != nil {
return errors.New("error doing json parsing")
}

filename := p.cl.GenerateRemoteFilename(vol.volname, vol.backupName)
if filename == "" {
return errors.New("error creating remote file name for pvc backup")
}

if ok := p.cl.Write(data, filename+".pv"); !ok {
return errors.New("failed to upload PV")
}

return nil
}

// restorePV create PV for given volume name
func (p *Plugin) restorePV(volumeID, snapName string) error {
_, err := p.K8sClient.
CoreV1().
PersistentVolumes().
Get(context.TODO(), volumeID, metav1.GetOptions{})
if err == nil {
p.Log.Infof("PV=%s already exists, skip restore", volumeID)
return nil
}
if !k8serrors.IsNotFound(err) {
return errors.Wrapf(err, "failed to get PV=%s", volumeID)
}

pv, err := p.downloadPV(volumeID, snapName)
if err != nil {
return errors.Wrapf(err, "failed to download pv")
}

// Add annotation PVCreatedByKey, with value 'restore' to PV
pv.Annotations = make(map[string]string)
pv.Annotations[v1alpha1.PVCreatedByKey] = "restore"
pv.ManagedFields = nil
pv.Finalizers = nil
if pv.Spec.ClaimRef != nil {
pv.Spec.ClaimRef.ResourceVersion = ""
pv.Spec.ClaimRef.UID = ""
}
pv.CreationTimestamp = metav1.Time{}
pv.ResourceVersion = ""
pv.UID = ""
pv.Status = v1.PersistentVolumeStatus{}

_, err = p.K8sClient.
CoreV1().
PersistentVolumes().
Create(context.TODO(), pv, metav1.CreateOptions{})
if err != nil {
if !k8serrors.IsAlreadyExists(err) {
return errors.Wrapf(err, "failed to create PV=%s", pv.Name)
}
p.Log.Infof("PV=%s already exists, skip restore", pv.Name)
}

return nil
}

func (p *Plugin) downloadPV(volumeID, snapName string) (*v1.PersistentVolume, error) {
pv := &v1.PersistentVolume{}

filename := p.cl.GenerateRemoteFilename(volumeID, snapName)

data, ok := p.cl.Read(filename + ".pv")
if !ok {
return nil, errors.Errorf("failed to download PV file=%s", filename+".pv")
}

if err := json.Unmarshal(data, pv); err != nil {
return nil, errors.Errorf("failed to decode pv file=%s", filename+".pv")
}

return pv, nil
}
23 changes: 19 additions & 4 deletions pkg/cstor/pvc_operation.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/pkg/errors"
v1 "k8s.io/api/core/v1"
k8serrors "k8s.io/apimachinery/pkg/api/errors"

metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/wait"
)
Expand Down Expand Up @@ -128,12 +129,22 @@ func (p *Plugin) createPVC(volumeID, snapName string) (*Volume, error) {
return newVol, nil
}

p.Log.Infof("Creating PVC for volumeID:%s snapshot:%s in namespace=%s", volumeID, snapName, targetedNs)
// restore PV so that it can be bound to PVC
p.Log.Infof("Creating PV(%s) snapshot:%s", volumeID, snapName)
if err := p.restorePV(volumeID, snapName); err != nil {
return nil, errors.Wrapf(err, "failed to restore PV=%s", pvc.Spec.VolumeName)
}

// pending until the pv is bound to pvc
p.Log.Infof("Creating PVC with specified volumeID:%s, snapshot:%s in namespace=%s", volumeID, snapName, targetedNs)

pvc.Annotations = make(map[string]string)
// Add annotation PVCreatedByKey, with value 'restore' to PVC
// So that Maya-APIServer skip updating target IPAddress in CVR
pvc.Annotations[v1alpha1.PVCreatedByKey] = "restore"

// Specify src pvname
pvc.Spec.VolumeName = volumeID
rpvc, err := p.K8sClient.
CoreV1().
PersistentVolumeClaims(pvc.Namespace).
Expand All @@ -142,6 +153,10 @@ func (p *Plugin) createPVC(volumeID, snapName string) (*Volume, error) {
return nil, errors.Wrapf(err, "failed to create PVC=%s/%s", pvc.Namespace, pvc.Name)
}

p.Log.Infof("Creating CVC(%s) snapshot:%s", volumeID, snapName)
if err := p.restoreCVC(volumeID, rpvc.Name, rpvc.Namespace, snapName); err != nil {
return nil, errors.Wrapf(err, "failed to restore CVC=%s", volumeID)
}
for cnt := 0; cnt < PVCWaitCount; cnt++ {
pvc, err = p.K8sClient.
CoreV1().
Expand All @@ -157,7 +172,7 @@ func (p *Plugin) createPVC(volumeID, snapName string) (*Volume, error) {
return nil, errors.Wrapf(err, "failed to create PVC=%s/%s", rpvc.Namespace, rpvc.Name)
}
if pvc.Status.Phase == v1.ClaimBound {
p.Log.Infof("PVC(%v) created..", pvc.Name)
p.Log.Infof("PVC(%v) created, PV(%s) bound", pvc.Name, volumeID)
vol = &Volume{
volname: pvc.Spec.VolumeName,
snapshotTag: volumeID,
Expand All @@ -168,6 +183,7 @@ func (p *Plugin) createPVC(volumeID, snapName string) (*Volume, error) {
p.volumes[vol.volname] = vol
break
}
p.Log.Debugf("PV(%v) is not bound yet..", volumeID)
time.Sleep(PVCCheckInterval)
}

Expand All @@ -182,12 +198,10 @@ func (p *Plugin) createPVC(volumeID, snapName string) (*Volume, error) {
p.Log.Errorf("Failed to get PV{%s}", vol.volname)
return nil, errors.Wrapf(err, "failed to get pv=%s", vol.volname)
}

vol.isCSIVolume = isCSIPv(*pv)
if err = p.waitForAllCVRs(vol); err != nil {
return nil, err
}

// CVRs are created and updated, now we can remove the annotation 'PVCreatedByKey' from PVC
if err = p.removePVCAnnotationKey(pvc, v1alpha1.PVCreatedByKey); err != nil {
p.Log.Warningf("Failed to remove restore annotation from PVC=%s/%s err=%s", pvc.Namespace, pvc.Name, err)
Expand All @@ -207,6 +221,7 @@ func (p *Plugin) getVolumeFromPVC(pvc v1.PersistentVolumeClaim) (*Volume, error)
Get(context.TODO(), pvc.Name, metav1.GetOptions{})
if err != nil {
if k8serrors.IsNotFound(err) {
p.Log.Warnf("PVC{%s} not found", pvc.Name)
return nil, nil
}
return nil, errors.Wrapf(err, "failed to fetch PVC{%s}", pvc.Name)
Expand Down

0 comments on commit bf73079

Please sign in to comment.