Skip to content

Commit

Permalink
Distributed lock: add Lock (blocking) and RenewLock methods
Browse files Browse the repository at this point in the history
Includes conformance tests

Signed-off-by: ItalyPaleAle <[email protected]>
  • Loading branch information
ItalyPaleAle committed Aug 5, 2023
1 parent 517f714 commit dc9a666
Show file tree
Hide file tree
Showing 12 changed files with 397 additions and 186 deletions.
9 changes: 2 additions & 7 deletions crypto/feature.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,8 @@ limitations under the License.
package crypto

import (
"golang.org/x/exp/slices"
"github.com/dapr/components-contrib/internal/features"
)

// Feature names a feature that can be implemented by the crypto provider components.
type Feature string

// IsPresent checks if a given feature is present in the list.
func (f Feature) IsPresent(features []Feature) bool {
return slices.Contains(features, f)
}
type Feature = features.Feature[SubtleCrypto]
17 changes: 11 additions & 6 deletions lock/metadata.go → internal/features/feature.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
Copyright 2021 The Dapr Authors
Copyright 2023 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Expand All @@ -11,11 +11,16 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package lock
package features

import "github.com/dapr/components-contrib/metadata"
import (
"golang.org/x/exp/slices"
)

// Metadata contains a lock store specific set of metadata property.
type Metadata struct {
metadata.Base `json:",inline"`
// Feature is a generic type for features supported by components.
type Feature[T any] string

// IsPresent checks if a given feature is present in the list.
func (f Feature[T]) IsPresent(features []Feature[T]) bool {
return slices.Contains(features, f)
}
100 changes: 100 additions & 0 deletions lock/lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
/*
Copyright 2021 The Dapr Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package lock

import (
"context"

"github.com/dapr/components-contrib/internal/features"
"github.com/dapr/components-contrib/metadata"
)

// Store is the interface for lock stores.
type Store interface {
metadata.ComponentWithMetadata

// Init the lock store.
Init(ctx context.Context, metadata Metadata) error

// Features returns the list of supported features.
Features() []Feature

// Lock acquires a lock.
// If the lock is owned by someone else, this method blocks until the lock can be acquired or the context is canceled.
Lock(ctx context.Context, req *LockRequest) (*LockResponse, error)

// TryLock tries to acquire a lock.
// If the lock cannot be acquired, it returns immediately.
TryLock(ctx context.Context, req *LockRequest) (*LockResponse, error)

// RenewLock attempts to renew a lock if the lock is still valid.
RenewLock(ctx context.Context, req *RenewLockRequest) (*RenewLockResponse, error)

// Unlock tries to release a lock if the lock is still valid.
Unlock(ctx context.Context, req *UnlockRequest) (*UnlockResponse, error)
}

// Metadata contains a lock store specific set of metadata property.
type Metadata struct {
metadata.Base `json:",inline"`
}

// Feature names a feature that can be implemented by the lock stores.
type Feature = features.Feature[Store]

// LockRequest is the request to acquire locks, used by Lock and TryLock.
type LockRequest struct {
ResourceID string `json:"resourceId"`
LockOwner string `json:"lockOwner"`
ExpiryInSeconds int32 `json:"expiryInSeconds"`
}

// LockResponse is the response used by Lock and TryLock when the operation is completed.
type LockResponse struct {
Success bool `json:"success"`
}

// RenewLockRequest is a lock renewal request.
type RenewLockRequest struct {
ResourceID string `json:"resourceId"`
LockOwner string `json:"lockOwner"`
ExpiryInSeconds int32 `json:"expiryInSeconds"`
}

// RenewLockResponse is a lock renewal request.
type RenewLockResponse struct {
Status LockStatus `json:"status"`
}

// UnlockRequest is a lock release request.
type UnlockRequest struct {
ResourceID string `json:"resourceId"`
LockOwner string `json:"lockOwner"`
}

// Status when releasing the lock.
type UnlockResponse struct {
Status LockStatus `json:"status"`
}

// LockStatus is the status included in lock responses.
type LockStatus int32

// lock status.
const (
LockStatusInternalError LockStatus = -1
LockStatusSuccess LockStatus = 0
LockStatusNotExist LockStatus = 1
LockStatusOwnerMismatch LockStatus = 2
)
162 changes: 138 additions & 24 deletions lock/redis/standalone.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,37 +18,47 @@ import (
"errors"
"fmt"
"reflect"
"sync/atomic"
"time"

"github.com/cenkalti/backoff/v4"
rediscomponent "github.com/dapr/components-contrib/internal/component/redis"
"github.com/dapr/components-contrib/lock"
contribMetadata "github.com/dapr/components-contrib/metadata"
"github.com/dapr/kit/logger"
)

const unlockScript = `local v = redis.call("get",KEYS[1]); if v==false then return -1 end; if v~=ARGV[1] then return -2 else return redis.call("del",KEYS[1]) end`
const (
unlockScript = `local v = redis.call("get",KEYS[1]); if v==false then return -1 end; if v~=ARGV[1] then return -2 else return redis.call("del",KEYS[1]) end`
renewLockScript = `local v = redis.call("get",KEYS[1]); if v==false then return -1 end; if v~=ARGV[1] then return -2 else return redis.call("expire",KEYS[1],ARGV[2]) end`
)

var ErrComponentClosed = errors.New("component is closed")

// Standalone Redis lock store.
// Any fail-over related features are not supported, such as Sentinel and Redis Cluster.
type StandaloneRedisLock struct {
client rediscomponent.RedisClient
clientSettings *rediscomponent.Settings

logger logger.Logger
closed atomic.Bool
runnincCh chan struct{}
logger logger.Logger
}

// NewStandaloneRedisLock returns a new standalone redis lock.
// Do not use this lock with a redis cluster, which might lead to unexpected lock loss.
// Do not use this lock with a Redis cluster, which might lead to unexpected lock loss.
func NewStandaloneRedisLock(logger logger.Logger) lock.Store {
s := &StandaloneRedisLock{
logger: logger,
logger: logger,
runnincCh: make(chan struct{}),
}

return s
}

// Init StandaloneRedisLock.
func (r *StandaloneRedisLock) InitLockStore(ctx context.Context, metadata lock.Metadata) (err error) {
// Init the lock store.
func (r *StandaloneRedisLock) Init(ctx context.Context, metadata lock.Metadata) (err error) {
// Create the client
r.client, r.clientSettings, err = rediscomponent.ParseClientFromProperties(metadata.Properties, contribMetadata.LockStoreType)
if err != nil {
Expand Down Expand Up @@ -79,51 +89,150 @@ func (r *StandaloneRedisLock) InitLockStore(ctx context.Context, metadata lock.M
return nil
}

// Features returns the list of supported features.
func (r *StandaloneRedisLock) Features() []lock.Feature {
return nil
}

// Lock tries to acquire a lock.
// If the lock is owned by someone else, this method blocks until the lock can be acquired or the context is canceled.
func (r *StandaloneRedisLock) Lock(ctx context.Context, req *lock.LockRequest) (res *lock.LockResponse, err error) {
if r.closed.Load() {
return nil, ErrComponentClosed
}

// We try to acquire a lock through periodic polling
// A potentially more efficient way would be to use keyspace notifications to subscribe to changes in the key we subscribe to
// However, keyspace notifications:
// 1. Are not enabled by default in Redis, and require an explicit configuration change, which adds quite a bit of complexity for the user: https://redis.io/docs/manual/keyspace-notifications/
// 2. When a connection to Redis calls SUBSCRIBE to watch for notifications, it cannot be used for anything else (unless we switch the protocol to RESP3, which must be explicitly chosen and only works with Redis 6+: https://redis.io/commands/hello/)
// So, periodic polling it is

// We use an exponential backoff here because it supports a randomization factor
bo := backoff.NewExponentialBackOff()
bo.MaxElapsedTime = 0
bo.InitialInterval = 50 * time.Millisecond
bo.MaxInterval = 500 * time.Millisecond
bo.RandomizationFactor = 0.5

// Repat until we get the lock, or context is canceled
for {
// Try to acquire the lock
res, err = r.TryLock(ctx, req)
if err != nil {
// If we got an error, return right away
return nil, err
}

// Let's see if we got the lock
if res.Success {
return res, nil
}

// Sleep till the next tick and try again
// Stop when context is done or component is closed
t := time.NewTimer(bo.NextBackOff())
select {
case <-t.C:
// Nop, retry
case <-ctx.Done():
return nil, ctx.Err()
case <-r.runnincCh:
return nil, ErrComponentClosed
}
}
}

// TryLock tries to acquire a lock.
// If the lock cannot be acquired, it returns immediately.
func (r *StandaloneRedisLock) TryLock(ctx context.Context, req *lock.TryLockRequest) (*lock.TryLockResponse, error) {
// Set a key if doesn't exist with an expiration time
func (r *StandaloneRedisLock) TryLock(ctx context.Context, req *lock.LockRequest) (*lock.LockResponse, error) {
if r.closed.Load() {
return nil, ErrComponentClosed
}

// Set a key if doesn't exist, with an expiration time
nxval, err := r.client.SetNX(ctx, req.ResourceID, req.LockOwner, time.Second*time.Duration(req.ExpiryInSeconds))
if nxval == nil {
return &lock.TryLockResponse{}, fmt.Errorf("setNX returned a nil response")
return &lock.LockResponse{}, fmt.Errorf("setNX returned a nil response")
}

if err != nil {
return &lock.TryLockResponse{}, err
return &lock.LockResponse{}, err
}

return &lock.TryLockResponse{
return &lock.LockResponse{
Success: *nxval,
}, nil
}

// RenewLock attempts to renew a lock if the lock is still valid.
func (r *StandaloneRedisLock) RenewLock(ctx context.Context, req *lock.RenewLockRequest) (*lock.RenewLockResponse, error) {
if r.closed.Load() {
return nil, ErrComponentClosed
}

// Delegate to client.eval lua script
evalInt, parseErr, err := r.client.EvalInt(ctx, renewLockScript, []string{req.ResourceID}, req.LockOwner, req.ExpiryInSeconds)
if evalInt == nil {
res := &lock.RenewLockResponse{
Status: lock.LockStatusInternalError,
}
return res, errors.New("eval renew lock script returned a nil response")
}

// Parse result
if parseErr != nil {
return &lock.RenewLockResponse{
Status: lock.LockStatusInternalError,
}, err
}
var status lock.LockStatus
switch {
case *evalInt >= 0:
status = lock.LockStatusSuccess
case *evalInt == -1:
status = lock.LockStatusNotExist
case *evalInt == -2:
status = lock.LockStatusOwnerMismatch
default:
status = lock.LockStatusInternalError
}

return &lock.RenewLockResponse{
Status: status,
}, nil
}

// Unlock tries to release a lock if the lock is still valid.
func (r *StandaloneRedisLock) Unlock(ctx context.Context, req *lock.UnlockRequest) (*lock.UnlockResponse, error) {
if r.closed.Load() {
return nil, ErrComponentClosed
}

// Delegate to client.eval lua script
evalInt, parseErr, err := r.client.EvalInt(ctx, unlockScript, []string{req.ResourceID}, req.LockOwner)
if evalInt == nil {
res := &lock.UnlockResponse{
Status: lock.InternalError,
Status: lock.LockStatusInternalError,
}
return res, errors.New("eval unlock script returned a nil response")
}

// Parse result
if parseErr != nil {
return &lock.UnlockResponse{
Status: lock.InternalError,
Status: lock.LockStatusInternalError,
}, err
}
var status lock.Status
var status lock.LockStatus
switch {
case *evalInt >= 0:
status = lock.Success
status = lock.LockStatusSuccess
case *evalInt == -1:
status = lock.LockDoesNotExist
status = lock.LockStatusNotExist
case *evalInt == -2:
status = lock.LockBelongsToOthers
status = lock.LockStatusOwnerMismatch
default:
status = lock.InternalError
status = lock.LockStatusInternalError
}

return &lock.UnlockResponse{
Expand All @@ -133,12 +242,17 @@ func (r *StandaloneRedisLock) Unlock(ctx context.Context, req *lock.UnlockReques

// Close shuts down the client's redis connections.
func (r *StandaloneRedisLock) Close() error {
if r.client != nil {
err := r.client.Close()
r.client = nil
return err
if !r.closed.CompareAndSwap(false, true) {
return nil
}
return nil

close(r.runnincCh)

if r.client == nil {
return nil
}

return r.client.Close()
}

// GetComponentMetadata returns the metadata of the component.
Expand Down
Loading

0 comments on commit dc9a666

Please sign in to comment.