Source code for inet_nm.power_control

import logging
import subprocess
from time import sleep
from typing import List, Optional

import inet_nm.config as cfg
import inet_nm.locking as lck
import inet_nm.usb_ctrl as ucl
from inet_nm.data_types import NmNode

DEFAULT_MAX_ALLOWED_NODES = 14


[docs] class PowerControl: DEFAULT_POWER_ON_WAIT = 10 DEFAULT_POWER_OFF_WAIT = 1 MAX_ALLOWED_NODES = 256 def __init__( self, locations, nodes: List[NmNode], max_powered_devices=None, config: Optional[str] = None, ): self.logging = logging.getLogger(__name__) self.id_path_to_node_uid = {} self.powered_locations = {} self.powered_id_paths = set() self.node_uids = {node.uid for node in nodes if not node.ignore} for id, loc in locations.items(): if loc["power_control"]: self.powered_locations[id] = loc self.powered_id_paths.add(id) self.max_powered_devices = max_powered_devices self._running = False self._power_on_procs = [] self._power_off_procs = [] self._powered_on = set() if config is not None: self._cache_config = cfg.LocationCache(config_dir=config).load() else: self._cache_config = None def _available(self, powered_devs) -> int: if self.max_powered_devices is not None: return self.max_powered_devices - len(powered_devs) return self.MAX_ALLOWED_NODES
[docs] def power_on_uid(self, uid: str): if uid not in self.id_path_to_node_uid.values(): raise ValueError( f"Node with uid {uid} not found, " "must have been collected during power iterations." ) for id_path, node_uid in self.id_path_to_node_uid.items(): if node_uid == uid: if id_path in self.powered_locations: self._power_on(id_path)
[docs] def power_off_uid(self, uid: str): if uid not in self.id_path_to_node_uid.values(): raise ValueError( f"Node with uid {uid} not found, " "must have been collected during power iterations." ) for id_path, node_uid in self.id_path_to_node_uid.items(): if node_uid == uid: if id_path in self.powered_locations: self._power_off(id_path)
def _power_off(self, id_path): self.logging.debug("Powering off %s", id_path) usb_info = self.powered_locations[id_path] self._power_off_procs.append( subprocess.Popen( [ "sudo", "uhubctl", "-l", usb_info["hub"], "-p", usb_info["port"], "-a", "off", ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) ) def _power_on(self, id_path): self.logging.debug("Powering on %s", id_path) usb_info = self.powered_locations[id_path] self._power_on_procs.append( subprocess.Popen( [ "sudo", "uhubctl", "-l", usb_info["hub"], "-p", usb_info["port"], "-a", "on", ], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, ) )
[docs] def power_on_chunk(self): self._running = True powered_devs = ucl.get_connected_id_paths() available = self._available(powered_devs) if available < 0: raise ValueError( f"More than {self.max_powered_devices} nodes are " f"already powered on, {-available} over." ) self.logging.debug( "%s nodes powered, %s of %s available", len(powered_devs), available, self.max_powered_devices or self.MAX_ALLOWED_NODES, ) for id_path in self.powered_locations: if id_path in powered_devs: self._powered_on.add(id_path) continue if id_path in self._powered_on: continue self._power_on(id_path) # it takes a while to actually show up as a tty device # so we just manually add it self._powered_on.add(id_path) powered_devs.add(id_path) if self._available(powered_devs) == 0: break self.wait_for_power_on()
@property def power_on_complete(self) -> bool: if not self._running: return False # check if all powered_id_paths are powered on if self._powered_on == self.powered_id_paths: self._running = False return True return False def _map_id_path_to_node_uid(self): powered_id_paths = ucl.get_connected_id_paths() for id_path in powered_id_paths: if id_path in self.id_path_to_node_uid: continue uid = ucl.get_uid_from_id_path(id_path) self.id_path_to_node_uid[id_path] = uid if self._cache_config is not None: for cache in self._cache_config: if cache["id_path"] in self.id_path_to_node_uid: continue self.id_path_to_node_uid[cache["id_path"]] = cache["node_uid"]
[docs] def power_off_unused(self) -> None: self.logging.debug("Powering off") # check locked devices from lockfiles locked_uids = lck.get_locked_uids() unused_uids = self.node_uids - set(locked_uids) for id_path, usb_info in self.powered_locations.items(): uid = ucl.get_uid_from_id_path(id_path) if uid is None: continue if uid in unused_uids: self._power_off(id_path) self.wait_for_power_off()
[docs] def wait_for_power_off(self): for proc in self._power_off_procs: proc.wait() if self._power_off_procs: sleep(self.DEFAULT_POWER_OFF_WAIT) self._power_off_procs = [] self.logging.debug("Finished powering off")
[docs] def wait_for_power_on(self, wait_time=None): for proc in self._power_on_procs: proc.wait() if self._power_on_procs: sleep(wait_time or self.DEFAULT_POWER_ON_WAIT) self._power_on_procs = [] self._map_id_path_to_node_uid() self.logging.debug("Finished powering on")
def __enter__(self): return self def __exit__(self, exc_type, value, traceback) -> None: """Release the lock when exiting the context.""" self.wait_for_power_on(wait_time=0) self.power_off_unused() self.wait_for_power_off()