Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

UI: Showing Ingestion status #893

Open
wants to merge 9 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions fern/mdx/deploy/local.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,9 @@ NEXT_PUBLIC_STRIPE_PUBLISHABLE_KEY=
# Optional for recording human feedback with Langfuse
NEXT_PUBLIC_LANGFUSE_PUBLIC_KEY=
NEXT_PUBLIC_LANGFUSE_BASE_URL=

# Optional for using SuperRag in SAML (https://github.com/superagent-ai/super-rag)
NEXT_PUBLIC_SUPERRAG_API_URL=
```

### Run app
Expand Down
3 changes: 0 additions & 3 deletions libs/.docker/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,3 @@ NEXT_PUBLIC_POSTHOG_KEY=

# Optional for sending events to Loops
LOOPS_API_KEY=

# Optional for SuperRag (https://github.com/superagent-ai/super-rag)
SUPERRAG_API_URL= # e.g. http://localhost:1234/api/v1
1 change: 1 addition & 0 deletions libs/.docker/ui/docker-compose.ui.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ services:
NEXT_PUBLIC_SUPABASE_URL: ${NEXT_PUBLIC_SUPABASE_URL}
NEXT_PUBLIC_SUPABASE_ANON_KEY: ${NEXT_PUBLIC_SUPABASE_ANON_KEY}
NEXT_PUBLIC_SUPERAGENT_API_URL: ${NEXT_PUBLIC_SUPERAGENT_API_URL}
NEXT_PUBLIC_SUPERRAG_API_URL: ${NEXT_PUBLIC_SUPERRAG_API_URL}
NEXT_PUBLIC_SUPABASE_STORAGE_NAME: ${NEXT_PUBLIC_SUPABASE_STORAGE_NAME}
NEXT_PUBLIC_SEGMENT_WRITE_KEY: ${NEXT_PUBLIC_SEGMENT_WRITE_KEY}
NEXT_PUBLIC_APIDECK_API_KEY: ${NEXT_PUBLIC_APIDECK_API_KEY}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ async def add_datasource(self, assistant: dict, data: dict):
data["index_name"] = await self._get_unique_index_name(data, assistant)

await self._add_superrag_tool(assistant, data)
await self.superrag_service.aingest(data=data)
ingest_data = self.superrag_service.ingest(data=data)

return {"superrag_task": ingest_data.get("task")}

async def delete_datasource(self, assistant: dict, datasource: dict):
tool = await self.agent_manager.get_tool(
Expand All @@ -156,7 +158,7 @@ async def delete_datasource(self, assistant: dict, datasource: dict):
tool_metadata = json.loads(tool.metadata)

await self._delete_tool(assistant, datasource)
await self.superrag_service.adelete(
self.superrag_service.delete(
{
**datasource,
"index_name": tool_metadata.get("index_name"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ async def process_assistant(
new_assistant_obj: dict,
workflow_step_order: int | None = None,
):
new_agent = None

old_assistant_type: str = get_first_non_null_key(old_assistant_obj)
new_assistant_type: str = get_first_non_null_key(new_assistant_obj)

Expand Down Expand Up @@ -83,6 +81,9 @@ async def process_assistant(
self.api_user, self.api_manager
).get_superrag_processor(new_assistant)

result = {
"superrag_tasks": [],
}
if old_assistant_type and new_assistant_type:
if old_assistant_type != new_assistant_type:
# order matters here as we need process
Expand All @@ -101,7 +102,10 @@ async def process_assistant(
# all tools and data should be re-created
await new_tool_processor.process([], new_tools)
await new_data_processor.process({}, new_data)
await new_superrag_processor.process([], new_superrags)
superrag_result = await new_superrag_processor.process(
[], new_superrags
)
result["superrag_tasks"].extend(superrag_result)

else:
changes = compare_dicts(old_assistant, new_assistant)
Expand All @@ -113,7 +117,10 @@ async def process_assistant(

await new_tool_processor.process(old_tools, new_tools)
await new_data_processor.process(old_data, new_data)
await new_superrag_processor.process(old_superrags, new_superrags)
superrag_result = await new_superrag_processor.process(
old_superrags, new_superrags
)
result["superrag_tasks"].extend(superrag_result)

elif old_assistant_type and not new_assistant_type:
await old_tool_processor.process(old_tools, [])
Expand All @@ -124,16 +131,17 @@ async def process_assistant(
assistant=old_assistant,
)
elif new_assistant_type and not old_assistant_type:
new_agent = await self.api_manager.agent_manager.add_assistant(
await self.api_manager.agent_manager.add_assistant(
new_assistant,
workflow_step_order,
)

await new_tool_processor.process([], new_tools)
await new_data_processor.process({}, new_data)
await new_superrag_processor.process([], new_superrags)
superrag_result = await new_superrag_processor.process([], new_superrags)
result["superrag_tasks"].extend(superrag_result)

return new_agent
return result

async def process_assistants(self, old_config, new_config):
validator = SAMLValidator(new_config, self.api_user)
Expand All @@ -142,12 +150,21 @@ async def process_assistants(self, old_config, new_config):
old_assistants = old_config.get("workflows", [])
new_assistants = new_config.get("workflows", [])
workflow_step_order = 0

results = {
"superrag_tasks": [],
}
for old_assistant_obj, new_assistant_obj in zip_longest(
old_assistants, new_assistants, fillvalue={}
):
await self.process_assistant(
res = await self.process_assistant(
old_assistant_obj,
new_assistant_obj,
workflow_step_order,
)
workflow_step_order += 1

if res:
results["superrag_tasks"].extend(res.get("superrag_tasks", []))

return results
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ async def process(self, old_data, new_data):
datasource_manager = ApiDatasourceSuperRagManager(
self.api_user, self.api_manager.agent_manager
)
superrag_tasks = []
for old_obj, new_obj in zip_longest(old_data, new_data, fillvalue={}):
old_node_name = get_first_non_null_key(old_obj)
new_node_name = get_first_non_null_key(new_obj)
Expand Down Expand Up @@ -57,21 +58,25 @@ async def process(self, old_data, new_data):
self.assistant,
old_datasource,
)
await datasource_manager.add_datasource(
add_datasource_res = await datasource_manager.add_datasource(
self.assistant,
new_datasource,
)
superrag_tasks.append(add_datasource_res.get("superrag_task"))

elif old_datasource_name and not new_datasource_name:
await datasource_manager.delete_datasource(
self.assistant,
old_datasource,
)
elif new_datasource_name and not old_datasource_name:
await datasource_manager.add_datasource(
add_datasource_res = await datasource_manager.add_datasource(
self.assistant,
new_datasource,
)
superrag_tasks.append(add_datasource_res.get("superrag_task"))

return superrag_tasks


class SuperagentDataProcessor(BaseProcessor):
Expand Down
6 changes: 3 additions & 3 deletions libs/superagent/app/api/workflow_configs/validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def validate_assistant_names(self):

if assistant_name in assistant_names:
raise RepeatedNameError(
f"Assistant name '{assistant_name}' is repeated in the SAML,"
f"Assistant name '{assistant_name}' is repeated in the SAML, "
f"please use unique names for each assistant."
)
assistant_names.append(assistant_name)
Expand All @@ -56,7 +56,7 @@ def validate_tool_names(self):

if tool_name in tool_names:
raise RepeatedNameError(
f"Tool name '{tool_name}' is repeated in the SAML,"
f"Tool name '{tool_name}' is repeated in the SAML, "
f"please use unique names for each tool."
)
tool_names.append(tool_name)
Expand All @@ -77,7 +77,7 @@ def validate_superrag_names(self):

if superrag_name in superrag_names:
raise RepeatedNameError(
f"Superrag name '{superrag_name}' is repeated in the SAML,"
f"Superrag name '{superrag_name}' is repeated in the SAML, "
f"please use unique names for each superrag."
)
superrag_names.append(superrag_name)
Expand Down
10 changes: 8 additions & 2 deletions libs/superagent/app/api/workflow_configs/workflow_configs.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ async def add_config(
processor = AgentProcessor(api_user, api_manager)

try:
await processor.process_assistants(old_config, new_config)
results = await processor.process_assistants(old_config, new_config)
except (
MissingVectorDatabaseProvider,
UnkownFileType,
Expand All @@ -102,7 +102,13 @@ async def add_config(
}
)

return {"success": True, "data": config}
return {
"success": True,
"data": {
"config": config,
"superrag_tasks": results.get("superrag_tasks", []),
},
}
except Exception as e:
logger.exception(e)
return JSONResponse(
Expand Down
16 changes: 4 additions & 12 deletions libs/superagent/services/superrag.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from typing import Optional

import aiohttp
import requests
from decouple import config

Expand All @@ -12,25 +11,18 @@ def __init__(self, url: Optional[str] = None):
if not self.url:
raise ValueError("SUPERRAG_API_URL is not set")

async def _arequest(self, method, endpoint, data):
async with aiohttp.ClientSession() as session:
async with session.request(
method, f"{self.url}/{endpoint}", json=data
) as response:
return await response.json()

def _request(self, method, endpoint, data):
return requests.request(method, f"{self.url}/{endpoint}", json=data).json()

async def aingest(self, data):
return await self._arequest(
def ingest(self, data):
return self._request(
"POST",
"ingest",
data,
)

async def adelete(self, data):
return await self._arequest("DELETE", "delete", data)
def delete(self, data):
return self._request("DELETE", "delete", data)

def query(self, data):
return self._request(
Expand Down
5 changes: 4 additions & 1 deletion libs/ui/.env.example
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@ NEXT_PUBLIC_STRIPE_PUBLISHABLE_KEY=

# Optional for Langfuse
NEXT_PUBLIC_LANGFUSE_PUBLIC_KEY=
NEXT_PUBLIC_LANGFUSE_BASE_URL=
NEXT_PUBLIC_LANGFUSE_BASE_URL=

# Optional for SuperRag
NEXT_PUBLIC_SUPERRAG_API_URL=http://localhost:1234/api/v1
Loading