Skip to content

Commit

Permalink
updates
Browse files Browse the repository at this point in the history
  • Loading branch information
jairad26 committed Nov 15, 2024
1 parent 4f6a34e commit 40731a3
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 198 deletions.
238 changes: 101 additions & 137 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,6 @@ package modusdb

import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"path"
Expand All @@ -28,28 +26,28 @@ var (
// This ensures that we only have one instance of modusdb in this process.
singeton atomic.Bool

ErrSingletonOnly = errors.New("only one modusdb instance is supported")
ErrEmptyDataDir = errors.New("data directory is required")
ErrDBClosed = errors.New("modusdb instance is closed")
ErrSingletonOnly = errors.New("only one modusdb instance is supported")
ErrEmptyDataDir = errors.New("data directory is required")
ErrDBClosed = errors.New("modusdb instance is closed")
ErrNsDoesNotExist = errors.New("namespace does not exist")
)

type Engine struct {
type DB struct {
mutex sync.RWMutex
isOpen bool
nsDBs map[uint64]*DB
z *zero
}

// DB is an instance of modusdb.
// Namespace is an instance of modusdb.
// For now, we only support one instance of modusdb per process.
type DB struct {
mutex sync.RWMutex
ns uint64
engine *Engine
type Namespace struct {
mutex sync.RWMutex
id uint64
db *DB
}

// New returns a new modusdb instance.
func New(conf Config) (*Engine, error) {
func New(conf Config) (*DB, error) {
// Ensure that we do not create another instance of modusdb in the same process
if !singeton.CompareAndSwap(false, true) {
return nil, ErrSingletonOnly
Expand All @@ -63,7 +61,6 @@ func New(conf Config) (*Engine, error) {
worker.Config.PostingDir = path.Join(conf.dataDir, "p")
worker.Config.WALDir = path.Join(conf.dataDir, "w")
x.WorkerConfig.TmpDir = path.Join(conf.dataDir, "t")
x.WorkerConfig.AclEnabled = true

// TODO: optimize these and more options
x.WorkerConfig.Badger = badger.DefaultOptions("").FromSuperFlag(worker.BadgerDefaults)
Expand All @@ -78,41 +75,85 @@ func New(conf Config) (*Engine, error) {
schema.Init(worker.State.Pstore)
posting.Init(worker.State.Pstore, 0) // TODO: set cache size

engine := &Engine{
nsDBs: make(map[uint64]*DB),
engine := &DB{
isOpen: true,
}
db := &DB{ns: 0, engine: engine}

if err := engine.reset(); err != nil {
return nil, fmt.Errorf("error resetting engine: %w", err)
}

engine.nsDBs[0] = db

x.UpdateHealthStatus(true)
return engine, nil
}

func (e *Engine) GetDB(ns uint64) (*DB, error) {
func (e *DB) createNamespaceNoLock(ns uint64) error {
startTs, err := e.z.nextTs()
if err != nil {
return err
}

if err := worker.ApplyInitialSchema(ns, startTs); err != nil {
return fmt.Errorf("error applying initial schema: %w", err)
}

for _, pred := range schema.State().Predicates() {
worker.InitTablet(pred)
}

return nil
}

func (e *DB) GetNamespace(ns uint64) (*Namespace, error) {
e.mutex.RLock()
defer e.mutex.RUnlock()

if !e.isOpen {
return nil, ErrDBClosed
}

if db, ok := e.nsDBs[ns]; ok {
db, err := e.getNamespaceNoLock(ns)
if err == nil {
return db, nil
}

db := &DB{ns: ns, engine: e}
e.nsDBs[ns] = db
if err != ErrNsDoesNotExist {
return nil, err
}

err = e.createNamespaceNoLock(ns)
if err != nil {
return nil, err
}

return e.getNamespaceNoLock(ns)
}

func (e *DB) GetNamespaces() []uint64 {
e.mutex.RLock()
defer e.mutex.RUnlock()

if !e.isOpen {
return nil
}

ns := make([]uint64, 0, len(schema.State().Namespaces()))
for k := range schema.State().Namespaces() {
ns = append(ns, k)
}
return ns
}

func (e *DB) getNamespaceNoLock(ns uint64) (*Namespace, error) {
if _, ok := schema.State().Namespaces()[ns]; !ok {
return nil, ErrNsDoesNotExist
}

db := &Namespace{id: ns, db: e}
return db, nil
}

// Close closes the modusdb instance.
func (e *Engine) Close() {
func (e *DB) Close() {
e.mutex.Lock()
defer e.mutex.Unlock()

Expand All @@ -128,17 +169,10 @@ func (e *Engine) Close() {
x.UpdateHealthStatus(false)
posting.Cleanup()
worker.State.Dispose()

for ns, db := range e.nsDBs {
db.mutex.Lock()
defer db.mutex.Unlock()
db = nil
delete(e.nsDBs, ns)
}
}

// DropAll drops all the data and schema in the modusdb instance.
func (e *Engine) DropAll(ctx context.Context) error {
func (e *DB) DropAll(ctx context.Context) error {
e.mutex.Lock()
defer e.mutex.Unlock()

Expand All @@ -162,18 +196,18 @@ func (e *Engine) DropAll(ctx context.Context) error {
}

// DropData drops all the data in the modusdb instance.
func (db *DB) DropData(ctx context.Context) error {
db.mutex.Lock()
defer db.mutex.Unlock()
func (n *Namespace) DropData(ctx context.Context) error {
n.mutex.Lock()
defer n.mutex.Unlock()

if !db.engine.isOpen {
if !n.db.isOpen {
return ErrDBClosed
}

p := &pb.Proposal{Mutations: &pb.Mutations{
GroupId: 1,
DropOp: pb.Mutations_DATA,
DropValue: strconv.FormatUint(db.ns, 10),
DropValue: strconv.FormatUint(n.id, 10),
}}

if err := worker.ApplyMutations(ctx, p); err != nil {
Expand All @@ -185,23 +219,23 @@ func (db *DB) DropData(ctx context.Context) error {
return nil
}

func (db *DB) AlterSchema(ctx context.Context, sch string) error {
db.mutex.Lock()
defer db.mutex.Unlock()
func (n *Namespace) AlterSchema(ctx context.Context, sch string) error {
n.mutex.Lock()
defer n.mutex.Unlock()

if !db.engine.isOpen {
if !n.db.isOpen {
return ErrDBClosed
}

sc, err := schema.ParseWithNamespace(sch, db.ns)
sc, err := schema.ParseWithNamespace(sch, n.id)
if err != nil {
return fmt.Errorf("error parsing schema: %w", err)
}
for _, pred := range sc.Preds {
worker.InitTablet(pred.Predicate)
}

startTs, err := db.engine.z.nextTs()
startTs, err := n.db.z.nextTs()
if err != nil {
return err
}
Expand All @@ -218,7 +252,7 @@ func (db *DB) AlterSchema(ctx context.Context, sch string) error {
return nil
}

func (db *DB) Mutate(ctx context.Context, ms []*api.Mutation) (map[string]uint64, error) {
func (n *Namespace) Mutate(ctx context.Context, ms []*api.Mutation) (map[string]uint64, error) {
if len(ms) == 0 {
return nil, nil
}
Expand All @@ -237,7 +271,7 @@ func (db *DB) Mutate(ctx context.Context, ms []*api.Mutation) (map[string]uint64
}
if len(newUids) > 0 {
num := &pb.Num{Val: uint64(len(newUids)), Type: pb.Num_UID}
res, err := db.engine.z.nextUIDs(num)
res, err := n.db.z.nextUIDs(num)
if err != nil {
return nil, err
}
Expand All @@ -253,20 +287,20 @@ func (db *DB) Mutate(ctx context.Context, ms []*api.Mutation) (map[string]uint64
if err != nil {
return nil, err
}
ctx = x.AttachNamespace(ctx, db.ns)
ctx = x.AttachNamespace(ctx, n.id)

db.mutex.Lock()
defer db.mutex.Unlock()
n.mutex.Lock()
defer n.mutex.Unlock()

if !db.engine.isOpen {
if !n.db.isOpen {
return nil, ErrDBClosed
}

startTs, err := db.engine.z.nextTs()
startTs, err := n.db.z.nextTs()
if err != nil {
return nil, err
}
commitTs, err := db.engine.z.nextTs()
commitTs, err := n.db.z.nextTs()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -296,68 +330,35 @@ func (db *DB) Mutate(ctx context.Context, ms []*api.Mutation) (map[string]uint64
}

// Query performs query or mutation or upsert on the given modusdb instance.
func (db *DB) Query(ctx context.Context, query string) (*api.Response, error) {
db.mutex.RLock()
defer db.mutex.RUnlock()
func (n *Namespace) Query(ctx context.Context, query string) (*api.Response, error) {
n.mutex.RLock()
defer n.mutex.RUnlock()

if !db.engine.isOpen {
if !n.db.isOpen {
return nil, ErrDBClosed
}

ctx = x.AttachNamespace(ctx, db.ns)

h := sha256.New()
h.Write([]byte(fmt.Sprintf("%#x%#x%#x", db.ns, db.engine.z.readTs(), []byte(worker.Config.AclSecretKeyBytes))))
ctx = x.AttachNamespace(ctx, n.id)

return (&edgraph.Server{}).QueryNoGrpc(ctx, &api.Request{
ReadOnly: true,
Query: query,
StartTs: db.engine.z.readTs(),
Hash: hex.EncodeToString(h.Sum(nil)),
return (&edgraph.Server{}).DoQuery(ctx, &edgraph.Request{
Req: &api.Request{
ReadOnly: true,
Query: query,
StartTs: n.db.z.readTs(),
// Hash: hex.EncodeToString(h.Sum(nil)),
},
DoAuth: edgraph.NoAuthorize,
})
}

func (e *Engine) NewNamespace(ctx context.Context, ns uint64) error {
e.mutex.Lock()
defer e.mutex.Unlock()

if !e.isOpen {
return ErrDBClosed
}

startTs, err := e.z.nextTs()
if err != nil {
return err
}

for _, su := range schema.InitialSchema(ns) {
if err := updateSchema(su, ns); err != nil {
return err
}
}

initialTypes := schema.InitialTypes(ns)
for _, t := range initialTypes {
if _, ok := schema.State().GetType(t.TypeName); !ok {
continue
}

if err := updateType(t.TypeName, *t, startTs); err != nil {
return err
}
}

return nil
}

func (e *Engine) reset() error {
func (e *DB) reset() error {
z, restart, err := newZero()
if err != nil {
return fmt.Errorf("error initializing zero: %w", err)
}

if !restart {
if err := worker.ApplyInitialSchema(); err != nil {
if err := worker.ApplyInitialSchema(0, 1); err != nil {
return fmt.Errorf("error applying initial schema: %w", err)
}
}
Expand All @@ -372,40 +373,3 @@ func (e *Engine) reset() error {
e.z = z
return nil
}

// updateSchema commits the schema to disk in blocking way, should be ok because this happens
// only during schema mutations or we see a new predicate.
func updateSchema(s *pb.SchemaUpdate, ts uint64) error {
schema.State().Set(s.Predicate, s)
schema.State().DeleteMutSchema(s.Predicate)
txn := worker.State.Pstore.NewTransactionAt(ts, true)
defer txn.Discard()
data, err := s.Marshal()
x.Check(err)
e := &badger.Entry{
Key: x.SchemaKey(s.Predicate),
Value: data,
UserMeta: posting.BitSchemaPosting,
}
if err = txn.SetEntry(e.WithDiscard()); err != nil {
return err
}
return txn.CommitAt(ts, nil)
}

func updateType(typeName string, t pb.TypeUpdate, ts uint64) error {
schema.State().SetType(typeName, &t)
txn := worker.State.Pstore.NewTransactionAt(ts, true)
defer txn.Discard()
data, err := t.Marshal()
x.Check(err)
e := &badger.Entry{
Key: x.TypeKey(typeName),
Value: data,
UserMeta: posting.BitSchemaPosting,
}
if err := txn.SetEntry(e.WithDiscard()); err != nil {
return err
}
return txn.CommitAt(ts, nil)
}
Loading

0 comments on commit 40731a3

Please sign in to comment.