Skip to content

dummy_opus_interface

frog.hardware.plugins.spectrometer.dummy_opus_interface ¤

Provides a dummy EM27 device for interfacing with.

Classes¤

DummyOPUSInterface(measure_duration=1.0) ¤

Bases: OPUSInterfaceBase

A mock version of the OPUS API for testing purposes.

Create a new DummyOPUSInterface.

Parameters:

Name Type Description Default
measure_duration float

How long a single measurement takes (seconds)

1.0
Source code in frog/hardware/plugins/spectrometer/dummy_opus_interface.py
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def __init__(self, measure_duration: float = 1.0) -> None:
    """Create a new DummyOPUSInterface.

    Args:
        measure_duration: How long a single measurement takes (seconds)
    """
    super().__init__()

    self.state_machine = OPUSStateMachine(measure_duration)
    """An object representing the internal state of the device."""

    # Monitor state changes
    self.state_machine.add_observer(self)

    # Broadcast initial status
    self.on_enter_state(self.state_machine.current_state)
Attributes¤
state_machine = OPUSStateMachine(measure_duration) instance-attribute ¤

An object representing the internal state of the device.

Functions¤
close() ¤

Close the device.

If a measurement is running, cancel it.

Source code in frog/hardware/plugins/spectrometer/dummy_opus_interface.py
136
137
138
139
140
141
142
def close(self) -> None:
    """Close the device.

    If a measurement is running, cancel it.
    """
    self.state_machine.measure_timer.stop()
    super().close()
on_enter_state(target) ¤

Broadcast state changes via pubsub.

Source code in frog/hardware/plugins/spectrometer/dummy_opus_interface.py
172
173
174
def on_enter_state(self, target: State) -> None:
    """Broadcast state changes via pubsub."""
    self.send_status_message(target.value)
request_command(command) ¤

Execute the specified command on the device.

Parameters:

Name Type Description Default
command str

The command to run

required

Raises: OPUSError: If the device is in the wrong state for this command

Source code in frog/hardware/plugins/spectrometer/dummy_opus_interface.py
159
160
161
162
163
164
165
166
167
168
169
170
def request_command(self, command: str) -> None:
    """Execute the specified command on the device.

    Args:
        command: The command to run
    Raises:
        OPUSError: If the device is in the wrong state for this command
    """
    if command not in self._COMMAND_ERRORS:
        OPUSErrorInfo.UNKNOWN_COMMAND.raise_exception()

    self._run_command(command)

OPUSErrorInfo ¤

Bases: Enum

Represents an error code and description for OPUS errors.

The codes and descriptions are taken from the manual.

Functions¤
raise_exception() ¤

Raise this error as an exception.

Source code in frog/hardware/plugins/spectrometer/dummy_opus_interface.py
36
37
38
def raise_exception(self) -> None:
    """Raise this error as an exception."""
    raise OPUSError.from_response(*self.value)

OPUSStateMachine(measure_duration) ¤

Bases: StateMachine

An FSM for keeping track of the internal state of the mock device.

Create a new OPUSStateMachine.

The state diagram looks like this:

Parameters:

Name Type Description Default
measure_duration float

How long a single measurement takes (seconds)

required
Source code in frog/hardware/plugins/spectrometer/dummy_opus_interface.py
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def __init__(self, measure_duration: float) -> None:
    """Create a new OPUSStateMachine.

    The state diagram looks like this:

    ![](../../../../../OPUSStateMachine.png)

    Args:
        measure_duration: How long a single measurement takes (seconds)
    """
    self.measure_timer = QTimer()
    """Timer signalling the end of a measurement."""
    self.measure_timer.setInterval(round(measure_duration * 1000))
    self.measure_timer.setSingleShot(True)
    self.measure_timer.timeout.connect(self._on_measure_finished)

    super().__init__()
Attributes¤
measure_timer = QTimer() instance-attribute ¤

Timer signalling the end of a measurement.

Functions¤
connect() ¤

Connect to the device.

Source code in frog/hardware/plugins/spectrometer/dummy_opus_interface.py
79
80
81
82
def connect(self) -> None:
    """Connect to the device."""
    self._start_connecting()
    self._finish_connecting()
on_enter_measuring() ¤

Start the measurement timer.

Source code in frog/hardware/plugins/spectrometer/dummy_opus_interface.py
95
96
97
def on_enter_measuring(self) -> None:
    """Start the measurement timer."""
    self.measure_timer.start()
on_exit_measuring() ¤

Stop the measurement timer.

Source code in frog/hardware/plugins/spectrometer/dummy_opus_interface.py
 99
100
101
def on_exit_measuring(self) -> None:
    """Stop the measurement timer."""
    self.measure_timer.stop()
stop() ¤

Stop the current measurement.

Source code in frog/hardware/plugins/spectrometer/dummy_opus_interface.py
84
85
86
87
88
def stop(self) -> None:
    """Stop the current measurement."""
    self._cancel_measuring()
    self._reset_after_cancelling()
    logging.info("Cancelling current measurement")