Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flat field method #1173

Merged
merged 1 commit into from
Aug 20, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
257 changes: 252 additions & 5 deletions src/panoptes/pocs/observatory.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@
from astropy import units as u
from astropy.coordinates import get_moon
from astropy.coordinates import get_sun
from astropy.io.fits import setval
from panoptes.utils import error
from panoptes.utils.time import current_time, CountdownTimer
from panoptes.utils.time import current_time, CountdownTimer, flatten_time

import panoptes.pocs.camera.fli
from panoptes.pocs.base import PanBase
Expand All @@ -18,7 +19,9 @@
from panoptes.pocs.images import Image
from panoptes.pocs.mount.mount import AbstractMount
from panoptes.pocs.scheduler import BaseScheduler
from panoptes.pocs.scheduler.field import Field
from panoptes.pocs.scheduler.observation.base import Observation
from panoptes.pocs.scheduler.observation.compound import Observation as CompoundObservation
from panoptes.utils import images as img_utils
from panoptes.utils.images import fits as fits_utils
from panoptes.pocs.utils.cli.image import upload_image
Expand Down Expand Up @@ -335,7 +338,7 @@ def get_observation(self, *args, **kwargs):

Returns:
observation (pocs.scheduler.observation.Observation or None): An
an object that represents the observation to be made
object that represents the observation to be made.

Raises:
error.NoObservation: If no valid observation is found
Expand Down Expand Up @@ -628,12 +631,11 @@ def get_standard_headers(self, observation=None):
if observation is None:
observation = self.current_observation

assert observation is not None, self.logger.warning(
"No observation, can't get headers")
assert observation is not None, self.logger.warning("No observation, can't get headers")

field = observation.field

self.logger.debug("Getting headers for : {}".format(observation))
self.logger.debug(f'Getting headers for : {observation}')

t0 = current_time()
moon = get_moon(t0, self.observer.location)
Expand Down Expand Up @@ -740,3 +742,248 @@ def close_dome(self):
if not self.dome.is_closed:
self.logger.info('Closed dome')
return self.dome.close()

def take_flat_fields(self,
which='evening',
alt=None,
az=None,
min_counts=1000,
max_counts=12000,
target_adu_percentage=0.5,
initial_exptime=3.,
min_exptime=0.,
max_exptime=60.,
readout=5.,
camera_list=None,
bias=2048,
max_num_exposures=10,
no_tracking=True
): # pragma: no cover
"""Take flat fields.
This method will slew the mount to the given AltAz coordinates(which
should be roughly opposite of the setting sun) and then begin the flat-field
procedure. The first image starts with a simple 1 second exposure and
after each image is taken the average counts are analyzed and the exposure
time is adjusted to try to keep the counts close to `target_adu_percentage`
of the `(max_counts + min_counts) - bias`.
The next exposure time is calculated as:
.. code-block:: python
# Get the sun direction multiplier used to determine if exposure
# times are increasing or decreasing.
if which == 'evening':
sun_direction = 1
else:
sun_direction = -1
exptime = previous_exptime * (target_adu / counts) *
(2.0 ** (sun_direction * (elapsed_time / 180.0))) + 0.5
Under - and over-exposed images are rejected. If image is saturated with
a short exposure the method will wait 60 seconds before beginning next
exposure.
Optionally, the method can also take dark exposures of equal exposure
time to each flat-field image.
Args:
which (str, optional): Specify either 'evening' or 'morning' to lookup coordinates
in config, default 'evening'.
alt (float, optional): Altitude for flats, default None.
az (float, optional): Azimuth for flats, default None.
min_counts (int, optional): Minimum ADU count.
max_counts (int, optional): Maximum ADU count.
target_adu_percentage (float, optional): Exposure time will be adjust so
that counts are close to: target * (`min_counts` + `max_counts`). Defaults
to 0.5.
initial_exptime (float, optional): Start the flat fields with this exposure
time, default 3 seconds.
max_exptime (float, optional): Maximum exposure time before stopping.
camera_list (list, optional): List of cameras to use for flat-fielding.
bias (int, optional): Default bias for the cameras.
max_num_exposures (int, optional): Maximum number of flats to take.
no_tracking (bool, optional): If tracking should be stopped for drift flats,
default True.
"""
if camera_list is None:
camera_list = list(self.cameras.keys())

