Source code for inet_nm.usb_ctrl

import os
from typing import List, Optional, Set

if os.getenv("INET_NM_FAKE_USB_PATH"):
    from inet_nm.fake_usb import Context
else:
    from pyudev import Context

from inet_nm.data_types import NmNode


[docs] class TtyNotPresent(Exception): """Exception to be raised when a TTY device is not found for a given NmNode."""
[docs] def get_connected_uids() -> List[str]: """ Get the UIDs of all connected USB devices. Returns: A list of UIDs of all connected USB devices. """ uids = [] context = Context() for device in context.list_devices(subsystem="tty"): parent = device.find_parent("usb", "usb_device") if parent is None: continue vendor_id = parent.get("ID_VENDOR_ID") model_id = parent.get("ID_MODEL_ID") serial_short = parent.get("ID_SERIAL_SHORT") uids.append(NmNode.calculate_uid(model_id, vendor_id, serial_short)) return uids
[docs] def get_connected_id_paths() -> Set[str]: """ Get the ID_PATHs of all connected USB devices. Returns: A list of ID_PATHs of all connected USB devices. """ context = Context() locations = set() for device in context.list_devices(subsystem="tty"): parent = device.find_parent("usb", "usb_device") if parent is None: continue locations.add(parent.get("ID_PATH")) return locations
def _split_devpath(devpath): """Split the devpath into hub and port. Args: devpath: The devpath to split. Returns: Tuple containing the hub and port. Example: >>> split_devpath("usb1/1-1/1-1.3") ("1-1", "3") >>> split_devpath("usb1/1-1/1-1.3.2-4") ("1-1.3.2", "4") >>> split_devpath("usb5/5-2") ("5", "2") """ end_devpath = devpath.split("/")[-1] # check if a "-" or a "." is the later in the string # get the last index of "-" in the string if "-" in end_devpath: port = end_devpath.split("-")[-1] if "." in port: port = port.split(".")[-1] else: port = end_devpath.split(".")[-1] # remove the port string from the end_devpath string hub = end_devpath[: -len(port) - 1] return (hub, port)
[docs] def get_uid_from_id_path(id_path: str) -> str: """Get the UID of a connected USB device. Args: id_path: The ID_PATH of the connected USB device. Returns: The UID of the connected USB device. Raises: Exception: If the node is not found, maybe not connected. """ context = Context() for device in context.list_devices(subsystem="tty"): parent = device.find_parent("usb", "usb_device") if parent is None: continue if parent.get("ID_PATH") == id_path: vendor_id = parent.get("ID_VENDOR_ID") model_id = parent.get("ID_MODEL_ID") serial_short = parent.get("ID_SERIAL_SHORT") return NmNode.calculate_uid(model_id, vendor_id, serial_short)
[docs] def get_usb_info_from_node(node: NmNode): """Read the USB information from a node. Args: node: The node to read the USB information from. Returns: A tuple containing the ID_PATH, hub, and port of the connected USB device. Raises: Exception: If the node is not found, maybe not connected. """ context = Context() vendor_id = node.vendor_id model_id = node.product_id serial_short = node.serial for device in context.list_devices(subsystem="tty"): parent = device.find_parent("usb", "usb_device") if parent is None: continue if ( parent.get("ID_VENDOR_ID") == vendor_id and parent.get("ID_MODEL_ID") == model_id and parent.get("ID_SERIAL_SHORT") == serial_short ): hub, port = _split_devpath(parent.get("DEVPATH")) return (parent.get("ID_PATH"), hub, port) raise Exception("Node not found, maybe not connected")
[docs] def get_id_path_from_node(node: NmNode) -> str: """ Get the ID_PATH of a connected USB device. Args: node: The node to get the ID_PATH for. Returns: The ID_PATH of the connected USB device. """ context = Context() vendor_id = node.vendor_id model_id = node.product_id serial_short = node.serial for device in context.list_devices(subsystem="tty"): parent = device.find_parent("usb", "usb_device") if parent is None: continue if ( parent.get("ID_VENDOR_ID") == vendor_id and parent.get("ID_MODEL_ID") == model_id and parent.get("ID_SERIAL_SHORT") == serial_short ): return parent.get("ID_PATH") raise Exception(f"Node {node} not found, maybe not connected")
[docs] def get_devices_from_tty(saved_nodes: Optional[List[NmNode]] = None) -> List[NmNode]: """ Retrieve connected TTY devices as a list of NmNode objects. Args: saved_nodes: List of previously saved NmNode objects. Returns: List of NmNode objects representing connected TTY devices. Raises: TtyNotPresent: If a TTY device could not be found for a given NmNode. """ saved_nodes = saved_nodes or [] context = Context() nodes = [] for device in context.list_devices(subsystem="tty"): if "ACM" in device.device_node or "USB" in device.device_node: usb_device = device.find_parent("usb", "usb_device") nm_node = NmNode( vendor_id=usb_device.get("ID_VENDOR_ID"), product_id=usb_device.get("ID_MODEL_ID"), serial=usb_device.get("ID_SERIAL_SHORT"), vendor=usb_device.get("ID_VENDOR_FROM_DATABASE"), model=usb_device.get("ID_MODEL_FROM_DATABASE"), driver=usb_device.get("DRIVER"), ) if nm_node.uid in [node.uid for node in saved_nodes]: continue nodes.append(nm_node) return nodes
[docs] def get_ttys_from_nm_node(nm_node: NmNode) -> List[str]: """ Retrieve the TTY device node string for a given NmNode. Args: nm_node: An NmNode object. Returns: The TTY device node string. Raises: TtyNotPresent: If a TTY device could not be found for the given NmNode. """ context = Context() vendor_id = nm_node.vendor_id model_id = nm_node.product_id serial_short = nm_node.serial ttys = [] for device in context.list_devices(subsystem="tty"): parent = device.find_parent("usb", "usb_device") if parent is None: continue if ( parent.get("ID_VENDOR_ID") == vendor_id and parent.get("ID_MODEL_ID") == model_id and parent.get("ID_SERIAL_SHORT") == serial_short ): ttys.append(device.device_node) return ttys
[docs] def get_tty_from_nm_node(nm_node: NmNode) -> str: """ Retrieve the TTY device node string for a given NmNode. Args: nm_node: An NmNode object. Returns: The TTY device node string. Raises: TtyNotPresent: If a TTY device could not be found for the given NmNode. """ ttys = get_ttys_from_nm_node(nm_node) if ttys: return ttys[0] raise TtyNotPresent(f"Could not find tty device for {nm_node}")