Skip to content

device

frog.hardware.device ¤

Provides base classes for all types of devices.

The Device class is the top-level base class from which all devices ultimately inherit. Concrete classes for devices must not inherit directly from this class, but instead should inherit from a device base class.

Classes¤

AbstractDevice ¤

Bases: ABC

An abstract base class for devices.

Functions¤
__init_subclass__(parameters={}, async_open=None) ¤

Initialise a device class.

Parameters:

Name Type Description Default
parameters Mapping[str, str | tuple[str, Sequence]]

Extra device parameters that this class requires

{}
async_open bool | None

Whether the device should be opened in the background

None
Source code in frog/hardware/device.py
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def __init_subclass__(
    cls,
    parameters: Mapping[str, str | tuple[str, Sequence]] = {},
    async_open: bool | None = None,
) -> None:
    """Initialise a device class.

    Args:
        parameters: Extra device parameters that this class requires
        async_open: Whether the device should be opened in the background
    """
    super().__init_subclass__()

    cls._add_parameters(parameters)
    cls._update_parameter_defaults()
    if async_open is not None:
        cls._device_async_open = async_open
close() abstractmethod ¤

Close the connection to the device.

Source code in frog/hardware/device.py
152
153
154
@abstractmethod
def close(self) -> None:
    """Close the connection to the device."""
get_device_base_type_info() classmethod ¤

Get information about the base type for this device type.

Source code in frog/hardware/device.py
161
162
163
164
@classmethod
def get_device_base_type_info(cls) -> DeviceBaseTypeInfo:
    """Get information about the base type for this device type."""
    return cls._device_base_type_info
get_device_parameters() classmethod ¤

Get the parameters for this device class.

Source code in frog/hardware/device.py
156
157
158
159
@classmethod
def get_device_parameters(cls) -> dict[str, DeviceParameter]:
    """Get the parameters for this device class."""
    return cls._device_parameters
get_device_type_info() classmethod ¤

Get information about this device type.

Source code in frog/hardware/device.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
@classmethod
def get_device_type_info(cls) -> DeviceTypeInfo:
    """Get information about this device type."""
    class_name_full = f"{cls.__module__}.{cls.__name__}"
    class_name = class_name_full.removeprefix(f"{_plugins_name}.")

    if len(class_name) == len(class_name_full):
        logging.warning(
            f"Plugin found outside of {_plugins_name}. This probably won't work."
        )

    return DeviceTypeInfo(
        class_name,
        cls._device_description,
        cls.get_device_parameters(),
    )
pubsub_broadcast(func, success_topic_suffix, *kwarg_names) ¤

Broadcast success or failure of function via pubsub.

If the function returns without error, the returned values are sent as arguments to the success_topic message.

Parameters:

Name Type Description Default
func Callable

The function to wrap

required
success_topic_suffix str

The topic name on which to broadcast function results

required
kwarg_names str

The names of each of the returned values

()
Source code in frog/hardware/device.py
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def pubsub_broadcast(
    self, func: Callable, success_topic_suffix: str, *kwarg_names: str
) -> Callable:
    """Broadcast success or failure of function via pubsub.

    If the function returns without error, the returned values are sent as arguments
    to the success_topic message.

    Args:
        func: The function to wrap
        success_topic_suffix: The topic name on which to broadcast function results
        kwarg_names: The names of each of the returned values
    """

    def wrapped(func, *args, **kwargs):
        try:
            result = func(*args, **kwargs)
        except Exception as error:
            self.send_error_message(error)
        else:
            # Convert result to a tuple of the right size
            if result is None:
                result = ()
            elif not isinstance(result, tuple):
                result = (result,)

            # Make sure we have the right number of return values
            assert len(result) == len(kwarg_names)

            # Send message with arguments
            self.send_message(
                success_topic_suffix, **dict(zip(kwarg_names, result))
            )

    return decorate(func, wrapped)
pubsub_errors(func) ¤

Catch exceptions and broadcast via pubsub.

Parameters:

Name Type Description Default
func Callable

The function to wrap

required
Source code in frog/hardware/device.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def pubsub_errors(self, func: Callable) -> Callable:
    """Catch exceptions and broadcast via pubsub.

    Args:
        func: The function to wrap
    """

    def wrapped(func, *args, **kwargs):
        try:
            func(*args, **kwargs)
        except Exception as error:
            self.send_error_message(error)

    return decorate(func, wrapped)

