From 7a831b0150509b0f0562c70379f8a3b411882df2 Mon Sep 17 00:00:00 2001 From: Jai Radhakrishnan <55522316+jairad26@users.noreply.github.com> Date: Tue, 12 Nov 2024 07:10:49 -0800 Subject: [PATCH] add namespacing to modusdb --- db.go | 15 ++++-- db_test.go | 130 ++++++++++++++++++++++++++++++++++++++++++++++--- live.go | 2 +- live_test.go | 4 +- vector_test.go | 10 ++-- 5 files changed, 144 insertions(+), 17 deletions(-) diff --git a/db.go b/db.go index d219bbe..d53c32c 100644 --- a/db.go +++ b/db.go @@ -2,6 +2,8 @@ package modusdb import ( "context" + "crypto/sha256" + "encoding/hex" "errors" "fmt" "path" @@ -54,6 +56,7 @@ func New(conf Config) (*DB, error) { worker.Config.PostingDir = path.Join(conf.dataDir, "p") worker.Config.WALDir = path.Join(conf.dataDir, "w") x.WorkerConfig.TmpDir = path.Join(conf.dataDir, "t") + x.WorkerConfig.AclEnabled = true // TODO: optimize these and more options x.WorkerConfig.Badger = badger.DefaultOptions("").FromSuperFlag(worker.BadgerDefaults) @@ -175,7 +178,7 @@ func (db *DB) AlterSchema(ctx context.Context, sch string, ns uint64) error { return nil } -func (db *DB) Mutate(ctx context.Context, ms []*api.Mutation) (map[string]uint64, error) { +func (db *DB) Mutate(ctx context.Context, ms []*api.Mutation, ns uint64) (map[string]uint64, error) { if len(ms) == 0 { return nil, nil } @@ -210,7 +213,7 @@ func (db *DB) Mutate(ctx context.Context, ms []*api.Mutation) (map[string]uint64 if err != nil { return nil, err } - ctx = x.AttachNamespace(ctx, 0) + ctx = x.AttachNamespace(ctx, ns) db.mutex.Lock() defer db.mutex.Unlock() @@ -253,7 +256,7 @@ func (db *DB) Mutate(ctx context.Context, ms []*api.Mutation) (map[string]uint64 } // Query performs query or mutation or upsert on the given modusdb instance. -func (db *DB) Query(ctx context.Context, query string) (*api.Response, error) { +func (db *DB) Query(ctx context.Context, query string, ns uint64) (*api.Response, error) { db.mutex.RLock() defer db.mutex.RUnlock() @@ -261,10 +264,16 @@ func (db *DB) Query(ctx context.Context, query string) (*api.Response, error) { return nil, ErrDBClosed } + ctx = x.AttachNamespace(ctx, ns) + + h := sha256.New() + h.Write([]byte(fmt.Sprintf("%#x%#x%#x", ns, db.z.readTs(), []byte(worker.Config.AclSecretKeyBytes)))) + return (&edgraph.Server{}).QueryNoGrpc(ctx, &api.Request{ ReadOnly: true, Query: query, StartTs: db.z.readTs(), + Hash: hex.EncodeToString(h.Sum(nil)), }) } diff --git a/db_test.go b/db_test.go index a86bb4d..5c9794c 100644 --- a/db_test.go +++ b/db_test.go @@ -34,7 +34,7 @@ func TestRestart(t *testing.T) { }, }, }, - }) + }, 0) require.NoError(t, err) query := `{ @@ -42,14 +42,14 @@ func TestRestart(t *testing.T) { name } }` - qresp, err := db.Query(context.Background(), query) + qresp, err := db.Query(context.Background(), query, 0) require.NoError(t, err) require.JSONEq(t, `{"me":[{"name":"A"}]}`, string(qresp.GetJson())) db.Close() db, err = modusdb.New(modusdb.NewDefaultConfig(dataDir)) require.NoError(t, err) - qresp, err = db.Query(context.Background(), query) + qresp, err = db.Query(context.Background(), query, 0) require.NoError(t, err) require.JSONEq(t, `{"me":[{"name":"A"}]}`, string(qresp.GetJson())) @@ -70,7 +70,7 @@ func TestSchemaQuery(t *testing.T) { dob: datetime . `, 0)) - resp, err := db.Query(context.Background(), `schema(pred: [name, age]) {type}`) + resp, err := db.Query(context.Background(), `schema(pred: [name, age]) {type}`, 0) require.NoError(t, err) require.JSONEq(t, @@ -102,7 +102,7 @@ func TestBasicVector(t *testing.T) { Val: &api.Value_Vfloat32Val{Vfloat32Val: vectBytes}, }, }}, - }}) + }}, 0) require.NoError(t, err) uid := uids["_:vector"] @@ -114,9 +114,127 @@ func TestBasicVector(t *testing.T) { q (func: uid(%v)) { project_description_v } - }`, uid)) + }`, uid), 0) require.NoError(t, err) require.Equal(t, `{"q":[{"project_description_v":[5.1E+00,5.1E+00,1.1E+00]}]}`, string(resp.GetJson())) } + +func TestNonGalaxyNamespace(t *testing.T) { + db, err := modusdb.New(modusdb.NewDefaultConfig(t.TempDir())) + require.NoError(t, err) + defer db.Close() + + require.NoError(t, db.DropAll(context.Background())) + require.NoError(t, db.AlterSchema(context.Background(), "name: string @index(exact) .", 1)) + + _, err = db.Mutate(context.Background(), []*api.Mutation{ + { + Set: []*api.NQuad{ + { + Namespace: 1, + Subject: "_:aman", + Predicate: "name", + ObjectValue: &api.Value{Val: &api.Value_StrVal{StrVal: "A"}}, + }, + }, + }, + }, 1) + require.NoError(t, err) + + query := `{ + me(func: has(name)) { + name + } + }` + qresp, err := db.Query(context.Background(), query, 1) + require.NoError(t, err) + require.JSONEq(t, `{"me":[{"name":"A"}]}`, string(qresp.GetJson())) + +} + +func TestMultipleNamespaces(t *testing.T) { + db, err := modusdb.New(modusdb.NewDefaultConfig(t.TempDir())) + require.NoError(t, err) + defer db.Close() + + require.NoError(t, db.DropAll(context.Background())) + require.NoError(t, db.AlterSchema(context.Background(), "name: string @index(exact) .", 0)) + require.NoError(t, db.AlterSchema(context.Background(), "name: string @index(exact) .", 1)) + + _, err = db.Mutate(context.Background(), []*api.Mutation{ + { + Set: []*api.NQuad{ + { + Namespace: 0, + Subject: "_:aman", + Predicate: "name", + ObjectValue: &api.Value{Val: &api.Value_StrVal{StrVal: "A"}}, + }, + }, + }, + }, 0) + require.NoError(t, err) + + _, err = db.Mutate(context.Background(), []*api.Mutation{ + { + Set: []*api.NQuad{ + { + Namespace: 1, + Subject: "_:aman", + Predicate: "name", + ObjectValue: &api.Value{Val: &api.Value_StrVal{StrVal: "B"}}, + }, + }, + }, + }, 1) + require.NoError(t, err) + + query := `{ + me(func: has(name)) { + name + } + }` + qresp, err := db.Query(context.Background(), query, 0) + require.NoError(t, err) + require.JSONEq(t, `{"me":[{"name":"A"}]}`, string(qresp.GetJson())) + + qresp, err = db.Query(context.Background(), query, 1) + require.NoError(t, err) + require.JSONEq(t, `{"me":[{"name":"B"}]}`, string(qresp.GetJson())) +} + +func TestQueryFromWrongNamespace(t *testing.T) { + db, err := modusdb.New(modusdb.NewDefaultConfig(t.TempDir())) + require.NoError(t, err) + defer db.Close() + + require.NoError(t, db.DropAll(context.Background())) + require.NoError(t, db.AlterSchema(context.Background(), "name: string @index(exact) .", 0)) + require.NoError(t, db.AlterSchema(context.Background(), "name: string @index(exact) .", 1)) + + _, err = db.Mutate(context.Background(), []*api.Mutation{ + { + Set: []*api.NQuad{ + { + Namespace: 0, + Subject: "_:aman", + Predicate: "name", + ObjectValue: &api.Value{Val: &api.Value_StrVal{StrVal: "A"}}, + }, + }, + }, + }, 0) + require.NoError(t, err) + + query := `{ + me(func: has(name)) { + name + } + }` + + qresp, err := db.Query(context.Background(), query, 1) + require.NoError(t, err) + require.JSONEq(t, `{"me":[]}`, string(qresp.GetJson())) +} diff --git a/live.go b/live.go index a369f08..ebd88e1 100644 --- a/live.go +++ b/live.go @@ -85,7 +85,7 @@ func (db *DB) LoadData(inCtx context.Context, dataDir string) error { if !ok { return nil } - uids, err := db.Mutate(rootCtx, []*api.Mutation{nqs}) + uids, err := db.Mutate(rootCtx, []*api.Mutation{nqs}, 0) if err != nil { return fmt.Errorf("error applying mutations: %w", err) } diff --git a/live_test.go b/live_test.go index 825f4b4..82d6a10 100644 --- a/live_test.go +++ b/live_test.go @@ -77,7 +77,7 @@ func TestLiveLoaderSmall(t *testing.T) { ] }` - resp, err := db.Query(context.Background(), query) + resp, err := db.Query(context.Background(), query, 0) require.NoError(t, err) require.JSONEq(t, expected, string(resp.Json)) } @@ -98,7 +98,7 @@ func TestLiveLoader1Million(t *testing.T) { for _, tt := range common.OneMillionTCs { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) - resp, err := db.Query(ctx, tt.Query) + resp, err := db.Query(ctx, tt.Query, 0) cancel() if ctx.Err() == context.DeadlineExceeded { diff --git a/vector_test.go b/vector_test.go index f27e1b6..e69c545 100644 --- a/vector_test.go +++ b/vector_test.go @@ -33,7 +33,7 @@ func TestVectorDelete(t *testing.T) { require.NoError(t, err) //nolint:gosec rdf, vectors := dgraphapi.GenerateRandomVectors(int(assignIDs.StartId)-10, int(assignIDs.EndId)-10, 10, "vtest") - _, err = db.Mutate(context.Background(), []*api.Mutation{{SetNquads: []byte(rdf)}}) + _, err = db.Mutate(context.Background(), []*api.Mutation{{SetNquads: []byte(rdf)}}, 0) require.NoError(t, err) // check the count of the vectors inserted @@ -42,7 +42,7 @@ func TestVectorDelete(t *testing.T) { count(uid) } }` - resp, err := db.Query(context.Background(), q1) + resp, err := db.Query(context.Background(), q1, 0) require.NoError(t, err) require.JSONEq(t, fmt.Sprintf(`{"vector":[{"count":%d}]}`, numVectors), string(resp.Json)) @@ -61,7 +61,7 @@ func TestVectorDelete(t *testing.T) { deleteTriple := func(idx int) string { _, err := db.Mutate(context.Background(), []*api.Mutation{{ DelNquads: []byte(triples[idx]), - }}) + }}, 0) require.NoError(t, err) uid := strings.Split(triples[idx], " ")[0] @@ -71,7 +71,7 @@ func TestVectorDelete(t *testing.T) { } }`, uid[1:len(uid)-1]) - res, err := db.Query(context.Background(), q2) + res, err := db.Query(context.Background(), q2, 0) require.NoError(t, err) require.JSONEq(t, `{"vector":[]}`, string(res.Json)) return triples[idx] @@ -97,7 +97,7 @@ func TestVectorDelete(t *testing.T) { } func queryVectors(t *testing.T, db *modusdb.DB, query string) [][]float32 { - resp, err := db.Query(context.Background(), query) + resp, err := db.Query(context.Background(), query, 0) require.NoError(t, err) var data struct {