Skip to content

device

frog.hardware.device ¤

Provides base classes for all types of devices.

The Device class is the top-level base class from which all devices ultimately inherit. Concrete classes for devices must not inherit directly from this class, but instead should inherit from a device base class.

Classes¤

AbstractDevice ¤

Bases: ABC

An abstract base class for devices.

Functions¤
__init_subclass__(parameters={}, async_open=None) ¤

Initialise a device class.

Parameters:

Name Type Description Default
parameters Mapping[str, str | tuple[str, Sequence]]

Extra device parameters that this class requires

{}
async_open bool | None

Whether the device should be opened in the background

None
Source code in src/frog/hardware/device.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
def __init_subclass__(
    cls,
    parameters: Mapping[str, str | tuple[str, Sequence]] = {},
    async_open: bool | None = None,
) -> None:
    """Initialise a device class.

    Args:
        parameters: Extra device parameters that this class requires
        async_open: Whether the device should be opened in the background
    """
    super().__init_subclass__()

    cls._add_parameters(parameters)
    cls._update_parameter_defaults()
    if async_open is not None:
        cls._device_async_open = async_open
close() abstractmethod ¤

Close the connection to the device.

Source code in src/frog/hardware/device.py
201
202
203
@abstractmethod
def close(self) -> None:
    """Close the connection to the device."""
get_device_base_type_info() classmethod ¤

Get information about the base type for this device type.

Source code in src/frog/hardware/device.py
210
211
212
213
@classmethod
def get_device_base_type_info(cls) -> DeviceBaseTypeInfo:
    """Get information about the base type for this device type."""
    return cls._device_base_type_info
get_device_parameters() classmethod ¤

Get the parameters for this device class.

Source code in src/frog/hardware/device.py
205
206
207
208
@classmethod
def get_device_parameters(cls) -> dict[str, DeviceParameter]:
    """Get the parameters for this device class."""
    return cls._device_parameters
get_device_type_info() classmethod ¤

Get information about this device type.

Source code in src/frog/hardware/device.py
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
@classmethod
def get_device_type_info(cls) -> DeviceTypeInfo:
    """Get information about this device type."""
    class_name_full = f"{cls.__module__}.{cls.__name__}"
    class_name = class_name_full.removeprefix(f"{_plugins_name}.")

    if len(class_name) == len(class_name_full):
        logging.warning(
            f"Plugin found outside of {_plugins_name}. This probably won't work."
        )

    return DeviceTypeInfo(
        class_name,
        cls._device_description,
        cls.get_device_parameters(),
    )
pubsub_broadcast(func, success_topic_suffix, *kwarg_names) ¤

Broadcast success or failure of function via pubsub.

If the function returns without error, the returned values are sent as arguments to the success_topic message.

Parameters:

Name Type Description Default
func Callable

The function to wrap

required
success_topic_suffix str

The topic name on which to broadcast function results

required
kwarg_names str

The names of each of the returned values

()
Source code in src/frog/hardware/device.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def pubsub_broadcast(
    self, func: Callable, success_topic_suffix: str, *kwarg_names: str
) -> Callable:
    """Broadcast success or failure of function via pubsub.

    If the function returns without error, the returned values are sent as arguments
    to the success_topic message.

    Args:
        func: The function to wrap
        success_topic_suffix: The topic name on which to broadcast function results
        kwarg_names: The names of each of the returned values
    """

    def wrapped(func, *args, **kwargs):
        try:
            result = func(*args, **kwargs)
        except Exception as error:
            self.send_error_message(error)
        else:
            # Convert result to a tuple of the right size
            if result is None:
                result = ()
            elif not isinstance(result, tuple):
                result = (result,)

            # Make sure we have the right number of return values
            assert len(result) == len(kwarg_names)

            # Send message with arguments
            self.send_message(
                success_topic_suffix, **dict(zip(kwarg_names, result))
            )

    return decorate(func, wrapped)
pubsub_errors(func) ¤

Catch exceptions and broadcast via pubsub.

Parameters:

Name Type Description Default
func Callable

The function to wrap

required
Source code in src/frog/hardware/device.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def pubsub_errors(self, func: Callable) -> Callable:
    """Catch exceptions and broadcast via pubsub.

    Args:
        func: The function to wrap
    """

    def wrapped(func, *args, **kwargs):
        try:
            func(*args, **kwargs)
        except Exception as error:
            self.send_error_message(error)

    return decorate(func, wrapped)

Device(name=None) ¤

Bases: AbstractDevice

A base class for device types.

This class is the base class for device base types and (indirectly) concrete device type classes. Unlike AbstractDevice, it provides an init_subclass method to initialise the its subclasses differently depending on whether or not they are defined as device base types or not.

Create a new Device.

Parameters:

Name Type Description Default
name str | None

A name to distinguish devices of the same type.

