Source code for photons.equipment.idq_time_controller

"""
Time Controller from ID Quantique.
"""
import warnings
from dataclasses import dataclass
from enum import Enum
from math import ceil
from time import perf_counter
from time import sleep
from typing import Sequence

import numpy as np
from msl.equipment import EquipmentRecord
from msl.equipment.connection_zeromq import ConnectionZeroMQ
from msl.qt import QtCore
from msl.qt import Signal

from .base import BaseEquipment
from .base import equipment
from ..samples import Samples


[docs] class Clock(Enum): INTERNAL = 'INTERNAL' EXTERNAL = 'EXTERNAL' INT = 'INTERNAL' EXT = 'EXTERNAL'
[docs] class Coupling(Enum): AC = 'AC' DC = 'DC'
[docs] class Edge(Enum): RISING = 'RISING' FALLING = 'FALLING'
[docs] class Mode(Enum): ACCUMULATE = 'ACCUM' ACCUM = 'ACCUM' CYCLE = 'CYCLE' NIM = 'NIM' TTL = 'TTL' HIGH_SPEED = 'LOWRES' HIGH_RESOLUTION = 'HIRES' LOW_SPEED = 'HIRES' LOW_RESOLUTION = 'LOWRES' FAST = 'LOWRES' SLOW = 'HIRES'
[docs] class ResyncPolicy(Enum): AUTO = 'AUTO' MANUAL = 'MANUAL'
[docs] class Select(Enum): LOOP = 'LOOP' OUTPUT = 'OUTPUT' SHAPED = 'SHAPED' UNSHAPED = 'UNSHAPED'
[docs] @dataclass class DelaySettings: address: str value: float
[docs] def to_json(self) -> dict[str, str | float]: """Return the settings as a JSON serializable object.""" return {'address': self.address, 'value': self.value}
[docs] @dataclass class DeviceSettings: clock: Clock mode: Mode
[docs] def to_json(self) -> dict[str, str]: """Return the settings as a JSON serializable object.""" return {'clock': self.clock.value, 'mode': self.mode.value}
[docs] @dataclass class Histogram: hist1: np.ndarray hist2: np.ndarray hist3: np.ndarray hist4: np.ndarray
[docs] def to_json(self) -> dict[str, list]: """Return the histogram data as a JSON serializable object.""" return { # noqa: tolist() returns a list 'hist1': self.hist1.tolist(), 'hist2': self.hist2.tolist(), 'hist3': self.hist3.tolist(), 'hist4': self.hist4.tolist(), }
[docs] @dataclass class HistogramSettings: channel: int ref: str stop: str enabler: str minimum: float maximum: float bin_count: int bin_width: float
[docs] def to_json(self) -> dict[str, str | int | float]: """Return the settings as a JSON serializable object.""" return { 'channel': self.channel, 'ref': self.ref, 'stop': self.stop, 'enabler': self.enabler, 'minimum': self.minimum, 'maximum': self.maximum, 'bin_count': self.bin_count, 'bin_width': self.bin_width }
[docs] @dataclass class InputSettings: channel: int coupling: Coupling edge: Edge enabled: bool delay: float duration: float mode: Mode resync_policy: ResyncPolicy select: Select threshold: float
[docs] def to_json(self) -> dict[str, str | bool | int | float]: """Return the settings as a JSON serializable object.""" return { 'channel': self.channel, 'coupling': self.coupling.value, 'edge': self.edge.value, 'enabled': self.enabled, 'delay': self.delay, 'duration': self.duration, 'mode': self.mode.value, 'resync_policy': self.resync_policy.value, 'select': self.select.value, 'threshold': self.threshold }
[docs] @dataclass class StartSettings: coupling: Coupling edge: Edge enabled: bool delay: float duration: float mode: Mode select: Select threshold: float
[docs] def to_json(self) -> dict[str, str | bool | float]: """Return the settings as a JSON serializable object.""" return { 'coupling': self.coupling.value, 'edge': self.edge.value, 'enabled': self.enabled, 'delay': self.delay, 'duration': self.duration, 'mode': self.mode.value, 'select': self.select.value, 'threshold': self.threshold }
[docs] @equipment(manufacturer=r'ID\s*Q', model=r'ID900') class IDQTimeController(BaseEquipment): Clock: Clock = Clock Coupling: Coupling = Coupling Edge: Edge = Edge Mode: Mode = Mode ResyncPolicy: ResyncPolicy = ResyncPolicy Select: Select = Select connection: ConnectionZeroMQ counts_changed: QtCore.SignalInstance = Signal(Samples) def __init__(self, record: EquipmentRecord, **kwargs) -> None: """Time Controller from ID Quantique. Args: record: The equipment record. **kwargs: Keyword arguments. Can be specified as attributes of an XML element in a configuration file (with the tag of the element equal to the alias of `record`). """ super().__init__(record, **kwargs) self.ignore_attributes('counts_changed') cfg = kwargs.get('config') if cfg is not None: self.load(cfg) def _check_channel(self, channel: int) -> None: """Check that the channel number is valid, raises an exception if invalid.""" if channel < 1 or channel > 4: self.raise_exception(f'The channel number must be 1, 2, 3 or 4 (got {channel})') def _check_delay_block(self, block: int) -> None: if block < 1 or block > 8: self.raise_exception(f'The DELAY block number must be ' f'between 1 and 8 (got {block})') def _check_delay_value(self, *, delay: float, stacklevel: int) -> None: """Check the delay value.""" if delay < 0 or delay > 1: self.raise_exception(f'The delay must be between ' f'0 and 1 second (got {delay})') if delay > 4e-6: warnings.warn( 'Delay >4us, there is no guarantee it will work', UserWarning, stacklevel=stacklevel) def _config_start_or_input(self, **kwargs) -> None: """Common to both configure_start and configure_input.""" if kwargs['duration'] < 0.001 or kwargs['duration'] > 65.535: self.raise_exception(f'The duration must be between ' f'0.001 and 65.535 seconds (got {kwargs["duration"]})') self._check_delay_value(delay=kwargs['delay'], stacklevel=4) enable = 'ON' if kwargs['enabled'] else 'OFF' coupling = self.convert_to_enum(kwargs['coupling'], Coupling, to_upper=True) edge = self.convert_to_enum(kwargs['edge'], Edge, to_upper=True) mode = self.convert_to_enum(kwargs['mode'], Mode, to_upper=True) select = self.convert_to_enum(kwargs['select'], Select, to_upper=True) if kwargs['resync_policy'] is not None: policy = self.convert_to_enum(kwargs['resync_policy'], ResyncPolicy, to_upper=True) resync_policy = f'RESYNCPOLICY {policy.value};' else: # the START command does not support RESYNCPOLICY resync_policy = '' ms = round(kwargs['duration'] * 1e3) # convert to milliseconds ps = round(kwargs['delay'] * 1e12) # convert to picoseconds cmd = f'{kwargs["type"]}:ENABLE {enable};' \ f'COUPLING {coupling.value};' \ f'EDGE {edge.value};' \ f'THRESHOLD {kwargs["threshold"]}V;' \ f'SELECT {select.value};' \ f'{resync_policy}' \ f':{kwargs["type"]}:COUNTER:INTEGRATIONTIME {ms};' \ f'MODE {mode.value};' \ f'{kwargs["delay_cmd"]} {ps}' self.logger.info(f'configure {self.alias!r} {kwargs["type"]} settings with {cmd!r}') reply = self.connection.query(cmd) if reply.endswith('SCPI_ERR_PARAM_TYPE') or reply.endswith('SCPI_ERR_INVALID_CMD'): self.raise_exception(reply)
[docs] def clear_high_resolution_error(self, channel: int) -> None: """Clear the high-resolution error for an input channel. Args: channel: The input channel number (1, 2, 3 or 4). """ self._check_channel(channel) self.logger.info(f'clear {self.alias!r} high-resolution error for INPUT{channel}') reply = self.connection.query(f'INPUT{channel}:HIRES:ERROR:CLEAR') if reply != 'Cleared Highres errors': self.raise_exception('Could not clear the high-resolution error')
[docs] def configure_delay(self, *, block: int, address: int | str | None, value: float = 0) -> DelaySettings: """Configure the settings for a DELAY block. Args: block: The DELAY block number (1 through 8). address: The address to link the DELAY to. Can be an INPUT channel number (e.g., 1 or 'INPUT1'), 'START' or None. value: The delay value, in seconds. Returns: The DELAY settings that were read from the device after the settings were written. """ self._check_delay_block(block) address_map = { None: 'NONE', 0: 'STAR', 1: 'INPU1', 2: 'INPU2', 3: 'INPU3', 4: 'INPU4', 'NONE': 'NONE', 'STAR': 'STAR', 'START': 'STAR', 'INPU1': 'INPU1', 'INPU2': 'INPU2', 'INPU3': 'INPU3', 'INPU4': 'INPU4', 'INPUT1': 'INPU1', 'INPUT2': 'INPU2', 'INPUT3': 'INPU3', 'INPUT4': 'INPU4', } if isinstance(address, str): address = address.upper() link = address_map.get(address) if link is None: self.raise_exception(f'Invalid DELAY address {address!r}') self._check_delay_value(delay=value, stacklevel=3) ps = round(value * 1e12) # convert to picoseconds cmd = f'DELAY{block}:INPORT:LINK {link};:DELAY{block}:VALUE {ps}' self.logger.info(f'configure {self.alias!r} DELAY block with {cmd!r}') reply = self.connection.query(cmd) if reply.startswith('No') or \ reply.endswith('SCPI_ERR_PARAM_TYPE') or \ reply.endswith('SCPI_ERR_INVALID_CMD'): self.raise_exception(reply) return self.settings_delay(block)
[docs] def configure_device(self, *, clock: Clock | str, mode: Mode | str) -> DeviceSettings: """Configure the DEVICE settings. Args: clock: Use the internal or external clock. mode: The resolution mode (high speed or high resolution). Returns: The DEVICE settings that were read from the device after the settings were written. """ sync = self.convert_to_enum(clock, Clock, to_upper=True) mode = self.convert_to_enum(mode, Mode, to_upper=True) if mode not in (Mode.HIGH_SPEED, Mode.HIGH_RESOLUTION): self.raise_exception(f'Invalid device resolution mode {mode.value!r}') cmd = f'DEVICE:SYNC {sync.value};RESOLUTION {mode.value}' self.logger.info(f'configure {self.alias!r} DEVICE settings with {cmd!r}') reply = self.connection.query(cmd) if reply.endswith('SCPI_ERR_PARAM_TYPE') or reply.endswith('SCPI_ERR_INVALID_CMD'): self.raise_exception(reply) return self.settings_device()
[docs] def configure_histogram(self, *, channel: int, ref: int | str | None, stop: int | str | None = None, enabler: str = 'TSGE8', minimum: float = 0, maximum: float = 1e-6, bin_count: int | None = None, bin_width: float = 100e-12) -> HistogramSettings: """Configure a HISTOGRAM channel. Args: channel: The HISTOGRAM channel number (1, 2, 3 or 4). ref: The INPUT channel number (0[START], 1, 2, 3 or 4) or the name of the reference channel (e.g., 'TSCO5') stop: The INPUT channel number (0[START], 1, 2, 3 or 4) or the name of the stop channel (e.g., 'TSCO5'). If not specified, then the value of `ref` is used. enabler: The timestamp-generator block that determines when data acquisition begins and ends. minimum: Minimum time value, in seconds. maximum: Maximum time value, in seconds. bin_count: The number of time bins. If specified, then `maximum` is ignored. Must be between 1 and 16384. bin_width: The time-bin width, in seconds. Returns: The HISTOGRAM settings that were read from the device after the settings were written. """ self._check_channel(channel) s = self.settings_device() if s.mode == Mode.HIGH_RESOLUTION: min_width = 13e-12 min_width_text = '13 ps' else: min_width = 100e-12 min_width_text = '100 ps' if bin_width < min_width or bin_width > 1e-3: self.raise_exception(f'The bin width must be between ' f'{min_width_text} and 1 ms (got {bin_width})') if minimum < 0: self.raise_exception(f'The minimum time must be >0 (got {minimum})') if bin_count is None: bin_count = ceil((maximum - minimum) / bin_width) if bin_count < 1 or bin_count > 16384: self.raise_exception(f'Invalid bin count, {bin_count}. ' f'Must be between 1 and 16384.') if isinstance(ref, str): ref = ref.upper() if stop is None: stop = ref input_map = { None: 'NONE', 0: 'STAR', 1: 'TSCO5', 2: 'TSCO6', 3: 'TSCO7', 4: 'TSCO8', 'START': 'STAR', 'INPUT1': 'TSCO5', 'INPUT2': 'TSCO6', 'INPUT3': 'TSCO7', 'INPUT4': 'TSCO8', } ref_link = input_map.get(ref, ref) stop_link = input_map.get(stop, stop) # convert to picoseconds min_ps = ceil(minimum * 1e12) width_ps = ceil(bin_width * 1e12) cmd = f':HIST{channel}:INPORT:STOP:LINK {stop_link};' \ f':HIST{channel}:INPORT:REF:LINK {ref_link};' \ f':HIST{channel}:INPORT:ENAB:LINK {enabler};' \ f':HIST{channel}:MINIMUM {min_ps};' \ f':HIST{channel}:BWIDTH {width_ps};' \ f':HIST{channel}:BCOUNT {bin_count};' \ f':RAW{channel}:INPORT:STOP:LINK {stop_link};' \ f':RAW{channel}:INPORT:REF:LINK {ref_link}' self.logger.info(f'configure {self.alias!r} HISTOGRAM{channel} settings with {cmd!r}') reply = self.connection.query(cmd) if 'No connection' in reply or \ reply.endswith('SCPI_ERR_PARAM_TYPE') or \ reply.endswith('SCPI_ERR_INVALID_CMD'): self.raise_exception(reply) return self.settings_histogram(channel)
[docs] def configure_input(self, *, channel: int, coupling: Coupling | str = Coupling.DC, delay: float = 0, duration: float = 1, edge: Edge | str = Edge.RISING, enabled: bool = False, mode: Mode | str = Mode.CYCLE, resync_policy: ResyncPolicy | str = ResyncPolicy.AUTO, select: Select | str = Select.UNSHAPED, threshold: float = 1) -> InputSettings: """Configure an INPUT channel. Args: channel: The input channel number (1, 2, 3 or 4). coupling: Either AC or DC coupling. delay: The delay, in seconds, to add to the timestamp when an edge is detected. Must be between 0 and 1 second. duration: The number of seconds to count edges (integration time). Must be between 0.001 and 65.535 seconds. edge: The discriminator edge, either RISING or FALLING. enabled: Whether the channel is enabled or disabled. mode: The counter mode (either CYCLE or ACCUMULATE). resync_policy: The resync policy. select: Select what feeds the INPUT block. threshold: The discriminator threshold value, in volts. Returns: The INPUT settings that were read from the device after the settings were written. """ self._check_channel(channel) kwargs = { 'coupling': coupling, 'delay': delay, 'delay_cmd': f':DELAY{channel}:INPORT:LINK INPU{channel};:DELAY{channel}:VALUE', 'duration': duration, 'edge': edge, 'enabled': enabled, 'mode': mode, 'resync_policy': resync_policy, 'select': select, 'threshold': threshold, 'type': f'INPUT{channel}', } self._config_start_or_input(**kwargs) return self.settings_input(channel)
[docs] def configure_start(self, *, coupling: Coupling | str = Coupling.DC, delay: float = 0, edge: Edge | str = Edge.RISING, enabled: bool = False, duration: float = 1, mode: Mode | str = Mode.CYCLE, select: Select | str = Select.UNSHAPED, threshold: float = 1) -> StartSettings: """Configure the START channel. Args: coupling: Either AC or DC coupling. delay: The delay, in seconds, to add to the timestamp when an edge is detected. Must be between 0 and 1 second. edge: The discriminator edge, either RISING or FALLING. enabled: Whether the channel is enabled or disabled. duration: The number of seconds to count edges (integration time). Must be between 0.001 and 65.535 seconds. mode: The counter mode (either CYCLE or ACCUMULATE). select: Select what feeds the START block. threshold: The discriminator threshold value, in volts. Returns: The START settings that were read from the device after the settings were written. """ kwargs = { 'coupling': coupling, 'delay': delay, 'delay_cmd': ':START:DELAY', 'duration': duration, 'edge': edge, 'enabled': enabled, 'mode': mode, 'resync_policy': None, 'select': select, 'threshold': threshold, 'type': f'START', } self._config_start_or_input(**kwargs) return self.settings_start()
[docs] def count_edges(self, *, channel: int, allow_zero: bool = False, nsamples: int = 1) -> Samples: """Count the number of edges per second. Args: channel: The input channel number (e.g., 0[START], 1, 2, 3 or 4). allow_zero: Whether to allow zero edges per second to be counted. Querying the COUNTER? value from the device immediately returns the value that is stored in the device's memory. After resetting the device (i.e., the value returned by COUNTER? is 0) Python sleeps for `duration` seconds (see :meth:`.configure_input`) before querying COUNTER?. Since the computer clock and the device clock are not synced it is possible that COUNTER? is queried before the device writes a value to memory. If zero edges are detected and `allow_zero` is :data:`False` then this method will block until at least 1 edge is detected. nsamples: The number of samples to acquire. Returns: The number of edges per second. """ if channel == 0: settings = self.settings_start() reset_cmd = 'START:COUNTER:RESET' count_cmd = 'START:COUNTER?' where = 'START' else: settings = self.settings_input(channel) reset_cmd = f'INPUT{channel}:COUNTER:RESET' count_cmd = f'INPUT{channel}:COUNTER?' where = f'INPUT{channel}' if not settings.enabled: self.raise_exception(f'Channel {where} is not enabled') cps = np.full((nsamples,), -2 ** 63, dtype=np.int64) duration = settings.duration query = self.connection.query self.logger.info(f'{self.alias!r} start counting edges on {where} ...') for index in range(nsamples): if query(reset_cmd) != 'Counter value set to 0 ': self.raise_exception('Could not reset the counter') sleep(duration) while True: counts = int(query(count_cmd)) if counts > 0 or allow_zero: break sleep(0.01) cps[index] = counts / duration self.logger.info( f'{self.alias!r} counted {np.array2string(cps, max_line_width=1000)} ' f'{settings.edge.name} edges/second in {duration}-second intervals') s = Samples(cps) self.counts_changed.emit(s) self.maybe_emit_notification(**s.to_json()) return s
[docs] def has_high_resolution_error(self, channel: int) -> bool: """Check if an input channel has a high-resolution error. Args: channel: The input channel number (1, 2, 3 or 4). Returns: Whether the specified channel has a high-resolution error. """ self._check_channel(channel) reply = self.connection.query(f'INPUT{channel}:HIRES:ERROR?') return int(reply) == 1
[docs] def load(self, config: str) -> None: """Load a pre-defined configuration. Args: config: The configuration to load ('INIT', 'HISTO', 'COUNT', or 'BLANK'). """ self.logger.info(f'{self.alias!r} load configuration {config!r}') reply = self.connection.query(f'DEVICE:CONFIGURATION:LOAD {config}') if reply.endswith('SCPI_ERR_PARAM_TYPE') or reply.endswith('SCPI_ERR_INVALID_CMD'): self.raise_exception(f'Could not load configuration {config!r}')
[docs] def recalibrate(self) -> None: """Recalibrate the Time Controller.""" self.logger.info(f'recalibrating {self.alias!r}') reply = self.connection.query('DEVICE:SAMPLING:RECALIBRATE') if 'failed' in reply: self.raise_exception(reply)
[docs] def settings_delay(self, block: int) -> DelaySettings: """Get the settings of a DELAY block. Args: block: The DELAY block number (1 through 8). """ self._check_delay_block(block) address_map = { 'NONE': None, 'STAR': 'START', 'INPU1': 'INPUT1', 'INPU2': 'INPUT2', 'INPU3': 'INPUT3', 'INPU4': 'INPUT4', } reply = self.connection.query(f'DELAY{block}:STATE?') value, address = reply.rstrip(';').split(';') return DelaySettings( address=address_map[address[10:].upper()], value=round(float(value[12:].rstrip('TB')) * 1e-12, 12))
[docs] def settings_device(self) -> DeviceSettings: """Get the DEVICE settings.""" reply = self.connection.query('DEVICE:STATE?') sync, res = reply.upper().split(';') return DeviceSettings( clock=self.convert_to_enum(sync[12:], Clock), mode=self.convert_to_enum(res[4:], Mode) )
[docs] def settings_histogram(self, channel: int) -> HistogramSettings: """Get the settings of a HISTOGRAM channel. Args: channel: The HISTOGRAM channel number (1, 2, 3 or 4). """ self._check_channel(channel) reply = self.connection.query(f':HIST{channel}:STATE?') stop, ref, enab, _min, width, count = reply.rstrip(';').split(';') bin_count = int(count[7:]) bin_width = round(float(width[5:].rstrip('TB')) * 1e-12, 12) minimum = round(float(_min[11:].rstrip('TB')) * 1e-12, 12) return HistogramSettings( channel=channel, ref=ref[23:], stop=stop[23:], enabler=enab[24:], minimum=minimum, maximum=round(minimum + (bin_width * bin_count), 12), bin_width=bin_width, bin_count=bin_count)
[docs] def settings_input(self, channel: int) -> InputSettings: """Get the settings of an INPUT channel. Args: channel: The INPUT channel number (1, 2, 3 or 4). """ self._check_channel(channel) reply = self.connection.query(f'INPUT{channel}:STATE?;:DELAY{channel}:VALUE?') enab, coup, edge, thre, sel, resy, time, mode, delay = reply.upper().split(';') return InputSettings( channel=channel, coupling=self.convert_to_enum(coup[5:], Coupling), edge=self.convert_to_enum(edge[5:], Edge), enabled=enab.endswith('ON'), delay=round(float(delay.rstrip('TB')) * 1e-12, 12), duration=round(float(time[30:].rstrip('TB')) * 1e-3, 3), mode=self.convert_to_enum(mode[5:], Mode), resync_policy=self.convert_to_enum(resy[5:], ResyncPolicy), select=self.convert_to_enum(sel[5:], Select), threshold=float(thre[5:].rstrip('V')))
[docs] def settings_start(self) -> StartSettings: """Get the settings of the START channel.""" reply = self.connection.query('START:STATE?') enab, coup, edge, thre, sel, delay, time, mode = reply.upper().split(';') return StartSettings( coupling=self.convert_to_enum(coup[5:], Coupling), edge=self.convert_to_enum(edge[5:], Edge), enabled=enab.endswith('ON'), delay=round(float(delay[5:].rstrip('TB')) * 1e-12, 12), duration=round(float(time[30:].rstrip('TB')) * 1e-3, 3), mode=self.convert_to_enum(mode[5:], Mode), select=self.convert_to_enum(sel[5:], Select), threshold=float(thre[5:].rstrip('V')))
[docs] def start_stop(self, *, clear: bool = True, duration: float = 30, enabler: str = 'TSGE8', min_events: int | Sequence[int] = 0, timeout: float | None = None) -> Histogram: """Acquire a start-stop histogram of the duration between two edges. Args: clear: Whether to clear the histogram data before acquiring data. duration: The number of seconds to acquire data for. enabler: The timestamp-generator block that determines when data acquisition begins and ends. min_events: The minimum number of start-stop events, on each histogram channel, that must occur before returning to the calling program. If specified, then iteratively acquires data for `duration` seconds until the specified number of events has occurred. timeout: The maximum number of seconds to wait for `min_events` to occur. If a timeout occurs, the data that has been acquired is returned (an error is not raised). Returns: The histogram data. """ channels = (1, 2, 3, 4) done = [False, False, False, False] if isinstance(min_events, int): min_events = [min_events for _ in channels] if len(min_events) != len(channels): self.raise_exception(f'Length of min_events sequence must be ' f'{len(channels)}, (got {len(min_events)})') timestamps = [] for c in channels: s = self.settings_histogram(c) bin_center = (s.minimum + s.bin_width) / 2.0 timestamps.append(bin_center + (np.arange(s.bin_count) * s.bin_width)) def gate(action): r = self.connection.query(f':{enabler}:ENABLE {action}') if not r.endswith(action): self.raise_exception(f'Cannot turn {action} {enabler}') if clear: self.logger.info(f'clear {self.alias!r} HISTOGRAM') gate('OFF') for c in channels: reply = self.connection.query(f':HIST{c}:FLUSH') if reply != 'Flushed Histogram!': self.raise_exception(f'Cannot clear HIST{c}') counts = [np.empty(0)] * len(channels) t0 = perf_counter() while not all(done): self.logger.info(f'{self.alias!r} waiting for {duration} second(s) for HISTOGRAM data ...') gate('ON') sleep(duration) gate('OFF') for i, c in enumerate(channels): reply = self.connection.query(f':HIST{c}:DATA?') counts[i] = np.fromstring(reply[1:-1], sep=',', dtype=np.uint64) if (not done[i]) and (np.sum(counts[i]) >= min_events[i]): done[i] = True if timeout and perf_counter() - t0 > timeout: self.logger.warning(f'{self.alias!r} timed out after ' f'{timeout} seconds waiting for HISTOGRAM data') break self.logger.info(f'{self.alias!r} finished acquiring HISTOGRAM data') f = np.core.records.fromarrays return Histogram( hist1=f([timestamps[0], counts[0]], names=['timestamps', 'counts']), hist2=f([timestamps[1], counts[1]], names=['timestamps', 'counts']), hist3=f([timestamps[2], counts[2]], names=['timestamps', 'counts']), hist4=f([timestamps[3], counts[3]], names=['timestamps', 'counts']), )