Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: adding error handling to spans (Distributed Tracing) #728

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions bin/experiment/experiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"flag"
"fmt"
"os"

// Uncomment to load all auth plugins
Expand Down Expand Up @@ -68,6 +69,7 @@ import (
"github.com/litmuschaos/litmus-go/pkg/telemetry"
"github.com/sirupsen/logrus"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
)

func init() {
Expand Down Expand Up @@ -106,6 +108,8 @@ func main() {
//Getting kubeConfig and Generate ClientSets
if err := clients.GenerateClientSetFromKubeConfig(); err != nil {
log.Errorf("Unable to Get the kubeconfig, err: %v", err)
span.SetStatus(codes.Error, "Unable to Get the kubeconfig")
span.RecordError(err)
return
}

Expand Down Expand Up @@ -211,6 +215,7 @@ func main() {
k6Loadgen.Experiment(ctx, clients)
default:
log.Errorf("Unsupported -name %v, please provide the correct value of -name args", *experimentName)
span.SetStatus(codes.Error, fmt.Sprintf("Unsupported -name %v", *experimentName))
return
}
}
17 changes: 17 additions & 0 deletions chaoslib/litmus/aws-ssm-chaos/lib/ssm-chaos.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/litmuschaos/litmus-go/pkg/utils/common"
"github.com/palantir/stacktrace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
)

