diff --git a/db.go b/db.go index e71e701..e25a47d 100644 --- a/db.go +++ b/db.go @@ -2,8 +2,6 @@ package modusdb import ( "context" - "crypto/sha256" - "encoding/hex" "errors" "fmt" "path" @@ -28,28 +26,28 @@ var ( // This ensures that we only have one instance of modusdb in this process. singeton atomic.Bool - ErrSingletonOnly = errors.New("only one modusdb instance is supported") - ErrEmptyDataDir = errors.New("data directory is required") - ErrDBClosed = errors.New("modusdb instance is closed") + ErrSingletonOnly = errors.New("only one modusdb instance is supported") + ErrEmptyDataDir = errors.New("data directory is required") + ErrDBClosed = errors.New("modusdb instance is closed") + ErrNsDoesNotExist = errors.New("namespace does not exist") ) -type Engine struct { +type DB struct { mutex sync.RWMutex isOpen bool - nsDBs map[uint64]*DB z *zero } -// DB is an instance of modusdb. +// Namespace is an instance of modusdb. // For now, we only support one instance of modusdb per process. -type DB struct { - mutex sync.RWMutex - ns uint64 - engine *Engine +type Namespace struct { + mutex sync.RWMutex + id uint64 + db *DB } // New returns a new modusdb instance. -func New(conf Config) (*Engine, error) { +func New(conf Config) (*DB, error) { // Ensure that we do not create another instance of modusdb in the same process if !singeton.CompareAndSwap(false, true) { return nil, ErrSingletonOnly @@ -63,7 +61,6 @@ func New(conf Config) (*Engine, error) { worker.Config.PostingDir = path.Join(conf.dataDir, "p") worker.Config.WALDir = path.Join(conf.dataDir, "w") x.WorkerConfig.TmpDir = path.Join(conf.dataDir, "t") - x.WorkerConfig.AclEnabled = true // TODO: optimize these and more options x.WorkerConfig.Badger = badger.DefaultOptions("").FromSuperFlag(worker.BadgerDefaults) @@ -78,23 +75,35 @@ func New(conf Config) (*Engine, error) { schema.Init(worker.State.Pstore) posting.Init(worker.State.Pstore, 0) // TODO: set cache size - engine := &Engine{ - nsDBs: make(map[uint64]*DB), + engine := &DB{ isOpen: true, } - db := &DB{ns: 0, engine: engine} if err := engine.reset(); err != nil { return nil, fmt.Errorf("error resetting engine: %w", err) } - - engine.nsDBs[0] = db - x.UpdateHealthStatus(true) return engine, nil } -func (e *Engine) GetDB(ns uint64) (*DB, error) { +func (e *DB) createNamespaceNoLock(ns uint64) error { + startTs, err := e.z.nextTs() + if err != nil { + return err + } + + if err := worker.ApplyInitialSchema(ns, startTs); err != nil { + return fmt.Errorf("error applying initial schema: %w", err) + } + + for _, pred := range schema.State().Predicates() { + worker.InitTablet(pred) + } + + return nil +} + +func (e *DB) GetNamespace(ns uint64) (*Namespace, error) { e.mutex.RLock() defer e.mutex.RUnlock() @@ -102,17 +111,49 @@ func (e *Engine) GetDB(ns uint64) (*DB, error) { return nil, ErrDBClosed } - if db, ok := e.nsDBs[ns]; ok { + db, err := e.getNamespaceNoLock(ns) + if err == nil { return db, nil } - db := &DB{ns: ns, engine: e} - e.nsDBs[ns] = db + if err != ErrNsDoesNotExist { + return nil, err + } + + err = e.createNamespaceNoLock(ns) + if err != nil { + return nil, err + } + + return e.getNamespaceNoLock(ns) +} + +func (e *DB) GetNamespaces() []uint64 { + e.mutex.RLock() + defer e.mutex.RUnlock() + + if !e.isOpen { + return nil + } + + ns := make([]uint64, 0, len(schema.State().Namespaces())) + for k := range schema.State().Namespaces() { + ns = append(ns, k) + } + return ns +} + +func (e *DB) getNamespaceNoLock(ns uint64) (*Namespace, error) { + if _, ok := schema.State().Namespaces()[ns]; !ok { + return nil, ErrNsDoesNotExist + } + + db := &Namespace{id: ns, db: e} return db, nil } // Close closes the modusdb instance. -func (e *Engine) Close() { +func (e *DB) Close() { e.mutex.Lock() defer e.mutex.Unlock() @@ -128,17 +169,10 @@ func (e *Engine) Close() { x.UpdateHealthStatus(false) posting.Cleanup() worker.State.Dispose() - - for ns, db := range e.nsDBs { - db.mutex.Lock() - defer db.mutex.Unlock() - db = nil - delete(e.nsDBs, ns) - } } // DropAll drops all the data and schema in the modusdb instance. -func (e *Engine) DropAll(ctx context.Context) error { +func (e *DB) DropAll(ctx context.Context) error { e.mutex.Lock() defer e.mutex.Unlock() @@ -162,18 +196,18 @@ func (e *Engine) DropAll(ctx context.Context) error { } // DropData drops all the data in the modusdb instance. -func (db *DB) DropData(ctx context.Context) error { - db.mutex.Lock() - defer db.mutex.Unlock() +func (n *Namespace) DropData(ctx context.Context) error { + n.mutex.Lock() + defer n.mutex.Unlock() - if !db.engine.isOpen { + if !n.db.isOpen { return ErrDBClosed } p := &pb.Proposal{Mutations: &pb.Mutations{ GroupId: 1, DropOp: pb.Mutations_DATA, - DropValue: strconv.FormatUint(db.ns, 10), + DropValue: strconv.FormatUint(n.id, 10), }} if err := worker.ApplyMutations(ctx, p); err != nil { @@ -185,15 +219,15 @@ func (db *DB) DropData(ctx context.Context) error { return nil } -func (db *DB) AlterSchema(ctx context.Context, sch string) error { - db.mutex.Lock() - defer db.mutex.Unlock() +func (n *Namespace) AlterSchema(ctx context.Context, sch string) error { + n.mutex.Lock() + defer n.mutex.Unlock() - if !db.engine.isOpen { + if !n.db.isOpen { return ErrDBClosed } - sc, err := schema.ParseWithNamespace(sch, db.ns) + sc, err := schema.ParseWithNamespace(sch, n.id) if err != nil { return fmt.Errorf("error parsing schema: %w", err) } @@ -201,7 +235,7 @@ func (db *DB) AlterSchema(ctx context.Context, sch string) error { worker.InitTablet(pred.Predicate) } - startTs, err := db.engine.z.nextTs() + startTs, err := n.db.z.nextTs() if err != nil { return err } @@ -218,7 +252,7 @@ func (db *DB) AlterSchema(ctx context.Context, sch string) error { return nil } -func (db *DB) Mutate(ctx context.Context, ms []*api.Mutation) (map[string]uint64, error) { +func (n *Namespace) Mutate(ctx context.Context, ms []*api.Mutation) (map[string]uint64, error) { if len(ms) == 0 { return nil, nil } @@ -237,7 +271,7 @@ func (db *DB) Mutate(ctx context.Context, ms []*api.Mutation) (map[string]uint64 } if len(newUids) > 0 { num := &pb.Num{Val: uint64(len(newUids)), Type: pb.Num_UID} - res, err := db.engine.z.nextUIDs(num) + res, err := n.db.z.nextUIDs(num) if err != nil { return nil, err } @@ -253,20 +287,20 @@ func (db *DB) Mutate(ctx context.Context, ms []*api.Mutation) (map[string]uint64 if err != nil { return nil, err } - ctx = x.AttachNamespace(ctx, db.ns) + ctx = x.AttachNamespace(ctx, n.id) - db.mutex.Lock() - defer db.mutex.Unlock() + n.mutex.Lock() + defer n.mutex.Unlock() - if !db.engine.isOpen { + if !n.db.isOpen { return nil, ErrDBClosed } - startTs, err := db.engine.z.nextTs() + startTs, err := n.db.z.nextTs() if err != nil { return nil, err } - commitTs, err := db.engine.z.nextTs() + commitTs, err := n.db.z.nextTs() if err != nil { return nil, err } @@ -296,68 +330,35 @@ func (db *DB) Mutate(ctx context.Context, ms []*api.Mutation) (map[string]uint64 } // Query performs query or mutation or upsert on the given modusdb instance. -func (db *DB) Query(ctx context.Context, query string) (*api.Response, error) { - db.mutex.RLock() - defer db.mutex.RUnlock() +func (n *Namespace) Query(ctx context.Context, query string) (*api.Response, error) { + n.mutex.RLock() + defer n.mutex.RUnlock() - if !db.engine.isOpen { + if !n.db.isOpen { return nil, ErrDBClosed } - ctx = x.AttachNamespace(ctx, db.ns) - - h := sha256.New() - h.Write([]byte(fmt.Sprintf("%#x%#x%#x", db.ns, db.engine.z.readTs(), []byte(worker.Config.AclSecretKeyBytes)))) + ctx = x.AttachNamespace(ctx, n.id) - return (&edgraph.Server{}).QueryNoGrpc(ctx, &api.Request{ - ReadOnly: true, - Query: query, - StartTs: db.engine.z.readTs(), - Hash: hex.EncodeToString(h.Sum(nil)), + return (&edgraph.Server{}).DoQuery(ctx, &edgraph.Request{ + Req: &api.Request{ + ReadOnly: true, + Query: query, + StartTs: n.db.z.readTs(), + // Hash: hex.EncodeToString(h.Sum(nil)), + }, + DoAuth: edgraph.NoAuthorize, }) } -func (e *Engine) NewNamespace(ctx context.Context, ns uint64) error { - e.mutex.Lock() - defer e.mutex.Unlock() - - if !e.isOpen { - return ErrDBClosed - } - - startTs, err := e.z.nextTs() - if err != nil { - return err - } - - for _, su := range schema.InitialSchema(ns) { - if err := updateSchema(su, ns); err != nil { - return err - } - } - - initialTypes := schema.InitialTypes(ns) - for _, t := range initialTypes { - if _, ok := schema.State().GetType(t.TypeName); !ok { - continue - } - - if err := updateType(t.TypeName, *t, startTs); err != nil { - return err - } - } - - return nil -} - -func (e *Engine) reset() error { +func (e *DB) reset() error { z, restart, err := newZero() if err != nil { return fmt.Errorf("error initializing zero: %w", err) } if !restart { - if err := worker.ApplyInitialSchema(); err != nil { + if err := worker.ApplyInitialSchema(0, 1); err != nil { return fmt.Errorf("error applying initial schema: %w", err) } } @@ -372,40 +373,3 @@ func (e *Engine) reset() error { e.z = z return nil } - -// updateSchema commits the schema to disk in blocking way, should be ok because this happens -// only during schema mutations or we see a new predicate. -func updateSchema(s *pb.SchemaUpdate, ts uint64) error { - schema.State().Set(s.Predicate, s) - schema.State().DeleteMutSchema(s.Predicate) - txn := worker.State.Pstore.NewTransactionAt(ts, true) - defer txn.Discard() - data, err := s.Marshal() - x.Check(err) - e := &badger.Entry{ - Key: x.SchemaKey(s.Predicate), - Value: data, - UserMeta: posting.BitSchemaPosting, - } - if err = txn.SetEntry(e.WithDiscard()); err != nil { - return err - } - return txn.CommitAt(ts, nil) -} - -func updateType(typeName string, t pb.TypeUpdate, ts uint64) error { - schema.State().SetType(typeName, &t) - txn := worker.State.Pstore.NewTransactionAt(ts, true) - defer txn.Discard() - data, err := t.Marshal() - x.Check(err) - e := &badger.Entry{ - Key: x.TypeKey(typeName), - Value: data, - UserMeta: posting.BitSchemaPosting, - } - if err := txn.SetEntry(e.WithDiscard()); err != nil { - return err - } - return txn.CommitAt(ts, nil) -} diff --git a/db_test.go b/db_test.go index 765a6d8..a8baecb 100644 --- a/db_test.go +++ b/db_test.go @@ -20,11 +20,12 @@ func TestRestart(t *testing.T) { require.NoError(t, err) defer func() { e.Close() }() - db, err := e.GetDB(0) + require.NoError(t, e.DropAll(context.Background())) + + db, err := e.GetNamespace(0) require.NoError(t, err) require.NoError(t, err) - require.NoError(t, e.DropAll(context.Background())) require.NoError(t, db.AlterSchema(context.Background(), "name: string @index(term) .")) _, err = db.Mutate(context.Background(), []*api.Mutation{ @@ -54,7 +55,7 @@ func TestRestart(t *testing.T) { e, err = modusdb.New(modusdb.NewDefaultConfig(dataDir)) require.NoError(t, err) - db, err = e.GetDB(0) + db, err = e.GetNamespace(0) require.NoError(t, err) qresp, err = db.Query(context.Background(), query) require.NoError(t, err) @@ -68,10 +69,11 @@ func TestSchemaQuery(t *testing.T) { require.NoError(t, err) defer e.Close() - db, err := e.GetDB(0) + require.NoError(t, e.DropAll(context.Background())) + + db, err := e.GetNamespace(0) require.NoError(t, err) - require.NoError(t, e.DropAll(context.Background())) require.NoError(t, db.AlterSchema(context.Background(), ` name: string @index(exact) . age: int . @@ -100,7 +102,7 @@ func TestBasicVector(t *testing.T) { require.NoError(t, err) defer e.Close() - db, err := e.GetDB(0) + db, err := e.GetNamespace(0) require.NoError(t, err) require.NoError(t, e.DropAll(context.Background())) @@ -139,7 +141,7 @@ func TestNonGalaxyNamespace(t *testing.T) { require.NoError(t, err) defer e.Close() - db, err := e.GetDB(1) + db, err := e.GetNamespace(1) require.NoError(t, err) require.NoError(t, e.DropAll(context.Background())) @@ -175,7 +177,7 @@ func TestDropData(t *testing.T) { require.NoError(t, err) defer e.Close() - db, err := e.GetDB(0) + db, err := e.GetNamespace(0) require.NoError(t, err) require.NoError(t, e.DropAll(context.Background())) @@ -216,9 +218,9 @@ func TestMultipleNamespaces(t *testing.T) { require.NoError(t, err) defer e.Close() - db0, err := e.GetDB(0) + db0, err := e.GetNamespace(0) require.NoError(t, err) - db1, err := e.GetDB(1) + db1, err := e.GetNamespace(1) require.NoError(t, err) require.NoError(t, e.DropAll(context.Background())) @@ -272,9 +274,9 @@ func TestQueryFromWrongNamespace(t *testing.T) { require.NoError(t, err) defer e.Close() - db0, err := e.GetDB(0) + db0, err := e.GetNamespace(0) require.NoError(t, err) - db1, err := e.GetDB(1) + db1, err := e.GetNamespace(1) require.NoError(t, err) require.NoError(t, e.DropAll(context.Background())) @@ -312,9 +314,9 @@ func TestNamespaces2(t *testing.T) { require.NoError(t, err) defer e.Close() - db0, err := e.GetDB(0) + db0, err := e.GetNamespace(0) require.NoError(t, err) - db1, err := e.GetDB(1) + db1, err := e.GetNamespace(1) require.NoError(t, err) require.NoError(t, e.DropAll(context.Background())) @@ -374,9 +376,9 @@ func TestDroppingNamespaceData(t *testing.T) { require.NoError(t, err) defer e.Close() - db0, err := e.GetDB(0) + db0, err := e.GetNamespace(0) require.NoError(t, err) - db1, err := e.GetDB(1) + db1, err := e.GetNamespace(1) require.NoError(t, err) require.NoError(t, e.DropAll(context.Background())) @@ -435,20 +437,24 @@ func TestDroppingNamespaceData(t *testing.T) { require.JSONEq(t, `{"me":[]}`, string(qresp.GetJson())) } -func TestCreateNamespace(t *testing.T) { +func TestGetNamespaces(t *testing.T) { e, err := modusdb.New(modusdb.NewDefaultConfig(t.TempDir())) require.NoError(t, err) defer e.Close() - err = e.NewNamespace(context.Background(), 1) + db0, err := e.GetNamespace(0) require.NoError(t, err) - - db0, err := e.GetDB(0) + db1, err := e.GetNamespace(1) require.NoError(t, err) - db1, err := e.GetDB(1) + + _, err = e.GetNamespace(2) require.NoError(t, err) - require.NoError(t, e.DropAll(context.Background())) + namespaces := e.GetNamespaces() + require.Contains(t, namespaces, uint64(0)) + require.Contains(t, namespaces, uint64(1)) + require.Contains(t, namespaces, uint64(2)) + require.NoError(t, db0.AlterSchema(context.Background(), "name: string @index(exact) .")) require.NoError(t, db1.AlterSchema(context.Background(), "name: string @index(exact) .")) @@ -468,7 +474,6 @@ func TestCreateNamespace(t *testing.T) { _, err = db1.Mutate(context.Background(), []*api.Mutation{ { - Set: []*api.NQuad{ { Namespace: 1, @@ -479,29 +484,10 @@ func TestCreateNamespace(t *testing.T) { }, }, }) - - require.NoError(t, err) - - query := `{ - me(func: has(name)) { - name - } - }` - qresp, err := db0.Query(context.Background(), query) require.NoError(t, err) - require.JSONEq(t, `{"me":[{"name":"A"}]}`, string(qresp.GetJson())) - qresp, err = db1.Query(context.Background(), query) - require.NoError(t, err) - require.JSONEq(t, `{"me":[{"name":"B"}]}`, string(qresp.GetJson())) - - require.NoError(t, db1.DropData(context.Background())) - - qresp, err = db0.Query(context.Background(), query) - require.NoError(t, err) - require.JSONEq(t, `{"me":[{"name":"A"}]}`, string(qresp.GetJson())) - - qresp, err = db1.Query(context.Background(), query) - require.NoError(t, err) - require.JSONEq(t, `{"me":[]}`, string(qresp.GetJson())) + namespaces = e.GetNamespaces() + require.Contains(t, namespaces, uint64(0)) + require.Contains(t, namespaces, uint64(1)) + require.Contains(t, namespaces, uint64(2)) } diff --git a/go.mod b/go.mod index 12ddb70..25fec75 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,8 @@ require ( golang.org/x/sync v0.8.0 ) +replace github.com/dgraph-io/dgraph/v24 => ../../dgraph-io/dgraph + require ( contrib.go.opencensus.io/exporter/jaeger v0.2.1 // indirect contrib.go.opencensus.io/exporter/prometheus v0.4.2 // indirect diff --git a/go.sum b/go.sum index 68b50ff..70b50de 100644 --- a/go.sum +++ b/go.sum @@ -138,8 +138,6 @@ github.com/dgraph-io/badger/v4 v4.3.1 h1:7r5wKqmoRpGgSxqa0S/nGdpOpvvzuREGPLSua73 github.com/dgraph-io/badger/v4 v4.3.1/go.mod h1:oObz97DImXpd6O/Dt8BqdKLLTDmEmarAimo72VV5whQ= github.com/dgraph-io/dgo/v240 v240.0.0 h1:LgpaQoQuM8YD3/oHjjqzCfORPlw34OnIcA/jMiKuDJc= github.com/dgraph-io/dgo/v240 v240.0.0/go.mod h1:YrKW6k5cJpG6qP+MtNlXBogNMTupDmnnmiF6heC0Uao= -github.com/dgraph-io/dgraph/v24 v24.0.3-0.20241111015302-0789e950c74c h1:0JcHJu5uAc9He1TgtP0PvuSGI3MkeV4KdnCLxk+J3D0= -github.com/dgraph-io/dgraph/v24 v24.0.3-0.20241111015302-0789e950c74c/go.mod h1:ghlhHA3UcSLhZbSd6RqbK5PZdDhuwTdkVE/aF6r8cw0= github.com/dgraph-io/gqlgen v0.13.2 h1:TNhndk+eHKj5qE7BenKKSYdSIdOGhLqxR1rCiMso9KM= github.com/dgraph-io/gqlgen v0.13.2/go.mod h1:iCOrOv9lngN7KAo+jMgvUPVDlYHdf7qDwsTkQby2Sis= github.com/dgraph-io/gqlparser/v2 v2.1.1/go.mod h1:MYS4jppjyx8b9tuUtjV7jU1UFZK6P9fvO8TsIsQtRKU= diff --git a/live.go b/live.go index ac4b562..3e89e05 100644 --- a/live.go +++ b/live.go @@ -25,29 +25,29 @@ const ( ) type liveLoader struct { - db *DB + n *Namespace blankNodes map[string]string mutex sync.RWMutex } -func (db *DB) Load(ctx context.Context, schemaPath, dataPath string) error { +func (n *Namespace) Load(ctx context.Context, schemaPath, dataPath string) error { schemaData, err := os.ReadFile(schemaPath) if err != nil { return fmt.Errorf("error reading schema file [%v]: %w", schemaPath, err) } - if err := db.AlterSchema(ctx, string(schemaData)); err != nil { + if err := n.AlterSchema(ctx, string(schemaData)); err != nil { return fmt.Errorf("error altering schema: %w", err) } - if err := db.LoadData(ctx, dataPath); err != nil { + if err := n.LoadData(ctx, dataPath); err != nil { return fmt.Errorf("error loading data: %w", err) } return nil } // TODO: Add support for CSV file -func (db *DB) LoadData(inCtx context.Context, dataDir string) error { +func (n *Namespace) LoadData(inCtx context.Context, dataDir string) error { fs := filestore.NewFileStore(dataDir) files := fs.FindDataFiles(dataDir, []string{".rdf", ".rdf.gz", ".json", ".json.gz"}) if len(files) == 0 { @@ -85,7 +85,7 @@ func (db *DB) LoadData(inCtx context.Context, dataDir string) error { if !ok { return nil } - uids, err := db.Mutate(rootCtx, []*api.Mutation{nqs}) + uids, err := n.Mutate(rootCtx, []*api.Mutation{nqs}) if err != nil { return fmt.Errorf("error applying mutations: %w", err) } @@ -95,7 +95,7 @@ func (db *DB) LoadData(inCtx context.Context, dataDir string) error { } }) - ll := &liveLoader{db: db, blankNodes: make(map[string]string)} + ll := &liveLoader{n: n, blankNodes: make(map[string]string)} for _, datafile := range files { procG.Go(func() error { return ll.processFile(procCtx, fs, datafile, nqch) @@ -237,7 +237,7 @@ func (l *liveLoader) uid(ns uint64, val string) (string, error) { return uid, nil } - asUID, err := l.db.engine.LeaseUIDs(1) + asUID, err := l.n.db.LeaseUIDs(1) if err != nil { return "", fmt.Errorf("error allocating UID: %w", err) } diff --git a/live_test.go b/live_test.go index aa116d1..d52f278 100644 --- a/live_test.go +++ b/live_test.go @@ -46,7 +46,7 @@ func TestLiveLoaderSmall(t *testing.T) { require.NoError(t, err) defer func() { e.Close() }() - db, err := e.GetDB(0) + db, err := e.GetNamespace(0) require.NoError(t, err) dataFolder := t.TempDir() @@ -90,7 +90,7 @@ func TestLiveLoader1Million(t *testing.T) { require.NoError(t, err) defer func() { e.Close() }() - db, err := e.GetDB(0) + db, err := e.GetNamespace(0) require.NoError(t, err) baseDir := t.TempDir() diff --git a/vector_test.go b/vector_test.go index 156d49f..15fa16f 100644 --- a/vector_test.go +++ b/vector_test.go @@ -24,7 +24,7 @@ func TestVectorDelete(t *testing.T) { require.NoError(t, err) defer func() { e.Close() }() - db, err := e.GetDB(0) + db, err := e.GetNamespace(0) require.NoError(t, err) require.NoError(t, e.DropAll(context.Background())) @@ -99,7 +99,7 @@ func TestVectorDelete(t *testing.T) { _ = queryVectors(t, db, fmt.Sprintf(q3, strings.Split(triple, `"`)[1])) } -func queryVectors(t *testing.T, db *modusdb.DB, query string) [][]float32 { +func queryVectors(t *testing.T, db *modusdb.Namespace, query string) [][]float32 { resp, err := db.Query(context.Background(), query) require.NoError(t, err) diff --git a/zero.go b/zero.go index 153c31d..3650c76 100644 --- a/zero.go +++ b/zero.go @@ -24,7 +24,7 @@ const ( zeroStateKey = "0-dgraph.modusdb.zero" ) -func (e *Engine) LeaseUIDs(numUIDs uint64) (pb.AssignedIds, error) { +func (e *DB) LeaseUIDs(numUIDs uint64) (pb.AssignedIds, error) { num := &pb.Num{Val: numUIDs, Type: pb.Num_UID} return e.z.nextUIDs(num) }