Device(name=None) ¤

Bases: AbstractDevice

A base class for device types.

This class is the base class for device base types and (indirectly) concrete device type classes. Unlike AbstractDevice, it provides an init_subclass method to initialise the its subclasses differently depending on whether or not they are defined as device base types or not.

Create a new Device.

Parameters:

Name Type Description Default
name str | None

A name to distinguish devices of the same type.

None
Source code in frog/hardware/device.py
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
def __init__(self, name: str | None = None) -> None:
    """Create a new Device.

    Args:
        name: A name to distinguish devices of the same type.
    """
    self.topic = f"device.{self._device_base_type_info.name}"
    """The name of the root pubsub topic on which this device will broadcast."""

    self.name = name
    """The (optional) name of this device to use in pubsub messages."""

    self._subscriptions: list[tuple[Callable, str]] = []
    """Store of wrapped functions which are subscribed to pubsub messages."""

    self._is_open = False
    """Whether the device has finished opening."""

    if not self._device_base_type_info.names_short:
        if name:
            raise RuntimeError(
                "Name provided for device which cannot accept names."
            )
        return

    if name not in self._device_base_type_info.names_short:
        raise RuntimeError("Invalid name given for device")

    self.topic += f".{name}"
Attributes¤
name = name instance-attribute ¤

The (optional) name of this device to use in pubsub messages.

topic = f'device.{self._device_base_type_info.name}' instance-attribute ¤

The name of the root pubsub topic on which this device will broadcast.

Functions¤
__init_subclass__(class_type=None, **kwargs) ¤

Initialise a device type class.

Parameters:

Name Type Description Default
class_type DeviceClassType | None

Optionally override the default heuristic for determining whether this is a base type, device type or neither

None
**kwargs Any

Class arguments for either base type or device type initialisation

{}
Source code in frog/hardware/device.py
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
def __init_subclass__(
    cls, class_type: DeviceClassType | None = None, **kwargs: Any
) -> None:
    """Initialise a device type class.

    Args:
        class_type: Optionally override the default heuristic for determining
                    whether this is a base type, device type or neither
        **kwargs: Class arguments for either base type or device type initialisation
    """
    if class_type is None:
        class_type = cls._infer_device_class_type()

    match class_type:
        case DeviceClassType.BASE_TYPE:
            cls._init_base_type(**kwargs)
        case DeviceClassType.DEVICE_TYPE:
            cls._init_device_type(**kwargs)
        case DeviceClassType.IGNORE:
            super().__init_subclass__(**kwargs)
close() ¤

Close the device and clear any pubsub subscriptions.

Source code in frog/hardware/device.py
391
392
393
394
def close(self) -> None:
    """Close the device and clear any pubsub subscriptions."""
    for sub in self._subscriptions:
        pub.unsubscribe(*sub)
get_instance_ref() ¤

Get the DeviceInstanceRef corresponding to this device.

Source code in frog/hardware/device.py
396
397
398
def get_instance_ref(self) -> DeviceInstanceRef:
    """Get the DeviceInstanceRef corresponding to this device."""
    return DeviceInstanceRef(self._device_base_type_info.name, self.name)
send_error_message(error) ¤

Send an error message for this device.

Source code in frog/hardware/device.py
442
443
444
445
446
447
448
449
450
451
452
453
454
def send_error_message(self, error: Exception) -> None:
    """Send an error message for this device."""
    # Write to log
    traceback_str = "".join(traceback.format_exception(error))
    logging.error(f"Error with device {self.topic}: {traceback_str}")

    # Send pubsub message
    instance = self.get_instance_ref()
    pub.sendMessage(
        f"device.error.{instance!s}",
        instance=instance,
        error=error,
    )
send_message(topic_suffix, **kwargs) ¤

Send a pubsub message for this device.

Parameters:

Name Type Description Default
topic_suffix str

The part of the topic name after self.topic

required
**kwargs Any

Extra arguments to include with pubsub message

{}
Source code in frog/hardware/device.py
433
434
435
436
437
438
439
440
def send_message(self, topic_suffix: str, **kwargs: Any) -> None:
    """Send a pubsub message for this device.

    Args:
        topic_suffix: The part of the topic name after self.topic
        **kwargs: Extra arguments to include with pubsub message
    """
    pub.sendMessage(f"{self.topic}.{topic_suffix}", **kwargs)
