Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add USB reset to ZWO camera class on exposure failed #1079

Open
wants to merge 3 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ install_requires =
panoptes-utils[config]>=0.2.30
pyserial
transitions
pyusb>=1.1.1
# The usage of test_requires is discouraged, see `Dependency Management` docs
# tests_require = pytest; pytest-cov
# Require a specific Python version, e.g. Python 2.7 or >= 3.4
Expand Down
39 changes: 31 additions & 8 deletions src/panoptes/pocs/camera/zwo.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,25 @@
import threading
import time
from contextlib import suppress
from usb.core import find as finddev

import numpy as np
from astropy import units as u
from astropy.time import Time
from panoptes.pocs.camera.libasi import ASIDriver
from panoptes.pocs.camera.sdk import AbstractSDKCamera

from panoptes.utils import error
from panoptes.utils.images import fits as fits_utils
from panoptes.utils.utils import get_quantity_value

from panoptes.pocs.camera.libasi import ASIDriver
from panoptes.pocs.camera.sdk import AbstractSDKCamera


class Camera(AbstractSDKCamera):
_driver = None # Class variable to store the ASI driver interface
_cameras = [] # Cache of camera string IDs
_assigned_cameras = set() # Camera string IDs already in use.
_usb_vendor_id = 0x03c3 # Fixed for ZWO cameras

def __init__(self,
name='ZWO ASI Camera',
Expand All @@ -24,7 +28,6 @@ def __init__(self,
*args, **kwargs):
"""
ZWO ASI Camera class

Args:
serial_number (str): camera serial number or user set ID (up to 8 bytes). See notes.
gain (int, optional): gain setting, using camera's internal units. If not given
Expand All @@ -33,7 +36,6 @@ def __init__(self,
or 'Y8'). Default is to use 'RAW16' if supported by the camera, otherwise
the camera's own default will be used.
*args, **kwargs: additional arguments to be passed to the parent classes.

Notes:
ZWO ASI cameras don't have a 'port', they only have a non-deterministic integer
camera_ID and, probably, an 8 byte serial number. Optionally they also have an
Expand Down Expand Up @@ -105,7 +107,6 @@ def temperature(self):
@AbstractSDKCamera.target_temperature.getter
def target_temperature(self):
""" Current value of the target temperature for the camera's image sensor cooling control.

Can be set by assigning an astropy.units.Quantity
"""
return self._control_getter('TARGET_TEMP')[0]
Expand All @@ -123,7 +124,6 @@ def cooling_power(self):
@property
def gain(self):
""" Current value of the camera's gain setting in internal units.

See `egain` for the corresponding electrons / ADU value.
"""
return self._control_getter('GAIN')[0]
Expand All @@ -148,7 +148,6 @@ def is_exposing(self):
def connect(self):
"""
Connect to ZWO ASI camera.

Gets 'camera_ID' (needed for all driver commands), camera properties and details
of available camera commands/parameters.
"""
Expand All @@ -169,6 +168,12 @@ def connect(self):
Camera._driver.disable_dark_subtract(self._handle)
self._connected = True

def reconnect(self):
""" Reconnect to the camera. """
Camera._driver.close_camera(self._handle)
self._reset_usb()
return self.connect()

def start_video(self, seconds, filename_root, max_frames, image_type=None):
if not isinstance(seconds, u.Quantity):
seconds = seconds * u.second
Expand Down Expand Up @@ -298,7 +303,13 @@ def _readout(self, filename, width, height, header):
header=header,
filename=filename)
elif exposure_status == 'FAILED':
raise error.PanError("Exposure failed on {}".format(self))

# Reconnect to the camera so it can still be used
self.logger.warning(f"Exposure failed on {self}. Reconnecting camera.")
self.reconnect()

raise error.PanError(f"Exposure failed on {self}")

elif exposure_status == 'IDLE':
raise error.PanError("Exposure missing on {}".format(self))
else:
Expand Down Expand Up @@ -351,3 +362,15 @@ def _control_setter(self, control_type, value):
raise error.IllegalValue(msg)

Camera._driver.set_control_value(self._handle, control_type, value)

def _reset_usb(self):
""" Reset the USB device. """
self.logger.warning(f"Resetting USB for {self}.")
for product_id in Camera._driver.get_product_ids():
dev = finddev(idVendor=self._usb_vendor_id, idProduct=product_id)
if dev:
self.logger.debug(f"Identified USB product ID: {product_id}.")
break
if not dev:
raise RuntimeError(f"Unable to determine USB product ID for {self}.")
dev.reset()