From 870a0e3ea0eacad469a2adc46489895b605f6b22 Mon Sep 17 00:00:00 2001 From: kgarbacinski Date: Thu, 2 Jan 2025 12:29:17 +0100 Subject: [PATCH 1/7] tweak database params and unify session for project_rewards --- backend/v2/core/dependencies.py | 47 +++++--------------------- backend/v2/project_rewards/services.py | 10 +++--- 2 files changed, 14 insertions(+), 43 deletions(-) diff --git a/backend/v2/core/dependencies.py b/backend/v2/core/dependencies.py index d45a2cbc9e..20c45032f7 100644 --- a/backend/v2/core/dependencies.py +++ b/backend/v2/core/dependencies.py @@ -31,18 +31,17 @@ def get_w3( return w3 -Web3 = Annotated[AsyncWeb3, Depends(get_w3)] - - class DatabaseSettings(OctantSettings): + """ + Values below are the defaults for the database with max_connetions = 100 and backend pods = 3. + """ db_uri: str = Field(..., alias="db_uri") - pg_pool_size: int = Field(10, alias="sqlalchemy_connection_pool_size") - pg_max_overflow: int = Field(0, alias="sqlalchemy_connection_pool_max_overflow") + pg_pool_size: int = Field(20, alias="sqlalchemy_connection_pool_size") + pg_max_overflow: int = Field(13, alias="sqlalchemy_connection_pool_max_overflow") pg_pool_timeout: int = 60 pg_pool_recycle: int = 30 * 60 # 30 minutes pg_pool_pre_ping: bool = True - # TODO other settings of the database @property def sqlalchemy_database_uri(self) -> str: @@ -87,36 +86,9 @@ def get_sessionmaker( return sessionmaker -# @asynccontextmanager async def get_db_session( sessionmaker: Annotated[async_sessionmaker[AsyncSession], Depends(get_sessionmaker)] ) -> AsyncGenerator[AsyncSession, None]: - # Create an async SQLAlchemy engine - - # logging.error("Creating database engine") - - # engine = create_async_engine( - # settings.sqlalchemy_database_uri, - # echo=False, # Disable SQL query logging (for performance) - # pool_size=20, # Initial pool size (default is 5) - # max_overflow=10, # Extra connections if pool is exhausted - # pool_timeout=30, # Timeout before giving up on a connection - # pool_recycle=3600, # Recycle connections after 1 hour (for long-lived connections) - # pool_pre_ping=True, # Check if the connection is alive before using it - # future=True, # Use the future-facing SQLAlchemy 2.0 style - # # connect_args={"options": "-c timezone=utc"} # Ensures timezone is UTC - # ) - - # # Create a sessionmaker with AsyncSession class - # async_session = async_sessionmaker( - # autocommit=False, autoflush=False, bind=engine, class_=AsyncSession - # ) - - # logging.error("Opening session", async_session) - - # scoped_session = async_scoped_session(sessionmaker, scopefunc=current_task) - - # Create a new session async with sessionmaker() as session: try: yield session @@ -128,9 +100,6 @@ async def get_db_session( await session.close() -GetSession = Annotated[AsyncSession, Depends(get_db_session, use_cache=False)] - - class ChainSettings(OctantSettings): chain_id: int = Field( default=11155111, @@ -142,9 +111,6 @@ def get_chain_settings() -> ChainSettings: return ChainSettings() -GetChainSettings = Annotated[ChainSettings, Depends(get_chain_settings)] - - class SocketioSettings(OctantSettings): host: str = Field(..., alias="SOCKETIO_REDIS_HOST") port: int = Field(..., alias="SOCKETIO_REDIS_PORT") @@ -161,3 +127,6 @@ def get_socketio_settings() -> SocketioSettings: GetSocketioSettings = Annotated[SocketioSettings, Depends(get_socketio_settings)] +GetChainSettings = Annotated[ChainSettings, Depends(get_chain_settings)] +Web3 = Annotated[AsyncWeb3, Depends(get_w3)] +GetSession = Annotated[AsyncSession, Depends(get_db_session)] diff --git a/backend/v2/project_rewards/services.py b/backend/v2/project_rewards/services.py index 76e6b43734..da6802d0c7 100644 --- a/backend/v2/project_rewards/services.py +++ b/backend/v2/project_rewards/services.py @@ -23,10 +23,12 @@ class ProjectRewardsEstimator: async def get(self) -> CappedQuadriaticFunding: # Gather all the necessary data for the calculation - all_projects, matched_rewards, allocations = await asyncio.gather( - self.projects_contracts.get_project_addresses(self.epoch_number), - self.matched_rewards_estimator.get(), - get_allocations_with_user_uqs(self.session, self.epoch_number), + all_projects = await self.projects_contracts.get_project_addresses( + self.epoch_number + ) + matched_rewards = await self.matched_rewards_estimator.get() + allocations = await get_allocations_with_user_uqs( + self.session, self.epoch_number ) # Calculate using the Capped Quadriatic Funding formula From 9b601a3b46530412edf261586740251b4b327277 Mon Sep 17 00:00:00 2001 From: kgarbacinski Date: Thu, 2 Jan 2025 12:32:51 +0100 Subject: [PATCH 2/7] format --- backend/v2/core/dependencies.py | 1 + backend/v2/project_rewards/services.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/backend/v2/core/dependencies.py b/backend/v2/core/dependencies.py index 20c45032f7..ea9dbc514e 100644 --- a/backend/v2/core/dependencies.py +++ b/backend/v2/core/dependencies.py @@ -35,6 +35,7 @@ class DatabaseSettings(OctantSettings): """ Values below are the defaults for the database with max_connetions = 100 and backend pods = 3. """ + db_uri: str = Field(..., alias="db_uri") pg_pool_size: int = Field(20, alias="sqlalchemy_connection_pool_size") diff --git a/backend/v2/project_rewards/services.py b/backend/v2/project_rewards/services.py index da6802d0c7..a3d593f8e6 100644 --- a/backend/v2/project_rewards/services.py +++ b/backend/v2/project_rewards/services.py @@ -1,7 +1,7 @@ -import asyncio from dataclasses import dataclass from sqlalchemy.ext.asyncio import AsyncSession + from v2.allocations.repositories import get_allocations_with_user_uqs from v2.matched_rewards.services import MatchedRewardsEstimator from v2.project_rewards.capped_quadriatic import ( From 1fcea9ae501d088a082117e4bc0d983c49612a46 Mon Sep 17 00:00:00 2001 From: kgarbacinski Date: Thu, 2 Jan 2025 13:27:59 +0100 Subject: [PATCH 3/7] remove parallel sessions in sockets, align improvements --- backend/v2/allocations/services.py | 9 ++--- backend/v2/allocations/socket.py | 50 +++++++++----------------- backend/v2/allocations/validators.py | 6 ++-- backend/v2/project_rewards/services.py | 10 +++--- 4 files changed, 28 insertions(+), 47 deletions(-) diff --git a/backend/v2/allocations/services.py b/backend/v2/allocations/services.py index a130c32b33..a5b15dc9ef 100644 --- a/backend/v2/allocations/services.py +++ b/backend/v2/allocations/services.py @@ -1,4 +1,3 @@ -import asyncio from dataclasses import dataclass from app import exceptions @@ -126,11 +125,9 @@ async def simulate_leverage( Calculate leverage of the allocation made by the user. """ - all_projects, matched_rewards, existing_allocations = await asyncio.gather( - projects_contracts.get_project_addresses(epoch_number), - matched_rewards_estimator.get(), - get_allocations_with_user_uqs(session, epoch_number), - ) + all_projects = await projects_contracts.get_project_addresses(epoch_number) + matched_rewards = await matched_rewards_estimator.get() + existing_allocations = await get_allocations_with_user_uqs(session, epoch_number) return cqf_simulate_leverage( existing_allocations=existing_allocations, diff --git a/backend/v2/allocations/socket.py b/backend/v2/allocations/socket.py index c491405801..3e115aa570 100644 --- a/backend/v2/allocations/socket.py +++ b/backend/v2/allocations/socket.py @@ -62,42 +62,34 @@ async def create_dependencies_on_connect() -> AsyncGenerator[ projects_contracts = get_projects_contracts(w3, get_projects_settings()) epochs_subgraph = get_epochs_subgraph(get_epochs_subgraph_settings()) - # For safety, we create separate sessions for each dependency - # (to avoid any potential issues with session sharing in async task context) - sessionmaker = get_sessionmaker(get_database_settings()) - async with ( - sessionmaker() as s1, - sessionmaker() as s2, - sessionmaker() as s3, - sessionmaker() as s4, - ): + async with sessionmaker() as session: try: threshold_getter = get_projects_allocation_threshold_getter( epoch_number, - s1, + session, projects_contracts, get_projects_allocation_threshold_settings(), ) estimated_matched_rewards = await get_matched_rewards_estimator( epoch_number, - s2, + session, epochs_subgraph, get_matched_rewards_estimator_settings(), ) estimated_project_rewards = await get_project_rewards_estimator( epoch_number, - s3, + session, projects_contracts, estimated_matched_rewards, ) # Yield the dependencies to the on_connect handler - yield (s4, threshold_getter, estimated_project_rewards) + yield (session, threshold_getter, estimated_project_rewards) except Exception as e: - await cleanup_sessions(s1, s2, s3, s4) + await cleanup_sessions(session) raise e @@ -129,49 +121,41 @@ async def create_dependencies_on_allocate() -> AsyncGenerator[ # (to avoid any potential issues with session sharing in async task context) sessionmaker = get_sessionmaker(get_database_settings()) - async with ( - sessionmaker() as s1, - sessionmaker() as s2, - sessionmaker() as s3, - sessionmaker() as s4, - sessionmaker() as s5, - sessionmaker() as s6, - sessionmaker() as s7, - ): + async with sessionmaker() as session: try: threshold_getter = get_projects_allocation_threshold_getter( epoch_number, - s1, + session, projects_contracts, get_projects_allocation_threshold_settings(), ) estimated_matched_rewards = await get_matched_rewards_estimator( epoch_number, - s2, + session, epochs_subgraph, get_matched_rewards_estimator_settings(), ) estimated_project_rewards = await get_project_rewards_estimator( epoch_number, - s3, + session, projects_contracts, estimated_matched_rewards, ) signature_verifier = get_signature_verifier( - s4, + session, epochs_subgraph, projects_contracts, get_chain_settings(), ) uq_score_getter = get_uq_score_getter( - s5, get_uq_score_settings(), get_chain_settings() + session, get_uq_score_settings(), get_chain_settings() ) allocations = await get_allocator( epoch_number, - s6, + session, signature_verifier, uq_score_getter, projects_contracts, @@ -180,14 +164,14 @@ async def create_dependencies_on_allocate() -> AsyncGenerator[ # Yield the dependencies to the on_allocate handler yield ( - s7, + session, allocations, threshold_getter, estimated_project_rewards, ) except Exception as e: - await cleanup_sessions(s1, s2, s3, s4, s5, s6, s7) + await cleanup_sessions(session) raise e @@ -358,7 +342,7 @@ def from_dict(data: str) -> UserAllocationRequest: return request -async def safe_session_cleanup(session): +async def safe_session_cleanup(session: AsyncSession): try: await session.rollback() except Exception: @@ -372,5 +356,5 @@ async def safe_session_cleanup(session): logging.exception("Error during session close") -async def cleanup_sessions(*sessions): +async def cleanup_sessions(*sessions: AsyncSession): await asyncio.gather(*(safe_session_cleanup(s) for s in sessions)) diff --git a/backend/v2/allocations/validators.py b/backend/v2/allocations/validators.py index 3311b4c555..18289f90d1 100644 --- a/backend/v2/allocations/validators.py +++ b/backend/v2/allocations/validators.py @@ -69,10 +69,8 @@ async def _check_database(): ) await _user_has_budget(session, payload, epoch_number) - await asyncio.gather( - _check_database(), - _provided_projects_are_correct(projects_contracts, epoch_number, payload), - ) + await _check_database() + await _provided_projects_are_correct(projects_contracts, epoch_number, payload) async def _provided_nonce_matches_expected( diff --git a/backend/v2/project_rewards/services.py b/backend/v2/project_rewards/services.py index a3d593f8e6..22d9b3f041 100644 --- a/backend/v2/project_rewards/services.py +++ b/backend/v2/project_rewards/services.py @@ -1,3 +1,4 @@ +import asyncio from dataclasses import dataclass from sqlalchemy.ext.asyncio import AsyncSession @@ -23,15 +24,16 @@ class ProjectRewardsEstimator: async def get(self) -> CappedQuadriaticFunding: # Gather all the necessary data for the calculation - all_projects = await self.projects_contracts.get_project_addresses( - self.epoch_number + all_projects, matched_rewards = asyncio.gather( + self.projects_contracts.get_project_addresses(self.epoch_number), + self.matched_rewards_estimator.get(), ) - matched_rewards = await self.matched_rewards_estimator.get() + allocations = await get_allocations_with_user_uqs( self.session, self.epoch_number ) - # Calculate using the Capped Quadriatic Funding formula + # Calculate using the Capped Quadratic Funding formula return capped_quadriatic_funding( project_addresses=all_projects, allocations=allocations, From 05051e6fc12d6c05039cbb112704945c0a2372bc Mon Sep 17 00:00:00 2001 From: kgarbacinski Date: Fri, 3 Jan 2025 00:55:39 +0100 Subject: [PATCH 4/7] get rid of all gathers --- backend/v2/allocations/socket.py | 12 ++++------ backend/v2/allocations/validators.py | 33 +++++++++++++------------- backend/v2/project_rewards/services.py | 7 +++--- backend/v2/projects/services.py | 8 +++---- 4 files changed, 27 insertions(+), 33 deletions(-) diff --git a/backend/v2/allocations/socket.py b/backend/v2/allocations/socket.py index 3e115aa570..d4faa520f9 100644 --- a/backend/v2/allocations/socket.py +++ b/backend/v2/allocations/socket.py @@ -1,11 +1,11 @@ -import asyncio import logging from contextlib import asynccontextmanager from typing import AsyncGenerator, Tuple import socketio -from app.exceptions import OctantException from sqlalchemy.ext.asyncio import AsyncSession + +from app.exceptions import OctantException from v2.allocations.dependencies import get_allocator, get_signature_verifier from v2.allocations.repositories import get_donations_by_project from v2.allocations.schemas import UserAllocationRequest, UserAllocationRequestV1 @@ -89,7 +89,7 @@ async def create_dependencies_on_connect() -> AsyncGenerator[ yield (session, threshold_getter, estimated_project_rewards) except Exception as e: - await cleanup_sessions(session) + await safe_session_cleanup(session) raise e @@ -171,7 +171,7 @@ async def create_dependencies_on_allocate() -> AsyncGenerator[ ) except Exception as e: - await cleanup_sessions(session) + await safe_session_cleanup(session) raise e @@ -354,7 +354,3 @@ async def safe_session_cleanup(session: AsyncSession): except Exception: # Log the close error, but don't raise it logging.exception("Error during session close") - - -async def cleanup_sessions(*sessions: AsyncSession): - await asyncio.gather(*(safe_session_cleanup(s) for s in sessions)) diff --git a/backend/v2/allocations/validators.py b/backend/v2/allocations/validators.py index 18289f90d1..4289e20125 100644 --- a/backend/v2/allocations/validators.py +++ b/backend/v2/allocations/validators.py @@ -1,9 +1,10 @@ -import asyncio from dataclasses import dataclass +from sqlalchemy.ext.asyncio import AsyncSession +from web3 import AsyncWeb3 + from app import exceptions from app.modules.common.crypto.signature import EncodingStandardFor, encode_for_signing -from sqlalchemy.ext.asyncio import AsyncSession from v2.allocations.repositories import get_last_allocation_request_nonce from v2.allocations.schemas import UserAllocationRequest from v2.core.types import Address @@ -14,7 +15,6 @@ get_budget_by_user_address_and_epoch, user_is_patron_with_budget, ) -from web3 import AsyncWeb3 @dataclass @@ -25,20 +25,19 @@ class SignatureVerifier: chain_id: int async def verify(self, epoch_number: int, request: UserAllocationRequest) -> None: - await asyncio.gather( - verify_logic( - session=self.session, - epoch_subgraph=self.epochs_subgraph, - projects_contracts=self.projects_contracts, - epoch_number=epoch_number, - payload=request, - ), - verify_signature( - w3=self.projects_contracts.w3, - chain_id=self.chain_id, - user_address=request.user_address, - payload=request, - ), + await verify_logic( + session=self.session, + epoch_subgraph=self.epochs_subgraph, + projects_contracts=self.projects_contracts, + epoch_number=epoch_number, + payload=request, + ) + + await verify_signature( + w3=self.projects_contracts.w3, + chain_id=self.chain_id, + user_address=request.user_address, + payload=request, ) diff --git a/backend/v2/project_rewards/services.py b/backend/v2/project_rewards/services.py index 22d9b3f041..fd6899ad01 100644 --- a/backend/v2/project_rewards/services.py +++ b/backend/v2/project_rewards/services.py @@ -1,4 +1,3 @@ -import asyncio from dataclasses import dataclass from sqlalchemy.ext.asyncio import AsyncSession @@ -24,10 +23,10 @@ class ProjectRewardsEstimator: async def get(self) -> CappedQuadriaticFunding: # Gather all the necessary data for the calculation - all_projects, matched_rewards = asyncio.gather( - self.projects_contracts.get_project_addresses(self.epoch_number), - self.matched_rewards_estimator.get(), + all_projects = ( + await self.projects_contracts.get_project_addresses(self.epoch_number), ) + matched_rewards = (await self.matched_rewards_estimator.get(),) allocations = await get_allocations_with_user_uqs( self.session, self.epoch_number diff --git a/backend/v2/projects/services.py b/backend/v2/projects/services.py index 7421f2c30f..4b8fa12abf 100644 --- a/backend/v2/projects/services.py +++ b/backend/v2/projects/services.py @@ -1,7 +1,7 @@ -import asyncio from dataclasses import dataclass from sqlalchemy.ext.asyncio import AsyncSession + from v2.allocations.repositories import sum_allocations_by_epoch from v2.projects.contracts import ProjectsContracts @@ -35,10 +35,10 @@ async def get_projects_allocation_threshold( ) -> int: # PROJECTS_COUNT_MULTIPLIER = 1 # TODO: from settings? - total_allocated, project_addresses = await asyncio.gather( - sum_allocations_by_epoch(session, epoch_number), - projects.get_project_addresses(epoch_number), + total_allocated, project_addresses = await sum_allocations_by_epoch( + session, epoch_number ) + project_addresses = (await projects.get_project_addresses(epoch_number),) return _calculate_threshold( total_allocated, len(project_addresses), project_count_multiplier From ef01465f7fa055e93165d7daa1380b1c75ecf000 Mon Sep 17 00:00:00 2001 From: kgarbacinski Date: Fri, 3 Jan 2025 09:27:42 +0100 Subject: [PATCH 5/7] format --- backend/v2/project_rewards/services.py | 7 ++++--- backend/v2/projects/services.py | 6 ++---- 2 files changed, 6 insertions(+), 7 deletions(-) diff --git a/backend/v2/project_rewards/services.py b/backend/v2/project_rewards/services.py index fd6899ad01..3ef386d1fd 100644 --- a/backend/v2/project_rewards/services.py +++ b/backend/v2/project_rewards/services.py @@ -23,10 +23,11 @@ class ProjectRewardsEstimator: async def get(self) -> CappedQuadriaticFunding: # Gather all the necessary data for the calculation - all_projects = ( - await self.projects_contracts.get_project_addresses(self.epoch_number), + all_projects = await self.projects_contracts.get_project_addresses( + self.epoch_number ) - matched_rewards = (await self.matched_rewards_estimator.get(),) + + matched_rewards = await self.matched_rewards_estimator.get() allocations = await get_allocations_with_user_uqs( self.session, self.epoch_number diff --git a/backend/v2/projects/services.py b/backend/v2/projects/services.py index 4b8fa12abf..2f1f39da2b 100644 --- a/backend/v2/projects/services.py +++ b/backend/v2/projects/services.py @@ -35,10 +35,8 @@ async def get_projects_allocation_threshold( ) -> int: # PROJECTS_COUNT_MULTIPLIER = 1 # TODO: from settings? - total_allocated, project_addresses = await sum_allocations_by_epoch( - session, epoch_number - ) - project_addresses = (await projects.get_project_addresses(epoch_number),) + total_allocated = await sum_allocations_by_epoch(session, epoch_number) + project_addresses = await projects.get_project_addresses(epoch_number) return _calculate_threshold( total_allocated, len(project_addresses), project_count_multiplier From d554e52730b4f92b62d223295b9b3017e49f5158 Mon Sep 17 00:00:00 2001 From: kgarbacinski Date: Tue, 7 Jan 2025 21:13:28 +0100 Subject: [PATCH 6/7] OCT-2296: Tweak defaults of PG_POOL for max_connections 300 --- backend/v2/core/dependencies.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/backend/v2/core/dependencies.py b/backend/v2/core/dependencies.py index ea9dbc514e..73f02bd2a1 100644 --- a/backend/v2/core/dependencies.py +++ b/backend/v2/core/dependencies.py @@ -33,13 +33,13 @@ def get_w3( class DatabaseSettings(OctantSettings): """ - Values below are the defaults for the database with max_connetions = 100 and backend pods = 3. + Values below are the defaults for the database with max_connetions = 300 and backend pods = 3. """ db_uri: str = Field(..., alias="db_uri") - pg_pool_size: int = Field(20, alias="sqlalchemy_connection_pool_size") - pg_max_overflow: int = Field(13, alias="sqlalchemy_connection_pool_max_overflow") + pg_pool_size: int = Field(67, alias="sqlalchemy_connection_pool_size") + pg_max_overflow: int = Field(33, alias="sqlalchemy_connection_pool_max_overflow") pg_pool_timeout: int = 60 pg_pool_recycle: int = 30 * 60 # 30 minutes pg_pool_pre_ping: bool = True From 8b2229de29e52a5f92d215f793bd69f717e2f41e Mon Sep 17 00:00:00 2001 From: kgarbacinski Date: Sun, 12 Jan 2025 20:10:52 +0100 Subject: [PATCH 7/7] update docstr --- backend/v2/core/dependencies.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/v2/core/dependencies.py b/backend/v2/core/dependencies.py index 73f02bd2a1..4c238dcf0f 100644 --- a/backend/v2/core/dependencies.py +++ b/backend/v2/core/dependencies.py @@ -33,7 +33,7 @@ def get_w3( class DatabaseSettings(OctantSettings): """ - Values below are the defaults for the database with max_connetions = 300 and backend pods = 3. + Values below are the defaults for the pod which can serve up to 100 connections. """ db_uri: str = Field(..., alias="db_uri")