Source code for photons.utils

"""
General classes and functions.
"""
import time
from decimal import Decimal

import numpy as np
import requests
from numpy.lib import recfunctions

from .log import logger


[docs] def array_central( centre: float, width: float, step: float, *, randomize: bool = False, decimals: int = None) -> np.ndarray: """Get an array about a central value, for a given full width and step size. Args: centre: The central value in the array. width: The full width about `centre`. Must be an integer multiple of `step`. step: The step size. Must be a positive number. randomize: Whether to randomize the values in the array. decimals: The number of decimals to use in :func:`numpy.around`. If not specified then uses the value of `step` to determine the number of decimals. Returns: The requested array. """ if step < 0: raise ValueError("'step' cannot be negative") if width < 0: raise ValueError("'width' cannot be negative") if step == 0 or width == 0: return np.array([centre]) ratio = width / step rounded = round(ratio) if abs(rounded - ratio) > 1e-12: raise ValueError(f'The width, {width}, must be an integer multiple ' f'of the step size, {step}') num = rounded + 1 half_width = width * 0.5 start = centre - half_width stop = centre + half_width array, np_step = np.linspace(start, stop, num, retstep=True, endpoint=True) assert abs(step - np_step) < 1e-12, f'requested step size {step} != actual step size {np_step}' d = decimals or max(get_decimals(centre), get_decimals(step)) array = np.around(array, decimals=d) if randomize: np.random.shuffle(array) return array
[docs] def array_evenly( start: float, stop: float, step: float, *, randomize: bool = False, decimals: int = None) -> np.ndarray: """Return evenly-spaced values within a given interval. The values are generated within the interval [start, stop]. Args: start: Start value (the interval includes this value). stop: Stop value (the interval includes this value). step: The step size. randomize: Whether to randomize the values in the array. decimals: The number of decimals to use in :func:`numpy.around`. If not specified then uses the value of `step` to determine the number of decimals. Returns: The requested array. """ if step == 0: return np.array([start]) centre = (stop + start) * 0.5 width = stop - start if step > 0: if start > stop: raise ValueError("'start' must be less than 'stop' for a positive step size") return array_central(centre, width, step, randomize=randomize, decimals=decimals) if stop > start: raise ValueError("'start' must be greater than 'stop' for a negative step size") array = array_central(centre, -width, -step, randomize=randomize, decimals=decimals) return np.flip(array)
[docs] def array_merge(*vectors: np.ndarray) -> np.ndarray: """Merge 1-D arrays, field by field, to create a new structured N-D array.""" size = None for vector in vectors: if vector.ndim != 1: raise ValueError(f'A vector can only have dimension 1, ' f'got dimension {vector.ndim}') if size is None: size = vector.size elif size != vector.size: raise ValueError(f'All vectors must have the same size, ' f'{size} != {vector.size}') if size is None: return np.array([]) return recfunctions.merge_arrays(vectors)
[docs] def array_photodiode_centre( centre: float, *, width: float = 10, step: float = 0.1, randomize: bool = False, decimals: int = None) -> np.ndarray: """Useful when trying to find the centre position of a photodiode. The returned array has values at the two edges of the photodiode and in the central region of the photodiode (in one-dimension only). Args: centre: The position of a translation stage where the centre of the photodiode is believed to be. width: The width of the photodiode or the diameter of the aperture. Must have the same unit as `step`. step: The step size to move the translation stage within each of the three regions (not between regions). randomize: Whether to randomize the values in the array. decimals: The number of decimals to use in :func:`numpy.around`. If not specified then uses the values of `centre` and `step` to determine the number of decimals. Returns: The requested array. """ if step == 0: return np.array([centre]) half_width = width * 0.5 region_width = step * round(width * 0.1 / step) rising = array_central(centre-half_width, region_width, step, decimals=decimals) central = array_central(centre, region_width, step, decimals=decimals) falling = array_central(centre+half_width, region_width, step, decimals=decimals) array = np.concatenate((rising, central, falling)) if randomize: np.random.shuffle(array) return array
[docs] def ave_std(data: np.ndarray, *, axis: int | tuple[int] = None) -> tuple[float | np.ndarray, float | np.ndarray]: """Calculate the average and standard deviation. Args: data: The values to compute the average and standard deviation of. axis: Axis or axes along which the average and standard deviation is computed. The default is to compute the values of the flattened array. Returns: The average value and the standard deviation. """ if data.size > 1: return np.average(data, axis=axis), np.std(data, axis=axis, ddof=1) if data.size == 1: return data[0], np.nan return np.nan, np.nan
[docs] def get_decimals(value: int | float) -> int: """Get the number of digits after the decimal point. This function returns a sensible result only if `value` was explicitly defined for a parameter (for example a value from a QSpinbox). If `value` is the result from a calculation then there will be floating-point issues, and will most likely return nonsense (e.g., a number > 15). """ if value == int(value): return 0 return -Decimal(str(value)).as_tuple().exponent
[docs] def lab_logging(root_url: str, *aliases: str, corrected: bool = True, strict: bool = True, timeout: float = 10) -> dict: """Read the current temperature, humidity and dewpoint of (an) OMEGA iServer(s). Args: root_url: The root url of the webapp, e.g., ``'http://hostname:port/'`` aliases: The iServer alias(es) to retrieve the data from. If not specified then retrieves the data from all iServers. corrected: Whether to return corrected (True) or uncorrected (False) values. strict: Whether to raise an exception if the connection to the webapp cannot be established. timeout: The maximum number of seconds to wait for a reply from the webapp. Returns: The temperature, humidity and dewpoint from the iServer(s). """ url = root_url.rstrip('/') params = {'corrected': corrected} if aliases: params['alias'] = ','.join(aliases) try: reply = requests.get(f'{url}/now/?', params=params, timeout=timeout) except requests.exceptions.RequestException as e: if strict: raise logger.error(str(e)) return {} if not reply.ok: msg = reply.content.decode() if strict: raise RuntimeError(msg) logger.error(msg) return {} data = reply.json() for key, value in data.items(): if value['error']: error = value['error'] alias = value['alias'] msg = f'{error} [Serial:{key}, Alias:{alias}]' if strict: raise RuntimeError(msg) logger.error(msg) return {} return data
[docs] def std_relative(array: np.ndarray, *, axis: int | tuple[int] = None) -> float | np.ndarray: """Calculate the relative standard deviation. Args: array: The values to compute the relative standard deviation of. axis: Axis or axes along which the relative standard deviation is computed. The default is to compute the value of the flattened array. Returns: The relative standard deviation. """ ave, std = ave_std(array, axis=axis) if array.size > 1 and np.any(ave == 0): if ave.ndim == 0: raise ZeroDivisionError('The average value is 0') raise ZeroDivisionError('The average value along an axis is 0') return std / np.abs(ave)
[docs] def hhmmss(seconds: float) -> str: """Convert seconds to a hh:mm:ss representation.""" one_day = 86400 if seconds < one_day: return time.strftime('%H:%M:%S', time.gmtime(seconds)) days = int(seconds // one_day) hms = hhmmss(seconds - (days * one_day)) out = f'{hms} (+{days} day' if days == 1: return f'{out})' return f'{out}s)'
[docs] def mean_max_n(array: np.ndarray, n: int) -> float: """Return the mean of the maximum *n* values in *array*.""" indices = np.argpartition(array, -n)[-n:] return float(np.mean(array[indices]))
[docs] def mean_min_n(array: np.ndarray, n: int) -> float: """Return the mean of the minimum *n* values in *array*.""" indices = np.argpartition(array, n)[:n] return float(np.mean(array[indices]))