Skip to content

Commit

Permalink
add namespacing to modusdb
Browse files Browse the repository at this point in the history
  • Loading branch information
jairad26 committed Nov 12, 2024
1 parent 90ef967 commit 7a831b0
Show file tree
Hide file tree
Showing 5 changed files with 144 additions and 17 deletions.
15 changes: 12 additions & 3 deletions db.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package modusdb

import (
"context"
"crypto/sha256"
"encoding/hex"
"errors"
"fmt"
"path"
Expand Down Expand Up @@ -54,6 +56,7 @@ func New(conf Config) (*DB, error) {
worker.Config.PostingDir = path.Join(conf.dataDir, "p")
worker.Config.WALDir = path.Join(conf.dataDir, "w")
x.WorkerConfig.TmpDir = path.Join(conf.dataDir, "t")
x.WorkerConfig.AclEnabled = true

// TODO: optimize these and more options
x.WorkerConfig.Badger = badger.DefaultOptions("").FromSuperFlag(worker.BadgerDefaults)
Expand Down Expand Up @@ -175,7 +178,7 @@ func (db *DB) AlterSchema(ctx context.Context, sch string, ns uint64) error {
return nil
}

func (db *DB) Mutate(ctx context.Context, ms []*api.Mutation) (map[string]uint64, error) {
func (db *DB) Mutate(ctx context.Context, ms []*api.Mutation, ns uint64) (map[string]uint64, error) {
if len(ms) == 0 {
return nil, nil
}
Expand Down Expand Up @@ -210,7 +213,7 @@ func (db *DB) Mutate(ctx context.Context, ms []*api.Mutation) (map[string]uint64
if err != nil {
return nil, err
}
ctx = x.AttachNamespace(ctx, 0)
ctx = x.AttachNamespace(ctx, ns)

db.mutex.Lock()
defer db.mutex.Unlock()
Expand Down Expand Up @@ -253,18 +256,24 @@ func (db *DB) Mutate(ctx context.Context, ms []*api.Mutation) (map[string]uint64
}

// Query performs query or mutation or upsert on the given modusdb instance.
func (db *DB) Query(ctx context.Context, query string) (*api.Response, error) {
func (db *DB) Query(ctx context.Context, query string, ns uint64) (*api.Response, error) {
db.mutex.RLock()
defer db.mutex.RUnlock()

if !db.isOpen {
return nil, ErrDBClosed
}

ctx = x.AttachNamespace(ctx, ns)

h := sha256.New()
h.Write([]byte(fmt.Sprintf("%#x%#x%#x", ns, db.z.readTs(), []byte(worker.Config.AclSecretKeyBytes))))

return (&edgraph.Server{}).QueryNoGrpc(ctx, &api.Request{
ReadOnly: true,
Query: query,
StartTs: db.z.readTs(),
Hash: hex.EncodeToString(h.Sum(nil)),
})
}

Expand Down
130 changes: 124 additions & 6 deletions db_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,22 @@ func TestRestart(t *testing.T) {
},
},
},
})
}, 0)
require.NoError(t, err)

query := `{
me(func: has(name)) {
name
}
}`
qresp, err := db.Query(context.Background(), query)
qresp, err := db.Query(context.Background(), query, 0)
require.NoError(t, err)
require.JSONEq(t, `{"me":[{"name":"A"}]}`, string(qresp.GetJson()))

db.Close()
db, err = modusdb.New(modusdb.NewDefaultConfig(dataDir))
require.NoError(t, err)
qresp, err = db.Query(context.Background(), query)
qresp, err = db.Query(context.Background(), query, 0)
require.NoError(t, err)
require.JSONEq(t, `{"me":[{"name":"A"}]}`, string(qresp.GetJson()))

