Skip to content

Commit

Permalink
Merge branch 'master' into plugin_interface
Browse files Browse the repository at this point in the history
  • Loading branch information
mantas-sidlauskas authored Apr 22, 2024
2 parents 860636d + 3db7157 commit 5d0f3a4
Show file tree
Hide file tree
Showing 90 changed files with 4,901 additions and 299 deletions.
13 changes: 13 additions & 0 deletions common/dynamicconfig/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -2003,6 +2003,13 @@ const (
// Allowed filters: DomainName
EnableRetryForChecksumFailure

// EnableStrongIdempotency enables strong idempotency for APIs
// KeyName: history.enableStrongIdempotency
// Value type: Bool
// Default value: false
// Allowed filters: DomainName
EnableStrongIdempotency

// LastBoolKey must be the last one in this const group
LastBoolKey
)
Expand Down Expand Up @@ -4310,6 +4317,12 @@ var BoolKeys = map[BoolKey]DynamicBool{
Description: "EnableRetryForChecksumFailure enables retry if mutable state checksum verification fails",
DefaultValue: false,
},
EnableStrongIdempotency: DynamicBool{
KeyName: "history.enableStrongIdempotency",
Filters: []Filter{DomainName},
Description: "EnableStrongIdempotency enables strong idempotency for APIs",
DefaultValue: false,
},
}

