"""
Perform a 3D spatial scan of a photodetector.
"""
from __future__ import annotations
import typing
from threading import Event
from time import monotonic
from time import perf_counter
import numpy as np
from msl.equipment.utils import convert_to_primitive
from msl.qt import Button
from msl.qt import CheckBox
from msl.qt import DoubleSpinBox
from msl.qt import Qt
from msl.qt import QtCore
from msl.qt import QtGui
from msl.qt import QtWidgets
from msl.qt import Slot
from msl.qt import SpinBox
from msl.qt import Thread
from msl.qt import Worker
from msl.qt import prompt
from .base import BasePlugin
from .base import plugin
from .. import audio
from ..equipment import DMM
from ..equipment.widgets import DAQCounterWidget
from ..samples import Samples
from ..utils import array_central
from ..utils import array_photodiode_centre
from ..utils import hhmmss
if typing.TYPE_CHECKING:
from ..app import MainWindow
[docs]
@plugin(name='Spatial Scan', description='Perform a 3D spatial scan of a detector')
class SpatialScan(BasePlugin):
def __init__(self, parent: MainWindow, **kwargs) -> None:
"""Perform a 3D spatial scan of a photodetector.
Args:
parent: The main window.
**kwargs: All keyword arguments are passed to super().
"""
super().__init__(parent, **kwargs)
self.abort_event = Event()
self.monitor_widget = None
self.detector_widget = None
self.comment = ''
self.filename_suffix = ''
self.x_values = np.empty(0)
self.y_values = np.empty(0)
self.z_values = np.empty(0)
self.t0 = 0
self.thread = Thread(SpatialScanWorker)
self.thread.finished.connect(self.on_worker_finished)
#
# Create the widgets
#
# the root element in the configuration file
root = self.app.config.find(self.__class__.__name__)
if root is None:
prompt.critical(f'You must create a <{self.__class__.__name__}> '
f'element in the configuration file')
self.show_plugin = False
return
# determine the equipment that the DUT is connected to
detectors = {el.text: el for el in root.findall('detector')}
if not detectors:
prompt.critical(f'You must create at least one <detector> '
f'sub-element to <{root.tag}>')
self.show_plugin = False
return
elif len(detectors) == 1:
alias = list(detectors)[0]
else:
alias = prompt.item(
'Select the device that the DUT is connected to',
list(detectors)
)
if alias is None:
self.show_plugin = False
return
self.detector = self.app.connect_equipment(alias)
self.is_detector_dmm = isinstance(self.detector, DMM)
if self.is_detector_dmm:
self.detector.configure(
**{k: convert_to_primitive(v)
for k, v in detectors[alias].attrib.items()}
)
self.detector_widget = self.find_widget(self.detector, parent=self)
element = root.find('x')
alias, width, step = 'stage-x', 10, 1
if element is not None:
alias = element.text
width = float(element.attrib.get('width', width))
step = float(element.attrib.get('step', step))
self.x_stage = self.app.connect_equipment(alias)
self.x_widget = self.find_widget(self.x_stage, parent=self)
info = self.x_stage.info()
self.x_width_spinbox = DoubleSpinBox(
value=width,
minimum=0,
maximum=info['maximum'],
decimals=3,
tooltip='The total width to scan in the X direction',
unit=info['unit'],
)
self.x_step_spinbox = DoubleSpinBox(
value=step,
minimum=0,
maximum=info['maximum'],
decimals=3,
tooltip='The step size in the X direction',
unit=info['unit'],
)
self.x_randomize_checkbox = CheckBox(
initial=False,
tooltip='Randomize the X values?',
)
element = root.find('y')
alias, width, step = 'stage-y', 10, 1
if element is not None:
alias = element.text
width = float(element.attrib.get('width', width))
step = float(element.attrib.get('step', step))
self.y_stage = self.app.connect_equipment(alias)
self.y_widget = self.find_widget(self.y_stage, parent=self)
info = self.y_stage.info()
self.y_width_spinbox = DoubleSpinBox(
value=width,
minimum=0,
maximum=info['maximum'],
decimals=3,
tooltip='The total height to scan in the Y direction',
unit=info['unit'],
)
self.y_step_spinbox = DoubleSpinBox(
value=step,
minimum=0,
maximum=info['maximum'],
decimals=3,
tooltip='The step size in the Y direction',
unit=info['unit'],
)
self.y_randomize_checkbox = CheckBox(
initial=False,
tooltip='Randomize the Y values?',
)
element = root.find('z')
alias, width, step = 'stage-z', 10, 1
if element is not None:
alias = element.text
width = float(element.attrib.get('width', width))
step = float(element.attrib.get('step', step))
self.z_stage = self.app.connect_equipment(alias)
self.z_widget = self.find_widget(self.z_stage, parent=self)
info = self.z_stage.info()
self.z_width_spinbox = DoubleSpinBox(
value=width,
minimum=0,
maximum=info['maximum'],
decimals=3,
tooltip='The total depth to scan in the Z direction',
unit=info['unit'],
)
self.z_step_spinbox = DoubleSpinBox(
value=step,
minimum=0,
maximum=info['maximum'],
decimals=3,
tooltip='The step size in the Z direction',
unit=info['unit'],
)
self.z_randomize_checkbox = CheckBox(
initial=False,
tooltip='Randomize the Z values?',
)
element = root.find('monitor')
if element is None:
prompt.critical(f'Add a <monitor> sub-element to <{root.tag}>')
self.show_plugin = False
return
self.monitor = self.app.connect_equipment(element.text.strip())
self.monitor.configure(**{k: convert_to_primitive(v) for k, v in element.attrib.items()})
self.monitor_widget = self.find_widget(self.monitor, parent=self)
self.shutter = self.app.connect_equipment(root.findtext('shutter', 'shutter'))
self.shutter_widget = self.find_widget(self.shutter, parent=self)
self.delay_spinbox = DoubleSpinBox(
value=2,
decimals=1,
tooltip='The number of seconds to wait after moving the translation stages',
unit=' s',
)
self.nrepeats_spinbox = SpinBox(
value=1,
tooltip='The number of times to repeat the scan',
)
self.run_button = Button(
text='Start',
icon='ieframe|101',
left_click=self.on_worker_start,
tooltip='Run (CTRL+R)'
)
self.abort_button = Button(
text='Abort',
icon='shell32|27',
left_click=self.on_worker_abort,
tooltip='Abort (CTRL+A)'
)
#
# Create the layout
#
spacer = 1, 1, QtWidgets.QSizePolicy.Policy.Expanding, QtWidgets.QSizePolicy.Policy.Minimum
layout = QtWidgets.QVBoxLayout()
box_x = QtWidgets.QHBoxLayout()
box_x.addWidget(QtWidgets.QLabel('X:'))
box_x.addWidget(self.x_widget)
box_x.addWidget(self.x_width_spinbox)
box_x.addWidget(self.x_step_spinbox)
box_x.addWidget(self.x_randomize_checkbox)
box_x.addSpacerItem(QtWidgets.QSpacerItem(*spacer))
layout.addLayout(box_x)
box_y = QtWidgets.QHBoxLayout()
box_y.addWidget(QtWidgets.QLabel('Y:'))
box_y.addWidget(self.y_widget)
box_y.addWidget(self.y_width_spinbox)
box_y.addWidget(self.y_step_spinbox)
box_y.addWidget(self.y_randomize_checkbox)
box_y.addSpacerItem(QtWidgets.QSpacerItem(*spacer))
layout.addLayout(box_y)
box_z = QtWidgets.QHBoxLayout()
box_z.addWidget(QtWidgets.QLabel('Z:'))
box_z.addWidget(self.z_widget)
box_z.addWidget(self.z_width_spinbox)
box_z.addWidget(self.z_step_spinbox)
box_z.addWidget(self.z_randomize_checkbox)
box_z.addSpacerItem(QtWidgets.QSpacerItem(*spacer))
layout.addLayout(box_z)
box1 = QtWidgets.QHBoxLayout()
box1.addWidget(QtWidgets.QLabel('Mon:'))
box1.addWidget(self.monitor_widget)
box1.addWidget(QtWidgets.QLabel('Det:'))
box1.addWidget(self.detector_widget)
layout.addLayout(box1)
box2 = QtWidgets.QHBoxLayout()
box2.addWidget(QtWidgets.QLabel('Shutter:'))
box2.addWidget(self.shutter_widget)
box2.addWidget(QtWidgets.QLabel('Delay:'))
box2.addWidget(self.delay_spinbox)
box2.addWidget(QtWidgets.QLabel('#Repeats:'))
box2.addWidget(self.nrepeats_spinbox)
box2.addSpacerItem(QtWidgets.QSpacerItem(*spacer))
layout.addLayout(box2)
box3 = QtWidgets.QHBoxLayout()
box3.addWidget(self.run_button)
box3.addWidget(self.abort_button)
layout.addLayout(box3)
self.setLayout(layout)
self.setWindowTitle('3D Spatial Scan')
[docs]
def closeEvent(self, event: QtGui.QCloseEvent) -> None:
"""Overrides :meth:`QtWidgets.QWidget.closeEvent`."""
if self.thread.is_running():
prompt.critical('Scan running. Click Abort to stop it.')
event.ignore()
return
self.stop_monitor_and_detector_timer_and_thread()
super().closeEvent(event)
[docs]
def keyReleaseEvent(self, event: QtGui.QKeyEvent) -> None:
"""Overrides :meth:`QtWidgets.QWidget.keyReleaseEvent`."""
if event.modifiers() == Qt.ControlModifier:
key = event.key()
if key == Qt.Key_R:
self.on_worker_start()
elif key == Qt.Key_A:
self.on_worker_abort()
super().keyReleaseEvent(event)
[docs]
@Slot()
def on_worker_abort(self) -> None:
"""Abort the worker thread."""
if self.thread.is_running():
self.app.logger.warning('aborting scan early')
self.abort_event.set()
self.main.status_bar_message.emit(f'Safely aborting {self.windowTitle()}...')
self.main.show_indeterminate_progress_bar.emit()
[docs]
@Slot()
def on_worker_finished(self) -> None:
"""Called when the worker thread finishes."""
hms = hhmmss(monotonic() - self.t0)
self.main.status_bar_message.emit(f'{self.windowTitle()} finished [took {hms}]')
self.main.hide_progress_bar.emit()
self.app.send_email(subject=f'{self.windowTitle()} finished', body=f'Took {hms}')
try:
audio.random()
except RuntimeError as e: # can occur if using Remote Desktop
self.app.logger.warning(f'{e.__class__.__name__}: {e}')
[docs]
@Slot()
def on_worker_start(self) -> None:
"""Start acquiring data in the worker thread."""
if self.thread.is_running():
prompt.critical('A scan is already running')
return
self.abort_event.clear()
self.stop_monitor_and_detector_timer_and_thread()
if prompt.yes_no('Are you finding the centre of a photodiode?', default=False):
array = array_photodiode_centre
else:
array = array_central
self.x_values = array(
centre=self.x_stage.get_position(),
width=self.x_width_spinbox.value(),
step=self.x_step_spinbox.value(),
randomize=self.x_randomize_checkbox.isChecked(),
)
self.y_values = array(
centre=self.y_stage.get_position(),
width=self.y_width_spinbox.value(),
step=self.y_step_spinbox.value(),
randomize=self.y_randomize_checkbox.isChecked(),
)
self.z_values = array(
centre=self.z_stage.get_position(),
width=self.z_width_spinbox.value(),
step=self.z_step_spinbox.value(),
randomize=self.z_randomize_checkbox.isChecked(),
)
self.comment = prompt.comments(
even_row_color=Qt.lightGray,
odd_row_color=Qt.darkGray
)
self.filename_suffix = prompt.text('<i>Optional</i>: Specify a suffix for the filename')
if not prompt.yes_no('Start spatial scan?'):
return
self.main.status_bar_message.emit(f'Running {self.windowTitle()}...')
self.t0 = monotonic()
self.thread.start(self)
[docs]
def stop_monitor_and_detector_timer_and_thread(self) -> None:
"""Stop the timers/threads for the monitor/detector connection."""
if self.monitor_widget is not None:
self.monitor_widget.stop_timer_and_thread() # noqa: DMM and NIDAQ widgets have stop_timer_and_thread()
if self.detector_widget is not None:
self.detector_widget.stop_timer_and_thread() # noqa: DMM and NIDAQ widgets have stop_timer_and_thread()
[docs]
class SpatialScanWorker(Worker):
def __init__(self, parent: SpatialScan) -> None:
super().__init__()
self.plugin = parent
self.is_detector_dmm = parent.is_detector_dmm
self.update_progress_bar = parent.main.update_progress_bar
self.status_bar_message = parent.main.status_bar_message
self.logger = parent.app.logger
self.delay = parent.delay_spinbox.value()
self.delay_ms = int(self.delay * 1e3)
self.monitor = typing.cast(DMM, parent.monitor)
self.detector = parent.detector
self.shutter = parent.shutter
self.detector_settings = {}
if self.is_detector_dmm:
self.detector_settings = self.detector.settings().to_json()
if not parent.is_detector_dmm:
widget = typing.cast(DAQCounterWidget, parent.detector_widget)
self.count_edges_kwargs = {
'pfi': widget.pfi_combobox.currentData(),
'duration': widget.duration_spinbox.value(),
'nsamples': widget.nsamples_spinbox.value(),
'rising': widget.edge_combobox.currentText().lower() == 'rising',
}
[docs]
def acquire(self) -> tuple[Samples, Samples]:
"""Acquire the monitor and detector samples.
Returns:
(monitor samples, detector samples)
"""
self.monitor.initiate()
if self.is_detector_dmm:
self.detector.initiate()
# read DUT before monitor since DUT could be connected to a counter or the SIA from CMI
dut = self.read_dut()
mon = self.monitor.fetch()
return mon, dut
[docs]
def acquire_dark(self) -> dict[str, float]:
"""Take a dark measurement."""
self.status_bar_message.emit('Taking a dark measurement...')
self.logger.info('Taking a dark measurement...')
self.shutter.close()
QtCore.QThread.msleep(self.delay_ms)
mon, dut = self.acquire()
return {
'mon_ave': mon.mean,
'mon_stdev': mon.stdev,
'dut_ave': dut.mean,
'dut_stdev': dut.stdev,
}
[docs]
def process(self) -> None:
"""Run the spatial scan."""
app = self.plugin.app
x_values = self.plugin.x_values
y_values = self.plugin.y_values
z_values = self.plugin.z_values
x_stage = self.plugin.x_stage
y_stage = self.plugin.y_stage
z_stage = self.plugin.z_stage
x_original = x_stage.get_position()
y_original = y_stage.get_position()
z_original = z_stage.get_position()
prefix = app.config.value(
f'{self.plugin.__class__.__name__}/filename_prefix',
'spatial_scan'
)
writer = app.create_writer(prefix, suffix=self.plugin.filename_suffix)
writer.add_equipment(
self.monitor, self.detector, self.shutter,
x_stage, y_stage, z_stage
)
writer.add_metadata(
comment=self.plugin.comment,
delay=self.delay,
delay_unit='seconds',
detector_info=self.detector_settings,
monitor_info=self.monitor.settings().to_json(),
x_start=x_original,
x_step=round(float(x_values[1] - x_values[0]), 4) if len(x_values) > 1 else 0.0,
x_stop=np.max(x_values),
x_unit=x_stage.info()['unit'].strip(),
y_start=y_original,
y_step=round(float(y_values[1] - y_values[0]), 4) if len(y_values) > 1 else 0.0,
y_stop=np.max(y_values),
y_unit=y_stage.info()['unit'].strip(),
z_start=z_original,
z_step=round(float(z_values[1] - z_values[0]), 4) if len(z_values) > 1 else 0.0,
z_stop=np.max(z_values),
z_unit=z_stage.info()['unit'].strip()
)
self.logger.info('START')
self.logger.info(f'X values: {np.array_str(x_values, max_line_width=1024)}')
self.logger.info(f'Y values: {np.array_str(y_values, max_line_width=1024)}')
self.logger.info(f'Z values: {np.array_str(z_values, max_line_width=1024)}')
abort: Event = self.plugin.abort_event
total: int = x_values.size * y_values.size * z_values.size
n_repeats: int = self.plugin.nrepeats_spinbox.value()
for irepeat in range(n_repeats):
if abort.is_set():
break
iteration = 0
writer.initialize(
'x', 'y', 'z', 'mon', 'mon_stdev', 'dut', 'dut_stdev',
name=f'spatial_scan_{irepeat+1}',
size=total,
dark_before=self.acquire_dark(),
)
# ZYX loop
self.shutter.open()
QtCore.QThread.msleep(self.delay_ms)
self.status_bar_message.emit('Start loop...')
self.logger.info('Start loop...')
for z_val in z_values:
if abort.is_set():
break
z_stage.set_position(z_val, wait=False)
for y_val in y_values:
if abort.is_set():
break
y_stage.set_position(y_val, wait=False)
for x_val in x_values:
if abort.is_set():
break
x_stage.set_position(x_val, wait=False)
t0 = perf_counter()
while x_stage.is_moving() or y_stage.is_moving() or z_stage.is_moving():
QtCore.QThread.msleep(100)
if perf_counter() - t0 > 30:
break
x = round(x_stage.get_position(), 3)
y = round(y_stage.get_position(), 3)
z = round(z_stage.get_position(), 3)
self.logger.info(f'stages at x={x:.4f}, y={y:.4f}, z={z:.4f}')
self.status_bar_message.emit(f'x={x}, y={y}, z={z} [{irepeat+1} of {n_repeats}]')
self.logger.info(f'waiting for {self.delay} seconds ...')
QtCore.QThread.msleep(self.delay_ms)
mon, dut = self.acquire()
writer.append(x, y, z, mon.mean, mon.stdev, dut.mean, dut.stdev)
iteration += 1
self.update_progress_bar.emit(100 * iteration / total)
writer.update_metadata(dark_after=self.acquire_dark())
x_stage.set_position(x_original, wait=False)
y_stage.set_position(y_original, wait=False)
z_stage.set_position(z_original, wait=False)
writer.write()
[docs]
def read_dut(self) -> Samples:
"""Read the samples for the device-under-test."""
if self.is_detector_dmm:
return self.detector.fetch()
else:
return self.detector.count_edges(**self.count_edges_kwargs)