Skip to content

opus_interface

frog.hardware.plugins.spectrometer.opus_interface ¤

Module containing code for sending commands to the OPUS program for the EM27.

Communication is based on a protocol using HTTP and HTML.

The OPUS program must be running on the computer at OPUS_IP for the commands to work. Note that this is a separate machine from the EM27!

Attributes¤

Classes¤

OPUSInterface(host=DEFAULT_OPUS_HOST, port=DEFAULT_OPUS_PORT, polling_interval=DEFAULT_OPUS_POLLING_INTERVAL, timeout=DEFAULT_EM27_HTTP_TIMEOUT) ¤

Bases: HTTPDevice, OPUSInterfaceBase

Interface for communicating with the OPUS program.

HTTP requests are handled on a background thread.

Create a new OPUSInterface.

Parameters:

Name Type Description Default
host str

The hostname or IP address on which to make requests

DEFAULT_OPUS_HOST
port int

The port on which to make requests

DEFAULT_OPUS_PORT
polling_interval float

Minimum polling interval for status

DEFAULT_OPUS_POLLING_INTERVAL
timeout float

The maximum time in seconds to wait for a response from the server

DEFAULT_EM27_HTTP_TIMEOUT
Source code in frog/hardware/plugins/spectrometer/opus_interface.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
def __init__(
    self,
    host: str = DEFAULT_OPUS_HOST,
    port: int = DEFAULT_OPUS_PORT,
    polling_interval: float = DEFAULT_OPUS_POLLING_INTERVAL,
    timeout: float = DEFAULT_EM27_HTTP_TIMEOUT,
) -> None:
    """Create a new OPUSInterface.

    Args:
        host: The hostname or IP address on which to make requests
        port: The port on which to make requests
        polling_interval: Minimum polling interval for status
        timeout: The maximum time in seconds to wait for a response from the server
    """
    HTTPDevice.__init__(self, timeout)
    OPUSInterfaceBase.__init__(self)

    self._url = f"http://{host}:{port}/opusrs"
    """URL to make requests."""

    self._status: SpectrometerStatus | None = None
    """The last known status of the spectrometer."""
    self._status_timer = QTimer()
    self._status_timer.timeout.connect(self._request_status)
    self._status_timer.setInterval(int(polling_interval * 1000))
    self._status_timer.setSingleShot(True)

    self._request_status()
Functions¤
close() ¤

Close the device.

Source code in frog/hardware/plugins/spectrometer/opus_interface.py
112
113
114
115
def close(self) -> None:
    """Close the device."""
    self._status_timer.stop()
    super().close()
handle_response(response) ¤

Process HTTP response from OPUS.

Source code in frog/hardware/plugins/spectrometer/opus_interface.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
def handle_response(self, response: str):
    """Process HTTP response from OPUS."""
    new_status = parse_response(response)

    # If the status has changed, notify listeners
    if new_status != self._status:
        # On first update, we need to signal that the device is now open
        if self._status is None:
            self.signal_is_opened()

        self._status = new_status
        self.send_status_message(new_status)

    # Poll the status again after a delay
    self._status_timer.start()
request_command(command) ¤

Request that OPUS run the specified command.

Parameters:

Name Type Description Default
command str

Name of command to run

required
Source code in frog/hardware/plugins/spectrometer/opus_interface.py
141
142
143
144
145
146
147
def request_command(self, command: str) -> None:
    """Request that OPUS run the specified command.

    Args:
        command: Name of command to run
    """
    self._make_opus_request(f"{COMMAND_FILENAME}?opusrs{command}")

Functions¤

parse_response(response) ¤

Parse OPUS's HTML response.

Source code in frog/hardware/plugins/spectrometer/opus_interface.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def parse_response(response: str) -> SpectrometerStatus:
    """Parse OPUS's HTML response."""
    status: SpectrometerStatus | None = None
    text: str | None = None
    errcode: int | None = None
    errtext: str = ""
    soup = BeautifulSoup(response, "html.parser")
    for td in soup.find_all("td"):
        if "id" not in td.attrs:
            continue

        id = td.attrs["id"]
        data = td.contents[0] if td.contents else ""
        if id == "STATUS":
            status = SpectrometerStatus(int(data))
        elif id == "TEXT":
            text = data
        elif id == "ERRCODE":
            errcode = int(data)
        elif id == "ERRTEXT":
            errtext = data
        else:
            logging.warning(f"Received unknown ID: {id}")

    if status is None or text is None:
        raise OPUSError("Required tags not found")
    if errcode is not None:
        raise OPUSError.from_response(errcode, errtext)

    return status