Source code for inet_nm.runner_helper
import subprocess
import sys
import inet_nm.check as chk
import inet_nm.config as cfg
def _is_command_available(cmd):
return_code = subprocess.call(
f"which {cmd}", shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.STDOUT
)
if return_code != 0:
print(f"{cmd} is not available! Please install first.")
sys.exit(1)
def _check_filtered_nodes(**kwargs):
nodes = chk.get_filtered_nodes(**kwargs)
if len(nodes) == 0:
print("No available nodes!")
sys.exit(1)
return nodes
[docs]
def node_env_vars(config: str):
env_cfg = cfg.EnvConfig(config)
env_cfg.check_file(writable=False)
env_info = env_cfg.load()
pattern_nodes = {}
for pattern in env_info.patterns:
pattern["config"] = config
env_key = pattern.pop("key")
env_val = pattern.pop("val")
matched_nodes = chk.get_filtered_nodes(all_nodes=True, **pattern)
for node in matched_nodes:
if node.uid not in pattern_nodes:
pattern_nodes[node.uid] = {}
pattern_nodes[node.uid][env_key] = env_val
for uid, env in env_info.nodes.items():
for key, val in env.items():
if uid not in pattern_nodes:
pattern_nodes[uid] = {}
pattern_nodes[uid][key] = val
env_info.nodes = pattern_nodes
return env_info
[docs]
def sanity_check(cmd, **kwargs):
"""Checks if the command is available and if there are any nodes available.
Args:
cmd (str): Command to check.
**kwargs: Keyword arguments to pass to check.get_filtered_nodes.
Returns:
List of NmNode objects.
"""
_is_command_available(cmd)
return _check_filtered_nodes(**kwargs)
[docs]
def do_nothing(signum, frame):
"""Does nothing."""
pass