target_adu = target_adu_percentage * (min_counts + max_counts)

# Get the sun direction multiplier used to determine if exposure
# times are increasing or decreasing.
if which == 'evening':
sun_direction = 1
else:
sun_direction = -1

# Setup initial exposure times.
exptimes = {cam_name: [initial_exptime * u.second] for cam_name in camera_list}

# Create the observation.
try:
flat_obs = self._create_flat_field_observation(
alt=alt, az=az, initial_exptime=initial_exptime,
field_name=f'{which.title()}Flat'
)
except Exception as e:
self.logger.warning(f'Problem making flat field: {e}')
return

# Slew to position
self.logger.debug(f"Slewing to flat-field coords: {flat_obs.field}")
self.mount.set_target_coordinates(flat_obs.field)
self.mount.slew_to_target(blocking=True)
if no_tracking:
self.logger.info(f'Stopping the mount tracking')
self.mount.query('stop_tracking')
self.logger.info(f'At {flat_obs.field=} with tracking stopped, starting flats.')

while len(camera_list) > 0:

start_time = current_time()
fits_headers = self.get_standard_headers(observation=flat_obs)
fits_headers['start_time'] = flatten_time(start_time)

# Report the sun level
sun_pos = self.observer.altaz(start_time, target=get_sun(start_time)).alt
self.logger.debug(f"Sun {sun_pos:.02f}°")

# Take the observations.
exptime = min_exptime
camera_filename = dict()
for cam_name in camera_list:
# Get latest exposure time.
exptime = max(exptimes[cam_name][-1].value, min_exptime)
fits_headers['exptime'] = exptime

# Take picture and get filename.
self.logger.info(f'Flat #{flat_obs.current_exp_num} on {cam_name=} with {exptime=}')
camera = self.cameras[cam_name]
metadata = camera.take_observation(flat_obs, headers=fits_headers, exptime=exptime)
camera_filename[cam_name] = metadata['filepath']

# Block until done exposing on all cameras.
flat_field_timer = CountdownTimer(exptime + readout, name='Flat Field Images')
while any([cam.is_observing for cam_name, cam in self.cameras.items()
if cam_name in camera_list]):
if flat_field_timer.expired():
self.logger.warning(f'{flat_field_timer} expired while waiting for flat fields')
return

self.logger.trace('Waiting for flat-field image')
flat_field_timer.sleep(1)

# Check the counts for each image.
is_saturated = False
too_bright = False
for cam_name, filename in camera_filename.items():

# Make sure we can find the file.
img_file = filename.replace('.cr2', '.fits')
if not os.path.exists(img_file):
img_file = img_file.replace('.fits', '.fits.fz')
if not os.path.exists(img_file): # pragma: no cover
self.logger.warning(f"No flat file {img_file} found, skipping")
continue

self.logger.debug(f"Checking counts for {img_file}")

# Get the bias subtracted data.
data = fits_utils.getdata(img_file) - bias

# Simple mean works just as well as sigma_clipping and is quicker for RGB.
# TODO(wtgee) verify this.
counts = data.mean()
self.logger.info(f"Counts: {counts:.02f} Desired: {target_adu:.02f}")

# Check we are above minimum counts.
if counts < min_counts:
self.logger.info("Counts are too low, flat should be discarded")
setval(img_file, 'QUALITY', value='BAD', ext=int(img_file.endswith('.fz')))

# Check we are below maximum counts.
if counts >= max_counts:
self.logger.info("Image is saturated")
is_saturated = True
setval(img_file, 'QUALITY', value='BAD', ext=int(img_file.endswith('fz')))

# Get suggested exposure time.
elapsed_time = (current_time() - start_time).sec
self.logger.debug(f"Elapsed time: {elapsed_time:.02f}")
previous_exptime = exptimes[cam_name][-1].value

# TODO(wtgee) Document this better.
suggested_exptime = int(previous_exptime * (target_adu / counts) *
(2.0 ** (sun_direction * (elapsed_time / 180.0))) + 0.5)

