Source code for inet_nm.runner_base
"""
Runs applications on nodes.
This module provides the NmNodesRunner class which handles running
operations on multiple nodes.
An operation is defined by a method `func` which is to be implemented
in the subclass.
"""
from threading import Thread
from typing import Dict, List
import inet_nm.locking as lk
from inet_nm._helpers import nm_print
from inet_nm.data_types import EnvConfigFormat, NmNode, NodeEnv
from inet_nm.filelock import FileLock
from inet_nm.usb_ctrl import get_ttys_from_nm_node
[docs]
class NmNodesRunner:
"""
Class to handle running operations on nodes.
The class manages locking/unlocking nodes, running operations on each node
concurrently in separate threads, and handles cleanup after operations
have completed.
The operations to be run are defined by a method `func` which is to
be implemented in the subclass.
"""
def __init__(
self,
nodes: List[NmNode],
default_timeout: int = None,
seq=False,
force=False,
extra_env: EnvConfigFormat = None,
):
"""
Initialize a new instance of NmNodesRunner.
Args:
nodes: A list of NmNode instances to be managed.
default_timeout: Default timeout value for file lock acquisition.
seq: If True, operations are run sequentially instead of concurrently.
force: If True, operations are run even if the node is locked.
extra_env: A dictionary of extra environment variables to be passed
to the operation function.
"""
self.nodes = nodes
self.default_timeout = default_timeout
self.seq = seq
self.force = force
self.extra_env = extra_env or EnvConfigFormat(shared={}, nodes={}, patterns=[])
self.lockable_nodes = [
(node, FileLock(lk.get_lock_path(node), timeout=default_timeout))
for node in nodes
]
self.locks = [lock for _, lock in self.lockable_nodes]
self._acquired = False
[docs]
def pre(self):
"""Override in the subclass if pre-operation steps are needed."""
pass
[docs]
def post(self):
"""Override in the subclass if post-operation steps are needed."""
pass
[docs]
def func(self, node: NmNode, idx: int, env: Dict[str, str]):
"""
Function to execute.
It must be implemented in the subclass.
Args:
node: The node to run the function on.
idx: The index of the node in the list of nodes.
env: A dictionary of environment variables.
"""
raise NotImplementedError("You must implement a func() method")
[docs]
def acquire(self, timeout: float = None):
"""
Acquire file locks for all nodes.
This method must be called before running operations on nodes.
Args:
timeout (float): Timeout value for file lock acquisition.
If None, default_timeout is used.
"""
if self.force:
return
for lock in self.locks:
lock.acquire(timeout=timeout or self.default_timeout)
self._acquired = True
[docs]
def release(self):
"""Release all acquired file locks."""
if self.force:
return
for lock in self.locks:
try:
lock.release()
except FileNotFoundError:
nm_print(f"File {lock.file_name} already unlocked.")
self._acquired = False
[docs]
def run(self):
"""
Run operations on all nodes.
This method spawns a new thread for each node and runs the
operation concurrently on all nodes.
The operation to run is defined by the `func` method.
"""
if not self._acquired and not self.force:
raise Exception("You must call acquire() before calling run()")
self.pre()
self.threads = []
for idx, node in enumerate(self.nodes):
ttys = get_ttys_from_nm_node(node)
if ttys:
nm_port = ttys[0]
else:
nm_port = "Unknown"
node_env = NodeEnv(
NM_IDX=idx,
NM_UID=node.uid,
NM_SERIAL=node.serial,
NM_BOARD=node.board,
NM_PORT=nm_port,
).to_dict()
# Inject multiple ttys values if available
for i, tty in enumerate(ttys):
node_env[f"NM_PORT_{i}"] = tty
node_env.update(self.extra_env.shared)
node_env.update(self.extra_env.nodes.get(node.uid, {}))
thread = Thread(target=self.func, args=(node, idx, node_env))
thread.start()
if self.seq:
thread.join()
else:
self.threads.append(thread)
for thread in self.threads:
thread.join()
self.post()
self.release()
def __enter__(self):
self.acquire()
return self
def __exit__(self, type, value, traceback):
if self._acquired:
self.release()
self._acquired = False