var FloatKeys = map[FloatKey]DynamicFloat{
Expand Down
5 changes: 5 additions & 0 deletions common/log/tag/tags.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,11 @@ func WorkflowSignalName(signalName string) Tag {
return newStringTag("wf-signal-name", signalName)
}

// WorkflowRequestID returns tag for WorkflowRequestID
func WorkflowRequestID(requestID string) Tag {
return newStringTag("wf-request-id", requestID)
}

// WorkflowState returns tag for WorkflowState
func WorkflowState(s int) Tag {
return newInt("wf-state", s)
Expand Down
2 changes: 1 addition & 1 deletion common/persistence/data_store_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import (
"github.com/uber/cadence/common/types"
)

//go:generate mockgen -package $GOPACKAGE -destination data_store_interfaces_mock.go -self_package github.com/uber/cadence/common/persistence github.com/uber/cadence/common/persistence ExecutionStore
//go:generate mockgen -package $GOPACKAGE -destination data_store_interfaces_mock.go -self_package github.com/uber/cadence/common/persistence github.com/uber/cadence/common/persistence ExecutionStore,ShardStore
//go:generate mockgen -package $GOPACKAGE -destination visibility_store_mock.go -self_package github.com/uber/cadence/common/persistence github.com/uber/cadence/common/persistence VisibilityStore

type (
Expand Down
94 changes: 93 additions & 1 deletion common/persistence/data_store_interfaces_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 14 additions & 2 deletions common/persistence/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@

package persistence

import "fmt"
import (
"errors"
"fmt"
)

type (
// TimeoutError is returned when a write operation fails due to a timeout
Expand Down Expand Up @@ -77,7 +80,8 @@ type (
}

DuplicateRequestError struct {
RunID string
RequestType WorkflowRequestType
RunID string
}
)

Expand Down Expand Up @@ -120,3 +124,11 @@ func (e *DBUnavailableError) Error() string {
func (e *TransactionSizeLimitError) Error() string {
return e.Msg
}

func AsDuplicateRequestError(err error) (*DuplicateRequestError, bool) {
var e *DuplicateRequestError
if errors.As(err, &e) {
return e, true
}
return nil, false
}
67 changes: 67 additions & 0 deletions common/persistence/errors_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
// The MIT License (MIT)

// Copyright (c) 2017-2020 Uber Technologies Inc.

// Permission is hereby granted, free of charge, to any person obtaining a copy
// of this software and associated documentation files (the "Software"), to deal
// in the Software without restriction, including without limitation the rights
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
// copies of the Software, and to permit persons to whom the Software is
// furnished to do so, subject to the following conditions:
//
// The above copyright notice and this permission notice shall be included in all
// copies or substantial portions of the Software.
//
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
// SOFTWARE.

package persistence

import (
"fmt"
"testing"

"github.com/stretchr/testify/assert"
)

func TestAsDuplicateRequestError(t *testing.T) {
testCases := []struct {
name string
err error
expectedErr *DuplicateRequestError
ok bool
}{
{
name: "unwrapped",
err: &DuplicateRequestError{RunID: "a"},
expectedErr: &DuplicateRequestError{RunID: "a"},
ok: true,
},
{
name: "wrapped",
err: fmt.Errorf("%w", &DuplicateRequestError{RunID: "b"}),
expectedErr: &DuplicateRequestError{RunID: "b"},
ok: true,
},
{
name: "not same type",
err: fmt.Errorf("adasdf"),
ok: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
e, ok := AsDuplicateRequestError(tc.err)
assert.Equal(t, tc.ok, ok)
if ok {
assert.Equal(t, tc.expectedErr, e)
}
})
}
}
3 changes: 2 additions & 1 deletion common/persistence/nosql/nosql_execution_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,8 @@ func (d *nosqlExecutionStore) CreateWorkflowExecution(
}
case conditionFailureErr.DuplicateRequest != nil:
return nil, &persistence.DuplicateRequestError{
RunID: conditionFailureErr.DuplicateRequest.RunID,
RequestType: conditionFailureErr.DuplicateRequest.RequestType,
RunID: conditionFailureErr.DuplicateRequest.RunID,
}
default:
// If ever runs into this branch, there is bug in the code either in here, or in the implementation of nosql plugin
Expand Down
37 changes: 37 additions & 0 deletions common/persistence/nosql/nosql_execution_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,25 @@ func TestCreateWorkflowExecution(t *testing.T) {
expectedResp: nil,
expectedError: fmt.Errorf("unsupported conditionFailureReason error"), // Expected generic error for unexpected conditions
},
{
name: "Duplicate request error",
setupMock: func(mockDB *nosqlplugin.MockDB, shardID int) {
mockDB.EXPECT().IsNotFoundError(gomock.Any()).Return(true).AnyTimes()
mockDB.EXPECT().
InsertWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&nosqlplugin.WorkflowOperationConditionFailure{
DuplicateRequest: &nosqlplugin.DuplicateRequest{
RequestType: persistence.WorkflowRequestTypeSignal,
RunID: "test-run-id",
},
})
},
expectedResp: nil,
expectedError: &persistence.DuplicateRequestError{
RequestType: persistence.WorkflowRequestTypeSignal,
RunID: "test-run-id",
},
},
}

for _, tc := range tests {
Expand Down Expand Up @@ -206,6 +225,24 @@ func TestUpdateWorkflowExecution(t *testing.T) {
},
expectedError: nil,
},
{
name: "Duplicate request error",
setupMock: func(mockDB *nosqlplugin.MockDB, shardID int) {
mockDB.EXPECT().
UpdateWorkflowExecutionWithTasks(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), nil, gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).
Return(&nosqlplugin.WorkflowOperationConditionFailure{
DuplicateRequest: &nosqlplugin.DuplicateRequest{
RequestType: persistence.WorkflowRequestTypeSignal,
RunID: "test-run-id",
},
})
},
request: newUpdateWorkflowExecutionRequest,
expectedError: &persistence.DuplicateRequestError{
RequestType: persistence.WorkflowRequestTypeSignal,
RunID: "test-run-id",
},
},
{
name: "UpdateWorkflowModeBypassCurrent - assertNotCurrentExecution failure",
setupMock: func(mockDB *nosqlplugin.MockDB, shardID int) {
Expand Down
3 changes: 2 additions & 1 deletion common/persistence/nosql/nosql_execution_store_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -743,7 +743,8 @@ func (d *nosqlExecutionStore) processUpdateWorkflowResult(err error, rangeID int
}
case conditionFailureErr.DuplicateRequest != nil:
return &persistence.DuplicateRequestError{
RunID: conditionFailureErr.DuplicateRequest.RunID,
RequestType: conditionFailureErr.DuplicateRequest.RequestType,
RunID: conditionFailureErr.DuplicateRequest.RunID,
}
default:
// If ever runs into this branch, there is bug in the code either in here, or in the implementation of nosql plugin
Expand Down
Loading

0 comments on commit 5d0f3a4

Please sign in to comment.