Source code for inet_nm.commissioner
"""This module commissions USB boards."""
import random
import time
from typing import List, Optional
import pyudev
try:
from cp210x import cp210x
except ImportError:
cp210x = None
import inet_nm._helpers as hlp
from inet_nm._helpers import nm_prompt_choice, nm_prompt_confirm, nm_prompt_input
from inet_nm.data_types import NmNode
[docs]
class TtyNotPresent(Exception):
"""Exception to be raised when a TTY device is not found for a given NmNode."""
[docs]
def check_and_set_uninitialized_sn(node: NmNode, sns: List = None):
"""
Check if a given NmNode has an uninitialized serial number and prompt the user
to set it.
Args:
node: An NmNode object.
sns: List of serial numbers to check against.
"""
if cp210x is None:
return
sns = sns or ["0001"]
if node.serial not in sns:
return
pid_vid_sn = {
"idVendor": int(node.vendor_id, 16),
"idProduct": int(node.product_id, 16),
}
for usbdev in cp210x.Cp210xProgrammer.list_devices([pid_vid_sn]):
# We only take the first one that matches
if usbdev.serial_number != node.serial:
continue
dev = cp210x.Cp210xProgrammer(usbdev)
random.seed()
sn = f"INET-NM-{random.randint(0, 10**20)}".zfill(20)
hlp.nm_print(f"Found uninitialized serial number ({node.serial})")
sn = hlp.nm_prompt_default_input("Enter serial number", sn)
dev.set_values({"serial_number": sn})
# It seems there is some issues with reset.
# Without a reset the device SN does not change properly.
# With the reset some exceptions get thrown but the SN is set.
# This is a quick fix.
time.sleep(1)
try:
def _junk(x):
pass
cp210x.Cp210xProgrammer.__del__ = _junk
dev.reset()
except Exception:
pass
node.serial = sn
node.uid = node.calculate_uid(node.product_id, node.vendor_id, node.serial)
hlp.nm_print(f"Wrote {sn} to serial number in EEPROM.")
return
[docs]
def get_devices_from_tty(saved_nodes: Optional[List[NmNode]] = None) -> List[NmNode]:
"""
Retrieve connected TTY devices as a list of NmNode objects.
Args:
saved_nodes: List of previously saved NmNode objects.
Returns:
List of NmNode objects representing connected TTY devices.
Raises:
TtyNotPresent: If a TTY device could not be found for a given NmNode.
"""
saved_nodes = saved_nodes or []
context = pyudev.Context()
nodes = []
for device in context.list_devices(subsystem="tty"):
if "ACM" in device.device_node or "USB" in device.device_node:
usb_device = device.find_parent("usb", "usb_device")
nm_node = NmNode(
vendor_id=usb_device.get("ID_VENDOR_ID"),
product_id=usb_device.get("ID_MODEL_ID"),
serial=usb_device.get("ID_SERIAL_SHORT"),
vendor=usb_device.get("ID_VENDOR_FROM_DATABASE"),
model=usb_device.get("ID_MODEL_FROM_DATABASE"),
driver=usb_device.get("DRIVER"),
)
if nm_node.uid in [node.uid for node in saved_nodes]:
continue
nodes.append(nm_node)
return nodes
[docs]
def get_tty_from_nm_node(nm_node: NmNode) -> str:
"""
Retrieve the TTY device node string for a given NmNode.
Args:
nm_node: An NmNode object.
Returns:
The TTY device node string.
Raises:
TtyNotPresent: If a TTY device could not be found for the given NmNode.
"""
ttys = get_ttys_from_nm_node(nm_node)
if ttys:
return ttys[0]
raise TtyNotPresent(f"Could not find tty device for {nm_node}")
[docs]
def get_ttys_from_nm_node(nm_node: NmNode) -> List[str]:
"""
Retrieve the TTY device node string for a given NmNode.
Args:
nm_node: An NmNode object.
Returns:
The TTY device node string.
Raises:
TtyNotPresent: If a TTY device could not be found for the given NmNode.
"""
context = pyudev.Context()
vendor_id = nm_node.vendor_id
model_id = nm_node.product_id
serial_short = nm_node.serial
ttys = []
for device in context.list_devices(subsystem="tty"):
parent = device.find_parent("usb", "usb_device")
if parent is None:
continue
if (
parent.get("ID_VENDOR_ID") == vendor_id
and parent.get("ID_MODEL_ID") == model_id
and parent.get("ID_SERIAL_SHORT") == serial_short
):
ttys.append(device.device_node)
return ttys
[docs]
def select_available_node(nodes: List[NmNode]) -> NmNode:
"""
Prompt the user to select an available node from a list of NmNode objects.
Args:
nodes: List of available NmNode objects.
Returns:
The selected NmNode.
"""
uids = list()
def _nice_node(node: NmNode):
nonlocal uids
ttys = get_ttys_from_nm_node(node)
if ttys:
tty = ttys[uids.count(node.uid)]
else:
tty = "Unknown"
msg = f"{tty} {node.vendor}"
if node.model:
msg += f" {node.model}"
msg += f" {node.serial}"
uids.append(node.uid)
return msg
return nm_prompt_choice("Select the node", nodes, _nice_node)
[docs]
def add_node_to_nodes(nodes: List[NmNode], node: NmNode) -> List[NmNode]:
"""
Add a new NmNode to a list of existing NmNode objects.
Replaces any existing NmNode with the same UID.
Args:
nodes: List of existing NmNode objects.
node: New NmNode to be added.
Returns:
Updated list of NmNode objects.
"""
nodes = (
[node if n.uid == node.uid else n for n in nodes]
if any(n.uid == node.uid for n in nodes)
else nodes + [node]
)
return nodes
[docs]
def remove_node_to_nodes(nodes: List[NmNode], node: NmNode) -> List[NmNode]:
"""
Remove an NmNode from a list of existing NmNode objects.
Args:
nodes: List of existing NmNode objects.
node: NmNode to be removed.
Returns:
Updated list of NmNode objects.
"""
nodes = [n for n in nodes if n.uid != node.uid]
return nodes
[docs]
def select_board(board_names: List[str], node: NmNode) -> str:
"""
Prompts the user to select a board for a given NmNode.
Args:
board_names: List of available board names.
node: NmNode for which to select a board.
Returns:
The selected board name.
"""
msg = f"Select board name for {node.vendor}"
if node.model:
msg += f" {node.model}"
while True:
board = nm_prompt_input(msg, board_names)
if board in board_names:
return board
if nm_prompt_confirm(
f"Board {board} not in board list, continue?", default=False
):
return board
[docs]
def mock_device():
"""Mock a device, useful for boards that do not have USB interfaces"""
sn = f"{random.randint(0, 10**20)}".zfill(20)
sn = hlp.nm_prompt_default_input("Enter serial number", sn)
return NmNode(
vendor_id="1234",
product_id="5678",
serial=sn,
vendor="generic_vendor",
driver="mock",
mock=True,
)