"""
DAQ from National Instruments.
"""
import warnings
import nidaqmx.constants
import numpy as np
from msl.equipment import EquipmentRecord
from msl.equipment.connection_nidaq import ConnectionNIDAQ
from msl.qt import QtCore
from msl.qt import Signal
from scipy.signal import sawtooth
from scipy.signal import square
from .base import BaseEquipment
from .base import equipment
from ..samples import Samples
Task = nidaqmx.Task
Edge = nidaqmx.constants.Edge
Slope = nidaqmx.constants.Slope
DigitalWidthUnits = nidaqmx.constants.DigitalWidthUnits
CountDirection = nidaqmx.constants.CountDirection
AcquisitionType = nidaqmx.constants.AcquisitionType
TriggerType = nidaqmx.constants.TriggerType
Level = nidaqmx.constants.Level
LineGrouping = nidaqmx.constants.LineGrouping
TerminalConfiguration = nidaqmx.constants.TerminalConfiguration
TimeUnits = nidaqmx.constants.TimeUnits
AnalogSingleChannelReader = nidaqmx.stream_readers.AnalogSingleChannelReader
AnalogMultiChannelReader = nidaqmx.stream_readers.AnalogMultiChannelReader
CounterReader = nidaqmx.stream_readers.CounterReader
[docs]
class Timing:
def __init__(self, **kwargs) -> None:
"""Do not instantiate this class directly. Use :meth:`NIDAQ.timing`."""
self._samples_per_channel = 1
self._source = kwargs['source']
self._rate = kwargs['rate']
if kwargs['rising']:
self._active_edge = Edge.RISING
else:
self._active_edge = Edge.FALLING
if kwargs['finite']:
self._sample_mode = AcquisitionType.FINITE
else:
self._sample_mode = AcquisitionType.CONTINUOUS
settings = [
f'rate={self._rate}',
f'edge={self._active_edge.name}',
f'mode={self._sample_mode.name}',
]
if self._source:
settings.append(f'source={self._source}')
self._settings = ', '.join(settings)
def __repr__(self) -> str:
return f'Timing<{self._settings}>'
[docs]
def add_to(self, task: Task) -> None:
"""Add the timing configuration to a task.
Args:
task: The task to add the timing configuration to.
"""
task.timing.cfg_samp_clk_timing(
self._rate,
source=self._source,
active_edge=self._active_edge,
sample_mode=self._sample_mode,
samps_per_chan=self._samples_per_channel
)
@property
def rate(self) -> float:
"""Returns the sample rate (in Hz)."""
return self._rate
@property
def sample_mode(self) -> AcquisitionType:
"""Returns the sample mode."""
return self._sample_mode
@property
def samples_per_channel(self) -> int:
"""Returns the number of samples per channel to acquire or generate."""
return self._samples_per_channel
@samples_per_channel.setter
def samples_per_channel(self, value):
self._samples_per_channel = int(value)
[docs]
class Trigger:
def __init__(self, **kwargs) -> None:
"""Do not instantiate this class directly. Use :meth:`NIDAQ.trigger`."""
self._delay = kwargs['delay']
self._level = kwargs['level']
self._hysteresis = kwargs['hysteresis']
self._retriggerable = kwargs['retriggerable']
settings = [f'source={kwargs["source"]}']
if self._level is None:
# digital trigger
edge = Edge.RISING if kwargs['rising'] else Edge.FALLING
self._kwargs = {
'trigger_source': kwargs['source'],
'trigger_edge': edge
}
settings.append(f'edge={edge.name}')
else:
# analog trigger
slope = Slope.RISING if kwargs['rising'] else Slope.FALLING
self._kwargs = {
'trigger_source': kwargs['source'],
'trigger_slope': slope,
'trigger_level': self._level
}
settings.extend([f'slope={slope.name}, level={self._level}'])
if self._delay != 0:
settings.append(f'delay={self._delay}')
if self._retriggerable:
settings.append(f'retriggerable=True')
if self._hysteresis != 0:
settings.append(f'hysteresis={self._hysteresis}')
self._settings = ', '.join(settings)
def __repr__(self) -> str:
return f'Trigger<{self._settings}>'
[docs]
def add_to(self, task: Task) -> None:
"""Add this trigger to a task.
Args:
task: The task to add the trigger event to.
"""
if self._delay < 0:
pre = round(task.timing.samp_clk_rate * abs(self._delay))
t = task.triggers.reference_trigger
if self._level is None:
t.cfg_dig_edge_ref_trig(pretrigger_samples=pre, **self._kwargs)
else:
t.cfg_anlg_edge_ref_trig(pretrigger_samples=pre, **self._kwargs)
if self._hysteresis != 0:
t.anlg_edge_hyst = self._hysteresis
if self._retriggerable:
t.retriggerable = True
else:
t = task.triggers.start_trigger
if self._level is None:
t.cfg_dig_edge_start_trig(**self._kwargs)
else:
t.cfg_anlg_edge_start_trig(**self._kwargs)
if self._hysteresis != 0:
t.anlg_edge_hyst = self._hysteresis
if self._delay > 0:
t.delay_units = DigitalWidthUnits.SECONDS
t.delay = self._delay
if self._retriggerable:
t.retriggerable = True
[docs]
@equipment(manufacturer=r'National Instruments', model=r'USB-6361')
class NIDAQ(BaseEquipment):
connection: ConnectionNIDAQ
counts_changed: QtCore.SignalInstance = Signal(Samples)
Task = Task
WAIT_INFINITELY = nidaqmx.constants.WAIT_INFINITELY
def __init__(self, record: EquipmentRecord, **kwargs) -> None:
"""DAQ from National Instruments.
Args:
record: The equipment record.
**kwargs: Keyword arguments. Can be specified as attributes
of an XML element in a configuration file (with the tag
of the element equal to the alias of `record`).
"""
super().__init__(record, **kwargs)
self.DEV: str = record.connection.address
self._tasks: list[Task] = []
self.ignore_attributes('DEV', 'counts_changed',
'Task', 'WAIT_INFINITELY')
[docs]
def analog_in(self,
channel: int | str,
*,
config: int | str = 'DIFF',
duration: float = None,
maximum: float = 10,
minimum: float = -10,
nsamples: int = 1,
timeout: float = 10,
timing: Timing = None,
trigger: Trigger = None,
wait: bool = True) -> tuple[np.ndarray | Task, float]:
"""Read the voltage(s) of the analog-input channel(s).
Args:
channel: The analog-input channel number(s), e.g.,
channel=0, channel='0:7'.
config: Specifies the input terminal configuration for the channel,
see :class:`~nidaqmx.constants.TerminalConfiguration`.
duration: The number of seconds to read voltages for. If specified
then this value is used instead of `nsamples`.
maximum: The maximum voltage that is expected to be measured.
minimum: The minimum voltage that is expected to be measured.
nsamples: The number of samples per channel to read. If a `duration`
is also specified then that value is used instead of `nsamples`.
timeout: The maximum number of seconds to wait for the task to finish.
Set to -1 to wait forever.
timing: The timing settings to use. See :meth:`.timing`.
trigger: The trigger settings to use. See :meth:`.trigger`.
wait: Whether to wait for the task to finish. If enabled then also
closes the task when it is finished.
Returns:
If `wait` is True then the voltage(s) of the requested analog-input
channel(s) and the time interval between samples (i.e., dt) are
returned. Otherwise, the analog-input task, which has *not* been
started yet and the time interval between samples are returned.
Not starting the task allows one to register a callback before
starting the task.
Examples:
.. suppress-unresolved-reference-daq:
>>> daq = NIDAQ()
Read the value of a single analog-input channel
>>> daq.analog_in(0)
(array([-0.48746041]), 0.001)
>>> daq.analog_in(6, nsamples=5)
(array([-0.44944232, -0.45040888, -0.45137544, -0.45556387, -0.45298637]), 0.001)
Read the values of multiple analog-input channels
>>> daq.analog_in('0:3', nsamples=4)
(array([[ 0.03512726, 0.03770475, 0.03867132, 0.03512726],
[-0.1675285 , 0.17527869, -0.17171693, 0.17237901],
[ 0.08248878, 0.12243999, 0.00741916, 0.07991128],
[ 0.08861033, 0.09859814, 0.05832474, 0.06831254]]), 0.001)
"""
if isinstance(channel, str) and channel.startswith(f'/{self.DEV}'):
ai_channel = channel
else:
ai_channel = f'/{self.DEV}/ai{channel}'
tc = self.convert_to_enum(config, TerminalConfiguration, to_upper=True)
task = NIDAQ.Task()
self._tasks.append(task)
task.ai_channels.add_ai_voltage_chan(
ai_channel,
terminal_config=tc,
min_val=minimum,
max_val=maximum,
)
if timing is None:
timing = self.timing()
if duration is None:
timing.samples_per_channel = nsamples
else:
timing.samples_per_channel = round(duration * timing.rate)
self._maybe_set_timing_and_trigger(task, timing, trigger, 'analog-input')
dt = 1.0 / task.timing.samp_clk_rate
if wait:
samples_per_channel = timing.samples_per_channel
num_channels = task.number_of_channels
if num_channels == 1:
data = np.empty((samples_per_channel,), dtype=float)
reader = AnalogSingleChannelReader(task.in_stream)
else:
data = np.empty((num_channels, samples_per_channel), dtype=float)
reader = AnalogMultiChannelReader(task.in_stream)
try:
reader.read_many_sample(
data,
number_of_samples_per_channel=samples_per_channel,
timeout=timeout,
)
task.read()
finally:
task.close()
self._tasks.remove(task)
return data, dt
return task, dt
[docs]
def analog_out(self,
channel: int | str,
voltage: float | list[float] | list[list[float]] | np.ndarray,
*,
auto_start: bool = True,
timeout: float = 10,
timing: Timing = None,
trigger: Trigger = None,
wait: bool = True) -> Task:
"""Write the voltage(s) to the analog-output channel(s).
Args:
channel: The analog-output channel number(s), e.g., channel=0, channel='0:1'.
voltage: The voltage(s) to output.
auto_start: Whether to automatically start the task.
timeout: The maximum number of seconds to wait for the task to finish.
Set to -1 to wait forever.
timing: The timing settings to use. See :meth:`.timing`.
trigger: The trigger settings to use. See :meth:`.trigger`.
wait: Whether to wait for the task to finish. If enabled then also
closes the task when it is finished.
Returns:
The analog-output task.
Examples:
.. suppress-unresolved-reference-daq:
>>> daq = NIDAQ()
Write to a single analog-output channel
>>> daq.analog_out(0, 1.123)
Write to multiple analog-output channels
>>> daq.analog_out('0:1', [0.2, -1.2])
>>> daq.analog_out('0:1', [[0.2, 0.1, 0.], [-0.1, 0., 0.1]])
"""
if isinstance(voltage, (float, int)):
array = np.array([voltage], dtype=float)
else:
array = np.asarray(voltage)
min_val = np.min(array)
max_val = np.max(array)
if max_val == min_val:
max_val += 0.1
ao = f'/{self.DEV}/ao{channel}'
task = NIDAQ.Task()
self._tasks.append(task)
task.ao_channels.add_ao_voltage_chan(ao, min_val=min_val, max_val=max_val)
if timing is None:
timing = self.timing()
timing.samples_per_channel = array.size // task.number_of_channels
self._maybe_set_timing_and_trigger(task, timing, trigger, 'analog-output')
self.logger.info(f'{self.alias!r} set {ao} with {array.shape} samples')
written = task.write(array, auto_start=auto_start, timeout=timeout)
assert written == timing.samples_per_channel
if wait:
try:
task.wait_until_done(timeout=timeout)
finally:
task.close()
self._tasks.remove(task)
return task
[docs]
def analog_out_read(self,
channel: int | str,
**kwargs) -> tuple[np.ndarray | Task, float]:
"""Read the output voltage(s) from the analog-output channel(s).
Args:
channel: The analog-output channel number(s), e.g., channel=0, channel='0:1'.
**kwargs: All keyword arguments are passed to :meth:`.analog_in`.
Returns:
If `wait` is True then the voltage(s) of the requested analog-output
channel(s) and the time interval between samples (i.e., dt) are
returned. Otherwise, the analog-output task, which has *not* been
started yet and the time interval between samples are returned.
Not starting the task allows one to register a callback before
starting the task.
Examples:
.. suppress-unresolved-reference-daq:
>>> daq = NIDAQ()
Read a single value from an analog-output channel
>>> daq.analog_out_read(0)
(array([-1.09800537]), 0.001)
Read multiple values from multiple analog-output channels
>>> daq.analog_out_read('0:1', nsamples=4)
(array([[-1.09832756, -1.09736099, -1.09800537, -1.09736099],
[ 0.21168585, 0.21233022, 0.21200803, 0.21168585]]), 0.001)
"""
def name(index):
return f'/{self.DEV}/_ao{index}_vs_aognd'
if isinstance(channel, str) and ':' in channel:
start, end = map(int, channel.split(':'))
assert end >= start
channels = [name(ch) for ch in range(start, end+1, 1)]
ao_channels = ','.join(channels)
else:
ao_channels = name(channel)
return self.analog_in(ao_channels, **kwargs)
[docs]
def close_all_tasks(self) -> None:
"""Close all tasks."""
with warnings.catch_warnings():
# closing an already-closed task indicates a ResourceWarning
warnings.simplefilter('ignore', ResourceWarning)
for task in self._tasks:
task.close()
self._tasks.clear()
[docs]
def count_edges(self,
pfi: int,
duration: float,
*,
nsamples: int = 1,
rising: bool = True) -> Samples:
"""Count the number of edges per second.
Args:
pfi: The PFI terminal number.
duration: The number of seconds to count edges for.
nsamples: The number of times to count edges for `duration` seconds.
rising: Whether to count rising edges, otherwise count falling edges.
Returns:
The number of edges per second.
"""
cps = np.full((nsamples,), -2**63, dtype=np.int64)
# using a Counter Output task as a gate for the Counter Input task
edge = Edge.RISING if rising else Edge.FALLING
# add a small delay to make sure that the CI task has started and
# is waiting for the CO gate pulse
co_task_delay = 0.01
ctr_src = 0
ctr_gate = 1
self.logger.info(f'{self.alias!r} start counting edges ...')
for index in range(nsamples):
with NIDAQ.Task() as co_task, NIDAQ.Task() as ci_task:
co_task.co_channels.add_co_pulse_chan_time(
f'/{self.DEV}/ctr{ctr_gate}',
high_time=duration,
# The value of low_time doesn't matter and that is why it is large
low_time=1000.,
idle_state=Level.LOW,
initial_delay=co_task_delay,
)
co_task.timing.cfg_implicit_timing(
sample_mode=AcquisitionType.FINITE,
samps_per_chan=1,
)
channel = ci_task.ci_channels.add_ci_count_edges_chan(
f'/{self.DEV}/ctr{ctr_src}',
edge=edge,
initial_count=0,
count_direction=CountDirection.COUNT_UP
)
# redirect the CI channel to the PFI terminal that has the
# input signal connected to it
channel.ci_count_edges_term = f'/{self.DEV}/PFI{pfi}'
# only increment the counter when the gate output is HIGH
pt = ci_task.triggers.pause_trigger
pt.trig_type = TriggerType.DIGITAL_LEVEL
pt.dig_lvl_when = Level.LOW
# the digital level source is internally connected to the CO task output
pt.dig_lvl_src = f'/{self.DEV}/Ctr{ctr_gate}InternalOutput'
# must start the CI task before the CO task
ci_task.start()
co_task.start()
co_task.wait_until_done(timeout=duration + co_task_delay + 5.0)
count = channel.ci_count
cps[index] = count / duration
self.logger.info(
f'{self.alias!r} counted {np.array2string(cps, max_line_width=1000)} '
f'{edge.name} edges/second in {duration}-second intervals')
s = Samples(cps)
self.counts_changed.emit(s)
self.maybe_emit_notification(**s.to_json())
return s
[docs]
def digital_in(self,
lines: int | str,
*,
port: int = 1) -> bool | list[bool]:
"""Read the state of the digital-input channel(s).
Args:
lines: The line number(s) (e.g., line=1, line='0:7',
line='/Dev1/port0/line0:7,/Dev1/port1/line0:3').
port: The port number.
Returns:
Whether the requested digital input channel(s) are HIGH or LOW.
Examples:
.. suppress-unresolved-reference-daq:
>>> daq = NIDAQ()
Read the state of a single digital-input channel (P1.0)
>>> daq.digital_in(0)
False
Read the state of a single digital-input channel (P0.2)
>>> daq.digital_in(2, port=0)
True
Read the state of multiple digital-input channels (P1.0-7)
>>> daq.digital_in('0:7')
[False, False, True, False, False, False, False, True]
"""
with NIDAQ.Task() as task:
task.di_channels.add_di_chan(
self._generate_digital_lines(lines, port),
line_grouping=LineGrouping.CHAN_PER_LINE
)
return task.read()
[docs]
def digital_out(self,
lines: int | str,
state: bool | list[bool] | list[list[bool]],
*,
auto_start: bool = True,
port: int = 1,
timeout: float = 10,
timing: Timing = None,
trigger: Trigger = None,
wait: bool = True) -> Task:
"""Write the state of digital-output channels(s).
Args:
lines: The line number(s) (e.g., line=1, line='0:7',
line='/Dev1/port0/line0:7,/Dev1/port1/line0:3').
state: Whether to set the specified line(s) to HIGH or LOW.
auto_start: Whether to automatically start the task.
port: The port number.
timeout: The maximum number of seconds to wait for the task to finish.
Set to -1 to wait forever.
timing: The timing settings to use. See :meth:`.timing`.
trigger: The trigger settings to use. See :meth:`.trigger`.
wait: Whether to wait for the task to finish. If enabled then also
closes the task when it is finished.
Returns:
The digital-output task.
Examples:
.. suppress-unresolved-reference-daq:
>>> daq = NIDAQ()
Set the state of a single digital-output channel (P1.0)
>>> daq.digital_out(0, True)
Set multiple digital-output channels to be in the same state (P2.0-7)
>>> daq.digital_out('0:7', False, port=2)
Set the state of multiple digital-output channels (P1.2-4)
>>> daq.digital_out('2:4', [False, True, True])
"""
lines = self._generate_digital_lines(lines, port)
task = NIDAQ.Task()
self._tasks.append(task)
task.do_channels.add_do_chan(
lines,
line_grouping=LineGrouping.CHAN_PER_LINE
)
n = 1
num_channels = task.number_of_channels
if isinstance(state, bool):
if num_channels > 1:
state = [state] * num_channels
elif num_channels == 1:
n = len(state)
elif isinstance(state[0], (list, tuple)):
n = len(state[0]) # noqa: state[0] is a list[bool]
if timing is None:
timing = self.timing()
timing.samples_per_channel = n
self._maybe_set_timing_and_trigger(task, timing, trigger, 'digital-output')
self.logger.info(f'{self.alias!r} set {lines} to {state}')
written = task.write(state, auto_start=auto_start, timeout=timeout)
assert written == n
if wait:
try:
task.wait_until_done(timeout=timeout)
finally:
task.close()
self._tasks.remove(task)
return task
[docs]
def digital_out_read(self,
lines: int | str,
*,
port: int = 1) -> bool | list[bool]:
"""Read the state of digital-output channel(s).
Args:
lines: The line number(s) (e.g., line=1, line='0:7',
line='/Dev1/port0/line0:7,/Dev1/port1/line0:3').
port: The port number.
Returns:
Whether the requested digital-output channel(s) are HIGH or LOW.
Examples:
.. suppress-unresolved-reference-daq:
>>> daq = NIDAQ()
Read the state of a single digital-output channel (P1.0)
>>> daq.digital_out_read(0)
True
Read the state of a single digital-output channel (P0.5)
>>> daq.digital_out_read(5, port=0)
False
Read the state of multiple digital-output channels (P1.0-7)
>>> daq.digital_out_read('0:7')
[False, True, True, False, True, False, False, False]
"""
with NIDAQ.Task() as task:
task.do_channels.add_do_chan(
self._generate_digital_lines(lines, port),
line_grouping=LineGrouping.CHAN_PER_LINE
)
return task.read()
[docs]
def edge_separation(self,
start: int,
stop: int,
*,
maximum: float = 1.0,
minimum: float = 100e-9,
nsamples: int = 10,
start_edge: int | str = 'RISING',
stop_edge: int | str = 'FALLING',
timeout: float = 10) -> Samples:
"""Get the duration, in seconds, between two edges.
Args:
start: The PFI terminal number to use for the start time, t=0.
stop: The PFI terminal number to use for the stop time, t=dt.
Can be same as `start` provided that `start_edge` and `stop_edge`
are different values.
maximum: The maximum time, in seconds, between the start-stop edges
that is expected.
minimum: The minimum time, in seconds, between the start-stop edges
that is expected.
nsamples: The number of start-stop samples to acquire.
start_edge: Specifies on which edge to start each measurement.
See :class:`~nidaqmx.constants.Edge` for allowed values.
stop_edge: Specifies on which edge to stop each measurement.
See :class:`~nidaqmx.constants.Edge` for allowed values.
timeout: The maximum number of seconds to wait for the task to finish.
Set to -1 to wait forever.
Returns:
The duration(s), in seconds, between the start-stop edges.
"""
first_edge = self.convert_to_enum(start_edge, Edge, to_upper=True)
second_edge = self.convert_to_enum(stop_edge, Edge, to_upper=True)
data = np.empty((nsamples,), dtype=float)
with NIDAQ.Task() as task:
channel = task.ci_channels.add_ci_two_edge_sep_chan(
f'/{self.DEV}/ctr0',
min_val=minimum,
max_val=maximum,
units=TimeUnits.SECONDS,
first_edge=first_edge,
second_edge=second_edge
)
channel.ci_two_edge_sep_first_term = f'/{self.DEV}/PFI{start}'
channel.ci_two_edge_sep_second_term = f'/{self.DEV}/PFI{stop}'
task.timing.cfg_implicit_timing(
sample_mode=AcquisitionType.CONTINUOUS,
samps_per_chan=2*nsamples # the buffer size
)
reader = CounterReader(task.in_stream)
reader.read_many_sample_double(
data,
number_of_samples_per_channel=nsamples,
timeout=timeout,
)
return Samples(data)
[docs]
def function_generator(self,
channel: int | str,
*,
amplitude: float = 1,
duty: float = 0.5,
frequency: float = 1000,
offset: float = 0,
nsamples: int = 1000,
phase: float = 0,
preview: bool = False,
symmetry: float = 1.0,
trigger: Trigger = None,
waveform: str = 'sine') -> Task | np.ndarray:
"""Generate a waveform.
Args:
channel: The analog-output channel number(s), e.g., channel=0,
channel='0:1'.
amplitude: The zero-to-peak amplitude of the waveform to generate
in volts. Zero and negative values are valid.
duty: The duty cycle of the square wave. Must be in the interval
[0, 1]. Only used if `waveform` is ``square``.
frequency: The frequency of the waveform to generate, in Hz.
offset: The voltage offset of the waveform to generate.
nsamples: The number of voltage samples per waveform period.
phase: The phase of the waveform, in degrees.
preview: Whether to return a :class:`~numpy.ndarray` of a single
period of the waveform voltages.
symmetry: The symmetry of the ramp. Corresponds to the ratio of
the rising portion of the ramp to the ramp period. For example,
a symmetry of 0.5 corresponds to a triangle wave. Must be in
the interval [0, 1]. Only used if `waveform` is ``ramp``.
trigger: The trigger settings to use. See :meth:`.trigger`.
waveform: Specifies the kind of waveform to generate. Can be:
sine, square, ramp, triangle, sawtooth.
Returns:
The analog-output task or a single period of the waveform if
`preview` is True.
"""
x0 = np.pi * phase / 180.0
x = np.linspace(x0, 2.0 * np.pi + x0, num=nsamples, endpoint=False)
match waveform.upper():
case 'SINE':
signal = np.sin(x)
case 'SQUARE':
signal = square(x, duty=duty)
case 'RAMP':
signal = sawtooth(x + np.pi/2.0, width=symmetry)
case 'TRIANGLE':
signal = sawtooth(x + np.pi/2.0, width=0.5)
case 'SAWTOOTH':
signal = sawtooth(x + np.pi, width=1.0)
case _:
raise ValueError(f'Unsupported waveform {waveform!r}')
voltages = amplitude * signal + offset
if preview:
return voltages
timing = self.timing(finite=False, rate=nsamples * frequency, rising=False)
return self.analog_out(channel, voltages, timing=timing, trigger=trigger, wait=False)
[docs]
def info(self) -> dict[str, int]:
"""Returns the driver information about the NIDAQ board."""
version = self.connection.version
return {
'driver_version_major': version.major_version,
'driver_version_minor': version.minor_version,
'driver_version_update': version.update_version,
}
[docs]
def pulse(self,
pfi: int,
duration: float,
*,
ctr: int = 1,
delay: float = 0,
npulses: int = 1,
state: bool = True,
timeout: float = -1,
wait: bool = True) -> Task:
"""Generate one (or more) digital pulse(s).
If `state` is True then the `pfi` terminal will output 0V
for `delay` seconds, generate `npulses` +5V pulse(s) (each with a width
of `duration` seconds) and then remain at 0V when the task is done.
If `state` is False then the `pfi` terminal will output +5V
for `delay` seconds, generate `npulses` 0V pulse(s) (each with a width
of `duration` seconds) and then remain at +5V when the task is done.
Args:
pfi: The PFI terminal number to output the pulse(s) from.
duration: The duration (width) of each pulse, in seconds.
ctr: The counter terminal number to use for timing.
delay: The number of seconds to wait before generating the first pulse.
npulses: The number of pulses to generate.
state: Whether to generate HIGH or LOW pulse(s).
timeout: The maximum number of seconds to wait for the task to finish.
Set to -1 to wait forever.
wait: Whether to wait for the task to finish. If enabled then also
closes the task when it is finished.
Returns:
The task.
Examples:
.. suppress-unresolved-reference-daq:
>>> daq = NIDAQ()
Generate a single HIGH pulse for 0.1 seconds from PFI2
>>> daq.pulse(2, 0.1)
"""
if state:
idle_state, state_str = Level.LOW, 'HIGH'
else:
idle_state, state_str = Level.HIGH, 'LOW'
task = NIDAQ.Task()
self._tasks.append(task)
co_channel = task.co_channels.add_co_pulse_chan_time(
f'/{self.DEV}/ctr{ctr}',
high_time=duration,
low_time=duration,
idle_state=idle_state,
initial_delay=delay,
)
co_channel.co_pulse_term = f'/{self.DEV}/PFI{pfi}'
if npulses > 1:
task.timing.cfg_implicit_timing(
sample_mode=AcquisitionType.FINITE,
samps_per_chan=npulses,
)
self.logger.info(f'{self.alias!r} generating {npulses} {state_str} '
f'pulse(s) [duration={duration}, delay={delay}]')
task.start()
if wait:
try:
task.wait_until_done(timeout=timeout)
finally:
task.close()
self._tasks.remove(task)
return task
[docs]
def storm(self, camera: int, sequence: dict) -> Task:
"""Create a task for STORM/PALM acquisition.
For example, for a 4-frame sequence controlling two lasers::
sequence = {
'port0/line0': [True, False, False, False],
'port0/line1': [False, True, True, True]
}
Args:
camera: The PFI terminal number that the camera's Fire signal
is connect to.
sequence: The keys are the digital-output terminals that turn the
laser pulses on/off and the values represent the state of the
lasers in each frame.
Returns:
The task.
"""
lines = ','.join(f'/{self.DEV}/{key}' for key in sequence)
data = [value for value in sequence.values()]
timing = self.timing(
rate=10000, # maximum expected rate of the camera's Fire signal
finite=False,
rising=False,
pfi=camera
)
timing.samples_per_channel = len(data[0])
return self.digital_out(lines, data, timing=timing, wait=False)
[docs]
def timing(self,
*,
finite: bool = True,
pfi: int = None,
rate: float = 1000,
rising: bool = True) -> Timing:
"""Configure and return the sample clock to add to a task.
Args:
finite: Whether to acquire/generate a continuous or a finite number
of samples.
pfi: The PFI terminal number to use as the external sample clock.
If not specified then uses the default onboard clock of the device.
rate: The sampling rate in Hz. If you specify an external sample clock
(i.e., a value for `pfi`) then set the `rate` to be the maximum
expected rate of the external clock.
rising: Whether to acquire/generate samples on the rising or falling
edge of the sample clock.
Returns:
The timing instance.
"""
source = '' if pfi is None else f'/{self.DEV}/PFI{pfi}'
return Timing(finite=finite, source=source, rate=rate, rising=rising)
[docs]
def trigger(self,
source: int | str,
*,
delay: float = 0,
hysteresis: float = 0,
level: float = None,
retriggerable: bool = False,
rising: bool = True) -> Trigger:
"""Configure and return a trigger to add to a task.
Args:
source: Either a PFI or an AI channel number or a
`terminal name <https://www.ni.com/docs/en-US/bundle/ni-daqmx/page/mxcncpts/termnames.html>`_
to use as the trigger source.
delay: The time (in seconds) between the trigger event and when to
acquire/generate samples. Can be < 0 to acquire/generate samples
before the trigger event (only if the NIDAQ task supports it).
hysteresis: A hysteresis level (in volts). Only applicable for an
analog trigger.
level: The voltage level to use for the trigger signal. Whether this
value is set decides whether the trigger source is from a digital
or an analog channel. If None then `channel` refers to a PFI
channel (a digital trigger), otherwise, `channel` refers to an
AI channel (an analog trigger).
retriggerable: Whether the task can be retriggered.
rising: Whether to use the rising or falling edge(slope) of the
digital(analog) trigger signal.
Returns:
The trigger instance.
"""
if not isinstance(source, str):
if level is None:
source = f'/{self.DEV}/PFI{source}'
else:
source = f'/{self.DEV}/APFI{source}'
if not source.startswith(f'/{self.DEV}/'):
source = f'/{self.DEV}/{source}'
return Trigger(source=source, delay=delay, hysteresis=hysteresis,
level=level, retriggerable=retriggerable, rising=rising)
[docs]
@staticmethod
def time_array(n: int | np.ndarray, dt: float) -> np.ndarray:
"""Create an array based on a sampling time.
Args:
n: The number of samples. If an array of voltage samples is
passed in, then the returned time array will have the
appropriate size.
dt: The sampling time.
Returns:
The array (e.g., [0, dt, 2*dt, 3*dt, ..., (n-1)*dt]).
"""
num = n.shape[-1] if isinstance(n, np.ndarray) else n
return np.linspace(0., dt*num, num=num, endpoint=False, dtype=float)
[docs]
@staticmethod
def wait_until_done(*tasks: Task, timeout: float = 10.0) -> None:
"""Wait until all tasks are done and then close each task.
Args:
tasks: The task(s) to wait for.
timeout: The number of seconds to wait for each task to finish.
Set to -1 to wait forever.
"""
for task in tasks:
task.wait_until_done(timeout=timeout)
task.close()
def _maybe_set_timing_and_trigger(self,
task: Task,
timing: Timing,
trigger: Trigger,
task_type: str) -> None:
"""(Maybe) Configure timing and triggering for a task."""
if timing.samples_per_channel > 1 or \
timing.sample_mode == AcquisitionType.CONTINUOUS or \
trigger is not None:
self.logger.info(f'{self.alias!r} set {timing} for the {task_type} task')
timing.add_to(task)
if trigger is not None:
self.logger.info(f'{self.alias!r} set {trigger} for the {task_type} task')
trigger.add_to(task)
def _generate_digital_lines(self, lines: int | str, port: int) -> str:
if isinstance(lines, str) and lines.startswith(f'/{self.DEV}'):
return lines
return f'/{self.DEV}/port{port}/line{lines}'