Skip to content

Commit

Permalink
Reasonably clean SUTL pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
pothiers committed Dec 9, 2024
1 parent 77ec3e9 commit 5277a2e
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 138 deletions.
44 changes: 4 additions & 40 deletions notebooks_tsqr/NightLog.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
"\n",
"# day_obs values: TODAY, v, YYYY-MM-DD\n",
"# Report on observing nights that start upto but not included this day.\n",
"#!day_obs = '2024-09-25' # Value to use for local testing (Summit)\n",
"day_obs = \"2024-12-02\" # TODO Change to 'YESTERDAY' and 'TODAY' to test with default before push\n",
"# day_obs = '2024-09-25' # 2024-12-05 Value to use for local testing (Summit)\n",
"day_obs = \"2024-12-05\" # TODO Change to 'YESTERDAY' and 'TODAY' to test with default before push\n",
"\n",
"# Total number of days of data to display (ending on day_obs)\n",
"number_of_days = \"1\" # TODO Change to '1' to test with default before push\n",
Expand Down Expand Up @@ -488,7 +488,7 @@
"metadata": {},
"source": [
"## Merged time-log with compaction <font style=\"background-color:yellow; color:black; font-size:20px\">DRAFT</font>\n",
"This merges the time-line of records from several sources (currently: `NightReport`, `ExposureLog`, `NarrativeLog`). The resulting DataFrame is Compacted and Reduced into time bins of a specified duration.\n",
"This merges the time-line of records from several sources (currently: `NightReport`, `ExposureLog`, `NarrativeLog`, `Almanac`). The resulting DataFrame is Compacted and Reduced into time bins of a specified duration.\n",
"\n",
"#### <font style=\"background-color:#ffff99\">Possible Changes</font>\n",
"- [ ] Column specific width (and formatting in general)\n",
Expand All @@ -508,43 +508,7 @@
"metadata": {},
"outputs": [],
"source": [
"import lsst.ts.logging_and_reporting.time_logs as tl\n",
"import lsst.ts.logging_and_reporting.views as views\n",
"import lsst.ts.logging_and_reporting.templates as templates\n",
"from importlib import reload\n",
"\n",
"reload(templates)\n",
"reload(views)\n",
"reload(tl)\n",
"\n",
"full_df = tl.merge_sources(allsrc)\n",
"md(f\"### Full DataFrame: {full_df.shape[0]} rows, {full_df.shape[1]} columns\")\n",
"md(f'Columns: {\", \".join(full_df.columns.to_list())}')\n",
"display(full_df)\n",
"\n",
"delta = \"4h\"\n",
"compact_df = tl.compact(full_df, allow_data_loss=False, delta=delta)\n",
"md(\n",
" f\"### Compacted DataFrame {delta=}: {compact_df.shape[0]} rows, {compact_df.shape[1]} columns\"\n",
")\n",
"md(f'Columns: {\", \".join(compact_df.columns.to_list())}')\n",
"# display(HTML(compact_df.to_html()))\n",
"display(compact_df)\n",
"\n",
"reduced_df = tl.reduce_period(compact_df)\n",
"md(f\"### Reduced DataFrame: {reduced_df.shape[0]} rows, {reduced_df.shape[1]} columns\")\n",
"md(f'Columns: {\", \".join(reduced_df.columns.to_list())}')\n",
"display(reduced_df)\n",
"\n",
"# display(df['message_text_NAR'].str.join('<br>').style)\n",
"\n",
"# df.style.set_properties(subset=['Period'], **{'vertical-align': 'text-top','width': '300px','text-align': 'center',})\n",
"# df.style.set_properties(subset=pd.IndexSlice[:, ['']], **{'text-align': 'left'})\n",
"# display(df.style.set_properties(**{'vertical-align': 'text-top', 'text-align': 'left','white-space': 'pre-wrap',}))\n",
"\n",
"# df.style.pipe(tl.sutl_style) # Not bad\n",
"display(HTML(views.render_reduced_df(reduced_df)))\n",
"#!print(tl.render_df(reduced_df))"
"display(HTML(tl.sutl(allsrc, delta=\"3h\")))"
]
},
{
Expand Down
46 changes: 33 additions & 13 deletions python/lsst/ts/logging_and_reporting/all_sources.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def __str__(self):

def __repr__(self):
return (
f"AllSources(server_usr={self.server_url!r}, "
f"AllSources(server_url={self.server_url!r}, "
f"min_dayobs={self.min_dayobs!r}, "
f"max_dayobs={self.max_dayobs!r})"
)
Expand All @@ -116,13 +116,11 @@ def __repr__(self):
# pandas.timedelta_range
#
# alm_df, nig_df, exp_df, nar_df = allsrc.get_sources_time_logs()
def get_sources_time_logs(self):
def get_sources_time_logs(self, verbose=False):
"""A time_log is a DF ordered and indexed with DatetimeIndex."""

# Convert datefld column to Timestamp in "Timestamp" column
def recs2df(recs, datefld):
if len(recs) > 0:
print(f"DBG recs2df: {datefld=} {recs[0][datefld]=}")
time_idx_name = "Time"
# YYYY-MM-DD HH:MM
times = pd.to_datetime([r[datefld] for r in recs], utc=True)
Expand All @@ -135,24 +133,45 @@ def recs2df(recs, datefld):

# Almanac
alm_df = recs2df(self.alm_src.as_records(), "UTC")
if verbose:
print(f"DBG get_sources_time_logs: {alm_df.shape=}")

# Night Report
nig_df = recs2df(self.nig_src.records, self.nig_src.log_dt_field)
if verbose:
print(f"DBG get_sources_time_logs: {nig_df.shape=}")

# NarrativeLog
nar_df = recs2df(self.nar_src.records, self.nar_src.log_dt_field)
if verbose:
print(f"DBG get_sources_time_logs: {nar_df.shape=}")

# ExposureLog
exp_df = recs2df(self.exp_src.records, self.exp_src.log_dt_field)
if verbose:
print(f"DBG get_sources_time_logs: {exp_df.shape=}")

# NarrativeLog
nar_df = recs2df(self.nar_src.records, self.nar_src.log_dt_field)
recs = itertools.chain.from_iterable(self.exp_src.exposures.values())
exp_detail_df = recs2df(recs, "timespan_begin")
if verbose:
print(f"DBG get_sources_time_logs: {exp_detail_df.shape=}")

# Best time resolution is "day_obs"! Better when available via TAP
# self.cdb_src
return (
alm_df,
nig_df,
exp_df,
nar_df,
)
# The best time resolution is currently "day_obs"! (not good enuf)
# This should be better when its available via TAP.

# Source Records
srecs = dict(ALM=alm_df) # srecs[src_name] = [rec, ...]
if not nig_df.empty:
srecs["NIG"] = nig_df
if not nar_df.empty:
srecs["NAR"] = nar_df
if not exp_df.empty:
srecs["EXP"] = exp_df
if not exp_detail_df.empty:
# for all instruments that have exposures
srecs["EXPDET"] = exp_detail_df
return srecs

@property
def dayobs_range(self):
Expand Down Expand Up @@ -263,6 +282,7 @@ async def night_tally_observation_gaps(self, verbose=False):
# edf.get_targets() => "slewTime" # (d,g,h)
return instrument_tally

# see source_record_counts()
def records_per_source(self):
sources = [
# self.alm_src,
Expand Down
43 changes: 21 additions & 22 deletions python/lsst/ts/logging_and_reporting/templates.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,22 @@
junk = """
templates = dict(
decanted_html="""
{{ df.to_html() }}
<ul>
{% for key, val in sparse_dict.items() %}
<li>
<b>{{ key }}:</b>
<pre {font-size: 0.5em; color: slateblue;}>
{{ val }}
</pre>
</li>
{% endfor %}
</ul>
""",
)

# ###########################################################################
head = """
<!DOCTYPE html>
<html>
<head>
Expand All @@ -10,28 +28,9 @@
</style>
</head>
<body>
"""

...
foot = """
</body>
</html>
"""

templates = dict(
decanted_html="""
<p>
table_columns={{ df.columns }}.
sparse_dict={{ sparse_dict.keys() }}
</p>
<p>DF as html</p>
{{ df.to_html() }}
<ul>
{% for key, val in sparse_dict.items() %}
<li><b>{{ key }}:</b> <pre>{{ val }}</pre></li>
{% endfor %}
</ul>
""",
)
100 changes: 41 additions & 59 deletions python/lsst/ts/logging_and_reporting/time_logs.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,6 @@ def merge_to_timelog(prefix, tl_df, right_df, right_dfield="Timestamp"):
"""
right_df.sort_index()
rdf = prefix_columns(right_df, prefix)
print(f"DBG merge_to_timelog: {rdf.index.dtype=}")

# The left_on and right_on columns are expected to contain datetime column
# in *_Time
Expand All @@ -151,27 +150,23 @@ def merge_to_timelog(prefix, tl_df, right_df, right_dfield="Timestamp"):


# reduce_period(compact(merge_sources(allsrc)))
def merge_sources(allsrc, verbose=True):
def merge_sources(allsrc, verbose=False):
"""Result contains a row for every source record. Only one source per row.
This means that there will be NaN values for all columns that are
not native to a row.
"""
sources = allsrc.get_sources_time_logs()
alm_df, nig_df, exp_df, nar_df = sources
srecs = allsrc.get_sources_time_logs() # Source Records

# Frame for Night (Unified Time Log)
dates = pd.date_range(allsrc.min_date, allsrc.max_date, freq="4h")
utl_df = pd.DataFrame(dates, index=dates, columns=["Time"])

print(f"DBG merge_sources: {utl_df.index.dtype=} {utl_df=}")
df = utl_df
# #!df = merge_to_timelog(utl_df, alm_df, prefix="ALM_")
if not nig_df.empty:
df = merge_to_timelog("NIG_", df, nig_df)
if not exp_df.empty:
df = merge_to_timelog("EXP_", df, exp_df)
if not nar_df.empty:
df = merge_to_timelog("NAR_", df, nar_df)
for srcname, srcdf in srecs.items():
if verbose:
print(f"DBG merge_sources: {srcname=}")
df = merge_to_timelog(f"{srcname}_", df, srcdf)
if verbose:
print(f"DBG merge_sources: {srcname=} {df.shape=} {df.columns.to_list()=}")

df.set_index(["Time"], inplace=True)

Expand All @@ -180,6 +175,21 @@ def merge_sources(allsrc, verbose=True):
return df


def sutl(allsrc, delta="2h", allow_data_loss=False, verbose=False):
"""Extract a single unified time log (SUTL) from records of sources."""

fdf = merge_sources(allsrc)
cdf = compact(fdf, delta=delta, allow_data_loss=allow_data_loss)
if verbose:
print(f"DBG sutl: {fdf.shape=}")
print(f"DBG sutl: {cdf.shape=}")
rdf = reduce_period(cdf)
if verbose:
print(f"DBG sutl: {rdf.shape=}")
html = views.render_reduced_df(rdf)
return html


# Want to be able to specify time-bin size.
# Then get multiple records in a bin (sorted be actual time)
# Non-sense? Just means
Expand Down Expand Up @@ -224,7 +234,6 @@ def remove_list_columns(df):


def render_df(df):
print(f"DBG render_df {df.shape=}")
return views.render_reduced_df(df)


Expand All @@ -233,7 +242,7 @@ def render_df(df):
# With allow_data_loss=True, remove useless/problematic columns.
# Also, see: remove_list_columns()
# Cell Values that are lists cause problems in pd.drop_duplicates()
def compact(full_df, delta="4h", allow_data_loss=False, verbose=True):
def compact(full_df, delta="4h", allow_data_loss=False, verbose=False):
df = full_df.copy()
if verbose:
print(f"DBG compact: Input {df.shape=}")
Expand Down Expand Up @@ -291,7 +300,7 @@ def compact(full_df, delta="4h", allow_data_loss=False, verbose=True):
df.dropna(how="all", axis="columns", inplace=True)

# df = df.fillna('')
# df = ut.wrap_dataframe_columns(df) # TODO re-enable
# #!df = ut.wrap_dataframe_columns(df) # TODO re-enable

# Trim whitespace from all columns
df = df.map(lambda x: x.strip() if isinstance(x, str) else x)
Expand Down Expand Up @@ -324,7 +333,7 @@ def compact(full_df, delta="4h", allow_data_loss=False, verbose=True):
# + In Period: Replace multi-values in a column with a conctenation
# of the unique values.
# TODO General aggregation using dtypes assigned in allsrc.
def reduce_period(df, verbose=True):
def reduce_period(df, verbose=False):
"""Group and aggregate by Period. Drops some columns. Reduces Rows."""

def multi_string(group):
Expand All @@ -336,40 +345,6 @@ def multi_label(group):
if verbose:
print(f"DBG reduce_period: Input {df.shape=}")

fields = {
"NIG_id", # ignore
"NIG_site_id",
"NIG_telescope",
"NIG_day_obs", # ignore
"NIG_summary",
"NIG_telescope_status",
"NIG_confluence_url",
"NIG_user_id", # ignore
"NIG_user_agent",
"NIG_date_added",
"NIG_date_sent", # ignore
"NIG_is_valid",
"NIG_parent_id",
"NIG_Timestamp", # ignore
"NAR_id", # ignore
"NAR_site_id",
"NAR_message_text",
"NAR_level",
"NAR_time_lost",
"NAR_date_begin",
"NAR_user_id", # ignore
"NAR_user_agent",
"NAR_is_human",
"NAR_is_valid",
"NAR_date_added",
"NAR_parent_id",
"NAR_date_end", # ignore
"NAR_category",
"NAR_time_lost_type",
"NAR_error_message",
"NAR_Timestamp", # ignore
}

nuke_columns = {
"NIG_id", # ignore
"NIG_day_obs", # ignore
Expand All @@ -386,8 +361,7 @@ def multi_label(group):
"NAR_date_end", # ignore
"NAR_Timestamp", # ignore
}
used_columns = fields - nuke_columns
print(f"DBG reduce_period: ({len(used_columns)}){used_columns=}")
used_columns = set(df.columns.to_list()) - nuke_columns

# #! available = set(df.columns.to_list())
# #! available.discard('NIG_day_obs')
Expand All @@ -405,9 +379,9 @@ def multi_label(group):
# #! if 0 < len(df[c].unique()) < 10 }
# #!

dropped_df = df.drop(nuke_columns, axis=1)
drop_columns = nuke_columns & used_columns
dropped_df = df.drop(drop_columns, axis=1)
df = dropped_df
print(df.info())

# We would rather not have these field names hardcoded!!!
group_aggregator = dict()
Expand All @@ -431,14 +405,22 @@ def multi_label(group):
"NAR_time_lost_type",
]
group_aggregator.update({c: multi_label for c in label_fields})

group_aggregator["NAR_time_lost"] = "sum"

if verbose:
print(f"DBG reduce_period: columns {df.columns.to_list()=}")
print(
f"DBG reduce_period: " f"aggregated fields {list(group_aggregator.keys())=}"
)
agg_keys = set(group_aggregator.keys())
use_agg = agg_keys & used_columns
drop_agg = agg_keys - use_agg
for col in drop_agg:
del group_aggregator[col]

if verbose:
print(f"DBG {agg_keys=}")
print(f"DBG {used_columns=}")
print(f"DBG {use_agg=}")
print(f"DBG {drop_agg=}")
print(f"DBG final agg_keys={set(group_aggregator.keys())}")
df = df.groupby(level="Period").agg(group_aggregator)
if verbose:
print(f"DBG reduce_period: Output {df.shape=}")
Expand Down
Loading

0 comments on commit 5277a2e

Please sign in to comment.