Source code for inet_nm.cli_commission

import argparse
import sys

import inet_nm.commissioner as cmr
import inet_nm.config as cfg
from inet_nm.data_types import NmNode
from inet_nm.power_control import DEFAULT_MAX_ALLOWED_NODES, PowerControl
from inet_nm.usb_ctrl import TtyNotPresent, get_devices_from_tty


def _main():
    """CLI to commission a USB board."""
    parser = argparse.ArgumentParser(description="Commission USB boards")
    cfg.config_arg(parser)
    parser.add_argument("-b", "--board", help="Name of the board to commission")
    parser.add_argument(
        "-n", "--no-cache", help="Ignore the cache", action="store_true"
    )
    parser.add_argument(
        "-m",
        "--mock-dev",
        help="Mock a device, useful for boards that do not have USB interfaces",
        action="store_true",
    )
    parser.add_argument(
        "-i",
        "--ignore",
        help="Device will always be ignore, used for ttys that are connected "
        "but not part of the inet-nm ecosystem.",
        action="store_true",
    )

    args = parser.parse_args()
    nodes_cfg = cfg.NodesConfig(args.config)
    bi_cfg = cfg.BoardInfoConfig(args.config)
    # Check early since we will need to write eventually
    nodes_cfg.check_file(writable=True)
    bi_cfg.check_file(writable=False)

    saved_nodes = nodes_cfg.load()
    nm_nodes = []
    with PowerControl(
        locations=cfg.LocationConfig(args.config).load(),
        nodes=saved_nodes,
        max_powered_devices=DEFAULT_MAX_ALLOWED_NODES,
    ) as pc:
        while not pc.power_on_complete:
            pc.power_on_chunk()
            nm_nodes.extend(
                get_devices_from_tty()
                if args.no_cache
                else get_devices_from_tty(saved_nodes)
            )
            pc.power_off_unused()

        # filter out duplicate nodes
        nm_node: NmNode
        found_ids = set()
        filtered_nodes = []
        for nm_node in nm_nodes:
            if nm_node.uid in found_ids:
                continue
            found_ids.add(nm_node.uid)
            filtered_nodes.append(nm_node)
        nm_nodes = filtered_nodes

        print(f"Found {len(saved_nodes)} saved nodes in {args.config}")
        if args.mock_dev:
            nm_nodes.append(cmr.mock_device())

        try:
            selected_node = cmr.select_available_node(nm_nodes)
        except ValueError:
            print("No available nodes found")
            sys.exit(1)
        except TtyNotPresent as exc:
            if args.mock_dev:
                selected_node = nm_nodes[-1]
            else:
                raise exc
        if args.ignore:
            selected_node.ignore = True
        else:
            binfo = bi_cfg.load()
            selected_node.board = args.board or cmr.select_board(
                list(binfo.keys()), selected_node
            )
            if selected_node.board in binfo:
                selected_node.features_provided = binfo[selected_node.board]
        if cmr.is_uninitialized_sn(selected_node):
            uid = selected_node.uid
            # Normally we would not need to power this as it would only get powered
            # down if it was already in the nodes list... If the no-cache is used
            # then it may be powered down and we need to power it up.
            # Slow but safe.
            pc.power_on_uid(uid)
            cmr.set_uninitialized_sn(selected_node)
            pc.power_off_uid(uid)
    nodes = cmr.add_node_to_nodes(saved_nodes, selected_node)
    nodes_cfg.save(nodes)
    print(f"Updated {nodes_cfg.file_path}")


[docs] def main(): """CLI to commission a USB board.""" try: _main() except KeyboardInterrupt: print("\nUser aborted...")
if __name__ == "__main__": main()