Expand All @@ -70,7 +70,7 @@ func TestSchemaQuery(t *testing.T) {
dob: datetime .
`, 0))

resp, err := db.Query(context.Background(), `schema(pred: [name, age]) {type}`)
resp, err := db.Query(context.Background(), `schema(pred: [name, age]) {type}`, 0)
require.NoError(t, err)

require.JSONEq(t,
Expand Down Expand Up @@ -102,7 +102,7 @@ func TestBasicVector(t *testing.T) {
Val: &api.Value_Vfloat32Val{Vfloat32Val: vectBytes},
},
}},
}})
}}, 0)
require.NoError(t, err)

uid := uids["_:vector"]
Expand All @@ -114,9 +114,127 @@ func TestBasicVector(t *testing.T) {
q (func: uid(%v)) {
project_description_v
}
}`, uid))
}`, uid), 0)
require.NoError(t, err)
require.Equal(t,
`{"q":[{"project_description_v":[5.1E+00,5.1E+00,1.1E+00]}]}`,
string(resp.GetJson()))
}

func TestNonGalaxyNamespace(t *testing.T) {
db, err := modusdb.New(modusdb.NewDefaultConfig(t.TempDir()))
require.NoError(t, err)
defer db.Close()

require.NoError(t, db.DropAll(context.Background()))
require.NoError(t, db.AlterSchema(context.Background(), "name: string @index(exact) .", 1))

_, err = db.Mutate(context.Background(), []*api.Mutation{
{
Set: []*api.NQuad{
{
Namespace: 1,
Subject: "_:aman",
Predicate: "name",
ObjectValue: &api.Value{Val: &api.Value_StrVal{StrVal: "A"}},
},
},
},
}, 1)
require.NoError(t, err)

query := `{
me(func: has(name)) {
name
}
}`
qresp, err := db.Query(context.Background(), query, 1)
require.NoError(t, err)
require.JSONEq(t, `{"me":[{"name":"A"}]}`, string(qresp.GetJson()))

}

func TestMultipleNamespaces(t *testing.T) {
db, err := modusdb.New(modusdb.NewDefaultConfig(t.TempDir()))
require.NoError(t, err)
defer db.Close()

require.NoError(t, db.DropAll(context.Background()))
require.NoError(t, db.AlterSchema(context.Background(), "name: string @index(exact) .", 0))
require.NoError(t, db.AlterSchema(context.Background(), "name: string @index(exact) .", 1))

_, err = db.Mutate(context.Background(), []*api.Mutation{
{
Set: []*api.NQuad{
{
Namespace: 0,
Subject: "_:aman",
Predicate: "name",
ObjectValue: &api.Value{Val: &api.Value_StrVal{StrVal: "A"}},
},
},
},
}, 0)
require.NoError(t, err)

_, err = db.Mutate(context.Background(), []*api.Mutation{
{
Set: []*api.NQuad{
{
Namespace: 1,
Subject: "_:aman",
Predicate: "name",
ObjectValue: &api.Value{Val: &api.Value_StrVal{StrVal: "B"}},
},
},
},
}, 1)
require.NoError(t, err)

query := `{
me(func: has(name)) {
name
}
}`
qresp, err := db.Query(context.Background(), query, 0)
require.NoError(t, err)
require.JSONEq(t, `{"me":[{"name":"A"}]}`, string(qresp.GetJson()))

qresp, err = db.Query(context.Background(), query, 1)
require.NoError(t, err)
require.JSONEq(t, `{"me":[{"name":"B"}]}`, string(qresp.GetJson()))
}

func TestQueryFromWrongNamespace(t *testing.T) {
db, err := modusdb.New(modusdb.NewDefaultConfig(t.TempDir()))
require.NoError(t, err)
defer db.Close()

require.NoError(t, db.DropAll(context.Background()))
require.NoError(t, db.AlterSchema(context.Background(), "name: string @index(exact) .", 0))
require.NoError(t, db.AlterSchema(context.Background(), "name: string @index(exact) .", 1))

_, err = db.Mutate(context.Background(), []*api.Mutation{
{
Set: []*api.NQuad{
{
Namespace: 0,
Subject: "_:aman",
Predicate: "name",
ObjectValue: &api.Value{Val: &api.Value_StrVal{StrVal: "A"}},
},
},
},
}, 0)
require.NoError(t, err)

query := `{
me(func: has(name)) {
name
}
}`

qresp, err := db.Query(context.Background(), query, 1)
require.NoError(t, err)
require.JSONEq(t, `{"me":[]}`, string(qresp.GetJson()))
}
2 changes: 1 addition & 1 deletion live.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func (db *DB) LoadData(inCtx context.Context, dataDir string) error {
if !ok {
return nil
}
uids, err := db.Mutate(rootCtx, []*api.Mutation{nqs})
uids, err := db.Mutate(rootCtx, []*api.Mutation{nqs}, 0)
if err != nil {
return fmt.Errorf("error applying mutations: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions live_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestLiveLoaderSmall(t *testing.T) {
]
}`

resp, err := db.Query(context.Background(), query)
resp, err := db.Query(context.Background(), query, 0)
require.NoError(t, err)
require.JSONEq(t, expected, string(resp.Json))
}
Expand All @@ -98,7 +98,7 @@ func TestLiveLoader1Million(t *testing.T) {

for _, tt := range common.OneMillionTCs {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
resp, err := db.Query(ctx, tt.Query)
resp, err := db.Query(ctx, tt.Query, 0)
cancel()

if ctx.Err() == context.DeadlineExceeded {
Expand Down
10 changes: 5 additions & 5 deletions vector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func TestVectorDelete(t *testing.T) {
require.NoError(t, err)
//nolint:gosec
rdf, vectors := dgraphapi.GenerateRandomVectors(int(assignIDs.StartId)-10, int(assignIDs.EndId)-10, 10, "vtest")
_, err = db.Mutate(context.Background(), []*api.Mutation{{SetNquads: []byte(rdf)}})
_, err = db.Mutate(context.Background(), []*api.Mutation{{SetNquads: []byte(rdf)}}, 0)
require.NoError(t, err)

// check the count of the vectors inserted
Expand All @@ -42,7 +42,7 @@ func TestVectorDelete(t *testing.T) {
count(uid)
}
}`
resp, err := db.Query(context.Background(), q1)
resp, err := db.Query(context.Background(), q1, 0)
require.NoError(t, err)
require.JSONEq(t, fmt.Sprintf(`{"vector":[{"count":%d}]}`, numVectors), string(resp.Json))

Expand All @@ -61,7 +61,7 @@ func TestVectorDelete(t *testing.T) {
deleteTriple := func(idx int) string {
_, err := db.Mutate(context.Background(), []*api.Mutation{{
DelNquads: []byte(triples[idx]),
}})
}}, 0)
require.NoError(t, err)

uid := strings.Split(triples[idx], " ")[0]
Expand All @@ -71,7 +71,7 @@ func TestVectorDelete(t *testing.T) {
}
}`, uid[1:len(uid)-1])

res, err := db.Query(context.Background(), q2)
res, err := db.Query(context.Background(), q2, 0)
require.NoError(t, err)
require.JSONEq(t, `{"vector":[]}`, string(res.Json))
return triples[idx]
Expand All @@ -97,7 +97,7 @@ func TestVectorDelete(t *testing.T) {
}

func queryVectors(t *testing.T, db *modusdb.DB, query string) [][]float32 {
resp, err := db.Query(context.Background(), query)
resp, err := db.Query(context.Background(), query, 0)
require.NoError(t, err)

var data struct {
Expand Down

0 comments on commit 7a831b0

Please sign in to comment.