Source code for photons.equipment.oscilloscope_rigol
"""
An oscilloscope from Rigol.
"""
from typing import Literal
import numpy as np
from .base import equipment
from .oscilloscope import Oscilloscope
[docs]
@equipment(manufacturer=r'Rigol', model=r'DS\d{4}Z?')
class RigolOscilloscope(Oscilloscope):
"""An oscilloscope from Rigol."""
def _check(self, command: str) -> None:
"""Write the command and then check for errors."""
reply = self.connection.query(f'{command};:SYSTEM:ERROR?')
if not reply.startswith('0,'):
self.raise_exception(reply)
def _configure(self, command: str) -> None:
"""Log the configuration command, write the command and then check for errors."""
self.logger.info(f'configure {self.alias!r} using {command!r}')
self._check(command)
[docs]
def clear(self) -> None:
"""Clears the event registers and the error queue."""
self.logger.info(f'clear {self.alias!r}')
self._check('*CLS')
[docs]
def configure_channel(self,
channel: int,
*,
bw_limit: bool = False,
coupling: Literal['DC', 'AC', 'GND'] = 'DC',
enable: bool = True,
invert: bool = False,
offset: float = 0,
probe: float = 1,
scale: float = 1) -> None:
"""Configure a channel.
Args:
channel: The channel number.
bw_limit: Whether to enable or disable the bandwidth limit.
coupling: The coupling mode (either DC, AC or GND).
enable: Whether to enable or disable the channel.
invert: Whether to invert the waveform.
offset: The vertical offset [Volts].
probe: The probe ratio. Only discrete values are allowed (see manual).
scale: The vertical scale [Volts/div].
"""
bwl = '20M' if bw_limit else 'OFF'
display = 'ON' if enable else 'OFF'
invert = 'ON' if invert else 'OFF'
self._configure(
f':CHANNEL{channel}:BWLIMIT {bwl};'
f':CHANNEL{channel}:COUPLING {coupling};'
f':CHANNEL{channel}:DISPLAY {display};'
f':CHANNEL{channel}:INVERT {invert};'
f':CHANNEL{channel}:PROBE {probe};'
f':CHANNEL{channel}:SCALE {scale};'
f':CHANNEL{channel}:OFFSET {offset}' # must come after SCALE
)
[docs]
def configure_timebase(self,
*,
mode: Literal['MAIN', 'XY', 'ROLL'] = 'MAIN',
offset: float = 0,
scale: float = 1e-6) -> None:
"""Configure the timebase.
Args:
mode: The timebase mode (either MAIN, XY or ROLL).
offset: The horizontal offset [seconds].
scale: The horizontal scale [seconds/div].
"""
self._configure(
f':TIMEBASE:MODE {mode};'
f':TIMEBASE:SCALE {scale};'
f':TIMEBASE:OFFSET {offset}' # must come after SCALE
)
[docs]
def configure_trigger(self,
*,
channel: int | str = 1,
coupling: Literal['AC', 'DC', 'LFReject', 'HFReject'] = 'DC',
holdoff: float = 16e-9,
level: float = 0,
noise_reject: bool = True,
slope: Literal['POS', 'NEG', 'RFAL'] = 'POS',
sweep: Literal['AUTO', 'NORMAL', 'SINGLE'] = 'AUTO') -> None:
"""Configure the Edge trigger.
Args:
channel: The channel to use as the trigger source.
coupling: The trigger coupling type (either AC, DC, LFReject or HFReject).
holdoff: The trigger holdoff time [seconds].
level: The voltage level to trigger at.
noise_reject: Whether to enable or disable noise rejection.
slope: The slope edge to trigger on (either POS, NEG or RFAL).
sweep: The sweep mode (either AUTO, NORMAL or SINGLE).
"""
reject = 'ON' if noise_reject else 'OFF'
source = channel if isinstance(channel, str) else f'CHAN{channel}'
self._configure(
f':TRIGGER:MODE EDGE;'
f':TRIGGER:EDGE:SOURCE {source};'
f':TRIGGER:EDGE:SLOPE {slope};'
f':TRIGGER:EDGE:LEVEL {level};'
f':TRIGGER:COUPLING {coupling};'
f':TRIGGER:HOLDOFF {holdoff};'
f':TRIGGER:SWEEP {sweep};'
f':TRIGGER:NREJECT {reject}'
)
[docs]
def run(self) -> None:
"""Start acquiring waveform data."""
self.logger.info(f'start {self.alias!r}')
self._check(':RUN')
[docs]
def single(self) -> None:
"""Capture and display a single acquisition."""
self.logger.info(f'single shot {self.alias!r}')
self._check(':SINGLE')
[docs]
def trigger(self) -> None:
"""Send a software trigger."""
self.logger.info(f'software trigger {self.alias!r}')
self._check(':TFORCE')
[docs]
def stop(self) -> None:
"""Stop acquiring waveform data."""
self.logger.info(f'stop {self.alias!r}')
self._check(':STOP')
[docs]
def waveform(self,
*channels: int | str,
displayed: bool = True) -> np.ndarray:
"""Get the waveform data.
Args:
*channels: The channel(s) to read (e.g., 1, 'CHAN1', 'channel1', 'D6').
displayed: Whether to read the waveform data displayed on the screen
or from internal memory. If reading from internal memory then
:meth:`.stop` is called prior to reading the data.
Returns:
The waveform data.
"""
if not channels:
self.raise_exception('Must specify the channel(s) to read')
if displayed:
chunk_size = 1200
mode = 'NORMAL'
else:
# A maximum of 250000 points can be read per ":WAV:DATA?" query
# when ":WAV:FORM BYTE" is used
chunk_size = 250000
mode = 'RAW'
self.stop()
data = []
names = []
for c in channels:
source = c.upper() if isinstance(c, str) else f'CHAN{c}'
cmd = f':WAVEFORM:SOURCE {source};' \
f':WAVEFORM:MODE {mode};' \
f':WAVEFORM:FORMAT BYTE'
self._check(cmd)
pre = self.connection.query(':WAVEFORM:PREAMBLE?').split(',')
fmt, typ, npts, nave = map(int, pre[:4])
dx, x0, x_ref, dy, y0, y_ref = map(float, pre[4:])
assert fmt == 0
self.logger.info(f'get {source!r} waveform data from {self.alias!r}')
raw = np.empty(npts, dtype=np.uint8)
start, stop = 1, chunk_size
while start < npts:
cmd = f':WAVEFORM:START {start};' \
f':WAVEFORM:STOP {stop};' \
f':WAVEFORM:DATA?'
raw[start-1:stop] = self.connection.query(cmd, dtype=np.uint8, fmt='ieee')
start = stop + 1
stop = min(stop + chunk_size, npts)
if not data:
t = x0 + np.arange(raw.size) * dx
data.append(t)
names.append('t')
volts = (raw - y0 - y_ref) * dy
data.append(volts)
if source.startswith('C'):
names.append(f'ch{source[-1]}')
else:
names.append(source)
return np.core.records.fromarrays(data, names=','.join(names))