None
Source code in src/frog/hardware/device.py
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
def __init__(self, name: str | None = None) -> None:
    """Create a new Device.

    Args:
        name: A name to distinguish devices of the same type.
    """
    self.topic = f"device.{self._device_base_type_info.name}"
    """The name of the root pubsub topic on which this device will broadcast."""

    self.name = name
    """The (optional) name of this device to use in pubsub messages."""

    self._subscriptions: list[tuple[Callable, str]] = []
    """Store of wrapped functions which are subscribed to pubsub messages."""

    self._is_open = False
    """Whether the device has finished opening."""

    if not self._device_base_type_info.names_short:
        if name:
            raise RuntimeError(
                "Name provided for device which cannot accept names."
            )
        return

    if name not in self._device_base_type_info.names_short:
        raise RuntimeError("Invalid name given for device")

    self.topic += f".{name}"
Attributes¤
name = name instance-attribute ¤

The (optional) name of this device to use in pubsub messages.

topic = f'device.{self._device_base_type_info.name}' instance-attribute ¤

The name of the root pubsub topic on which this device will broadcast.

Functions¤
__init_subclass__(class_type=None, **kwargs) ¤

Initialise a device type class.

Parameters:

Name Type Description Default
class_type DeviceClassType | None

Optionally override the default heuristic for determining whether this is a base type, device type or neither

None
**kwargs Any

Class arguments for either base type or device type initialisation

{}
Source code in src/frog/hardware/device.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
def __init_subclass__(
    cls, class_type: DeviceClassType | None = None, **kwargs: Any
) -> None:
    """Initialise a device type class.

    Args:
        class_type: Optionally override the default heuristic for determining
                    whether this is a base type, device type or neither
        **kwargs: Class arguments for either base type or device type initialisation
    """
    if class_type is None:
        class_type = cls._infer_device_class_type()

    match class_type:
        case DeviceClassType.BASE_TYPE:
            cls._init_base_type(**kwargs)
        case DeviceClassType.DEVICE_TYPE:
            cls._init_device_type(**kwargs)
        case DeviceClassType.IGNORE:
            super().__init_subclass__(**kwargs)
close() ¤

Close the device and clear any pubsub subscriptions.

Source code in src/frog/hardware/device.py
440
441
442
443
def close(self) -> None:
    """Close the device and clear any pubsub subscriptions."""
    for sub in self._subscriptions:
        pub.unsubscribe(*sub)
get_instance_ref() ¤

Get the DeviceInstanceRef corresponding to this device.

Source code in src/frog/hardware/device.py
445
446
447
def get_instance_ref(self) -> DeviceInstanceRef:
    """Get the DeviceInstanceRef corresponding to this device."""
    return DeviceInstanceRef(self._device_base_type_info.name, self.name)
send_error_message(error) ¤

Send an error message for this device.

Source code in src/frog/hardware/device.py
491
492
493
494
495
496
497
498
499
500
501
502
503
def send_error_message(self, error: Exception) -> None:
    """Send an error message for this device."""
    # Write to log
    traceback_str = "".join(traceback.format_exception(error))
    logging.error(f"Error with device {self.topic}: {traceback_str}")

    # Send pubsub message
    instance = self.get_instance_ref()
    pub.sendMessage(
        f"device.error.{instance!s}",
        instance=instance,
        error=error,
    )
send_message(topic_suffix, **kwargs) ¤

Send a pubsub message for this device.

Parameters:

Name Type Description Default
topic_suffix str

The part of the topic name after self.topic

required
**kwargs Any

Extra arguments to include with pubsub message

{}
Source code in src/frog/hardware/device.py
482
483
484
485
486
487
488
489
def send_message(self, topic_suffix: str, **kwargs: Any) -> None:
    """Send a pubsub message for this device.

    Args:
        topic_suffix: The part of the topic name after self.topic
        **kwargs: Extra arguments to include with pubsub message
    """
    pub.sendMessage(f"{self.topic}.{topic_suffix}", **kwargs)
signal_is_opened() ¤

Signal that the device is now open.

Source code in src/frog/hardware/device.py
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
def signal_is_opened(self) -> None:
    """Signal that the device is now open."""
    if self._is_open:
        raise RuntimeError("Device is already open")

    self._is_open = True

    # Subscribe to topics now that device is ready
    for args in self._subscriptions:
        pub.subscribe(*args)

    instance = self.get_instance_ref()
    class_name = self.get_device_type_info().class_name
    _, _, class_name_short = class_name.rpartition(".")
    logging.info(f"Opened device {instance!s}: {class_name_short}")

    # Signal that device is now open. The reason for the two different topics is
    # because we want to ensure that some listeners always run before others, in
    # case an error occurs and we have to undo the work.
    pub.sendMessage(
        f"device.after_opening.{instance!s}",
        instance=instance,
        class_name=class_name,
    )
    pub.sendMessage(f"device.opened.{instance!s}")
subscribe(func, topic_name_suffix, success_topic_suffix=None, *kwarg_names) ¤

Subscribe to a pubsub topic using the pubsub_* helper functions.