self.logger.info(f"Suggested exptime for {cam_name}: {suggested_exptime:.02f}")

# Stop flats if we are going on too long.
self.logger.debug(f"Checking for too many exposures on {cam_name}")
if len(exptimes) == max_num_exposures:
self.logger.info(f"Have ({max_num_exposures=}), stopping {cam_name}.")
camera_list.remove(cam_name)

# Stop flats if any time is greater than max.
self.logger.debug(f"Checking for long exposures on {cam_name}")
if suggested_exptime >= max_exptime:
self.logger.info(f"Suggested exposure time greater than max, "
f"stopping flat fields for {cam_name}")
camera_list.remove(cam_name)

self.logger.debug(f"Checking for saturation on short exposure on {cam_name}")
short_exptime = 2
if is_saturated and previous_exptime <= short_exptime:
too_bright = True

# Add next exptime to list.
exptimes[cam_name].append(suggested_exptime * u.second)

if too_bright:
if which == 'evening':
self.logger.info("Saturated short exposure, waiting 60 seconds for more dark")
CountdownTimer(60, name='WaitingForTheDarkness').sleep()
else:
self.logger.info('Saturated short exposure, too bright to continue')
return

def _create_flat_field_observation(self,
alt=70, # degrees
az=None,
field_name='FlatField',
flat_time=None,
initial_exptime=5):
"""Small convenience wrapper to create a flat-field Observation.
Flat-fields are specified by AltAz coordinates so this method is just a helper
to look up the current RA-Dec coordinates based on the unit's location and
the current time (or `flat_time` if provided).
If no azimuth is provided this will figure out the azimuth of the sun at
`flat_time` and use that position minus 180 degrees.
Args:
alt (float, optional): Altitude desired, default 70 degrees.
az (float, optional): Azimuth desired in degrees, defaults to a position
-180 degrees opposite the sun at `flat_time`.
field_name (str, optional): Name of the field, which will also be directory
name. Note that it is probably best to pass the camera.uid as name.
flat_time (`astropy.time.Time`, optional): The time at which the flats
will be taken, default `now`.
initial_exptime (int, optional): Initial exptime in seconds, default 5.
Returns:
`pocs.scheduler.Observation`: Information about the flat-field.
"""
self.logger.debug("Creating flat-field observation")

if flat_time is None:
flat_time = current_time()

# Get an azimuth that is roughly opposite the sun.
if az is None:
sun_pos = self.observer.altaz(flat_time, target=get_sun(flat_time))
az = sun_pos.az.value - 180. # Opposite the sun

self.logger.debug(f'Flat-field coords: {alt=:.02f} {az=:.02f}')

field = Field.from_altaz(field_name, alt, az, self.earth_location, time=flat_time)
flat_obs = CompoundObservation(field, exptime=initial_exptime)

# Note different 'flat' concepts.
flat_obs.seq_time = flatten_time(flat_time)

self.logger.debug(f'Flat-field observation: {flat_obs}')
return flat_obs
18 changes: 18 additions & 0 deletions tests/test_observatory.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import time

import pytest
from astropy.coordinates import get_sun
from astropy.time import Time
from panoptes.pocs import __version__
from panoptes.utils import error
Expand Down Expand Up @@ -418,3 +419,20 @@ def test_operate_dome():
assert observatory.open_dome()
assert observatory.dome.is_open
assert not observatory.dome.is_closed


def test_create_flat_field(observatory):
flat_time = Time('2016-09-09 22:00:00')

flat0 = observatory._create_flat_field_observation(flat_time=flat_time)

sun_pos = observatory.observer.altaz(flat_time, target=get_sun(flat_time))
az = sun_pos.az.value - 180

assert flat0.field.dec.value == pytest.approx(38.4, rel=1e-2)

os.environ['POCSTIME'] = '2016-09-09 22:00:00'
flat1 = observatory._create_flat_field_observation(az=az, flat_time=flat_time)

assert flat1.field.ra.value == pytest.approx(flat0.field.ra.value, rel=1e-2)
assert flat1.field.dec.value == pytest.approx(flat0.field.dec.value, rel=1e-2)