Skip to content

Commit

Permalink
finished scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Al-Saffar committed Nov 29, 2023
1 parent 80b0031 commit 66da564
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 21 deletions.
5 changes: 4 additions & 1 deletion myresources/crocodile/environment.py
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,10 @@ def append_permanently(path: str, scope: Literal["User", "system"] = "User"):
command = fr'[Environment]::SetEnvironmentVariable("Path", $env:PATH + ";{path}", "{scope}")'
result = backup + command
return result # if run is False else tm.run(result, shell="powershell").print()
else: tb.P.home().joinpath(".bashrc").append_text(f"export PATH='{path}:$PATH'")
else:
file = tb.P.home().joinpath(".bashrc")
txt = file.read_text()
file.write_text(txt + f"\nexport PATH='{path}:$PATH'", encoding="utf-8")

@staticmethod
def set_permanetly(path: str, scope: Literal["User", "system"] = "User"):
Expand Down
2 changes: 1 addition & 1 deletion myresources/crocodile/file_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ def yaml(path: PLike, r: bool = False) -> Any: # return could be list or dict e
_ = r
return mydict
def ini(path: PLike):
if not Path(path).exists(): raise FileNotFoundError(f"File not found: {path}")
if not Path(path).exists() or Path(path).is_dir(): raise FileNotFoundError(f"File not found: {path}")
import configparser; res = configparser.ConfigParser(); res.read(filenames=[str(path)]); return res
def toml(path: PLike): return install_n_import("tomli").loads(P(path).read_text())
def npy(path: PLike, **kwargs: Any):
Expand Down
37 changes: 18 additions & 19 deletions myresources/crocodile/meta.py
Original file line number Diff line number Diff line change
Expand Up @@ -176,11 +176,8 @@ def run_script(self, script: str, shell: SHELLS = "default", verbose: bool = Fal
full_command: Union[list[str], str] = [start_cmd, str(script_file)] # shell=True will cause this to be a string anyway (with space separation)
else:
start_cmd = "bash"
full_command = f"{start_cmd} {script_file}"
# full_command = [start_cmd, str(script_file)]
else:
# full_command = [shell, str(tmp_file)]
full_command = f"{shell} {script_file}"
full_command = f"{start_cmd} {script_file}" # full_command = [start_cmd, str(script_file)]
else: full_command = f"{shell} {script_file}" # full_command = [shell, str(tmp_file)]
if verbose:
from machineconfig.utils.utils import print_code
print_code(code=script, lexer="shell", desc="Script to be executed:")
Expand All @@ -190,17 +187,6 @@ def run_script(self, script: str, shell: SHELLS = "default", verbose: bool = Fal
resp = subprocess.run(full_command, stderr=self.stderr, stdin=self.stdin, stdout=self.stdout, text=True, shell=True, check=False)
else: resp = subprocess.run(full_command, stderr=self.stderr, stdin=self.stdin, stdout=self.stdout, text=True, shell=True, check=False)
return Response.from_completed_process(resp)
@staticmethod
def is_user_admin() -> bool: # adopted from: https://stackoverflow.com/questions/19672352/how-to-run-script-with-elevated-privilege-on-windows"""
if __import__('os').name == 'nt':
try: return __import__("ctypes").windll.shell32.IsUserAnAdmin()
except Exception: import traceback; traceback.print_exc(); print("Admin check failed, assuming not an admin."); return False
else: return __import__('os').getuid() == 0 # Check for root on Posix
@staticmethod
def run_as_admin(file: PLike, params: Any, wait: bool = False):
proce_info = install_n_import("win32com", fromlist=["shell.shell.ShellExecuteEx"]).shell.shell.ShellExecuteEx(lpVerb='runas', lpFile=file, lpParameters=params)
if wait: time.sleep(1)
return proce_info
def run_async(self, *cmds: str, new_window: bool = True, shell: Optional[str] = None, terminal: Optional[str] = None): # Runs SYSTEM commands like subprocess.Popen
"""Opens a new terminal, and let it run asynchronously. Maintaining an ongoing conversation with another process is very hard. It is adviseable to run all
commands in one go without interaction with an ongoing channel. Use this only for the purpose of producing a different window and humanly interact with it. Reference: https://stackoverflow.com/questions/54060274/dynamic-communication-between-main-and-subprocess-in-python & https://www.youtube.com/watch?v=IynV6Y80vws and https://www.oreilly.com/library/view/windows-powershell-cookbook/9781449359195/ch01.html"""
Expand All @@ -223,7 +209,20 @@ def run_py(self, script: str, wdir: OPLike = None, interactive: bool = True, ipy
if shell is None and self.machine == "Windows": shell = "pwsh"
window = "start" if new_window and self.machine == "Windows" else ""
os.system(f"{window} {terminal} {shell} {shell_script}")
pickle_to_new_session = staticmethod(lambda obj, cmd="": Terminal().run_py(f"""path = tb.P(r'{Save.pickle(obj=obj, path=P.tmpfile(tstamp=False, suffix=".pkl"), verbose=False)}')\n obj = path.readit()\npath.delete(sure=True, verbose=False)\n {cmd}"""))
@staticmethod
def is_user_admin() -> bool: # adopted from: https://stackoverflow.com/questions/19672352/how-to-run-script-with-elevated-privilege-on-windows"""
if __import__('os').name == 'nt':
try: return __import__("ctypes").windll.shell32.IsUserAnAdmin()
except Exception: import traceback; traceback.print_exc(); print("Admin check failed, assuming not an admin."); return False
else: return __import__('os').getuid() == 0 # Check for root on Posix
@staticmethod
def run_as_admin(file: PLike, params: Any, wait: bool = False):
proce_info = install_n_import("win32com", fromlist=["shell.shell.ShellExecuteEx"]).shell.shell.ShellExecuteEx(lpVerb='runas', lpFile=file, lpParameters=params)
if wait: time.sleep(1)
return proce_info
@staticmethod
def pickle_to_new_session(obj: Any, cmd: str = ""):
return Terminal().run_py(f"""path = tb.P(r'{Save.pickle(obj=obj, path=P.tmpfile(tstamp=False, suffix=".pkl"), verbose=False)}')\n obj = path.readit()\npath.delete(sure=True, verbose=False)\n {cmd}""")
@staticmethod
def import_to_new_session(func: Union[None, Callable[[Any], Any]] = None, cmd: str = "", header: bool = True, interactive: bool = True, ipython: bool = True, run: bool = False, **kwargs: Any):
load_kwargs_string = f"""kwargs = tb.P(r'{Save.pickle(obj=kwargs, path=P.tmpfile(tstamp=False, suffix=".pkl"), verbose=False)}').readit()\nkwargs.print()\n""" if kwargs else "\n"
Expand Down Expand Up @@ -257,7 +256,7 @@ def __init__(self, host: Optional[str] = None, username: Optional[str] = None, h
self.port: int = port
self.proxycommand: Optional[str] = None
import platform
import paramiko
import paramiko # type: ignore
# username, hostname = __import__("getpass").getuser(), platform.node()
if isinstance(host, str):
try:
Expand Down Expand Up @@ -441,7 +440,7 @@ def run(self, max_cycles: Optional[int] = None, until: str = "2050-01-01"):
try: self.routine(self)
except Exception as ex: self.exception_handler(ex, "routine", self) # 2- Perform logic
time_left = int(self.wait - (datetime.now() - time1).total_seconds()) # 4- Conclude Message
self.cycle += 1; self.logger.warning(f"Finishing Cycle {str(self.cycle - 1).zfill(5)} in {str(datetime.now() - time1).split('.')[0]}. Sleeping for {self.wait}s ({time_left}s left)\n" + "-" * 100)
self.cycle += 1; self.logger.warning(f"Finishing Cycle {str(self.cycle - 1).zfill(5)} in {str(datetime.now() - time1).split('.', maxsplit=1)[0]}. Sleeping for {self.wait}s ({time_left}s left)\n" + "-" * 100)
try: time.sleep(time_left if time_left > 0 else 0.1) # # 5- Sleep. consider replacing by Asyncio.sleep
except KeyboardInterrupt as ex: self.exception_handler(ex, "sleep", self); return # that's probably the only kind of exception that can rise during sleep.
self.record_session_end(reason=f"Reached maximum number of cycles ({self.max_cycles})" if self.cycle >= self.max_cycles else f"Reached due stop time ({until})")
Expand Down

0 comments on commit 66da564

Please sign in to comment.