signal_is_opened() ¤

Signal that the device is now open.

Source code in frog/hardware/device.py
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
def signal_is_opened(self) -> None:
    """Signal that the device is now open."""
    if self._is_open:
        raise RuntimeError("Device is already open")

    self._is_open = True

    # Subscribe to topics now that device is ready
    for args in self._subscriptions:
        pub.subscribe(*args)

    instance = self.get_instance_ref()
    class_name = self.get_device_type_info().class_name
    _, _, class_name_short = class_name.rpartition(".")
    logging.info(f"Opened device {instance!s}: {class_name_short}")

    # Signal that device is now open. The reason for the two different topics is
    # because we want to ensure that some listeners always run before others, in
    # case an error occurs and we have to undo the work.
    pub.sendMessage(
        f"device.after_opening.{instance!s}",
        instance=instance,
        class_name=class_name,
    )
    pub.sendMessage(f"device.opened.{instance!s}")
subscribe(func, topic_name_suffix, success_topic_suffix=None, *kwarg_names) ¤

Subscribe to a pubsub topic using the pubsub_* helper functions.

Errors will be broadcast with the message "device.error.{THIS_INSTANCE}". If success_topic_suffix is provided, a message will also be sent on success (see pubsub_broadcast).

Parameters:

Name Type Description Default
func Callable

Function to subscribe to

required
topic_name_suffix str

The suffix of the topic to subscribe to

required
success_topic_suffix str | None

The topic name on which to broadcast function results

None
kwarg_names str

The names of each of the returned values

()
Source code in frog/hardware/device.py
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
def subscribe(
    self,
    func: Callable,
    topic_name_suffix: str,
    success_topic_suffix: str | None = None,
    *kwarg_names: str,
) -> None:
    """Subscribe to a pubsub topic using the pubsub_* helper functions.

    Errors will be broadcast with the message "device.error.{THIS_INSTANCE}". If
    success_topic_suffix is provided, a message will also be sent on success (see
    pubsub_broadcast).

    Args:
        func: Function to subscribe to
        topic_name_suffix: The suffix of the topic to subscribe to
        success_topic_suffix: The topic name on which to broadcast function results
        kwarg_names: The names of each of the returned values
    """
    if success_topic_suffix:
        wrapped_func = self.pubsub_broadcast(
            func, success_topic_suffix, *kwarg_names
        )
    else:
        wrapped_func = self.pubsub_errors(func)

    topic_name = f"{self.topic}.{topic_name_suffix}"
    self._subscriptions.append((wrapped_func, topic_name))

    # If the device isn't ready, defer subscription so callers don't try to use it
    if self._is_open:
        pub.subscribe(wrapped_func, topic_name)

DeviceClassType ¤

Bases: Enum

The type of a class inheriting directly or indirectly from Device.

Attributes¤
BASE_TYPE = 0 class-attribute instance-attribute ¤

A base device type (e.g. stepper motor)

DEVICE_TYPE = 1 class-attribute instance-attribute ¤

A device type (e.g. ST10 stepper motor controller)

IGNORE = 2 class-attribute instance-attribute ¤

An intermediate class type that should not be added to either registry

Functions¤

get_device_types() ¤

Return info about device types grouped according to their base type.

Source code in frog/hardware/device.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
def get_device_types() -> dict[DeviceBaseTypeInfo, list[DeviceTypeInfo]]:
    """Return info about device types grouped according to their base type."""
    # Ensure all base types and device types have been registered
    load_all_plugins()

    # Get the base type info and sort it alphabetically by description
    base_types_info = sorted(
        (t.get_device_base_type_info() for t in _base_types),
        key=lambda info: info.description,
    )

    # Preallocate dict with empty lists
    out: dict[DeviceBaseTypeInfo, list[DeviceTypeInfo]] = {
        info: [] for info in base_types_info
    }

    # Get device type info and group by base type
    for device_type in _device_types:
        out[device_type.get_device_base_type_info()].append(
            device_type.get_device_type_info()
        )

    # Sort the device types by name
    for infos in out.values():
        infos.sort(key=lambda info: info.description)

    return out