From e3518ed8007717fb56e5b85f2154421d40cd971f Mon Sep 17 00:00:00 2001 From: soul-codes <40335030+soul-codes@users.noreply.github.com> Date: Sat, 21 Dec 2024 22:49:09 +0000 Subject: [PATCH] =?UTF-8?q?refactor:=20=F0=9F=92=A1=20separate=20app=20log?= =?UTF-8?q?ic=20from=20terminal=20logic=20(#55)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/__init__.py | 7 + app/analysis_context.py | 163 ++++++++++++++++ app/analysis_output_context.py | 70 +++++++ app/analysis_webserver_context.py | 78 ++++++++ app/app.py | 30 +++ app/app_context.py | 18 ++ app/project_context.py | 99 ++++++++++ app/settings_context.py | 20 ++ app/utils.py | 6 + .../web_static/dashboard_base.css | 0 {components => app}/web_templates/index.html | 0 components/__init__.py | 1 + components/analysis_main.py | 38 ++-- components/context.py | 10 + components/export_outputs.py | 174 ++++-------------- components/main_menu.py | 25 ++- components/new_analysis.py | 101 +++------- components/new_project.py | 30 ++- components/project_main.py | 24 +-- components/select_analysis.py | 24 +-- components/select_project.py | 54 ++++-- components/utils.py | 63 ------- mangotango.py | 10 +- storage/__init__.py | 32 ++-- 24 files changed, 701 insertions(+), 376 deletions(-) create mode 100644 app/__init__.py create mode 100644 app/analysis_context.py create mode 100644 app/analysis_output_context.py create mode 100644 app/analysis_webserver_context.py create mode 100644 app/app.py create mode 100644 app/app_context.py create mode 100644 app/project_context.py create mode 100644 app/settings_context.py create mode 100644 app/utils.py rename {components => app}/web_static/dashboard_base.css (100%) rename {components => app}/web_templates/index.html (100%) create mode 100644 components/context.py delete mode 100644 components/utils.py diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..9709e5b --- /dev/null +++ b/app/__init__.py @@ -0,0 +1,7 @@ +from .analysis_context import AnalysisContext +from .analysis_output_context import AnalysisOutputContext +from .analysis_webserver_context import AnalysisWebServerContext +from .app import App +from .app_context import AppContext +from .project_context import ProjectContext +from .settings_context import SettingsContext diff --git a/app/analysis_context.py b/app/analysis_context.py new file mode 100644 index 0000000..ae0bb8b --- /dev/null +++ b/app/analysis_context.py @@ -0,0 +1,163 @@ +from functools import cached_property +from tempfile import TemporaryDirectory +from typing import Literal + +from pydantic import BaseModel + +from analyzer_interface import AnalyzerDeclaration, SecondaryAnalyzerDeclaration +from context import ( + InputColumnProvider, + PrimaryAnalyzerContext, + SecondaryAnalyzerContext, +) +from storage import AnalysisModel + +from .app_context import AppContext +from .project_context import ProjectContext + + +class AnalysisRunProgressEvent(BaseModel): + analyzer: AnalyzerDeclaration | SecondaryAnalyzerDeclaration + event: Literal["start", "finish"] + + +class AnalysisContext(BaseModel): + app_context: AppContext + project_context: ProjectContext + model: AnalysisModel + is_deleted: bool = False + + @property + def display_name(self): + return self.model.display_name + + @property + def id(self): + return self.model.id + + @property + def analyzer_id(self): + return self.model.primary_analyzer_id + + @property + def analyzer_spec(self): + analyzer = self.app_context.suite.get_primary_analyzer(self.analyzer_id) + assert analyzer, f"Analyzer `{self.analyzer_id}` not found" + return analyzer + + @property + def column_mapping(self): + return self.model.column_mapping + + @property + def create_time(self): + return self.model.create_time() + + @property + def is_draft(self): + return self.model.is_draft + + @cached_property + def web_presenters(self): + return self.app_context.suite.find_web_presenters(self.analyzer_spec) + + def web_server(self): + from .analysis_webserver_context import AnalysisWebServerContext + + return AnalysisWebServerContext( + app_context=self.app_context, analysis_context=self + ) + + def rename(self, new_name: str): + self.model.display_name = new_name + self.app_context.storage.save_analysis(self.model) + + def delete(self): + self.is_deleted = True + self.app_context.storage.delete_analysis(self.model) + + def run(self): + assert not self.is_deleted, "Analysis is deleted" + secondary_analyzers = ( + self.app_context.suite.find_toposorted_secondary_analyzers( + self.analyzer_spec + ) + ) + + with TemporaryDirectory() as temp_dir: + yield AnalysisRunProgressEvent(analyzer=self.analyzer_spec, event="start") + user_columns_by_name = { + user_column.name: user_column + for user_column in self.project_context.columns + } + analyzer_context = PrimaryAnalyzerContext( + analysis=self.model, + analyzer=self.analyzer_spec, + store=self.app_context.storage, + temp_dir=temp_dir, + input_columns={ + analyzer_column_name: InputColumnProvider( + user_column_name=user_column_name, + semantic=user_columns_by_name[user_column_name].semantic, + ) + for analyzer_column_name, user_column_name in self.column_mapping.items() + }, + ) + analyzer_context.prepare() + self.analyzer_spec.entry_point(analyzer_context) + yield AnalysisRunProgressEvent(analyzer=self.analyzer_spec, event="finish") + + for secondary in secondary_analyzers: + yield AnalysisRunProgressEvent(analyzer=secondary, event="start") + with TemporaryDirectory() as temp_dir: + analyzer_context = SecondaryAnalyzerContext( + analysis=self.model, + secondary_analyzer=secondary, + temp_dir=temp_dir, + store=self.app_context.storage, + ) + analyzer_context.prepare() + secondary.entry_point(analyzer_context) + yield AnalysisRunProgressEvent(analyzer=secondary, event="finish") + + self.model.is_draft = False + self.app_context.storage.save_analysis(self.model) + + @property + def export_root_path(self): + return self.app_context.storage._get_project_exports_root_path(self.model) + + def get_all_exportable_outputs(self): + from .analysis_output_context import AnalysisOutputContext + + return [ + *( + AnalysisOutputContext( + app_context=self.app_context, + analysis_context=self, + secondary_spec=None, + output_spec=output, + ) + for output in self.analyzer_spec.outputs + if not output.internal + ), + *( + AnalysisOutputContext( + app_context=self.app_context, + analysis_context=self, + secondary_spec=secondary, + output_spec=output, + ) + for secondary_id in self.app_context.storage.list_secondary_analyses( + self.model + ) + if ( + secondary := self.app_context.suite.get_secondary_analyzer_by_id( + self.analyzer_id, secondary_id + ) + ) + is not None + for output in secondary.outputs + if not output.internal + ), + ] diff --git a/app/analysis_output_context.py b/app/analysis_output_context.py new file mode 100644 index 0000000..e6ed0ae --- /dev/null +++ b/app/analysis_output_context.py @@ -0,0 +1,70 @@ +from functools import cached_property +from typing import Literal, Optional + +from pydantic import BaseModel + +from analyzer_interface import AnalyzerOutput, SecondaryAnalyzerInterface +from storage import SupportedOutputExtension + +from .analysis_context import AnalysisContext +from .app_context import AppContext +from .utils import parquet_row_count + + +class AnalysisOutputContext(BaseModel): + app_context: AppContext + analysis_context: AnalysisContext + secondary_spec: Optional[SecondaryAnalyzerInterface] + output_spec: AnalyzerOutput + + @property + def descriptive_qualified_name(self): + return f"{self.output_spec.name} ({self.secondary_spec.name if self.secondary_spec else 'Base'})" + + def export( + self, + *, + format: SupportedOutputExtension, + chunk_size_override: Optional[int | Literal[False]] = None, + ): + export_chunk_size = ( + self.app_context.settings.export_chunk_size + if chunk_size_override is None + else chunk_size_override + ) or None + if self.secondary_spec is None: + return self.app_context.storage.export_project_primary_output( + self.analysis_context.model, + self.output_spec.id, + extension=format, + spec=self.output_spec, + export_chunk_size=export_chunk_size, + ) + else: + return self.app_context.storage.export_project_secondary_output( + self.analysis_context.model, + self.secondary_spec.id, + self.output_spec.id, + extension=format, + spec=self.output_spec, + export_chunk_size=export_chunk_size, + ) + + @cached_property + def num_rows( + self, + ): + if self.secondary_spec is None: + return parquet_row_count( + self.app_context.storage.get_primary_output_parquet_path( + self.analysis_context.model, self.output_spec.id + ) + ) + else: + return parquet_row_count( + self.app_context.storage.get_secondary_output_parquet_path( + self.analysis_context.model, + self.secondary_spec.id, + self.output_spec.id, + ) + ) diff --git a/app/analysis_webserver_context.py b/app/analysis_webserver_context.py new file mode 100644 index 0000000..cbbb5c4 --- /dev/null +++ b/app/analysis_webserver_context.py @@ -0,0 +1,78 @@ +import logging +import os.path +from pathlib import Path +from tempfile import TemporaryDirectory + +from dash import Dash +from flask import Flask, render_template +from pydantic import BaseModel +from waitress import serve + +from context import WebPresenterContext + +from .analysis_context import AnalysisContext +from .app_context import AppContext + + +class AnalysisWebServerContext(BaseModel): + app_context: AppContext + analysis_context: AnalysisContext + + def start(self): + containing_dir = str(Path(__file__).resolve().parent) + static_folder = os.path.join(containing_dir, "web_static") + template_folder = os.path.join(containing_dir, "web_templates") + + web_presenters = self.analysis_context.web_presenters + web_server = Flask( + __name__, + template_folder=template_folder, + static_folder=static_folder, + static_url_path="/static", + ) + web_server.logger.disabled = True + temp_dirs: list[TemporaryDirectory] = [] + + for presenter in web_presenters: + dash_app = Dash( + presenter.server_name, + server=web_server, + url_base_pathname=f"/{presenter.id}/", + external_stylesheets=["/static/dashboard_base.css"], + ) + temp_dir = TemporaryDirectory() + presenter_context = WebPresenterContext( + analysis=self.analysis_context.model, + web_presenter=presenter, + store=self.app_context.storage, + temp_dir=temp_dir.name, + dash_app=dash_app, + ) + temp_dirs.append(temp_dir) + presenter.factory(presenter_context) + + project_name = self.analysis_context.project_context.display_name + analyzer_name = self.analysis_context.display_name + + @web_server.route("/") + def index(): + return render_template( + "index.html", + panels=[(presenter.id, presenter.name) for presenter in web_presenters], + project_name=project_name, + analyzer_name=analyzer_name, + ) + + server_log = logging.getLogger("waitress") + original_log_level = server_log.level + original_disabled = server_log.disabled + server_log.setLevel(logging.ERROR) + server_log.disabled = True + + try: + serve(web_server, host="127.0.0.1", port=8050) + finally: + server_log.setLevel(original_log_level) + server_log.disabled = original_disabled + for temp_dir in temp_dirs: + temp_dir.cleanup() diff --git a/app/app.py b/app/app.py new file mode 100644 index 0000000..52e588f --- /dev/null +++ b/app/app.py @@ -0,0 +1,30 @@ +from tempfile import NamedTemporaryFile + +from pydantic import BaseModel + +from importing import ImporterSession + +from .app_context import AppContext +from .project_context import ProjectContext + + +class App(BaseModel): + context: AppContext + + def list_projects(self): + return [ + ProjectContext(model=project, app_context=self.context) + for project in self.context.storage.list_projects() + ] + + def create_project(self, name: str, importer_session: ImporterSession): + with NamedTemporaryFile(delete=False) as temp_file: + importer_session.import_as_parquet(temp_file.name) + project_model = self.context.storage.init_project( + display_name=name, input_temp_file=temp_file.name + ) + return ProjectContext(model=project_model, app_context=self.context) + + @property + def file_selector_state(self): + return self.context.storage.file_selector_state diff --git a/app/app_context.py b/app/app_context.py new file mode 100644 index 0000000..39dd4d6 --- /dev/null +++ b/app/app_context.py @@ -0,0 +1,18 @@ +from functools import cached_property + +from pydantic import BaseModel, ConfigDict + +from analyzer_interface.suite import AnalyzerSuite +from storage import Storage + + +class AppContext(BaseModel): + storage: Storage + suite: AnalyzerSuite + model_config = ConfigDict(arbitrary_types_allowed=True) + + @cached_property + def settings(self): + from .settings_context import SettingsContext + + return SettingsContext(app_context=self) diff --git a/app/project_context.py b/app/project_context.py new file mode 100644 index 0000000..5521b67 --- /dev/null +++ b/app/project_context.py @@ -0,0 +1,99 @@ +from functools import cached_property + +import polars as pl +from pydantic import BaseModel + +from analyzer_interface import UserInputColumn as BaseUserInputColumn +from preprocessing.series_semantic import SeriesSemantic, infer_series_semantic +from storage import AnalysisModel, ProjectModel + +from .app_context import AppContext + + +class ProjectContext(BaseModel): + model: ProjectModel + app_context: AppContext + is_deleted: bool = False + + @property + def display_name(self): + return self.model.display_name + + @property + def id(self): + return self.model.id + + def rename(self, new_name: str): + self.app_context.storage.rename_project(self.id, new_name) + self.model.display_name = new_name + + def delete(self): + self.app_context.storage.delete_project(self.id) + self.is_deleted = True + + def create_analysis(self, primary_analyzer_id: str, column_mapping: dict[str, str]): + assert not self.is_deleted, "Project is deleted" + + analyzer = self.app_context.suite.get_primary_analyzer(primary_analyzer_id) + assert analyzer, f"Analyzer `{primary_analyzer_id}` not found" + + analysis_model = self.app_context.storage.init_analysis( + self.id, analyzer.name, primary_analyzer_id, column_mapping + ) + return self._create_analysis_context(analysis_model) + + def list_analyses(self): + return [ + self._create_analysis_context(analysis_model) + for analysis_model in self.app_context.storage.list_project_analyses( + self.id + ) + ] + + def _create_analysis_context(self, analysis_model: AnalysisModel): + from .analysis_context import AnalysisContext + + return AnalysisContext( + model=analysis_model, project_context=self, app_context=self.app_context + ) + + @cached_property + def preview_data(self): + return self.app_context.storage.load_project_input(self.id, n_records=100) + + @cached_property + def data_row_count(self): + return self.app_context.storage.get_project_input_stats(self.id).num_rows + + @cached_property + def columns(self): + return _get_columns_with_semantic(self.preview_data) + + +def _get_columns_with_semantic(df: pl.DataFrame): + return [ + UserInputColumn( + name=col, data_type=semantic.data_type, semantic=semantic, data=df[col] + ) + for col in df.columns + if (semantic := infer_series_semantic(df[col])) is not None + ] + + +class UserInputColumn(BaseUserInputColumn): + semantic: SeriesSemantic + data: pl.Series + + def head(self, n: int = 10): + return UserInputColumn( + name=self.name, + data_type=self.data_type, + semantic=self.semantic, + data=self.data.head(n), + ) + + def apply_semantic_transform(self): + return self.semantic.try_convert(self.data) + + class Config: + arbitrary_types_allowed = True diff --git a/app/settings_context.py b/app/settings_context.py new file mode 100644 index 0000000..d7529e1 --- /dev/null +++ b/app/settings_context.py @@ -0,0 +1,20 @@ +from typing import Literal + +from pydantic import BaseModel + +from storage import SettingsModel + +from .app_context import AppContext + + +class SettingsContext(BaseModel): + app_context: AppContext + + @property + def export_chunk_size(self): + return self.app_context.storage.get_settings().export_chunk_size + + def set_export_chunk_size(self, value: int | Literal[False]): + self.app_context.storage.save_settings( + **SettingsModel(export_chunk_size=value).model_dump() + ) diff --git a/app/utils.py b/app/utils.py new file mode 100644 index 0000000..b10206f --- /dev/null +++ b/app/utils.py @@ -0,0 +1,6 @@ +import pyarrow.parquet as pq + + +def parquet_row_count(filename: str): + with pq.ParquetFile(filename) as pf: + return pf.metadata.num_rows diff --git a/components/web_static/dashboard_base.css b/app/web_static/dashboard_base.css similarity index 100% rename from components/web_static/dashboard_base.css rename to app/web_static/dashboard_base.css diff --git a/components/web_templates/index.html b/app/web_templates/index.html similarity index 100% rename from components/web_templates/index.html rename to app/web_templates/index.html diff --git a/components/__init__.py b/components/__init__.py index 0fa0069..93f9694 100644 --- a/components/__init__.py +++ b/components/__init__.py @@ -2,5 +2,6 @@ The application's terminal components that will be accessed by the entry module. """ +from .context import ViewContext from .main_menu import main_menu from .splash import splash diff --git a/components/analysis_main.py b/components/analysis_main.py index a614edf..5370108 100644 --- a/components/analysis_main.py +++ b/components/analysis_main.py @@ -1,26 +1,19 @@ from colorama import Fore -from analyzer_interface.suite import AnalyzerSuite -from storage import AnalysisModel, Storage +from app import AnalysisContext from terminal_tools import draw_box, open_directory_explorer, prompts, wait_for_key -from terminal_tools.inception import TerminalContext -from .analysis_web_server import analysis_web_server +from .context import ViewContext from .export_outputs import export_outputs -def analysis_main( - context: TerminalContext, - storage: Storage, - suite: AnalyzerSuite, - analysis: AnalysisModel, -): +def analysis_main(context: ViewContext, analysis: AnalysisContext): + terminal = context.terminal while True: - analyzer = suite.get_primary_analyzer(analysis.primary_analyzer_id) - has_web_server = suite.find_web_presenters(analyzer) + has_web_server = len(analysis.web_presenters) > 0 is_draft = analysis.is_draft - with context.nest( + with terminal.nest( draw_box(f"Analysis: {analysis.display_name}", padding_lines=0) ): if is_draft: @@ -53,16 +46,24 @@ def analysis_main( if action == "open_output_dir": print("Starting file explorer") - open_directory_explorer(storage._get_project_exports_root_path(analysis)) + open_directory_explorer(analysis.export_root_path) wait_for_key(True) continue if action == "export_output": - export_outputs(context, storage, suite, analysis) + export_outputs(context, analysis) continue if action == "web_server": - analysis_web_server(context, storage, suite, analysis) + server = analysis.web_server() + print("Web server will run at http://localhost:8050/") + print("Stop it with Ctrl+C") + try: + server.start() + except KeyboardInterrupt: + pass + print("Web server stopped") + wait_for_key(True) continue if action == "rename": @@ -72,8 +73,7 @@ def analysis_main( wait_for_key(True) continue - analysis.display_name = new_name - storage.save_analysis(analysis) + analysis.rename(new_name) print("Analysis renamed") wait_for_key(True) continue @@ -99,7 +99,7 @@ def analysis_main( wait_for_key(True) continue - storage.delete_analysis(analysis) + analysis.delete() print("🔥 Analysis deleted.") wait_for_key(True) return diff --git a/components/context.py b/components/context.py new file mode 100644 index 0000000..0365d80 --- /dev/null +++ b/components/context.py @@ -0,0 +1,10 @@ +from pydantic import BaseModel, ConfigDict + +from app import App +from terminal_tools.inception import TerminalContext + + +class ViewContext(BaseModel): + terminal: TerminalContext + app: App + model_config = ConfigDict(arbitrary_types_allowed=True) diff --git a/components/export_outputs.py b/components/export_outputs.py index 38332bb..16d4770 100644 --- a/components/export_outputs.py +++ b/components/export_outputs.py @@ -1,56 +1,44 @@ import os -from typing import Optional -import polars as pl -from pydantic import BaseModel - -from analyzer_interface import ( - AnalyzerInterface, - AnalyzerOutput, - SecondaryAnalyzerInterface, -) -from analyzer_interface.suite import AnalyzerSuite -from storage import AnalysisModel, Storage, SupportedOutputExtension +from app import AnalysisContext, AnalysisOutputContext +from storage import SupportedOutputExtension from terminal_tools import ( ProgressReporter, open_directory_explorer, prompts, wait_for_key, ) -from terminal_tools.inception import TerminalContext from terminal_tools.progress import ProgressReporter +from .context import ViewContext -def export_outputs( - context: TerminalContext, - storage: Storage, - suite: AnalyzerSuite, - analysis: AnalysisModel, - *, - all=False, -): - analyzer = suite.get_primary_analyzer(analysis.primary_analyzer_id) - with context.nest("[Export Output]\n\n") as scope: + +def export_outputs(context: ViewContext, analysis: AnalysisContext): + terminal = context.terminal + with terminal.nest("[Export Output]\n\n") as scope: outputs = sorted( - get_all_exportable_outputs(storage, suite, analysis), + analysis.get_all_exportable_outputs(), key=lambda output: ( - "0" if output.secondary is None else "1_" + output.secondary.name, - output.output.name, + ( + "0" + if output.secondary_spec is None + else "1_" + output.secondary_spec.name + ), + output.descriptive_qualified_name, ), ) - if all: - selected_outputs = outputs - else: - output_options = [(output.name, output) for output in outputs] - if not output_options: - print("There are no outputs for this analysis") - wait_for_key(True) - return + output_options = [ + (output.descriptive_qualified_name, output) for output in outputs + ] + if not output_options: + print("There are no outputs for this analysis") + wait_for_key(True) + return - selected_outputs: list[Output] = prompts.checkbox( - "Choose output(s) to export", choices=output_options - ) + selected_outputs: list[AnalysisOutputContext] = prompts.checkbox( + "Choose output(s) to export", choices=output_options + ) if not selected_outputs: print("Export cancelled") @@ -65,22 +53,19 @@ def export_outputs( return scope.refresh() - export_outputs_sequence(storage, analysis, selected_outputs, format) + export_outputs_sequence(context, analysis, selected_outputs, format) def export_outputs_sequence( - storage: Storage, - analysis: AnalysisModel, - selected_outputs: list["Output"], + context: ViewContext, + analysis: AnalysisContext, + selected_outputs: list[AnalysisOutputContext], format: SupportedOutputExtension, ): - has_large_dfs = any( - output.height(analysis, storage) > 50_000 for output in selected_outputs - ) + has_large_dfs = any(output.num_rows > 50_000 for output in selected_outputs) - export_chunk_size = None + settings = context.app.context.settings if has_large_dfs: - settings = storage.get_settings() if settings.export_chunk_size is None: print(f"Some of your exports will have more than 50,000 rows.") print(f"Let's take a moment to consider how you would like to proceed.") @@ -106,26 +91,19 @@ def export_outputs_sequence( ) if export_chunk_size is None: continue - - storage.save_settings(export_chunk_size=export_chunk_size) + settings.set_export_chunk_size(export_chunk_size) break if chunk_action == "whole": - storage.save_settings(export_chunk_size=False) + settings.set_export_chunk_size(False) break - else: - export_chunk_size = settings.export_chunk_size or None - print("Beginning export...") for selected_output in selected_outputs: - with ProgressReporter(f"Exporting {selected_output.name}") as progress: - export_progress = selected_output.export( - analysis, - storage, - format=format, - export_chunk_size=export_chunk_size, - ) + with ProgressReporter( + f"Exporting {selected_output.descriptive_qualified_name}" + ) as progress: + export_progress = selected_output.export(format=format) try: while True: progress.update(next(export_progress)) @@ -138,7 +116,7 @@ def export_outputs_sequence( if prompts.confirm( "Would you like to open the containing directory?", default=True ): - open_directory_explorer(storage._get_project_exports_root_path(analysis)) + open_directory_explorer(analysis.export_root_path) print("Directory opened") else: print("All done!") @@ -156,81 +134,3 @@ def export_format_prompt(): ("(Back)", None), ], ) - - -def get_all_exportable_outputs( - storage: Storage, suite: AnalyzerSuite, analysis: AnalysisModel -): - analyzer = suite.get_primary_analyzer(analysis.primary_analyzer_id) - return [ - *( - Output(output=output, secondary=None) - for output in analyzer.outputs - if not output.internal - ), - *( - Output(output=output, secondary=secondary) - for secondary_id in storage.list_secondary_analyses(analysis) - if ( - secondary := suite.get_secondary_analyzer_by_id( - analysis.primary_analyzer_id, secondary_id - ) - ) - is not None - for output in secondary.outputs - if not output.internal - ), - ] - - -class Output(BaseModel): - output: AnalyzerOutput - secondary: Optional[SecondaryAnalyzerInterface] - - @property - def name(self): - return ( - f"{self.output.name} ({self.secondary.name if self.secondary else 'Base'})" - ) - - def export( - self, - analysis: AnalysisModel, - storage: Storage, - *, - format: SupportedOutputExtension, - export_chunk_size: Optional[int] = None, - ): - if self.secondary is None: - return storage.export_project_primary_output( - analysis, - self.output.id, - extension=format, - spec=self.output, - export_chunk_size=export_chunk_size, - ) - else: - return storage.export_project_secondary_output( - analysis, - self.secondary.id, - self.output.id, - extension=format, - spec=self.output, - export_chunk_size=export_chunk_size, - ) - - def height(self, analysis: AnalysisModel, storage: Storage): - if self.secondary is None: - return self.df_height( - storage.get_primary_output_parquet_path(analysis, self.output.id) - ) - else: - return self.df_height( - storage.get_secondary_output_parquet_path( - analysis, self.secondary.id, self.output.id - ) - ) - - @staticmethod - def df_height(path: str) -> int: - return pl.scan_parquet(path).select(pl.len()).collect().item() diff --git a/components/main_menu.py b/components/main_menu.py index 7cc0950..af2aa43 100644 --- a/components/main_menu.py +++ b/components/main_menu.py @@ -1,21 +1,20 @@ from sys import exit -from analyzer_interface.suite import AnalyzerSuite -from storage import Storage from terminal_tools import draw_box, prompts -from terminal_tools.inception import TerminalContext from .analysis_main import analysis_main +from .context import ViewContext from .new_analysis import new_analysis from .new_project import new_project from .project_main import project_main from .select_project import select_project -def main_menu(context: TerminalContext, storage: Storage, suite: AnalyzerSuite): +def main_menu(context: ViewContext): + terminal = context.terminal while True: exit_instruction = "⟪ Hit Ctrl+C at any time to exit a menu ⟫" - with context.nest(draw_box("CIB Mango Tree") + "\n" + exit_instruction + "\n"): + with terminal.nest(draw_box("CIB Mango Tree") + "\n" + exit_instruction + "\n"): action = prompts.list_input( "What would you like to do?", choices=[ @@ -30,26 +29,26 @@ def main_menu(context: TerminalContext, storage: Storage, suite: AnalyzerSuite): exit(0) if action == "new_project": - with context.nest( + with terminal.nest( draw_box("CIB Mango Tree: New Dataset") + "\n" + exit_instruction + "\n" ): - project = new_project(context, storage) + project = new_project(context) if project is not None: - analysis = new_analysis(context, storage, suite, project) + analysis = new_analysis(context, project) if analysis is not None: - analysis_main(context, storage, suite, analysis) - project_main(context, storage, suite, project) + analysis_main(context, analysis) + project_main(context, project) continue if action == "load_project": - with context.nest( + with terminal.nest( draw_box("CIB Mango Tree: Load Dataset") + "\n" + exit_instruction + "\n" ): - project = select_project(context, storage) + project = select_project(context) if project is not None: - project_main(context, storage, suite, project) + project_main(context, project) continue diff --git a/components/new_analysis.py b/components/new_analysis.py index 97929b9..4c45baf 100644 --- a/components/new_analysis.py +++ b/components/new_analysis.py @@ -1,4 +1,3 @@ -import tempfile from traceback import format_exc from typing import Optional @@ -11,35 +10,27 @@ column_automap, get_data_type_compatibility_score, ) -from analyzer_interface.suite import AnalyzerSuite -from context import ( - InputColumnProvider, - PrimaryAnalyzerContext, - SecondaryAnalyzerContext, -) -from storage import Project, Storage +from app import ProjectContext from terminal_tools import draw_box, print_ascii_table, prompts, wait_for_key -from terminal_tools.inception import TerminalContext -from .export_outputs import ( - export_format_prompt, - export_outputs_sequence, - get_all_exportable_outputs, -) -from .utils import get_user_columns +from .context import ViewContext +from .export_outputs import export_format_prompt, export_outputs_sequence def new_analysis( - context: TerminalContext, storage: Storage, suite: AnalyzerSuite, project: Project + context: ViewContext, + project: ProjectContext, ): - with context.nest(draw_box("Choose a test", padding_lines=0)): + terminal = context.terminal + analyzers = context.app.context.suite.primary_anlyzers + with terminal.nest(draw_box("Choose a test", padding_lines=0)): analyzer: Optional[AnalyzerInterface] = prompts.list_input( "Which test?", choices=[ ("(Back)", None), *( (f"{analyzer.name} ({analyzer.short_description})", analyzer) - for analyzer in suite.primary_anlyzers + for analyzer in analyzers ), ], ) @@ -47,8 +38,8 @@ def new_analysis( if analyzer is None: return - with context.nest(draw_box(analyzer.name, padding_lines=0)): - with context.nest("◆◆ About this test ◆◆"): + with terminal.nest(draw_box(analyzer.name, padding_lines=0)): + with terminal.nest("◆◆ About this test ◆◆"): print("") print(analyzer.long_description or analyzer.short_description) @@ -64,8 +55,7 @@ def new_analysis( print(input_column.description or "") print("") - sample_project_df = storage.load_project_input(project.id, n_records=100) - user_columns = get_user_columns(sample_project_df) + user_columns = project.columns user_columns_by_name = { user_column.name: user_column for user_column in user_columns } @@ -98,7 +88,7 @@ def new_analysis( final_column_mapping = draft_column_mapping while True: - with context.nest("Column mapping") as column_mapping_scope: + with terminal.nest("Column mapping") as column_mapping_scope: print_ascii_table( rows=[ [ @@ -198,52 +188,18 @@ def new_analysis( selected_user_column.name ) - analysis = storage.init_analysis( - project.id, - analyzer.name, - analyzer.id, - final_column_mapping, - ) + analysis = project.create_analysis(analyzer.id, final_column_mapping) - with context.nest("Analysis") as run_scope: + with terminal.nest("Analysis") as run_scope: is_export_started = False try: - print("Starting base analysis for the test...") - with tempfile.TemporaryDirectory() as temp_dir: - analyzer_context = PrimaryAnalyzerContext( - analysis=analysis, - analyzer=analyzer, - store=storage, - temp_dir=temp_dir, - input_columns={ - analyzer_column_name: InputColumnProvider( - user_column_name=user_column_name, - semantic=user_columns_by_name[ - user_column_name - ].semantic, - ) - for analyzer_column_name, user_column_name in final_column_mapping.items() - }, - ) - analyzer_context.prepare() - analyzer.entry_point(analyzer_context) - - for secondary in suite.find_toposorted_secondary_analyzers(analyzer): - run_scope.refresh() - print("Running post-analysis: ", secondary.name) - - with tempfile.TemporaryDirectory() as temp_dir: - secondary_context = SecondaryAnalyzerContext( - analysis=analysis, - secondary_analyzer=secondary, - temp_dir=temp_dir, - store=storage, - ) - secondary_context.prepare() - secondary.entry_point(secondary_context) - - analysis.is_draft = False - storage.save_analysis(analysis) + for event in analysis.run(): + if event.event == "start": + run_scope.refresh() + if event.analyzer.kind == "primary": + print("Starting base analysis for the test...") + else: + print("Running post-analysis: ", event.analyzer.name) run_scope.refresh() print("The test is complete.") @@ -253,14 +209,15 @@ def new_analysis( print( "You can rename it now if you wish. Or just hit enter to continue." ) - analysis.display_name = ( + new_name = ( prompts.text("Analysis name", default=analyzer.name) or "" - ).strip() or analysis.display_name - storage.save_analysis(analysis) + ).strip() + if new_name: + analysis.rename(new_name) print("") - outputs = get_all_exportable_outputs(storage, suite, analysis) + outputs = analysis.get_all_exportable_outputs() print("You now have the option to export the following outputs:") for output in outputs: print("- " + output.name) @@ -274,7 +231,7 @@ def new_analysis( wait_for_key(True) else: is_export_started = True - export_outputs_sequence(storage, analysis, outputs, export_format) + export_outputs_sequence(context, analysis, outputs, export_format) return analysis @@ -314,4 +271,4 @@ def new_analysis( finally: if analysis.is_draft: - storage.delete_analysis(analysis) + analysis.delete() diff --git a/components/new_project.py b/components/new_project.py index c8711cc..05573cd 100644 --- a/components/new_project.py +++ b/components/new_project.py @@ -1,25 +1,28 @@ import os -import tempfile from traceback import format_exc from typing import Optional from importing import Importer, ImporterSession, importers -from storage import Project, Storage from terminal_tools import draw_box, prompts, wait_for_key -from terminal_tools.inception import Scope, TerminalContext +from terminal_tools.inception import Scope +from .context import ViewContext -def new_project(context: TerminalContext, storage: Storage): - with context.nest(draw_box("1. Data Source", padding_lines=0)): + +def new_project(context: ViewContext): + app = context.app + terminal = context.terminal + + with terminal.nest(draw_box("1. Data Source", padding_lines=0)): print("Select a file for your dataset") selected_file = prompts.file_selector( - "Select a file", state=storage.file_selector_state + "Select a file", state=app.file_selector_state ) if selected_file is None: print("Canceled") return wait_for_key(True) - with context.nest(draw_box("2. Import Options", padding_lines=0)) as scope: + with terminal.nest(draw_box("2. Import Options", padding_lines=0)) as scope: importer: Optional[ImporterSession] = importer_flow( selected_file, importers, scope ) @@ -27,25 +30,20 @@ def new_project(context: TerminalContext, storage: Storage): print("Canceled") return wait_for_key(True) - with context.nest(draw_box("3. Naming", padding_lines=0)): + with terminal.nest(draw_box("3. Naming", padding_lines=0)): print( "Rename the dataset if you wish. This is how the dataset will appear when you try to load it again." ) suggested_project_name = os.path.splitext(os.path.basename(selected_file))[0] project_name = prompts.text("Name", default=suggested_project_name) - with context.nest(draw_box("4. Import", padding_lines=0)): + with terminal.nest(draw_box("4. Import", padding_lines=0)): print("Please wait as the dataset is imported...") - with tempfile.NamedTemporaryFile(delete=False) as temp_file: - importer.import_as_parquet(temp_file.name) - - project = storage.init_project( - display_name=project_name, input_temp_file=temp_file.name - ) + project = app.create_project(name=project_name, importer_session=importer) print("Dataset successfully imported!") wait_for_key(True) - return Project(id=project.id, display_name=project.display_name) + return project def importer_flow( diff --git a/components/project_main.py b/components/project_main.py index e31afaa..32999e7 100644 --- a/components/project_main.py +++ b/components/project_main.py @@ -1,20 +1,21 @@ from colorama import Fore -from analyzer_interface.suite import AnalyzerSuite -from storage import Project, Storage +from app import ProjectContext from terminal_tools import draw_box, prompts, wait_for_key -from terminal_tools.inception import TerminalContext from .analysis_main import analysis_main +from .context import ViewContext from .new_analysis import new_analysis from .select_analysis import select_analysis def project_main( - context: TerminalContext, storage: Storage, suite: AnalyzerSuite, project: Project + context: ViewContext, + project: ProjectContext, ): + terminal = context.terminal while True: - with context.nest( + with terminal.nest( draw_box(f"CIB Mango Tree/Dataset: {project.display_name}", padding_lines=0) ): action = prompts.list_input( @@ -32,15 +33,15 @@ def project_main( return if action == "new_analysis": - analysis = new_analysis(context, storage, suite, project) + analysis = new_analysis(context, project) if analysis is not None: - analysis_main(context, storage, suite, analysis) + analysis_main(context, analysis) continue if action == "select_analysis": - analysis = select_analysis(context, storage, suite, project) + analysis = select_analysis(project) if analysis is not None: - analysis_main(context, storage, suite, analysis) + analysis_main(context, analysis) continue if action == "delete_project": @@ -66,7 +67,7 @@ def project_main( wait_for_key(True) continue - storage.delete_project(project.id) + project.delete() print("🔥 Dataset deleted.") wait_for_key(True) return @@ -78,8 +79,7 @@ def project_main( wait_for_key(True) continue - storage.rename_project(project.id, new_name) - project.display_name = new_name + project.rename(new_name) print("🔥 Dataset renamed.") wait_for_key(True) continue diff --git a/components/select_analysis.py b/components/select_analysis.py index 17e3d1b..28c5b32 100644 --- a/components/select_analysis.py +++ b/components/select_analysis.py @@ -1,15 +1,11 @@ from datetime import datetime from typing import Optional -from analyzer_interface.suite import AnalyzerSuite -from storage import AnalysisModel, Project, Storage +from app import AnalysisContext, ProjectContext from terminal_tools import prompts, wait_for_key -from terminal_tools.inception import TerminalContext -def select_analysis( - context: TerminalContext, storage: Storage, suite: AnalyzerSuite, project: Project -) -> Optional[AnalysisModel]: +def select_analysis(proj: ProjectContext) -> Optional[AnalysisContext]: now = datetime.now() analysis_options = sorted( [ @@ -17,8 +13,7 @@ def select_analysis( analysis_label(analysis, now), analysis, ) - for analysis in storage.list_project_analyses(project.id) - if suite.get_primary_analyzer(analysis.primary_analyzer_id) is not None + for analysis in proj.list_analyses() ], key=lambda option: option[0], ) @@ -27,7 +22,7 @@ def select_analysis( wait_for_key(True) return None - option: Optional[AnalysisModel] = prompts.list_input( + option: Optional[AnalysisContext] = prompts.list_input( "Choose a previously run test to view", choices=[ ("(Back)", None), @@ -37,20 +32,15 @@ def select_analysis( return option -def analysis_label(analysis: AnalysisModel, now: datetime) -> str: - create_time = analysis.create_time() +def analysis_label(analysis: AnalysisContext, now: datetime) -> str: timestamp_suffix = ( - " (" + present_timestamp(create_time, now) + ")" - if create_time is not None + " (" + present_timestamp(analysis.create_time, now) + ")" + if analysis.create_time is not None else "" ) return f"{analysis.display_name}{timestamp_suffix}" -def present_timestamp(timestamp: datetime, now: datetime): - return timestamp.strftime("%Y-%m-%d %H:%M:%S") - - def present_timestamp(d: datetime, now: datetime): diff = now - d s = diff.seconds diff --git a/components/select_project.py b/components/select_project.py index 9b8832f..936f2b4 100644 --- a/components/select_project.py +++ b/components/select_project.py @@ -1,20 +1,24 @@ -from storage import Project, Storage -from terminal_tools import draw_box, prompts, wait_for_key -from terminal_tools.inception import TerminalContext +from typing import Optional -from .utils import input_preview +from app import ProjectContext +from terminal_tools import draw_box, print_ascii_table, prompts, wait_for_key +from .context import ViewContext + + +def select_project(ctx: ViewContext): + terminal = ctx.terminal + app = ctx.app -def select_project(context: TerminalContext, storage: Storage): while True: - with context.nest(draw_box("Choose a project", padding_lines=0)): - projects = storage.list_projects() + with terminal.nest(draw_box("Choose a project", padding_lines=0)): + projects = app.list_projects() if not projects: print("There are no previously created projects.") wait_for_key(True) return None - project = prompts.list_input( + project: Optional[ProjectContext] = prompts.list_input( "Which project?", choices=[(project.display_name, project) for project in projects], ) @@ -22,12 +26,36 @@ def select_project(context: TerminalContext, storage: Storage): if project is None: return None - with context.nest( + with terminal.nest( draw_box(f"Project: {project.display_name}", padding_lines=0) ): - df = storage.load_project_input(project.id, n_records=100) - table_stats = storage.get_project_input_stats(project.id) - input_preview(df, table_stats) + df = project.preview_data + print_ascii_table( + [ + [preview_value(cell) for cell in row] + for row in df.head(10).iter_rows() + ], + header=df.columns, + ) + print(f"(Total {project.data_row_count} rows)") + print("Inferred column semantics:") + print_ascii_table( + rows=[ + [col.name, col.semantic.semantic_name] for col in project.columns + ], + header=["Column", "Semantic"], + ) + confirm_load = prompts.confirm("Load this project?", default=True) if confirm_load: - return Project(id=project.id, display_name=project.display_name) + return project + + +def preview_value(value): + if isinstance(value, str): + if len(value) > 20: + return value[:20] + "..." + return value + if value is None: + return "(N/A)" + return value diff --git a/components/utils.py b/components/utils.py deleted file mode 100644 index f780e52..0000000 --- a/components/utils.py +++ /dev/null @@ -1,63 +0,0 @@ -from typing import Optional - -import polars as pl - -from analyzer_interface import UserInputColumn as BaseUserInputColumn -from preprocessing.series_semantic import SeriesSemantic, infer_series_semantic -from storage import TableStats -from terminal_tools import print_ascii_table - - -class UserInputColumn(BaseUserInputColumn): - semantic: SeriesSemantic - data: pl.Series - - def head(self, n: int = 10): - return UserInputColumn( - name=self.name, - data_type=self.data_type, - semantic=self.semantic, - data=self.data.head(n), - ) - - def apply_semantic_transform(self): - return self.semantic.try_convert(self.data) - - class Config: - arbitrary_types_allowed = True - - -def input_preview(df: pl.DataFrame, stats: Optional[TableStats] = None): - user_columns = get_user_columns(df) - print_ascii_table( - [[preview_value(cell) for cell in row] for row in df.head(10).iter_rows()], - header=df.columns, - ) - if stats is not None and stats.num_rows > df.height: - print(f"(Total {stats.num_rows} rows)") - - print("Inferred column semantics:") - print_ascii_table( - rows=[[col.name, col.semantic.semantic_name] for col in user_columns], - header=["Column", "Semantic"], - ) - - -def preview_value(value): - if isinstance(value, str): - if len(value) > 20: - return value[:20] + "..." - return value - if value is None: - return "(N/A)" - return value - - -def get_user_columns(df: pl.DataFrame): - return [ - UserInputColumn( - name=col, data_type=semantic.data_type, semantic=semantic, data=df[col] - ) - for col in df.columns - if (semantic := infer_series_semantic(df[col])) is not None - ] diff --git a/mangotango.py b/mangotango.py index 9286714..b0dc519 100644 --- a/mangotango.py +++ b/mangotango.py @@ -1,7 +1,8 @@ from multiprocessing import freeze_support from analyzers import suite -from components import main_menu, splash +from app import App, AppContext +from components import ViewContext, main_menu, splash from storage import Storage from terminal_tools import enable_windows_ansi_support from terminal_tools.inception import TerminalContext @@ -12,4 +13,9 @@ storage = Storage(app_name="MangoTango", app_author="Civic Tech DC") splash() - main_menu(TerminalContext(), storage, suite) + main_menu( + ViewContext( + terminal=TerminalContext(), + app=App(context=AppContext(storage=storage, suite=suite)), + ) + ) diff --git a/storage/__init__.py b/storage/__init__.py index 1e8aad5..4c3e855 100644 --- a/storage/__init__.py +++ b/storage/__init__.py @@ -17,13 +17,13 @@ from .file_selector import FileSelectorStateManager -class Project(BaseModel): +class ProjectModel(BaseModel): class_: Literal["project"] = "project" id: str display_name: str -class Settings(BaseModel): +class SettingsModel(BaseModel): class_: Literal["settings"] = "settings" export_chunk_size: Optional[int | Literal[False]] = None @@ -42,7 +42,7 @@ class AnalysisModel(BaseModel): path: str column_mapping: Optional[dict[str, str]] = None create_timestamp: Optional[float] = None - is_draft: Optional[bool] = False + is_draft: bool = False def create_time(self): return ( @@ -72,7 +72,7 @@ def __init__(self, *, app_name: str, app_author: str): def init_project(self, *, display_name: str, input_temp_file: str): with self._lock_database(): project_id = self._find_unique_project_id(display_name) - project = Project(id=project_id, display_name=display_name) + project = ProjectModel(id=project_id, display_name=display_name) self.db.insert(project.model_dump()) project_dir = self._get_project_path(project_id) @@ -85,7 +85,7 @@ def list_projects(self): q = Query() projects = self.db.search(q["class_"] == "project") return sorted( - (Project(**project) for project in projects), + (ProjectModel(**project) for project in projects), key=lambda project: project.display_name, ) @@ -93,7 +93,7 @@ def get_project(self, project_id: str): q = Query() project = self.db.search((q["class_"] == "project") & (q["id"] == project_id)) if project: - return Project(**project[0]) + return ProjectModel(**project[0]) return None def delete_project(self, project_id: str): @@ -265,13 +265,19 @@ def _export_output( spec: AnalyzerOutput, export_chunk_size: Optional[int] = None, ): - if not export_chunk_size: + with pq.ParquetFile(input_path) as reader: + num_chunks = ( + math.ceil(reader.metadata.num_rows / export_chunk_size) + if export_chunk_size + else 1 + ) + + if num_chunks == 1: df = pl.scan_parquet(input_path) self._save_output(output_path, spec.transform_output(df), extension) return f"{output_path}.{extension}" with pq.ParquetFile(input_path) as reader: - num_chunks = math.ceil(reader.metadata.num_rows / export_chunk_size) get_batches = ( df for batch in reader.iter_batches() @@ -471,17 +477,19 @@ def _get_settings(self): q = Query() settings = self.db.search(q["class_"] == "settings") if settings: - return Settings(**settings[0]) - return Settings() + return SettingsModel(**settings[0]) + return SettingsModel() def save_settings(self, **kwargs): with self._lock_database(): q = Query() settings = self._get_settings() - new_settings = Settings( + new_settings = SettingsModel( **{ **settings.model_dump(), - **kwargs, + **{ + key: value for key, value in kwargs.items() if value is not None + }, } ) self.db.upsert(new_settings.model_dump(), q["class_"] == "settings")