// InjectChaosInSerialMode will inject the aws ssm chaos in serial mode that is one after other
Expand Down Expand Up @@ -51,6 +52,8 @@ func InjectChaosInSerialMode(ctx context.Context, experimentsDetails *experiment
ec2IDList := strings.Fields(ec2ID)
commandId, err := ssm.SendSSMCommand(experimentsDetails, ec2IDList)
if err != nil {
span.SetStatus(codes.Error, "failed to send ssm command")
span.RecordError(err)
return stacktrace.Propagate(err, "failed to send ssm command")
}
//prepare commands for abort recovery
Expand All @@ -59,20 +62,26 @@ func InjectChaosInSerialMode(ctx context.Context, experimentsDetails *experiment
//wait for the ssm command to get in running state
log.Info("[Wait]: Waiting for the ssm command to get in InProgress state")
if err := ssm.WaitForCommandStatus("InProgress", commandId, ec2ID, experimentsDetails.Region, experimentsDetails.ChaosDuration+experimentsDetails.Timeout, experimentsDetails.Delay); err != nil {
span.SetStatus(codes.Error, "failed to start ssm command")
span.RecordError(err)
return stacktrace.Propagate(err, "failed to start ssm command")
}
common.SetTargets(ec2ID, "injected", "EC2", chaosDetails)

// run the probes during chaos
if len(resultDetails.ProbeDetails) != 0 && i == 0 {
if err = probe.RunProbes(ctx, chaosDetails, clients, resultDetails, "DuringChaos", eventsDetails); err != nil {
span.SetStatus(codes.Error, "failed to run probes")
span.RecordError(err)
return stacktrace.Propagate(err, "failed to run probes")
}
}

//wait for the ssm command to get succeeded in the given chaos duration
log.Info("[Wait]: Waiting for the ssm command to get completed")
if err := ssm.WaitForCommandStatus("Success", commandId, ec2ID, experimentsDetails.Region, experimentsDetails.ChaosDuration+experimentsDetails.Timeout, experimentsDetails.Delay); err != nil {
span.SetStatus(codes.Error, "failed to send ssm command")
span.RecordError(err)
return stacktrace.Propagate(err, "failed to send ssm command")
}
common.SetTargets(ec2ID, "reverted", "EC2", chaosDetails)
Expand Down Expand Up @@ -117,6 +126,8 @@ func InjectChaosInParallelMode(ctx context.Context, experimentsDetails *experime
log.Info("[Chaos]: Starting the ssm command")
commandId, err := ssm.SendSSMCommand(experimentsDetails, instanceIDList)
if err != nil {
span.SetStatus(codes.Error, "failed to send ssm command")
span.RecordError(err)
return stacktrace.Propagate(err, "failed to send ssm command")
}
//prepare commands for abort recovery
Expand All @@ -126,13 +137,17 @@ func InjectChaosInParallelMode(ctx context.Context, experimentsDetails *experime
//wait for the ssm command to get in running state
log.Info("[Wait]: Waiting for the ssm command to get in InProgress state")
if err := ssm.WaitForCommandStatus("InProgress", commandId, ec2ID, experimentsDetails.Region, experimentsDetails.ChaosDuration+experimentsDetails.Timeout, experimentsDetails.Delay); err != nil {
span.SetStatus(codes.Error, "failed to start ssm command")
span.RecordError(err)
return stacktrace.Propagate(err, "failed to start ssm command")
}
}

// run the probes during chaos
if len(resultDetails.ProbeDetails) != 0 {
if err = probe.RunProbes(ctx, chaosDetails, clients, resultDetails, "DuringChaos", eventsDetails); err != nil {
span.SetStatus(codes.Error, "failed to run probes")
span.RecordError(err)
return stacktrace.Propagate(err, "failed to run probes")
}
}
Expand All @@ -141,6 +156,8 @@ func InjectChaosInParallelMode(ctx context.Context, experimentsDetails *experime
//wait for the ssm command to get succeeded in the given chaos duration
log.Info("[Wait]: Waiting for the ssm command to get completed")
if err := ssm.WaitForCommandStatus("Success", commandId, ec2ID, experimentsDetails.Region, experimentsDetails.ChaosDuration+experimentsDetails.Timeout, experimentsDetails.Delay); err != nil {
span.SetStatus(codes.Error, "failed to send ssm command")
span.RecordError(err)
return stacktrace.Propagate(err, "failed to send ssm command")
}
}
Expand Down
19 changes: 17 additions & 2 deletions chaoslib/litmus/aws-ssm-chaos/lib/ssm/aws-ssm-chaos-by-id.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/litmuschaos/litmus-go/pkg/utils/common"
"github.com/palantir/stacktrace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
)

var (
Expand Down Expand Up @@ -49,6 +50,8 @@ func PrepareAWSSSMChaosByID(ctx context.Context, experimentsDetails *experimentT

//create and upload the ssm document on the given aws service monitoring docs
if err = ssm.CreateAndUploadDocument(experimentsDetails.DocumentName, experimentsDetails.DocumentType, experimentsDetails.DocumentFormat, experimentsDetails.DocumentPath, experimentsDetails.Region); err != nil {
span.SetStatus(codes.Error, "could not create and upload the ssm document")
span.RecordError(err)
return stacktrace.Propagate(err, "could not create and upload the ssm document")
}
experimentsDetails.IsDocsUploaded = true
Expand All @@ -60,25 +63,37 @@ func PrepareAWSSSMChaosByID(ctx context.Context, experimentsDetails *experimentT
//get the instance id or list of instance ids
instanceIDList := strings.Split(experimentsDetails.EC2InstanceID, ",")
if experimentsDetails.EC2InstanceID == "" || len(instanceIDList) == 0 {
return cerrors.Error{ErrorCode: cerrors.ErrorTypeTargetSelection, Reason: "no instance id found for chaos injection"}
span.SetStatus(codes.Error, "no instance id found for chaos injection")
err := cerrors.Error{ErrorCode: cerrors.ErrorTypeTargetSelection, Reason: "no instance id found for chaos injection"}
span.RecordError(err)
return err
}

switch strings.ToLower(experimentsDetails.Sequence) {
case "serial":
if err = lib.InjectChaosInSerialMode(ctx, experimentsDetails, instanceIDList, clients, resultDetails, eventsDetails, chaosDetails, inject); err != nil {
span.SetStatus(codes.Error, "could not run chaos in serial mode")
span.RecordError(err)
return stacktrace.Propagate(err, "could not run chaos in serial mode")
}
case "parallel":
if err = lib.InjectChaosInParallelMode(ctx, experimentsDetails, instanceIDList, clients, resultDetails, eventsDetails, chaosDetails, inject); err != nil {
span.SetStatus(codes.Error, "could not run chaos in parallel mode")
span.RecordError(err)
return stacktrace.Propagate(err, "could not run chaos in parallel mode")
}
default:
return cerrors.Error{ErrorCode: cerrors.ErrorTypeTargetSelection, Reason: fmt.Sprintf("'%s' sequence is not supported", experimentsDetails.Sequence)}
span.SetStatus(codes.Error, "sequence is not supported")
err := cerrors.Error{ErrorCode: cerrors.ErrorTypeTargetSelection, Reason: fmt.Sprintf("'%s' sequence is not supported", experimentsDetails.Sequence)}
span.RecordError(err)
return err
}

//Delete the ssm document on the given aws service monitoring docs
err = ssm.SSMDeleteDocument(experimentsDetails.DocumentName, experimentsDetails.Region)
if err != nil {
span.SetStatus(codes.Error, "failed to delete ssm doc")
span.RecordError(err)
return stacktrace.Propagate(err, "failed to delete ssm doc")
}

Expand Down
19 changes: 17 additions & 2 deletions chaoslib/litmus/aws-ssm-chaos/lib/ssm/aws-ssm-chaos-by-tag.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/litmuschaos/litmus-go/pkg/utils/common"
"github.com/palantir/stacktrace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
)

// PrepareAWSSSMChaosByTag contains the prepration and injection steps for the experiment
Expand All @@ -44,6 +45,8 @@ func PrepareAWSSSMChaosByTag(ctx context.Context, experimentsDetails *experiment

//create and upload the ssm document on the given aws service monitoring docs
if err = ssm.CreateAndUploadDocument(experimentsDetails.DocumentName, experimentsDetails.DocumentType, experimentsDetails.DocumentFormat, experimentsDetails.DocumentPath, experimentsDetails.Region); err != nil {
span.SetStatus(codes.Error, "could not create and upload the ssm document")
span.RecordError(err)
return stacktrace.Propagate(err, "could not create and upload the ssm document")
}
experimentsDetails.IsDocsUploaded = true
Expand All @@ -55,25 +58,37 @@ func PrepareAWSSSMChaosByTag(ctx context.Context, experimentsDetails *experiment
log.Infof("[Chaos]:Number of Instance targeted: %v", len(instanceIDList))

if len(instanceIDList) == 0 {
return cerrors.Error{ErrorCode: cerrors.ErrorTypeTargetSelection, Reason: "no instance id found for chaos injection"}
span.SetStatus(codes.Error, "no instance id found for chaos injection")
err := cerrors.Error{ErrorCode: cerrors.ErrorTypeTargetSelection, Reason: "no instance id found for chaos injection"}
span.RecordError(err)
return err
}

switch strings.ToLower(experimentsDetails.Sequence) {
case "serial":
if err = lib.InjectChaosInSerialMode(ctx, experimentsDetails, instanceIDList, clients, resultDetails, eventsDetails, chaosDetails, inject); err != nil {
span.SetStatus(codes.Error, "could not run chaos in serial mode")
span.RecordError(err)
return stacktrace.Propagate(err, "could not run chaos in serial mode")
}
case "parallel":
if err = lib.InjectChaosInParallelMode(ctx, experimentsDetails, instanceIDList, clients, resultDetails, eventsDetails, chaosDetails, inject); err != nil {
span.SetStatus(codes.Error, "could not run chaos in parallel mode")
span.RecordError(err)
return stacktrace.Propagate(err, "could not run chaos in parallel mode")
}
default:
return cerrors.Error{ErrorCode: cerrors.ErrorTypeTargetSelection, Reason: fmt.Sprintf("'%s' sequence is not supported", experimentsDetails.Sequence)}
span.SetStatus(codes.Error, "sequence is not supported")
err := cerrors.Error{ErrorCode: cerrors.ErrorTypeTargetSelection, Reason: fmt.Sprintf("'%s' sequence is not supported", experimentsDetails.Sequence)}
span.RecordError(err)
return err
}

//Delete the ssm document on the given aws service monitoring docs
err = ssm.SSMDeleteDocument(experimentsDetails.DocumentName, experimentsDetails.Region)
if err != nil {
span.SetStatus(codes.Error, "failed to delete ssm doc")
span.RecordError(err)
return stacktrace.Propagate(err, "failed to delete ssm doc")
}

Expand Down
39 changes: 37 additions & 2 deletions chaoslib/litmus/azure-disk-loss/lib/azure-disk-loss.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/litmuschaos/litmus-go/pkg/utils/retry"
"github.com/palantir/stacktrace"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
)

var (
Expand Down Expand Up @@ -55,11 +56,16 @@ func PrepareChaos(ctx context.Context, experimentsDetails *experimentTypes.Exper
//get the disk name or list of disk names
diskNameList := strings.Split(experimentsDetails.VirtualDiskNames, ",")
if experimentsDetails.VirtualDiskNames == "" || len(diskNameList) == 0 {
return cerrors.Error{ErrorCode: cerrors.ErrorTypeTargetSelection, Reason: "no volume names found to detach"}
span.SetStatus(codes.Error, "no volume names found to detach")
err := cerrors.Error{ErrorCode: cerrors.ErrorTypeTargetSelection, Reason: "no volume names found to detach"}
span.RecordError(err)
return err
}
instanceNamesWithDiskNames, err := diskStatus.GetInstanceNameForDisks(diskNameList, experimentsDetails.SubscriptionID, experimentsDetails.ResourceGroup)

if err != nil {
span.SetStatus(codes.Error, "error fetching attached instances for disks")
span.RecordError(err)
return stacktrace.Propagate(err, "error fetching attached instances for disks")
}

Expand All @@ -69,6 +75,8 @@ func PrepareChaos(ctx context.Context, experimentsDetails *experimentTypes.Exper
for instanceName := range instanceNamesWithDiskNames {
attachedDisksWithInstance[instanceName], err = diskStatus.GetInstanceDiskList(experimentsDetails.SubscriptionID, experimentsDetails.ResourceGroup, experimentsDetails.ScaleSet, instanceName)
if err != nil {
span.SetStatus(codes.Error, "error fetching virtual disks")
span.RecordError(err)
return stacktrace.Propagate(err, "error fetching virtual disks")
}
}
Expand All @@ -85,14 +93,21 @@ func PrepareChaos(ctx context.Context, experimentsDetails *experimentTypes.Exper
switch strings.ToLower(experimentsDetails.Sequence) {
case "serial":
if err = injectChaosInSerialMode(ctx, experimentsDetails, instanceNamesWithDiskNames, attachedDisksWithInstance, clients, resultDetails, eventsDetails, chaosDetails); err != nil {
span.SetStatus(codes.Error, "could not run chaos in serial mode")
span.RecordError(err)
return stacktrace.Propagate(err, "could not run chaos in serial mode")
}
case "parallel":
if err = injectChaosInParallelMode(ctx, experimentsDetails, instanceNamesWithDiskNames, attachedDisksWithInstance, clients, resultDetails, eventsDetails, chaosDetails); err != nil {
span.SetStatus(codes.Error, "could not run chaos in parallel mode")
span.RecordError(err)
return stacktrace.Propagate(err, "could not run chaos in parallel mode")
}
default:
return cerrors.Error{ErrorCode: cerrors.ErrorTypeGeneric, Reason: fmt.Sprintf("'%s' sequence is not supported", experimentsDetails.Sequence)}
span.SetStatus(codes.Error, "sequence is not supported")
err := cerrors.Error{ErrorCode: cerrors.ErrorTypeGeneric, Reason: fmt.Sprintf("'%s' sequence is not supported", experimentsDetails.Sequence)}
span.RecordError(err)
return err
}

//Waiting for the ramp time after chaos injection
Expand Down Expand Up @@ -125,6 +140,8 @@ func injectChaosInParallelMode(ctx context.Context, experimentsDetails *experime
log.Info("[Chaos]: Detaching the virtual disks from the instances")
for instanceName, diskNameList := range instanceNamesWithDiskNames {
if err = diskStatus.DetachDisks(experimentsDetails.SubscriptionID, experimentsDetails.ResourceGroup, instanceName, experimentsDetails.ScaleSet, diskNameList); err != nil {
span.SetStatus(codes.Error, "failed to detach disks")
span.RecordError(err)
return stacktrace.Propagate(err, "failed to detach disks")
}
}
Expand All @@ -133,6 +150,8 @@ func injectChaosInParallelMode(ctx context.Context, experimentsDetails *experime
for _, diskName := range diskNameList {
log.Infof("[Wait]: Waiting for Disk '%v' to detach", diskName)
if err := diskStatus.WaitForDiskToDetach(experimentsDetails, diskName); err != nil {
span.SetStatus(codes.Error, "disk detachment check failed")
span.RecordError(err)
return stacktrace.Propagate(err, "disk detachment check failed")
}
}
Expand All @@ -147,6 +166,8 @@ func injectChaosInParallelMode(ctx context.Context, experimentsDetails *experime
// run the probes during chaos
if len(resultDetails.ProbeDetails) != 0 {
if err := probe.RunProbes(ctx, chaosDetails, clients, resultDetails, "DuringChaos", eventsDetails); err != nil {
span.SetStatus(codes.Error, "failed to run probes")
span.RecordError(err)
return stacktrace.Propagate(err, "failed to run probes")
}
}
Expand All @@ -159,6 +180,8 @@ func injectChaosInParallelMode(ctx context.Context, experimentsDetails *experime
log.Info("[Chaos]: Attaching the Virtual disks back to the instances")
for instanceName, diskNameList := range attachedDisksWithInstance {
if err = diskStatus.AttachDisk(experimentsDetails.SubscriptionID, experimentsDetails.ResourceGroup, instanceName, experimentsDetails.ScaleSet, diskNameList); err != nil {
span.SetStatus(codes.Error, "virtual disk attachment failed")
span.RecordError(err)
return stacktrace.Propagate(err, "virtual disk attachment failed")
}

Expand All @@ -167,6 +190,8 @@ func injectChaosInParallelMode(ctx context.Context, experimentsDetails *experime
for _, diskName := range diskNameList {
log.Infof("[Wait]: Waiting for Disk '%v' to attach", diskName)
if err := diskStatus.WaitForDiskToAttach(experimentsDetails, diskName); err != nil {
span.SetStatus(codes.Error, "disk attachment check failed")
span.RecordError(err)
return stacktrace.Propagate(err, "disk attachment check failed")
}
}
Expand Down Expand Up @@ -209,12 +234,16 @@ func injectChaosInSerialMode(ctx context.Context, experimentsDetails *experiment
// Detaching the virtual disks
log.Infof("[Chaos]: Detaching %v from the instance", diskName)
if err = diskStatus.DetachDisks(experimentsDetails.SubscriptionID, experimentsDetails.ResourceGroup, instanceName, experimentsDetails.ScaleSet, diskNameToList); err != nil {
span.SetStatus(codes.Error, "failed to detach disks")
span.RecordError(err)
return stacktrace.Propagate(err, "failed to detach disks")
}

// Waiting for disk to be detached
log.Infof("[Wait]: Waiting for Disk '%v' to detach", diskName)
if err := diskStatus.WaitForDiskToDetach(experimentsDetails, diskName); err != nil {
span.SetStatus(codes.Error, "disk detachment check failed")
span.RecordError(err)
return stacktrace.Propagate(err, "disk detachment check failed")
}

Expand All @@ -224,6 +253,8 @@ func injectChaosInSerialMode(ctx context.Context, experimentsDetails *experiment
// the OnChaos probes execution will start in the first iteration and keep running for the entire chaos duration
if len(resultDetails.ProbeDetails) != 0 && i == 0 {
if err := probe.RunProbes(ctx, chaosDetails, clients, resultDetails, "DuringChaos", eventsDetails); err != nil {
span.SetStatus(codes.Error, "failed to run probes")
span.RecordError(err)
return stacktrace.Propagate(err, "failed to run probes")
}
}
Expand All @@ -235,12 +266,16 @@ func injectChaosInSerialMode(ctx context.Context, experimentsDetails *experiment
//Attaching the virtual disks to the instance
log.Infof("[Chaos]: Attaching %v back to the instance", diskName)
if err = diskStatus.AttachDisk(experimentsDetails.SubscriptionID, experimentsDetails.ResourceGroup, instanceName, experimentsDetails.ScaleSet, attachedDisksWithInstance[instanceName]); err != nil {
span.SetStatus(codes.Error, "disk attachment failed")
span.RecordError(err)
return stacktrace.Propagate(err, "disk attachment failed")
}

// Waiting for disk to be attached
log.Infof("[Wait]: Waiting for Disk '%v' to attach", diskName)
if err := diskStatus.WaitForDiskToAttach(experimentsDetails, diskName); err != nil {
span.SetStatus(codes.Error, "disk attachment check failed")
span.RecordError(err)
return stacktrace.Propagate(err, "disk attachment check failed")
}

Expand Down
Loading