Source code for inet_nm.check

"""
This module contains functions for checking the state of the boards.

This is meant for evaluating inventory.
"""
import argparse
from typing import Dict, List, Tuple

import inet_nm.config as cfg
from inet_nm.data_types import NmNode
from inet_nm.locking import get_locked_uids
from inet_nm.usb_ctrl import get_connected_uids


[docs] def get_nodes_with_state(nodes: List[NmNode], connected=True) -> List[NmNode]: """ Get a list of nodes that are connected or not connected. Args: nodes: A list of nodes to filter. connected: If True, return the nodes that are connected. If False, return the nodes that are not connected. Returns: A list of filtered nodes. """ selected_nodes = [] connected_uids = get_connected_uids() for node in nodes: if node.uid in connected_uids: if connected: selected_nodes.append(node) elif not connected: selected_nodes.append(node) return selected_nodes
[docs] def filter_used_nodes( nodes: List[NmNode], used_uids: List[str], remove=True ) -> List[NmNode]: """ Filter the nodes based on the provided used UIDs. Args: nodes: A list of nodes to filter. used_uids: A list of UIDs of nodes that are already used. remove: If True, remove the used nodes. If False, remove the unused nodes. Returns: List[NmNode]: A list of nodes that are not already used. """ if used_uids is None: used_uids = [] selected_nodes = [] for node in nodes: if node.uid not in used_uids and remove: selected_nodes.append(node) elif node.uid in used_uids and not remove: selected_nodes.append(node) return selected_nodes
[docs] def filter_nodes(nodes: List[NmNode], must_contain: List[str]) -> List[NmNode]: """ Filter the nodes based on the provided features. Args: nodes: A list of nodes to filter. must_contain: A list of features. Only the nodes that contain all of these features will be returned. Returns: A list of filtered nodes. """ selected_nodes = [] for node in nodes: if all([feature in node.features_provided for feature in must_contain]): selected_nodes.append(node) return selected_nodes
[docs] def nodes_to_boards(nodes: List[NmNode]) -> Dict[str, int]: """ Convert a list of nodes to a dictionary of board counts. Args: nodes: A list of nodes. Returns: A dictionary where the keys are the names of the boards and the values are the counts of those boards. """ boards = {} for node in nodes: if node.board not in boards: boards[node.board] = 0 boards[node.board] += 1 return boards
[docs] def get_all_features(nodes: List[NmNode]) -> List[str]: """ Get a list of all features provided by the nodes. Args: nodes: A list of nodes. Returns: A list of all features. """ features = [] for node in nodes: features.extend(node.features_provided) return set(features)
[docs] def eval_features(nodes: List[NmNode], features: set, eval_func): """ Evaluate features of nodes using a provided function. Args: nodes: A list of nodes to evaluate. features: A set of features to evaluate. eval_func: The function to use to evaluate the features. Returns: A list of nodes that meet the criteria of the evaluation function. Raises: ValueError: If the evaluation function cannot be evaluated. """ selected_nodes = [] try: for node in nodes: selected_features = { feature: feature in node.features_provided for feature in features } if eval(eval_func, selected_features): selected_nodes.append(node) except Exception as e: raise ValueError( f"Could not evaluate {eval_func} with {sorted(features)}" ) from e return selected_nodes
[docs] def skip_duplicate_boards(nodes: List[NmNode]) -> List[NmNode]: """ Remove duplicate boards from a list of nodes. Args: nodes: A list of nodes. Returns: A list of nodes without duplicates. """ selected_nodes = [] boards = set() for node in nodes: if node.board not in boards: selected_nodes.append(node) boards.add(node.board) return selected_nodes
[docs] def check_nodes( nodes: List[NmNode], all_nodes: bool = False, missing: bool = False, feat_filter: List[str] = None, feat_eval: str = None, used: bool = False, skip_dups: bool = False, only_used: bool = False, boards: List[str] = None, uids: List[str] = None, locked_nodes: List[str] = None, ) -> List[NmNode]: """ Get a filtered list of nodes based on the provided parameters. Args: nodes: A list of nodes to check. all_nodes: If True, all nodes will be returned. missing: If True, only the nodes that are missing will be returned. feat_filter: A list of features. Only the nodes that contain all of these features will be returned. feat_eval: The function to use to evaluate the features. used: If True, used nodes will also be returned. skip_dups: If True, duplicate nodes will be removed. only_used: If True, only the used nodes will be returned. boards: A list of boards to use. uids: A list of UIDs of nodes to use. locked_nodes: A list of UIDs of nodes that are locked. Returns: A list of filtered nodes. """ features = get_all_features(nodes) if not all_nodes: nodes = get_nodes_with_state(nodes, connected=not missing) if only_used: nodes = filter_used_nodes(nodes, locked_nodes, remove=False) elif not used: nodes = filter_used_nodes(nodes, locked_nodes, remove=True) if feat_filter: nodes = filter_nodes(nodes, feat_filter) if feat_eval: nodes = eval_features(nodes, features, feat_eval) if skip_dups: nodes = skip_duplicate_boards(nodes) if boards: nodes = [node for node in nodes if node.board in boards] if uids: nodes = [node for node in nodes if node.uid in uids] # filter out ignored nodes nodes = [node for node in nodes if not node.ignore] return nodes
[docs] def check_filter_args(parser: argparse.ArgumentParser): parser.add_argument( "-f", "--feat-filter", nargs="+", help="Filter all boards that don't provide these features", ) parser.add_argument( "-e", "--feat-eval", type=str, help="Evaluate features with this function" ) parser.add_argument( "-b", "--boards", nargs="+", help="Use only the list of boards that match these names", ) parser.add_argument( "-d", "--uids", nargs="+", help="Use only the list of boards that match these UIDs", )
[docs] def check_args(parser: argparse.ArgumentParser): """ Define the arguments for the argparse parser. Args: parser: The argparse parser. """ parser.add_argument( "-a", "--all-nodes", action="store_true", help="Show all boards, regardless of connection", ) parser.add_argument( "-m", "--missing", action="store_true", help="Show all missing boards" ) parser.add_argument( "-u", "--used", action="store_true", help="Show used boards as well" ) parser.add_argument( "-o", "--only-used", action="store_true", help="Show only the used boards" ) parser.add_argument( "-s", "--skip-dups", action="store_true", help="Skip duplicate boards" ) check_filter_args(parser)
[docs] def get_inventory_nodes( config: str, feat_filter: List[str] = None, feat_eval: str = None, boards: List[str] = None, uids: List[str] = None, ) -> Tuple[List[NmNode], List[NmNode], List[NmNode], List[NmNode]]: """ Get a list of nodes based on the provided parameters. Args: config: The configuration path for the nodes. feat_filter: A list of features. Only the nodes that contain all of these features will be returned. feat_eval: The function to use to evaluate the features. boards: A list of boards to use. uids: A list of UIDs of nodes to use. Returns: A tuple of lists of nodes. The first list is the available nodes, the second list is the used nodes, the third list is the missing nodes, the forth is the total node count. """ kwargs = locals() available = get_filtered_nodes(**kwargs) used = get_filtered_nodes(only_used=True, **kwargs) missing = get_filtered_nodes(missing=True, **kwargs) total = get_filtered_nodes(all_nodes=True, used=True, **kwargs) return available, used, missing, total
[docs] def get_filtered_nodes( config: str, all_nodes: bool = False, missing: bool = False, feat_filter: List[str] = None, feat_eval: str = None, used: bool = False, skip_dups: bool = False, only_used: bool = False, boards: List[str] = None, uids: List[str] = None, ) -> List[NmNode]: """ Get a list of nodes based on the provided parameters. Args: config: The configuration path for the nodes. all_nodes: If True, all nodes will be returned. missing: If True, only the nodes that are missing will be returned. feat_filter: A list of features. Only the nodes that contain all of these features will be returned. feat_eval: The function to use to evaluate the features. used: If True, used nodes will also be returned. skip_dups: If True, duplicate nodes will be removed. only_used: If True, only the used nodes will be returned. boards: A list of boards to use. uids: A list of UIDs of nodes to use. Returns: A list of filtered nodes. """ nodes = cfg.NodesConfig(config).load() locked_nodes = get_locked_uids() nodes = check_nodes( nodes, all_nodes, missing, feat_filter, feat_eval, used, skip_dups, only_used, boards, uids, locked_nodes, ) return nodes
[docs] def all_features_from_nodes(nodes: List[NmNode]) -> List[str]: """ Get a list of all features provided by the nodes. Args: nodes: A list of nodes. Returns: A list of all features. """ features = [] for node in nodes: features.extend(node.features_provided) return sorted(list(set(features)))