diff --git a/db.go b/db.go index eb8f41f..6b212bf 100644 --- a/db.go +++ b/db.go @@ -10,11 +10,9 @@ import ( "github.com/dgraph-io/badger/v4" "github.com/dgraph-io/dgo/v240/protos/api" - "github.com/dgraph-io/dgraph/v24/dql" "github.com/dgraph-io/dgraph/v24/edgraph" "github.com/dgraph-io/dgraph/v24/posting" "github.com/dgraph-io/dgraph/v24/protos/pb" - "github.com/dgraph-io/dgraph/v24/query" "github.com/dgraph-io/dgraph/v24/schema" "github.com/dgraph-io/dgraph/v24/worker" "github.com/dgraph-io/dgraph/v24/x" @@ -22,27 +20,31 @@ import ( ) var ( - // This ensures that we only have one instance of modusdb in this process. - singeton atomic.Bool + // This ensures that we only have one instance of modusDB in this process. + singleton atomic.Bool - ErrSingletonOnly = errors.New("only one modusdb instance is supported") - ErrEmptyDataDir = errors.New("data directory is required") - ErrDBClosed = errors.New("modusdb instance is closed") + ErrSingletonOnly = errors.New("only one modusDB instance is supported") + ErrEmptyDataDir = errors.New("data directory is required") + ErrClosedDB = errors.New("modusDB instance is closed") + ErrNonExistentNamespace = errors.New("namespace does not exist") ) -// DB is an instance of modusdb. -// For now, we only support one instance of modusdb per process. +// DB is an instance of modusDB. +// For now, we only support one instance of modusDB per process. type DB struct { mutex sync.RWMutex isOpen bool z *zero + + // points to default / 0 / galaxy namespace + gxy *Namespace } -// New returns a new modusdb instance. +// New returns a new modusDB instance. func New(conf Config) (*DB, error) { - // Ensure that we do not create another instance of modusdb in the same process - if !singeton.CompareAndSwap(false, true) { + // Ensure that we do not create another instance of modusDB in the same process + if !singleton.CompareAndSwap(false, true) { return nil, ErrSingletonOnly } @@ -74,198 +76,120 @@ func New(conf Config) (*DB, error) { } x.UpdateHealthStatus(true) + + db.gxy = &Namespace{id: 0, db: db} return db, nil } -// Close closes the modusdb instance. -func (db *DB) Close() { - db.mutex.Lock() - defer db.mutex.Unlock() +func (db *DB) CreateNamespace() (*Namespace, error) { + db.mutex.RLock() + defer db.mutex.RUnlock() if !db.isOpen { - return + return nil, ErrClosedDB } - if !singeton.CompareAndSwap(true, false) { - panic("modusdb instance was not properly opened") + startTs, err := db.z.nextTs() + if err != nil { + return nil, err + } + nsID, err := db.z.nextNS() + if err != nil { + return nil, err } - db.isOpen = false - x.UpdateHealthStatus(false) - posting.Cleanup() - worker.State.Dispose() + if err := worker.ApplyInitialSchema(nsID, startTs); err != nil { + return nil, fmt.Errorf("error applying initial schema: %w", err) + } + for _, pred := range schema.State().Predicates() { + worker.InitTablet(pred) + } + + return &Namespace{id: nsID, db: db}, nil } -// DropAll drops all the data and schema in the modusdb instance. -func (db *DB) DropAll(ctx context.Context) error { - db.mutex.Lock() - defer db.mutex.Unlock() +func (db *DB) GetNamespace(nsID uint64) (*Namespace, error) { + db.mutex.RLock() + defer db.mutex.RUnlock() if !db.isOpen { - return ErrDBClosed + return nil, ErrClosedDB } - p := &pb.Proposal{Mutations: &pb.Mutations{ - GroupId: 1, - DropOp: pb.Mutations_ALL, - }} - if err := worker.ApplyMutations(ctx, p); err != nil { - return fmt.Errorf("error applying mutation: %w", err) - } - if err := db.reset(); err != nil { - return fmt.Errorf("error resetting db: %w", err) + if nsID > db.z.lastNS { + return nil, ErrNonExistentNamespace } - // TODO: insert drop record - return nil + // TODO: when delete namespace is implemented, check if the namespace exists + + return &Namespace{id: nsID, db: db}, nil } -// DropData drops all the data in the modusdb instance. -func (db *DB) DropData(ctx context.Context) error { +// DropAll drops all the data and schema in the modusDB instance. +func (db *DB) DropAll(ctx context.Context) error { db.mutex.Lock() defer db.mutex.Unlock() if !db.isOpen { - return ErrDBClosed + return ErrClosedDB } p := &pb.Proposal{Mutations: &pb.Mutations{ GroupId: 1, - DropOp: pb.Mutations_DATA, + DropOp: pb.Mutations_ALL, }} if err := worker.ApplyMutations(ctx, p); err != nil { return fmt.Errorf("error applying mutation: %w", err) } + if err := db.reset(); err != nil { + return fmt.Errorf("error resetting db: %w", err) + } // TODO: insert drop record - // TODO: should we reset back the timestamp as well? return nil } -func (db *DB) AlterSchema(ctx context.Context, sch string) error { - db.mutex.Lock() - defer db.mutex.Unlock() - - if !db.isOpen { - return ErrDBClosed - } - - sc, err := schema.ParseWithNamespace(sch, 0) - if err != nil { - return fmt.Errorf("error parsing schema: %w", err) - } - for _, pred := range sc.Preds { - worker.InitTablet(pred.Predicate) - } +func (db *DB) DropData(ctx context.Context) error { + return db.gxy.DropData(ctx) +} - startTs, err := db.z.nextTs() - if err != nil { - return err - } +func (db *DB) AlterSchema(ctx context.Context, sch string) error { + return db.gxy.AlterSchema(ctx, sch) +} - p := &pb.Proposal{Mutations: &pb.Mutations{ - GroupId: 1, - StartTs: startTs, - Schema: sc.Preds, - Types: sc.Types, - }} - if err := worker.ApplyMutations(ctx, p); err != nil { - return fmt.Errorf("error applying mutation: %w", err) - } - return nil +func (db *DB) Query(ctx context.Context, q string) (*api.Response, error) { + return db.gxy.Query(ctx, q) } func (db *DB) Mutate(ctx context.Context, ms []*api.Mutation) (map[string]uint64, error) { - if len(ms) == 0 { - return nil, nil - } + return db.gxy.Mutate(ctx, ms) +} - dms := make([]*dql.Mutation, 0, len(ms)) - for _, mu := range ms { - dm, err := edgraph.ParseMutationObject(mu, false) - if err != nil { - return nil, fmt.Errorf("error parsing mutation: %w", err) - } - dms = append(dms, dm) - } - newUids, err := query.ExtractBlankUIDs(ctx, dms) - if err != nil { - return nil, err - } - if len(newUids) > 0 { - num := &pb.Num{Val: uint64(len(newUids)), Type: pb.Num_UID} - res, err := db.z.nextUIDs(num) - if err != nil { - return nil, err - } +func (db *DB) Load(ctx context.Context, schemaPath, dataPath string) error { + return db.gxy.Load(ctx, schemaPath, dataPath) +} - curId := res.StartId - for k := range newUids { - x.AssertTruef(curId != 0 && curId <= res.EndId, "not enough uids generated") - newUids[k] = curId - curId++ - } - } - edges, err := query.ToDirectedEdges(dms, newUids) - if err != nil { - return nil, err - } - ctx = x.AttachNamespace(ctx, 0) +func (db *DB) LoadData(inCtx context.Context, dataDir string) error { + return db.gxy.LoadData(inCtx, dataDir) +} +// Close closes the modusDB instance. +func (db *DB) Close() { db.mutex.Lock() defer db.mutex.Unlock() if !db.isOpen { - return nil, ErrDBClosed - } - - startTs, err := db.z.nextTs() - if err != nil { - return nil, err - } - commitTs, err := db.z.nextTs() - if err != nil { - return nil, err - } - - m := &pb.Mutations{ - GroupId: 1, - StartTs: startTs, - Edges: edges, - } - m.Edges, err = query.ExpandEdges(ctx, m) - if err != nil { - return nil, fmt.Errorf("error expanding edges: %w", err) - } - - for _, edge := range m.Edges { - worker.InitTablet(edge.Attr) - } - - p := &pb.Proposal{Mutations: m, StartTs: startTs} - if err := worker.ApplyMutations(ctx, p); err != nil { - return nil, err + return } - return newUids, worker.ApplyCommited(ctx, &pb.OracleDelta{ - Txns: []*pb.TxnStatus{{StartTs: startTs, CommitTs: commitTs}}, - }) -} - -// Query performs query or mutation or upsert on the given modusdb instance. -func (db *DB) Query(ctx context.Context, query string) (*api.Response, error) { - db.mutex.RLock() - defer db.mutex.RUnlock() - - if !db.isOpen { - return nil, ErrDBClosed + if !singleton.CompareAndSwap(true, false) { + panic("modusDB instance was not properly opened") } - return (&edgraph.Server{}).QueryNoGrpc(ctx, &api.Request{ - ReadOnly: true, - Query: query, - StartTs: db.z.readTs(), - }) + db.isOpen = false + x.UpdateHealthStatus(false) + posting.Cleanup() + worker.State.Dispose() } func (db *DB) reset() error { @@ -275,7 +199,7 @@ func (db *DB) reset() error { } if !restart { - if err := worker.ApplyInitialSchema(); err != nil { + if err := worker.ApplyInitialSchema(0, 1); err != nil { return fmt.Errorf("error applying initial schema: %w", err) } } diff --git a/go.mod b/go.mod index f24ed08..7253865 100644 --- a/go.mod +++ b/go.mod @@ -8,10 +8,10 @@ require ( github.com/cavaliergopher/grab/v3 v3.0.1 github.com/dgraph-io/badger/v4 v4.4.0 github.com/dgraph-io/dgo/v240 v240.0.1 - github.com/dgraph-io/dgraph/v24 v24.0.3-0.20241127082017-fd45b3875e3f + github.com/dgraph-io/dgraph/v24 v24.0.3-0.20241202011806-64256ce6cac9 github.com/dgraph-io/ristretto/v2 v2.0.0 github.com/pkg/errors v0.9.1 - github.com/stretchr/testify v1.9.0 + github.com/stretchr/testify v1.10.0 golang.org/x/sync v0.9.0 ) @@ -25,7 +25,7 @@ require ( github.com/Microsoft/go-winio v0.6.2 // indirect github.com/agnivade/levenshtein v1.1.1 // indirect github.com/beorn7/perks v1.0.1 // indirect - github.com/bits-and-blooms/bitset v1.15.0 // indirect + github.com/bits-and-blooms/bitset v1.17.0 // indirect github.com/blevesearch/bleve/v2 v2.4.3 // indirect github.com/blevesearch/bleve_index_api v1.1.12 // indirect github.com/blevesearch/geo v0.1.20 // indirect diff --git a/go.sum b/go.sum index 5bd8b3c..46296ee 100644 --- a/go.sum +++ b/go.sum @@ -80,8 +80,8 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= -github.com/bits-and-blooms/bitset v1.15.0 h1:DiCRMscZsGyYePE9AR3sVhKqUXCt5IZvkX5AfAc5xLQ= -github.com/bits-and-blooms/bitset v1.15.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= +github.com/bits-and-blooms/bitset v1.17.0 h1:1X2TS7aHz1ELcC0yU1y2stUs/0ig5oMU6STFZGrhvHI= +github.com/bits-and-blooms/bitset v1.17.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= github.com/blevesearch/bleve/v2 v2.4.3 h1:XDYj+1prgX84L2Cf+V3ojrOPqXxy0qxyd2uLMmeuD+4= github.com/blevesearch/bleve/v2 v2.4.3/go.mod h1:hEPDPrbYw3vyrm5VOa36GyS4bHWuIf4Fflp7460QQXY= @@ -138,8 +138,8 @@ github.com/dgraph-io/badger/v4 v4.4.0 h1:rA48XiDynZLyMdlaJl67p9+lqfqwxlgKtCpYLAi github.com/dgraph-io/badger/v4 v4.4.0/go.mod h1:sONMmPPfbnj9FPwS/etCqky/ULth6CQJuAZSuWCmixE= github.com/dgraph-io/dgo/v240 v240.0.1 h1:R0d9Cao3MOghrC9RVXshw6v8Jr/IjKgU2mK9sR9nclc= github.com/dgraph-io/dgo/v240 v240.0.1/go.mod h1:urpjhWGdYVSVQAwd000iu4wHyHPpuHpwJ7aILsuGF5A= -github.com/dgraph-io/dgraph/v24 v24.0.3-0.20241127082017-fd45b3875e3f h1:cFVPh3MdiAQWUoK8d6+Xj2z6+FNe/JzUAqNRNjQGSlI= -github.com/dgraph-io/dgraph/v24 v24.0.3-0.20241127082017-fd45b3875e3f/go.mod h1:rPhzF3u8SjgEDp5v3tJI05f/8O8TUJiZ5/a6gYwj0qc= +github.com/dgraph-io/dgraph/v24 v24.0.3-0.20241202011806-64256ce6cac9 h1:6ink3iffWaAqHCOX8j35oC8+K2oiDFLwsyNSd40YmBQ= +github.com/dgraph-io/dgraph/v24 v24.0.3-0.20241202011806-64256ce6cac9/go.mod h1:2e/yPl+J7eEl9eaeeYSGMwuiQAxVh9x2Gm1SsaVvM3o= github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM= github.com/dgraph-io/gqlgen v0.13.2/go.mod h1:iCOrOv9lngN7KAo+jMgvUPVDlYHdf7qDwsTkQby2Sis= github.com/dgraph-io/gqlparser/v2 v2.1.1/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P9fvO8TsIsQtRKU= @@ -610,8 +610,8 @@ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= +github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/stvp/go-udp-testing v0.0.0-20201019212854-469649b16807/go.mod h1:7jxmlfBCDBXRzr0eAQJ48XC1hBu1np4CS5+cHEYfwpc= github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw= github.com/subosito/gotenv v1.6.0 h1:9NlTDc1FTs4qu0DDq7AEtTPNw6SVm7uBMsUCUjABIf8= diff --git a/live.go b/live.go index a8a83db..3e89e05 100644 --- a/live.go +++ b/live.go @@ -25,29 +25,29 @@ const ( ) type liveLoader struct { - db *DB + n *Namespace blankNodes map[string]string mutex sync.RWMutex } -func (db *DB) Load(ctx context.Context, schemaPath, dataPath string) error { +func (n *Namespace) Load(ctx context.Context, schemaPath, dataPath string) error { schemaData, err := os.ReadFile(schemaPath) if err != nil { return fmt.Errorf("error reading schema file [%v]: %w", schemaPath, err) } - if err := db.AlterSchema(ctx, string(schemaData)); err != nil { + if err := n.AlterSchema(ctx, string(schemaData)); err != nil { return fmt.Errorf("error altering schema: %w", err) } - if err := db.LoadData(ctx, dataPath); err != nil { + if err := n.LoadData(ctx, dataPath); err != nil { return fmt.Errorf("error loading data: %w", err) } return nil } // TODO: Add support for CSV file -func (db *DB) LoadData(inCtx context.Context, dataDir string) error { +func (n *Namespace) LoadData(inCtx context.Context, dataDir string) error { fs := filestore.NewFileStore(dataDir) files := fs.FindDataFiles(dataDir, []string{".rdf", ".rdf.gz", ".json", ".json.gz"}) if len(files) == 0 { @@ -85,7 +85,7 @@ func (db *DB) LoadData(inCtx context.Context, dataDir string) error { if !ok { return nil } - uids, err := db.Mutate(rootCtx, []*api.Mutation{nqs}) + uids, err := n.Mutate(rootCtx, []*api.Mutation{nqs}) if err != nil { return fmt.Errorf("error applying mutations: %w", err) } @@ -95,7 +95,7 @@ func (db *DB) LoadData(inCtx context.Context, dataDir string) error { } }) - ll := &liveLoader{db: db, blankNodes: make(map[string]string)} + ll := &liveLoader{n: n, blankNodes: make(map[string]string)} for _, datafile := range files { procG.Go(func() error { return ll.processFile(procCtx, fs, datafile, nqch) @@ -237,7 +237,7 @@ func (l *liveLoader) uid(ns uint64, val string) (string, error) { return uid, nil } - asUID, err := l.db.LeaseUIDs(1) + asUID, err := l.n.db.LeaseUIDs(1) if err != nil { return "", fmt.Errorf("error allocating UID: %w", err) } diff --git a/live_test.go b/live_test.go index 825f4b4..b1e173b 100644 --- a/live_test.go +++ b/live_test.go @@ -44,7 +44,7 @@ func TestLiveLoaderSmall(t *testing.T) { db, err := modusdb.New(modusdb.NewDefaultConfig(t.TempDir())) require.NoError(t, err) - defer func() { db.Close() }() + defer db.Close() dataFolder := t.TempDir() schemaFile := filepath.Join(dataFolder, "data.schema") @@ -85,7 +85,7 @@ func TestLiveLoaderSmall(t *testing.T) { func TestLiveLoader1Million(t *testing.T) { db, err := modusdb.New(modusdb.NewDefaultConfig(t.TempDir())) require.NoError(t, err) - defer func() { db.Close() }() + defer db.Close() baseDir := t.TempDir() schResp, err := grab.Get(baseDir, oneMillionSchema) diff --git a/namespace.go b/namespace.go new file mode 100644 index 0000000..d9446df --- /dev/null +++ b/namespace.go @@ -0,0 +1,177 @@ +package modusdb + +import ( + "context" + "fmt" + "strconv" + + "github.com/dgraph-io/dgo/v240/protos/api" + "github.com/dgraph-io/dgraph/v24/dql" + "github.com/dgraph-io/dgraph/v24/edgraph" + "github.com/dgraph-io/dgraph/v24/protos/pb" + "github.com/dgraph-io/dgraph/v24/query" + "github.com/dgraph-io/dgraph/v24/schema" + "github.com/dgraph-io/dgraph/v24/worker" + "github.com/dgraph-io/dgraph/v24/x" +) + +// Namespace is one of the namespaces in modusDB. +type Namespace struct { + id uint64 + db *DB +} + +func (n *Namespace) ID() uint64 { + return n.id +} + +// DropData drops all the data in the modusDB instance. +func (n *Namespace) DropData(ctx context.Context) error { + n.db.mutex.Lock() + defer n.db.mutex.Unlock() + + if !n.db.isOpen { + return ErrClosedDB + } + + p := &pb.Proposal{Mutations: &pb.Mutations{ + GroupId: 1, + DropOp: pb.Mutations_DATA, + DropValue: strconv.FormatUint(n.ID(), 10), + }} + + if err := worker.ApplyMutations(ctx, p); err != nil { + return fmt.Errorf("error applying mutation: %w", err) + } + + // TODO: insert drop record + // TODO: should we reset back the timestamp as well? + return nil +} + +func (n *Namespace) AlterSchema(ctx context.Context, sch string) error { + n.db.mutex.Lock() + defer n.db.mutex.Unlock() + + if !n.db.isOpen { + return ErrClosedDB + } + + sc, err := schema.ParseWithNamespace(sch, n.ID()) + if err != nil { + return fmt.Errorf("error parsing schema: %w", err) + } + for _, pred := range sc.Preds { + worker.InitTablet(pred.Predicate) + } + + startTs, err := n.db.z.nextTs() + if err != nil { + return err + } + + p := &pb.Proposal{Mutations: &pb.Mutations{ + GroupId: 1, + StartTs: startTs, + Schema: sc.Preds, + Types: sc.Types, + }} + if err := worker.ApplyMutations(ctx, p); err != nil { + return fmt.Errorf("error applying mutation: %w", err) + } + return nil +} + +func (n *Namespace) Mutate(ctx context.Context, ms []*api.Mutation) (map[string]uint64, error) { + if len(ms) == 0 { + return nil, nil + } + + dms := make([]*dql.Mutation, 0, len(ms)) + for _, mu := range ms { + dm, err := edgraph.ParseMutationObject(mu, false) + if err != nil { + return nil, fmt.Errorf("error parsing mutation: %w", err) + } + dms = append(dms, dm) + } + newUids, err := query.ExtractBlankUIDs(ctx, dms) + if err != nil { + return nil, err + } + if len(newUids) > 0 { + num := &pb.Num{Val: uint64(len(newUids)), Type: pb.Num_UID} + res, err := n.db.z.nextUIDs(num) + if err != nil { + return nil, err + } + + curId := res.StartId + for k := range newUids { + x.AssertTruef(curId != 0 && curId <= res.EndId, "not enough uids generated") + newUids[k] = curId + curId++ + } + } + edges, err := query.ToDirectedEdges(dms, newUids) + if err != nil { + return nil, err + } + ctx = x.AttachNamespace(ctx, n.ID()) + + n.db.mutex.Lock() + defer n.db.mutex.Unlock() + + if !n.db.isOpen { + return nil, ErrClosedDB + } + + startTs, err := n.db.z.nextTs() + if err != nil { + return nil, err + } + commitTs, err := n.db.z.nextTs() + if err != nil { + return nil, err + } + + m := &pb.Mutations{ + GroupId: 1, + StartTs: startTs, + Edges: edges, + } + m.Edges, err = query.ExpandEdges(ctx, m) + if err != nil { + return nil, fmt.Errorf("error expanding edges: %w", err) + } + + for _, edge := range m.Edges { + worker.InitTablet(edge.Attr) + } + + p := &pb.Proposal{Mutations: m, StartTs: startTs} + if err := worker.ApplyMutations(ctx, p); err != nil { + return nil, err + } + + return newUids, worker.ApplyCommited(ctx, &pb.OracleDelta{ + Txns: []*pb.TxnStatus{{StartTs: startTs, CommitTs: commitTs}}, + }) +} + +// Query performs query or mutation or upsert on the given modusDB instance. +func (n *Namespace) Query(ctx context.Context, query string) (*api.Response, error) { + n.db.mutex.RLock() + defer n.db.mutex.RUnlock() + + if !n.db.isOpen { + return nil, ErrClosedDB + } + + ctx = x.AttachNamespace(ctx, n.ID()) + return (&edgraph.Server{}).QueryNoAuth(ctx, &api.Request{ + ReadOnly: true, + Query: query, + StartTs: n.db.z.readTs(), + }) +} diff --git a/namespace_test.go b/namespace_test.go new file mode 100644 index 0000000..655e216 --- /dev/null +++ b/namespace_test.go @@ -0,0 +1,288 @@ +package modusdb_test + +import ( + "context" + "testing" + + "github.com/dgraph-io/dgo/v240/protos/api" + "github.com/stretchr/testify/require" + + "github.com/hypermodeinc/modusdb" +) + +func TestNonGalaxyNamespace(t *testing.T) { + db, err := modusdb.New(modusdb.NewDefaultConfig(t.TempDir())) + require.NoError(t, err) + defer db.Close() + + db1, err := db.CreateNamespace() + require.NoError(t, err) + + require.NoError(t, db1.DropData(context.Background())) + require.NoError(t, db1.AlterSchema(context.Background(), "name: string @index(exact) .")) + + _, err = db1.Mutate(context.Background(), []*api.Mutation{ + { + Set: []*api.NQuad{ + { + Subject: "_:aman", + Predicate: "name", + ObjectValue: &api.Value{Val: &api.Value_StrVal{StrVal: "A"}}, + }, + }, + }, + }) + require.NoError(t, err) + + query := `{ + me(func: has(name)) { + name + } + }` + resp, err := db1.Query(context.Background(), query) + require.NoError(t, err) + require.JSONEq(t, `{"me":[{"name":"A"}]}`, string(resp.GetJson())) + +} + +func TestDropData(t *testing.T) { + db, err := modusdb.New(modusdb.NewDefaultConfig(t.TempDir())) + require.NoError(t, err) + defer db.Close() + + db1, err := db.CreateNamespace() + require.NoError(t, err) + + require.NoError(t, db1.DropData(context.Background())) + require.NoError(t, db1.AlterSchema(context.Background(), "name: string @index(exact) .")) + + _, err = db1.Mutate(context.Background(), []*api.Mutation{ + { + Set: []*api.NQuad{ + { + Subject: "_:aman", + Predicate: "name", + ObjectValue: &api.Value{Val: &api.Value_StrVal{StrVal: "A"}}, + }, + }, + }, + }) + require.NoError(t, err) + + query := `{ + me(func: has(name)) { + name + } + }` + resp, err := db1.Query(context.Background(), query) + require.NoError(t, err) + require.JSONEq(t, `{"me":[{"name":"A"}]}`, string(resp.GetJson())) + + require.NoError(t, db1.DropData(context.Background())) + + resp, err = db1.Query(context.Background(), query) + require.NoError(t, err) + require.JSONEq(t, `{"me":[]}`, string(resp.GetJson())) +} + +func TestMultipleNamespaces(t *testing.T) { + db, err := modusdb.New(modusdb.NewDefaultConfig(t.TempDir())) + require.NoError(t, err) + defer db.Close() + + db0, err := db.GetNamespace(0) + require.NoError(t, err) + db1, err := db.CreateNamespace() + require.NoError(t, err) + + require.NoError(t, db.DropAll(context.Background())) + require.NoError(t, db0.AlterSchema(context.Background(), "name: string @index(exact) .")) + require.NoError(t, db1.AlterSchema(context.Background(), "name: string @index(exact) .")) + + _, err = db0.Mutate(context.Background(), []*api.Mutation{ + { + Set: []*api.NQuad{ + { + Subject: "_:aman", + Predicate: "name", + ObjectValue: &api.Value{Val: &api.Value_StrVal{StrVal: "A"}}, + }, + }, + }, + }) + require.NoError(t, err) + + _, err = db1.Mutate(context.Background(), []*api.Mutation{ + { + Set: []*api.NQuad{ + { + Subject: "_:aman", + Predicate: "name", + ObjectValue: &api.Value{Val: &api.Value_StrVal{StrVal: "B"}}, + }, + }, + }, + }) + require.NoError(t, err) + + query := `{ + me(func: has(name)) { + name + } + }` + resp, err := db0.Query(context.Background(), query) + require.NoError(t, err) + require.JSONEq(t, `{"me":[{"name":"A"}]}`, string(resp.GetJson())) + + resp, err = db1.Query(context.Background(), query) + require.NoError(t, err) + require.JSONEq(t, `{"me":[{"name":"B"}]}`, string(resp.GetJson())) + + require.NoError(t, db1.DropData(context.Background())) + resp, err = db1.Query(context.Background(), query) + require.NoError(t, err) + require.JSONEq(t, `{"me":[]}`, string(resp.GetJson())) +} + +func TestQueryWrongNamespace(t *testing.T) { + db, err := modusdb.New(modusdb.NewDefaultConfig(t.TempDir())) + require.NoError(t, err) + defer db.Close() + + db0, err := db.GetNamespace(0) + require.NoError(t, err) + db1, err := db.CreateNamespace() + require.NoError(t, err) + + require.NoError(t, db.DropAll(context.Background())) + require.NoError(t, db0.AlterSchema(context.Background(), "name: string @index(exact) .")) + require.NoError(t, db1.AlterSchema(context.Background(), "name: string @index(exact) .")) + + _, err = db0.Mutate(context.Background(), []*api.Mutation{ + { + Set: []*api.NQuad{ + { + Namespace: 1, + Subject: "_:aman", + Predicate: "name", + ObjectValue: &api.Value{Val: &api.Value_StrVal{StrVal: "A"}}, + }, + }, + }, + }) + require.NoError(t, err) + + query := `{ + me(func: has(name)) { + name + } + }` + + resp, err := db1.Query(context.Background(), query) + require.NoError(t, err) + require.JSONEq(t, `{"me":[]}`, string(resp.GetJson())) +} + +func TestTwoNamespaces(t *testing.T) { + db, err := modusdb.New(modusdb.NewDefaultConfig(t.TempDir())) + require.NoError(t, err) + defer db.Close() + + db0, err := db.GetNamespace(0) + require.NoError(t, err) + db1, err := db.CreateNamespace() + require.NoError(t, err) + + require.NoError(t, db.DropAll(context.Background())) + require.NoError(t, db0.AlterSchema(context.Background(), "foo: string @index(exact) .")) + require.NoError(t, db1.AlterSchema(context.Background(), "bar: string @index(exact) .")) + + _, err = db0.Mutate(context.Background(), []*api.Mutation{ + { + Set: []*api.NQuad{ + { + Subject: "_:aman", + Predicate: "foo", + ObjectValue: &api.Value{Val: &api.Value_StrVal{StrVal: "A"}}, + }, + }, + }, + }) + require.NoError(t, err) + + _, err = db1.Mutate(context.Background(), []*api.Mutation{ + { + Set: []*api.NQuad{ + { + Subject: "_:aman", + Predicate: "bar", + ObjectValue: &api.Value{Val: &api.Value_StrVal{StrVal: "B"}}, + }, + }, + }, + }) + require.NoError(t, err) + + query := `{ + me(func: has(foo)) { + foo + } + }` + resp, err := db0.Query(context.Background(), query) + require.NoError(t, err) + require.JSONEq(t, `{"me":[{"foo":"A"}]}`, string(resp.GetJson())) + + query = `{ + me(func: has(bar)) { + bar + } + }` + resp, err = db1.Query(context.Background(), query) + require.NoError(t, err) + require.JSONEq(t, `{"me":[{"bar":"B"}]}`, string(resp.GetJson())) +} + +func TestNamespaceDBRestart(t *testing.T) { + dataDir := t.TempDir() + db, err := modusdb.New(modusdb.NewDefaultConfig(dataDir)) + require.NoError(t, err) + defer func() { db.Close() }() + + db1, err := db.CreateNamespace() + require.NoError(t, err) + ns1 := db1.ID() + + require.NoError(t, db1.AlterSchema(context.Background(), "bar: string @index(exact) .")) + _, err = db1.Mutate(context.Background(), []*api.Mutation{ + { + Set: []*api.NQuad{ + { + Subject: "_:aman", + Predicate: "bar", + ObjectValue: &api.Value{Val: &api.Value_StrVal{StrVal: "B"}}, + }, + }, + }, + }) + require.NoError(t, err) + + db.Close() + db, err = modusdb.New(modusdb.NewDefaultConfig(dataDir)) + require.NoError(t, err) + + db2, err := db.CreateNamespace() + require.NoError(t, err) + require.Greater(t, db2.ID(), ns1) + + db1, err = db.GetNamespace(ns1) + require.NoError(t, err) + + query := `{ + me(func: has(bar)) { + bar + } + }` + resp, err := db1.Query(context.Background(), query) + require.NoError(t, err) + require.JSONEq(t, `{"me":[{"bar":"B"}]}`, string(resp.GetJson())) +} diff --git a/vector_test.go b/vector_test.go index f384c9c..135d438 100644 --- a/vector_test.go +++ b/vector_test.go @@ -22,13 +22,13 @@ const ( func TestVectorDelete(t *testing.T) { db, err := modusdb.New(modusdb.NewDefaultConfig(t.TempDir())) require.NoError(t, err) - defer func() { db.Close() }() + defer db.Close() require.NoError(t, db.DropAll(context.Background())) require.NoError(t, db.AlterSchema(context.Background(), fmt.Sprintf(vectorSchemaWithIndex, "vtest", "4", "euclidean"))) - // insert random vectorss + // insert random vectors assignIDs, err := db.LeaseUIDs(numVectors + 1) require.NoError(t, err) //nolint:gosec @@ -108,7 +108,7 @@ func queryVectors(t *testing.T, db *modusdb.DB, query string) [][]float32 { } require.NoError(t, json.Unmarshal(resp.Json, &data)) - vectors := make([][]float32, 0, numVectors) + vectors := make([][]float32, 0) for _, vector := range data.Vector { vectors = append(vectors, vector.VTest) } diff --git a/zero.go b/zero.go index 17fe3ec..4aeb362 100644 --- a/zero.go +++ b/zero.go @@ -35,6 +35,8 @@ type zero struct { minLeasedTs uint64 maxLeasedTs uint64 + + lastNS uint64 } func newZero() (*zero, bool, error) { @@ -50,11 +52,13 @@ func newZero() (*zero, bool, error) { z.maxLeasedUID = initialUID z.minLeasedTs = initialTs z.maxLeasedTs = initialTs + z.lastNS = 0 } else { z.minLeasedUID = zs.MaxUID z.maxLeasedUID = zs.MaxUID z.minLeasedTs = zs.MaxTxnTs z.maxLeasedTs = zs.MaxTxnTs + z.lastNS = zs.MaxNsID } posting.Oracle().ProcessDelta(&pb.OracleDelta{MaxAssigned: z.minLeasedTs - 1}) worker.SetMaxUID(z.minLeasedUID - 1) @@ -111,6 +115,14 @@ func (z *zero) nextUIDs(num *pb.Num) (pb.AssignedIds, error) { return resp, nil } +func (z *zero) nextNS() (uint64, error) { + z.lastNS++ + if err := z.writeZeroState(); err != nil { + return 0, fmt.Errorf("error leasing namespace ID: %w", err) + } + return z.lastNS, nil +} + func readZeroState() (*pb.MembershipState, error) { txn := worker.State.Pstore.NewTransactionAt(zeroStateTs, false) defer txn.Discard() @@ -130,11 +142,12 @@ func readZeroState() (*pb.MembershipState, error) { if err != nil { return nil, fmt.Errorf("error unmarshalling zero state: %v", err) } + return &zeroState, nil } -func writeZeroState(maxUID, maxTs uint64) error { - zeroState := pb.MembershipState{MaxUID: maxUID, MaxTxnTs: maxTs} +func (z *zero) writeZeroState() error { + zeroState := pb.MembershipState{MaxUID: z.maxLeasedUID, MaxTxnTs: z.maxLeasedTs, MaxNsID: z.lastNS} data, err := zeroState.Marshal() if err != nil { return fmt.Errorf("error marshalling zero state: %w", err) @@ -164,7 +177,7 @@ func (z *zero) leaseTs() error { } z.maxLeasedTs += z.minLeasedTs + leaseTsAtATime - if err := writeZeroState(z.maxLeasedUID, z.maxLeasedTs); err != nil { + if err := z.writeZeroState(); err != nil { return fmt.Errorf("error leasing UIDs: %w", err) } @@ -177,7 +190,7 @@ func (z *zero) leaseUIDs() error { } z.maxLeasedUID += z.minLeasedUID + leaseUIDAtATime - if err := writeZeroState(z.maxLeasedUID, z.maxLeasedTs); err != nil { + if err := z.writeZeroState(); err != nil { return fmt.Errorf("error leasing timestamps: %w", err) }