Errors will be broadcast with the message "device.error.{THIS_INSTANCE}". If success_topic_suffix is provided, a message will also be sent on success (see pubsub_broadcast).

Parameters:

Name Type Description Default
func Callable

Function to subscribe to

required
topic_name_suffix str

The suffix of the topic to subscribe to

required
success_topic_suffix str | None

The topic name on which to broadcast function results

None
kwarg_names str

The names of each of the returned values

()
Source code in src/frog/hardware/device.py
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
def subscribe(
    self,
    func: Callable,
    topic_name_suffix: str,
    success_topic_suffix: str | None = None,
    *kwarg_names: str,
) -> None:
    """Subscribe to a pubsub topic using the pubsub_* helper functions.

    Errors will be broadcast with the message "device.error.{THIS_INSTANCE}". If
    success_topic_suffix is provided, a message will also be sent on success (see
    pubsub_broadcast).

    Args:
        func: Function to subscribe to
        topic_name_suffix: The suffix of the topic to subscribe to
        success_topic_suffix: The topic name on which to broadcast function results
        kwarg_names: The names of each of the returned values
    """
    if success_topic_suffix:
        wrapped_func = self.pubsub_broadcast(
            func, success_topic_suffix, *kwarg_names
        )
    else:
        wrapped_func = self.pubsub_errors(func)

    topic_name = f"{self.topic}.{topic_name_suffix}"
    self._subscriptions.append((wrapped_func, topic_name))

    # If the device isn't ready, defer subscription so callers don't try to use it
    if self._is_open:
        pub.subscribe(wrapped_func, topic_name)

DeviceClassType ¤

Bases: Enum

The type of a class inheriting directly or indirectly from Device.

Attributes¤
BASE_TYPE = 0 class-attribute instance-attribute ¤

A base device type (e.g. stepper motor)

DEVICE_TYPE = 1 class-attribute instance-attribute ¤

A device type (e.g. ST10 stepper motor controller)

IGNORE = 2 class-attribute instance-attribute ¤

An intermediate class type that should not be added to either registry

DeviceError ¤

Bases: Exception

Base class for errors raised by device plugins.

RetryFailedError ¤

Bases: DeviceError

Indicates that an operation failed after multiple retries.

Functions¤

get_device_types() ¤

Return info about device types grouped according to their base type.

Source code in src/frog/hardware/device.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def get_device_types() -> dict[DeviceBaseTypeInfo, list[DeviceTypeInfo]]:
    """Return info about device types grouped according to their base type."""
    # Ensure all base types and device types have been registered
    load_all_plugins()

    # Get the base type info and sort it alphabetically by description
    base_types_info = sorted(
        (t.get_device_base_type_info() for t in _base_types),
        key=lambda info: info.description,
    )

    # Preallocate dict with empty lists
    out: dict[DeviceBaseTypeInfo, list[DeviceTypeInfo]] = {
        info: [] for info in base_types_info
    }

    # Get device type info and group by base type
    for device_type in _device_types:
        out[device_type.get_device_base_type_info()].append(
            device_type.get_device_type_info()
        )

    # Sort the device types by name
    for infos in out.values():
        infos.sort(key=lambda info: info.description)

    return out

retry_request(func, max_attempts, catch) ¤

Attempt to call func up to max_attempts times, retrying on caught exceptions.

The raised exception must be of type catch. If func raises an exception type other than catch, it will not be caught and the request will not be retried.

On each failed attempt a warning is logged. If all attempts are exhausted a RetryFailedError is raised.

Parameters:

Name Type Description Default
func Callable[[], T]

The callable to invoke on each attempt.

required
max_attempts int

Maximum number of attempts (must be >= 1).

required
catch type[Exception]

The exception type to catch and retry on.

required

Returns:

Type Description
T

The return value of func on success.

Raises:

Type Description
RetryFailedError

All attempts resulted in the catch error type being raised.

Source code in src/frog/hardware/device.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
def retry_request[T](
    func: Callable[[], T],
    max_attempts: int,
    catch: type[Exception],
) -> T:
    """Attempt to call func up to max_attempts times, retrying on caught exceptions.

    The raised exception must be of type `catch`. If `func` raises an exception type
    other than `catch`, it will not be caught and the request will not be retried.

    On each failed attempt a warning is logged. If all attempts are exhausted a
    RetryFailedError is raised.

    Args:
        func: The callable to invoke on each attempt.
        max_attempts: Maximum number of attempts (must be >= 1).
        catch: The exception type to catch and retry on.

    Returns:
        The return value of func on success.

    Raises:
        RetryFailedError: All attempts resulted in the `catch` error type being raised.
    """
    if max_attempts < 1:
        raise ValueError("max_attempts must be at least one")

    last_error: Exception | None = None
    for attempt in range(max_attempts):
        try:
            return func()
        except catch as e:
            logging.warning(f"Attempt {attempt + 1}/{max_attempts} failed: {e!s}")
            last_error = e

    raise RetryFailedError(
        f"Maximum number of attempts (={max_attempts}) exceeded"
    ) from last_error