Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add modusdb model tracing for local dev #697

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
## UNRELEASED

- fix: resolve warning in `deserializeRawMap` [#692](https://github.com/hypermodeinc/modus/pull/692)
- feat: add modusdb model tracing for local dev
[#697](https://github.com/hypermodeinc/modus/pull/697)
- fix: add json serialization support for neo4j sdk types
[#699](https://github.com/hypermodeinc/modus/pull/699)
- chore: update api-explorer to react 19 [#700](https://github.com/hypermodeinc/modus/pull/700)
Expand Down
2 changes: 2 additions & 0 deletions runtime/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,5 @@ runtime

node_modules
dist

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't be needed.

data
2 changes: 1 addition & 1 deletion runtime/config/commandline.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func parseCommandLineFlags() {
var showVersion bool
const versionUsage = "Show the Runtime version number and exit."
flag.BoolVar(&showVersion, "version", false, versionUsage)
flag.BoolVar(&showVersion, "v", false, versionUsage+" (shorthand)")
// flag.BoolVar(&showVersion, "v", false, versionUsage+" (shorthand)")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this commented?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Discussed. I will address separately to remove global flagset usage.


flag.Parse()

Expand Down
275 changes: 0 additions & 275 deletions runtime/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,33 +16,17 @@ import (
"sync"
"time"

"github.com/hypermodeinc/modus/lib/manifest"
"github.com/hypermodeinc/modus/runtime/config"
"github.com/hypermodeinc/modus/runtime/logger"
"github.com/hypermodeinc/modus/runtime/metrics"
"github.com/hypermodeinc/modus/runtime/plugins"
"github.com/hypermodeinc/modus/runtime/secrets"
"github.com/hypermodeinc/modus/runtime/utils"

"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
)

var globalRuntimePostgresWriter *runtimePostgresWriter = &runtimePostgresWriter{
dbpool: nil,
buffer: make(chan inferenceHistory, chanSize),
quit: make(chan struct{}),
done: make(chan struct{}),
}

var errDbNotConfigured = errors.New("database not configured")

const batchSize = 100
const chanSize = 10000

const pluginsTable = "plugins"
const inferencesTable = "inferences"
const collectionTextsTable = "collection_texts"
const collectionVectorsTable = "collection_vectors"

Expand All @@ -56,125 +40,6 @@ type runtimePostgresWriter struct {
once sync.Once
}

type inferenceHistory struct {
model *manifest.ModelInfo
input any
output any
start time.Time
end time.Time
pluginId *string
function *string
}

func (w *runtimePostgresWriter) GetPool(ctx context.Context) (*pgxpool.Pool, error) {
var initErr error
w.once.Do(func() {
var connStr string
var err error
if secrets.HasSecret("MODUS_DB") {
connStr, err = secrets.GetSecretValue("MODUS_DB")
} else if secrets.HasSecret("HYPERMODE_METADATA_DB") {
// fallback to old secret name
// TODO: remove this after the transition is complete
connStr, err = secrets.GetSecretValue("HYPERMODE_METADATA_DB")
} else {
return
}

if err != nil {
initErr = err
return
}

if pool, err := pgxpool.New(ctx, connStr); err != nil {
initErr = err
} else {
w.dbpool = pool
}
})

if w.dbpool != nil {
return w.dbpool, nil
} else if initErr != nil {
return nil, initErr
} else {
return nil, errDbNotConfigured
}
}

func (w *runtimePostgresWriter) Write(data inferenceHistory) {
select {
case w.buffer <- data:
default:
metrics.DroppedInferencesNum.Inc()
}
}

func (w *runtimePostgresWriter) worker(ctx context.Context) {
var batchIndex int
var batch [batchSize]inferenceHistory
timer := time.NewTimer(inferenceRefresherInterval)
defer timer.Stop()

for {
select {
case data := <-w.buffer:
batch[batchIndex] = data
batchIndex++
if batchIndex == batchSize {
WriteInferenceHistoryToDB(ctx, batch[:batchSize])
batchIndex = 0

// we need to drain the timer channel to prevent the timer from firing
if !timer.Stop() {
<-timer.C
}
timer.Reset(inferenceRefresherInterval)
}
case <-timer.C:
WriteInferenceHistoryToDB(ctx, batch[:batchIndex])
batchIndex = 0
timer.Reset(inferenceRefresherInterval)
case <-w.quit:
WriteInferenceHistoryToDB(ctx, batch[:batchIndex])
close(w.done)
return
}
}
}

func (h *inferenceHistory) getJson() (input []byte, output []byte, err error) {
input, err = getInferenceDataJson(h.input)
if err != nil {
return nil, nil, err
}
output, err = getInferenceDataJson(h.output)
if err != nil {
return nil, nil, err
}
return input, output, nil
}

func getInferenceDataJson(val any) ([]byte, error) {

// If the value is a byte slice or string, it must already have been serialized as JSON.
// It might be formatted, but we don't care because we store in a JSONB column in Postgres,
// which doesn't preserve formatting.
switch t := val.(type) {
case []byte:
return t, nil
case string:
return []byte(t), nil
}

// For all other types, we serialize to JSON ourselves.
bytes, err := utils.JsonSerialize(val)
if err != nil {
return nil, err
}
return bytes, nil
}

func Stop(ctx context.Context) {
pool, _ := globalRuntimePostgresWriter.GetPool(ctx)
if pool == nil {
Expand All @@ -186,62 +51,6 @@ func Stop(ctx context.Context) {
pool.Close()
}

func WritePluginInfo(ctx context.Context, plugin *plugins.Plugin) {

err := WithTx(ctx, func(tx pgx.Tx) error {

// Check if the plugin is already in the database
// If so, update the ID to match
id, err := getPluginId(ctx, tx, plugin.Metadata.BuildId)
if err != nil {
return err
}
if id != "" {
plugin.Id = id
return nil
}

// Insert the plugin info - still check for conflicts, in case another instance of the service is running
query := fmt.Sprintf(`INSERT INTO %s
(id, name, version, language, sdk_version, build_id, build_time, git_repo, git_commit)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
ON CONFLICT (build_id) DO NOTHING`,
pluginsTable)

ct, err := tx.Exec(ctx, query,
plugin.Id,
plugin.Metadata.Name(),
utils.NilIfEmpty(plugin.Metadata.Version()),
plugin.Language.Name(),
plugin.Metadata.SdkVersion(),
plugin.Metadata.BuildId,
plugin.Metadata.BuildTime,
utils.NilIfEmpty(plugin.Metadata.GitRepo),
utils.NilIfEmpty(plugin.Metadata.GitCommit),
)
if err != nil {
return err
}

if ct.RowsAffected() == 0 {
// Edge case - the plugin is now in the database, but we didn't insert it
// It must have been inserted by another instance of the service
// Get the ID set by the other instance
id, err := getPluginId(ctx, tx, plugin.Metadata.BuildId)
if err != nil {
return err
}
plugin.Id = id
}

return nil
})

if err != nil {
logDbWarningOrError(ctx, err, "Plugin info not written to database.")
}
}

func logDbWarningOrError(ctx context.Context, err error, msg string) {
if _, ok := err.(*pgconn.ConnectError); ok {
logger.Warn(ctx).Err(err).Msgf("Database connection error. %s", msg)
Expand All @@ -254,43 +63,6 @@ func logDbWarningOrError(ctx context.Context, err error, msg string) {
}
}

func getPluginId(ctx context.Context, tx pgx.Tx, buildId string) (string, error) {
query := fmt.Sprintf("SELECT id FROM %s WHERE build_id = $1", pluginsTable)
rows, err := tx.Query(ctx, query, buildId)
if err != nil {
return "", err
}
defer rows.Close()
if rows.Next() {
var id string
err := rows.Scan(&id)
return id, err
}
return "", nil
}

func WriteInferenceHistory(ctx context.Context, model *manifest.ModelInfo, input, output any, start, end time.Time) {
var pluginId *string
if plugin, ok := plugins.GetPluginFromContext(ctx); ok {
pluginId = &plugin.Id
}

var function *string
if functionName, ok := ctx.Value(utils.FunctionNameContextKey).(string); ok {
function = &functionName
}

globalRuntimePostgresWriter.Write(inferenceHistory{
model: model,
input: input,
output: output,
start: start,
end: end,
pluginId: pluginId,
function: function,
})
}

func GetUniqueNamespaces(ctx context.Context, collectionName string) ([]string, error) {
var namespaces []string
err := WithTx(ctx, func(tx pgx.Tx) error {
Expand Down Expand Up @@ -618,53 +390,6 @@ func QueryCollectionVectorsFromCheckpoint(ctx context.Context, collectionName, s
return textIds, vectorIds, keys, vectors, nil
}

func WriteInferenceHistoryToDB(ctx context.Context, batch []inferenceHistory) {
if len(batch) == 0 {
return
}

err := WithTx(ctx, func(tx pgx.Tx) error {
b := &pgx.Batch{}
for _, data := range batch {
input, output, err := data.getJson()
if err != nil {
return err
}
query := fmt.Sprintf(`INSERT INTO %s
(id, model_hash, input, output, started_at, duration_ms, plugin_id, function)
VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
`, inferencesTable)
args := []any{
utils.GenerateUUIDv7(),
data.model.Hash(),
input,
output,
data.start,
data.end.Sub(data.start).Milliseconds(),
data.pluginId,
data.function,
}
b.Queue(query, args...)
}

br := tx.SendBatch(ctx, b)
defer br.Close()

for range batch {
_, err := br.Exec()
if err != nil {
return err
}
}

return nil
})

if err != nil {
logDbWarningOrError(ctx, err, "Inference history not written to database.")
}
}

func Initialize(ctx context.Context) {
// this will initialize the pool and start the worker
_, err := globalRuntimePostgresWriter.GetPool(ctx)
Expand Down
Loading
Loading