Source code for inet_nm.runner_apps

"""
This module includes classes that are used to run applications on nodes.

Classes:
    NmShellRunner: Runs shell commands on nodes.
    NmTmuxPanedRunner: Runs tmux with paned windows.
    NmTmuxWindowedRunner: Runs tmux with individual windows for each node.
"""

import json
import os
import re
import subprocess
import time
from typing import Dict

from inet_nm._helpers import nm_extract_valid_jsons, nm_print
from inet_nm.data_types import NmNode
from inet_nm.runner_base import NmNodesRunner


[docs] class NmShellRunner(NmNodesRunner): """Runs shell commands on nodes. This class inherits from NmNodesRunner and overrides the func method to execute shell commands on nodes. Attributes: cmd: Command to execute on nodes. """ cmd = "echo $NM_IDX" SETUP_WAIT = 0.1 output_filter = None json_filter = False results = [] @staticmethod def _run_command(cmd, prefix, env, regex_str=None): def get_output(process): output = process.stdout.readline() if output: if regex_str is not None: matched = re.findall(regex_str, output.decode().strip()) for data in matched: nm_print(f"{prefix}{data}") else: nm_print(f"{prefix}{output.decode().strip()}") else: # Have a small sleep so we are not burning CPU waiting for output. time.sleep(0.1) return output process = subprocess.Popen( cmd, env=env, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT ) while True: output = get_output(process) poll = process.poll() if poll is not None: output = True while output: output = get_output(process) break rc = process.returncode return rc @staticmethod def _run_command_json(cmd, uid, board, idx, env): # Run subprocess command to completion and capture output result = subprocess.run( cmd, env=env, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) data = nm_extract_valid_jsons(result.stdout.decode()) result_output = { "uid": uid, "board": board, "idx": idx, "data": data, "stdout": result.stdout.decode(), "result": result.returncode, } return result_output
[docs] def func(self, node: NmNode, idx: int, env: Dict[str, str]): """Execute shell commands on nodes. Args: node: Node to run the command on. idx: Index of the node. env: Environment variables for the command. """ time.sleep(idx * self.SETUP_WAIT) full_env = {**os.environ, **env} # Merge original and new environment variables full_env = { k: str(v) for k, v in full_env.items() } # Cast everything in env to a string prefix = f"NODE:{idx}" if node.board: prefix += f":BOARD:{node.board}" prefix += ": " # Since most use cases are with bash we will use that as the default shell. # This may change as soon as we have a use case that requires a different shell. # Note that the run command exits after one command is executed. # So if we want to loop with a default shell it will exit after the first loop. cmd = f"/bin/bash -c '{self.cmd}'" if self.output_filter is None: regex_str = None else: regex_str = re.compile(self.output_filter) if self.json_filter: res = NmShellRunner._run_command_json( cmd, node.uid, node.board, idx, env=full_env ) self.results.append(res) else: res = NmShellRunner._run_command( cmd, prefix=prefix, env=full_env, regex_str=regex_str ) if self.output_filter is None and not self.json_filter: self.results.append(f"RESULT:{prefix}{res}")
[docs] def post(self): """Run after the operations on nodes have completed. It prints the results of the commands executed on nodes. """ if self.json_filter: nm_print(json.dumps(self.results, indent=2, sort_keys=True)) else: for result in self.results: nm_print(result)
[docs] class NmTmuxBaseRunner(NmNodesRunner): """Base class for tmux runners. This class inherits from NmNodesRunner and sets up a tmux session. """ session_name: str = "default" cmd: str = None SETUP_WAIT = 0.2
[docs] def post(self): """Run after the operations on nodes have completed. It attaches to the tmux session and keeps checking if the session is still active. """ os.system(f"tmux attach -t {self.session_name}") while True: result = subprocess.run( f"tmux has-session -t {self.session_name}", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) if result.returncode != 0: break
[docs] class NmTmuxPanedRunner(NmTmuxBaseRunner): """Run tmux with paned windows. This class inherits from NmTmuxBaseRunner and sets up a tmux session with panes. """
[docs] def func(self, node: NmNode, idx: int, env: Dict[str, str]): """Set up a tmux session with paned windows. Args: node: Node to run the command on. idx: Index of the node. env: Environment variables for the command. """ time.sleep(idx * self.SETUP_WAIT) e_args = "" for key, value in env.items(): esc = "\\$" if "${" in value: e_args += f" -e {key}={value.replace('$', esc)}" else: e_args += f" -e {key}={value}" if idx != 0: subprocess.run( f"tmux split-window {e_args} -t {self.session_name}", shell=True ) else: subprocess.run(f"tmux new-session -d -s {self.session_name}", shell=True) subprocess.run( f"tmux respawn-window -k {e_args} -t {self.session_name} -t 0.{idx}", shell=True, ) # Select pane subprocess.run(f"tmux select-pane -t {idx}", shell=True) # Execute command if self.cmd: subprocess.run( f"tmux send-keys -t {self.session_name}:0.{idx} '{self.cmd}' Enter", shell=True, ) # Evenly distribute panes subprocess.run(f"tmux select-layout -t {self.session_name} tiled", shell=True)
[docs] class NmTmuxWindowedRunner(NmTmuxBaseRunner): """Runs tmux with individual windows for each node. This class inherits from NmTmuxBaseRunner and sets up a tmux session with individual windows for each node. """
[docs] def func(self, node: NmNode, idx: int, env: Dict[str, str]): """Sets up a tmux session with individual windows for each node. Args: node: Node to run the command on. idx: Index of the node. env: Environment variables for the command. """ env = {k: str(v) for k, v in env.items()} # Cast everything in env to a string uid = env["NM_UID"] session_name = self.session_name time.sleep(idx * self.SETUP_WAIT) e_args = " -e " + " -e ".join([f"{key}={value}" for key, value in env.items()]) if idx != 0: subprocess.run( f"tmux new-window -t {session_name}:{idx} {e_args}", shell=True ) else: subprocess.run(f"tmux new-session -d -s {session_name}", shell=True) subprocess.run( f"tmux respawn-window -k {e_args} -t {session_name} -t 0.{idx}", shell=True, ) subprocess.run(f"tmux rename-window -t {session_name}:{idx} {uid}", shell=True) if self.cmd: subprocess.run( f"tmux send-keys -t {session_name}:{idx} '{self.cmd}' Enter", shell=True )