Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OCT-2296: Improve BE stability #611

Merged
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 3 additions & 6 deletions backend/v2/allocations/services.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import asyncio
from dataclasses import dataclass

from app import exceptions
Expand Down Expand Up @@ -126,11 +125,9 @@ async def simulate_leverage(
Calculate leverage of the allocation made by the user.
"""

all_projects, matched_rewards, existing_allocations = await asyncio.gather(
projects_contracts.get_project_addresses(epoch_number),
matched_rewards_estimator.get(),
get_allocations_with_user_uqs(session, epoch_number),
)
all_projects = await projects_contracts.get_project_addresses(epoch_number)
matched_rewards = await matched_rewards_estimator.get()
existing_allocations = await get_allocations_with_user_uqs(session, epoch_number)

return cqf_simulate_leverage(
existing_allocations=existing_allocations,
Expand Down
56 changes: 18 additions & 38 deletions backend/v2/allocations/socket.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import asyncio
import logging
from contextlib import asynccontextmanager
from typing import AsyncGenerator, Tuple

import socketio
from app.exceptions import OctantException
from sqlalchemy.ext.asyncio import AsyncSession

from app.exceptions import OctantException
from v2.allocations.dependencies import get_allocator, get_signature_verifier
from v2.allocations.repositories import get_donations_by_project
from v2.allocations.schemas import UserAllocationRequest, UserAllocationRequestV1
Expand Down Expand Up @@ -62,42 +62,34 @@ async def create_dependencies_on_connect() -> AsyncGenerator[
projects_contracts = get_projects_contracts(w3, get_projects_settings())
epochs_subgraph = get_epochs_subgraph(get_epochs_subgraph_settings())

# For safety, we create separate sessions for each dependency
# (to avoid any potential issues with session sharing in async task context)

sessionmaker = get_sessionmaker(get_database_settings())

async with (
sessionmaker() as s1,
sessionmaker() as s2,
sessionmaker() as s3,
sessionmaker() as s4,
):
async with sessionmaker() as session:
try:
threshold_getter = get_projects_allocation_threshold_getter(
epoch_number,
s1,
session,
projects_contracts,
get_projects_allocation_threshold_settings(),
)
estimated_matched_rewards = await get_matched_rewards_estimator(
epoch_number,
s2,
session,
epochs_subgraph,
get_matched_rewards_estimator_settings(),
)
estimated_project_rewards = await get_project_rewards_estimator(
epoch_number,
s3,
session,
projects_contracts,
estimated_matched_rewards,
)

# Yield the dependencies to the on_connect handler
yield (s4, threshold_getter, estimated_project_rewards)
yield (session, threshold_getter, estimated_project_rewards)

except Exception as e:
await cleanup_sessions(s1, s2, s3, s4)
await safe_session_cleanup(session)
raise e


Expand Down Expand Up @@ -129,49 +121,41 @@ async def create_dependencies_on_allocate() -> AsyncGenerator[
# (to avoid any potential issues with session sharing in async task context)
sessionmaker = get_sessionmaker(get_database_settings())

async with (
sessionmaker() as s1,
sessionmaker() as s2,
sessionmaker() as s3,
sessionmaker() as s4,
sessionmaker() as s5,
sessionmaker() as s6,
sessionmaker() as s7,
):
async with sessionmaker() as session:
try:
threshold_getter = get_projects_allocation_threshold_getter(
epoch_number,
s1,
session,
projects_contracts,
get_projects_allocation_threshold_settings(),
)
estimated_matched_rewards = await get_matched_rewards_estimator(
epoch_number,
s2,
session,
epochs_subgraph,
get_matched_rewards_estimator_settings(),
)
estimated_project_rewards = await get_project_rewards_estimator(
epoch_number,
s3,
session,
projects_contracts,
estimated_matched_rewards,
)

signature_verifier = get_signature_verifier(
s4,
session,
epochs_subgraph,
projects_contracts,
get_chain_settings(),
)

uq_score_getter = get_uq_score_getter(
s5, get_uq_score_settings(), get_chain_settings()
session, get_uq_score_settings(), get_chain_settings()
)

allocations = await get_allocator(
epoch_number,
s6,
session,
signature_verifier,
uq_score_getter,
projects_contracts,
Expand All @@ -180,14 +164,14 @@ async def create_dependencies_on_allocate() -> AsyncGenerator[

# Yield the dependencies to the on_allocate handler
yield (
s7,
session,
allocations,
threshold_getter,
estimated_project_rewards,
)

except Exception as e:
await cleanup_sessions(s1, s2, s3, s4, s5, s6, s7)
await safe_session_cleanup(session)
raise e


Expand Down Expand Up @@ -358,7 +342,7 @@ def from_dict(data: str) -> UserAllocationRequest:
return request


async def safe_session_cleanup(session):
async def safe_session_cleanup(session: AsyncSession):
try:
await session.rollback()
except Exception:
Expand All @@ -370,7 +354,3 @@ async def safe_session_cleanup(session):
except Exception:
# Log the close error, but don't raise it
logging.exception("Error during session close")


async def cleanup_sessions(*sessions):
await asyncio.gather(*(safe_session_cleanup(s) for s in sessions))
39 changes: 18 additions & 21 deletions backend/v2/allocations/validators.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import asyncio
from dataclasses import dataclass

from sqlalchemy.ext.asyncio import AsyncSession
from web3 import AsyncWeb3

from app import exceptions
from app.modules.common.crypto.signature import EncodingStandardFor, encode_for_signing
from sqlalchemy.ext.asyncio import AsyncSession
from v2.allocations.repositories import get_last_allocation_request_nonce
from v2.allocations.schemas import UserAllocationRequest
from v2.core.types import Address
Expand All @@ -14,7 +15,6 @@
get_budget_by_user_address_and_epoch,
user_is_patron_with_budget,
)
from web3 import AsyncWeb3


@dataclass
Expand All @@ -25,20 +25,19 @@ class SignatureVerifier:
chain_id: int

async def verify(self, epoch_number: int, request: UserAllocationRequest) -> None:
await asyncio.gather(
verify_logic(
session=self.session,
epoch_subgraph=self.epochs_subgraph,
projects_contracts=self.projects_contracts,
epoch_number=epoch_number,
payload=request,
),
verify_signature(
w3=self.projects_contracts.w3,
chain_id=self.chain_id,
user_address=request.user_address,
payload=request,
),
await verify_logic(
session=self.session,
epoch_subgraph=self.epochs_subgraph,
projects_contracts=self.projects_contracts,
epoch_number=epoch_number,
payload=request,
)

await verify_signature(
w3=self.projects_contracts.w3,
chain_id=self.chain_id,
user_address=request.user_address,
payload=request,
)


Expand Down Expand Up @@ -69,10 +68,8 @@ async def _check_database():
)
await _user_has_budget(session, payload, epoch_number)

await asyncio.gather(
_check_database(),
_provided_projects_are_correct(projects_contracts, epoch_number, payload),
)
await _check_database()
await _provided_projects_are_correct(projects_contracts, epoch_number, payload)


async def _provided_nonce_matches_expected(
Expand Down
48 changes: 9 additions & 39 deletions backend/v2/core/dependencies.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,18 @@ def get_w3(
return w3


Web3 = Annotated[AsyncWeb3, Depends(get_w3)]


class DatabaseSettings(OctantSettings):
"""
Values below are the defaults for the database with max_connetions = 100 and backend pods = 3.
"""

db_uri: str = Field(..., alias="db_uri")

pg_pool_size: int = Field(10, alias="sqlalchemy_connection_pool_size")
pg_max_overflow: int = Field(0, alias="sqlalchemy_connection_pool_max_overflow")
pg_pool_size: int = Field(20, alias="sqlalchemy_connection_pool_size")
pg_max_overflow: int = Field(13, alias="sqlalchemy_connection_pool_max_overflow")
pg_pool_timeout: int = 60
pg_pool_recycle: int = 30 * 60 # 30 minutes
pg_pool_pre_ping: bool = True
# TODO other settings of the database

@property
def sqlalchemy_database_uri(self) -> str:
Expand Down Expand Up @@ -87,36 +87,9 @@ def get_sessionmaker(
return sessionmaker


# @asynccontextmanager
async def get_db_session(
sessionmaker: Annotated[async_sessionmaker[AsyncSession], Depends(get_sessionmaker)]
) -> AsyncGenerator[AsyncSession, None]:
# Create an async SQLAlchemy engine

# logging.error("Creating database engine")

# engine = create_async_engine(
# settings.sqlalchemy_database_uri,
# echo=False, # Disable SQL query logging (for performance)
# pool_size=20, # Initial pool size (default is 5)
# max_overflow=10, # Extra connections if pool is exhausted
# pool_timeout=30, # Timeout before giving up on a connection
# pool_recycle=3600, # Recycle connections after 1 hour (for long-lived connections)
# pool_pre_ping=True, # Check if the connection is alive before using it
# future=True, # Use the future-facing SQLAlchemy 2.0 style
# # connect_args={"options": "-c timezone=utc"} # Ensures timezone is UTC
# )

# # Create a sessionmaker with AsyncSession class
# async_session = async_sessionmaker(
# autocommit=False, autoflush=False, bind=engine, class_=AsyncSession
# )

# logging.error("Opening session", async_session)

# scoped_session = async_scoped_session(sessionmaker, scopefunc=current_task)

# Create a new session
async with sessionmaker() as session:
try:
yield session
Expand All @@ -128,9 +101,6 @@ async def get_db_session(
await session.close()


GetSession = Annotated[AsyncSession, Depends(get_db_session, use_cache=False)]


class ChainSettings(OctantSettings):
chain_id: int = Field(
default=11155111,
Expand All @@ -142,9 +112,6 @@ def get_chain_settings() -> ChainSettings:
return ChainSettings()


GetChainSettings = Annotated[ChainSettings, Depends(get_chain_settings)]


class SocketioSettings(OctantSettings):
host: str = Field(..., alias="SOCKETIO_REDIS_HOST")
port: int = Field(..., alias="SOCKETIO_REDIS_PORT")
Expand All @@ -161,3 +128,6 @@ def get_socketio_settings() -> SocketioSettings:


GetSocketioSettings = Annotated[SocketioSettings, Depends(get_socketio_settings)]
GetChainSettings = Annotated[ChainSettings, Depends(get_chain_settings)]
Web3 = Annotated[AsyncWeb3, Depends(get_w3)]
GetSession = Annotated[AsyncSession, Depends(get_db_session)]
15 changes: 9 additions & 6 deletions backend/v2/project_rewards/services.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
from dataclasses import dataclass

from sqlalchemy.ext.asyncio import AsyncSession

from v2.allocations.repositories import get_allocations_with_user_uqs
from v2.matched_rewards.services import MatchedRewardsEstimator
from v2.project_rewards.capped_quadriatic import (
Expand All @@ -23,13 +23,16 @@ class ProjectRewardsEstimator:

async def get(self) -> CappedQuadriaticFunding:
# Gather all the necessary data for the calculation
all_projects, matched_rewards, allocations = await asyncio.gather(
self.projects_contracts.get_project_addresses(self.epoch_number),
self.matched_rewards_estimator.get(),
get_allocations_with_user_uqs(self.session, self.epoch_number),
all_projects = (
await self.projects_contracts.get_project_addresses(self.epoch_number),
)
matched_rewards = (await self.matched_rewards_estimator.get(),)

allocations = await get_allocations_with_user_uqs(
self.session, self.epoch_number
)

# Calculate using the Capped Quadriatic Funding formula
# Calculate using the Capped Quadratic Funding formula
return capped_quadriatic_funding(
project_addresses=all_projects,
allocations=allocations,
Expand Down
8 changes: 4 additions & 4 deletions backend/v2/projects/services.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import asyncio
from dataclasses import dataclass

from sqlalchemy.ext.asyncio import AsyncSession

from v2.allocations.repositories import sum_allocations_by_epoch
from v2.projects.contracts import ProjectsContracts

Expand Down Expand Up @@ -35,10 +35,10 @@ async def get_projects_allocation_threshold(
) -> int:
# PROJECTS_COUNT_MULTIPLIER = 1 # TODO: from settings?

total_allocated, project_addresses = await asyncio.gather(
sum_allocations_by_epoch(session, epoch_number),
projects.get_project_addresses(epoch_number),
total_allocated, project_addresses = await sum_allocations_by_epoch(
session, epoch_number
)
project_addresses = (await projects.get_project_addresses(epoch_number),)

return _calculate_threshold(
total_allocated, len(project_addresses), project_count_multiplier
Expand Down
Loading