import itertools
import json
import os
import pathlib
import typing
import uuid
from collections import OrderedDict
from datetime import datetime
from pprint import pprint
from tqdm import tqdm
import shutil
import yaml
from nbconvert.exporters import PythonExporter
import humanize
import glob
from cloudmesh.common.util import readfile
from cloudmesh.rivanna.rivanna import Rivanna
from cloudmesh.common.FlatDict import FlatDict
from cloudmesh.common.Printer import Printer
from cloudmesh.common.Shell import Shell
from cloudmesh.common.console import Console
from cloudmesh.common.parameter import Parameter
from cloudmesh.common.util import banner, writefile
from cloudmesh.common.variables import Variables
from cloudmesh.common.dotdict import dotdict
PathLike = typing.Union[str, pathlib.Path]
DictOrList = typing.Union[dict, list]
OptPath = typing.Optional[PathLike]
OptStr = typing.Optional[str]
[docs]
class ExperimentExecutor:
# def config_read(self, path="./config.yaml"):
# print (path)
# return None
def __init__(self, verbose=False):
"""Initialize the ExperimentExecutor Object
Args:
verbose (bool): If true prints additional infromation when
ExperimentExecutor methods are called
"""
self.flat = FlatDict({}, sep=".")
self.data = dict()
self.permutations = list()
self.experiments = None
self.dryrun = False
self.verbose = False
self.execution_mode = "h"
self.input_dir = str(Shell.map_filename(".").path)
self.output_dir = str(Shell.map_filename(".").path)
self.os_variables = None
self.verbose = verbose
self.template_path = None
self.template_content = None
self.configuration_parameters = None
self.script_out = None
# self.gpu = None
self.copycode = None
[docs]
@staticmethod
def progress_to_dict(input_string):
cleaned_string = input_string.replace("# cloudmesh", "")
key_value_pairs = [item.split("=") for item in cleaned_string.split()]
result_dict = {key: value for key, value in key_value_pairs}
return result_dict
[docs]
@staticmethod
def count_word_occurrences(list_of_dicts):
word_count = {
'total': len(list_of_dicts),
'pending': 0
}
for d in list_of_dicts:
progress_value = d.get('progress')
# No progress recorded
if not progress_value or progress_value == "None":
word_count['pending'] += 1
continue
try:
# Expect formats like "RUNNING: blah blah"
if ":" in progress_value:
word, _ = progress_value.split(":", 1)
word = word.strip()
word_count[word] = word_count.get(word, 0) + 1
else:
# Weird format → treat as pending
word_count['pending'] += 1
except Exception:
word_count['pending'] += 1
return word_count
[docs]
def list(self, directory="project",
config="config.yaml",
log="*.out",
debug=False,
verbose=True,
table=True):
"""Lists all experiments
Returns:
None: prints the experiments
"""
banner(f"List Experiments in {directory}/*/{config}")
experiments = []
if verbose:
num_entries = sum(1 for entry in os.scandir(directory) if entry.is_dir())
progress_bar = tqdm(total=num_entries, desc="Processing", ncols=70) # Set ncols to 70
header = None
for entry in os.scandir(directory):
if entry.is_dir():
config_dir = f"{directory}/{entry.name}"
config_file = f"{config_dir}/{config}"
log_file = f"{config_dir}/{log}"
if debug:
print(config_file)
progress = None
try:
logfilename = glob.glob(log_file)[0]
log_content = readfile(logfilename).strip()
state = Shell.find_lines_with(log_content, "# cloudmesh status=")
line = state[-1].strip()
state = ExperimentExecutor.progress_to_dict(line)
status = state["status"]
msg = state["msg"]
progress = f"{status}: {msg}"
except Exception as e:
# print (e)
progress = None
content = yaml.safe_load(readfile(config_file))
content = content["experiment"]
# space = Shell.calculate_disk_space(config_dir)
content["space"] = Shell.calculate_disk_space(config_dir)
content["space"] = humanize.naturalsize(content["space"])
content["progress"] = str(progress)
experiments.append(content)
if header is None:
header = content.keys()
if verbose:
progress_bar.update(1)
if verbose:
progress_bar.close()
if table:
print(Printer.write(experiments, order=header))
summary = ExperimentExecutor.count_word_occurrences(experiments)
summary_str = ', '.join([f'{key}: {value}' for key, value in summary.items()])
print(summary_str)
import shutil
[docs]
def info(self, verbose=None):
"""Prints information about the ExperimentExecutor object for debugging purposes
Args:
verbose (bool): if True prints even more information
Returns:
None: None
"""
verbose = verbose or self.verbose
if not verbose:
return
for a in [
"dryrun",
"verbose",
"name",
"source",
"destination",
"attributes",
"gpu",
"config",
"config_files",
"directory",
"experiment",
"execution_mode",
"template",
"script_output",
"output_dir",
"input_dir",
"script_in",
"script_out",
"os_variables",
"experiments",
"copy"
]:
# noinspection PyBroadException
try:
result = getattr(self, a)
except: # noqa: E722
result = self.data.get(a)
print(f'{a:<12}: {result}')
print("permutations:")
result = getattr(self, "permutations")
pprint(result)
print("BEGIN FLAT")
pprint(self.flat)
print("END FLAT")
print()
print("BEGIN DATA")
pprint(self.data)
print("END DATA")
print()
print("BEGIN YAML")
spec = yaml.dump(self.data, indent=2)
print(spec)
print("END YAML")
print("BEGIN SPEC")
spec = self.spec_replace(spec)
print(spec)
print("END SPEC")
print("BEGIN PERMUTATION")
p = self.permutations
pprint(p)
print("END PERMUTATION")
# self.info()
#
# self.data = result
#
print("BEGIN DATA")
pprint(self.data)
print("END DATA")
banner("BEGIN TEMPLATE")
print(self.template_content)
banner("END TEMPLATE")
[docs]
@staticmethod
def update_with_directory(directory, filename):
"""prefix with the directory if the filename is not starting with . / ~
Args:
directory (str): the string value of the directory
filename (str): the filename
Returns:
str: directory/filename
"""
if directory is None:
return filename
elif not filename.startswith("/") and not filename.startswith(".") and not filename.startswith("~"):
return f"{directory}/{filename}"
else:
return filename
[docs]
def get_data(self, flat=False):
"""converts the data from the yaml file with the flatdict
Args:
flat (boolen): if set to true uses flatdict
Returns:
dict: result of flatdict without the seperator
"""
result = self.data
if flat:
from cloudmesh.common.FlatDict import FlatDict
result = FlatDict(self.data, sep=".")
del result["sep"]
return result
[docs]
def spec_replace(self, spec):
"""given a spec in yaml format, replaces all values in the yaml file that
are of the form "{a.b}" with the value of
a:
b: value
if it is defined in the yaml file
Args:
spec (str): yaml string
Returns:
str: replaced yaml file
"""
banner("SPECIFICATION")
print(spec)
data = FlatDict()
data.loads(spec)
banner("FLATDICT")
pprint(data.__dict__)
spec1 = str(data.__dict__)
print(str(spec1[1:-1]))
banner("MUNCH")
#
# should be replaced with flatdct aplied on config.yaml file
#
import re
import munch
variables = re.findall(r"\{\w.+\}", spec)
data = yaml.load(spec, Loader=yaml.SafeLoader)
m = munch.DefaultMunch.fromDict(data)
for o in range(0,4):
for i in range(0, len(variables)):
for variable in variables:
text = variable
variable = variable[1:-1]
# noinspection PyBroadException
try:
value = eval("m.{variable}".format(**locals()))
if "{" not in value:
spec = spec.replace(text, value)
except: # noqa: E722
value = variable
banner("END MUNCH")
return spec
[docs]
def update_from_os(self, variables):
"""LOads all variables from os.environ into self.data with os.name
Args:
variables ([str]): the name of the variables such as "HOME"
Returns:
dict: self.data with all variaples added with os.name: value
"""
if variables is not None:
if "os" not in self.data:
self.data["os"] = {}
for key in variables:
self.data["os"][key] = os.environ[key]
return self.data
[docs]
def load_source_template(self, script):
"""Registers and reads the template script in for processing
This method must be run at least once prior to generating the batch script output.
Args:
script (str): A string that is the path to the template
script.
Returns:
str: The text of the template file unaltered.
"""
self.template_path = script
self.template_content = readfile(script)
return self.template_content
[docs]
def update_from_dict(self, d):
"""Add a dict to self. data
Args:
d (dict): dictionary
Returns:
dict: self.data with updated dict
"""
self.data.update(d)
return self.data
[docs]
def update_from_attributes(self, attributes: str):
"""attributes are of the form "a=1,b=3"
Args:
attributes: A string to expand into key-value pairs
Returns:
dict: self.data with updated dict
"""
flatdict = Parameter.arguments_to_dict(attributes)
d = FlatDict(flatdict, sep=".")
d = d.unflatten()
del d["sep"]
self.update_from_dict(d)
return self.data
[docs]
def update_from_os_environ(self):
"""Updates the config file output to include OS environment variables
Returns:
dict: The current value of the data configuration variable
"""
self.update_from_dict(dict(os.environ))
return self.data
[docs]
def update_from_cm_variables(self, load=True):
"""Adds Cloudmesh variables to the class's data parameter as a flat dict.
Args:
load (bool): Toggles execution; if false, method does
nothing.
Returns:
dict: self.data with updated cloudmesh variables
"""
if load:
variables = Variables()
v = FlatDict({"cloudmesh": variables.dict()}, sep=".")
d = v.unflatten()
del d["sep"]
self.update_from_dict(d)
return self.data
@staticmethod
def _suffix(filename):
"""
Args:
filename (str): Returns the file suffix of a filename
Returns:
str: the suffix of the filename
"""
return pathlib.Path(filename).suffix
[docs]
def update_from_file(self, filename):
"""Updates the configuration self.data with the data within the passed file.
Args:
filename (str): The path to the configuration file (yaml,
json, py, ipynb)
Returns:
dict: self.data with updated cloudmesh variables from the
specified file
"""
if self.verbose:
print(f"Reading variables from {filename}")
suffix = self._suffix(filename).lower()
content = readfile(filename)
if suffix in [".json"]:
values = json.loads(content)
elif suffix in [".yml", ".yaml"]:
content = readfile(filename)
values = yaml.safe_load(content)
elif suffix in [".py"]:
modulename = filename.replace(".py", "").replace("/", "_").replace("build_", "")
from importlib.machinery import SourceFileLoader
mod = SourceFileLoader(modulename, filename).load_module()
values = {}
for name, value in vars(mod).items():
if not name.startswith("__"):
values[name] = value
elif suffix in [".ipynb"]:
py_name = filename.replace(".ipynb", ".py")
jupy = PythonExporter()
body, _ = jupy.from_filename(filename)
writefile(py_name, body)
# Shell.run(f"jupyter nbconvert --to python {filename}")
filename = py_name
modulename = filename.replace(".py", "").replace("/", "_").replace("build_", "")
from importlib.machinery import SourceFileLoader
mod = SourceFileLoader(modulename, filename).load_module()
values = {}
for name, value in vars(mod).items():
if not name.startswith("__"):
values[name] = value
else:
raise RuntimeError(f"Unsupported config type {suffix}")
self.update_from_dict(values)
# self.read_config_from_dict(regular_dict)
if values is not None and 'experiment' in values:
experiments = values['experiment']
for key, value in experiments.items():
print(key, value)
# noinspection PyBroadException,PyPep8
try:
experiments[key] = Parameter.expand(value)
except:
experiments[key] = [value]
self.permutations = self.permutation_generator(experiments)
return self.data
[docs]
def generate(self, script=None, variables=None, fences=("{", "}")):
"""Expands the script template given the passed configuration.
Args:
script (str): The string contents of the script file.
variables (dict): The variables to be replaced. If omitted,
uses the internal variables found.
fences ((str, str)): A 2-position tuple that encloses
template variables (start and end).
Returns:
tuple[str, dict]: The expanded script and a dict of replaced values
based on ``data``.
"""
replaced = {}
if variables is None:
variables = self.data
if script is None:
script = self.template_content
content = str(script)
flat = FlatDict(variables, sep=".")
for attribute in flat:
value = flat[attribute]
frame = fences[0] + attribute + fences[1]
if frame in content:
if self.verbose:
print(f"- Expanding {frame} with {value}")
replaced[attribute] = value
content = content.replace(frame, str(value))
return content, replaced
[docs]
@staticmethod
def permutation_generator(exp_dict):
"""Creates a cartesian product of a ``{key: list, ...}`` object.
Args:
exp_dict (dict): The dictionary to process.
Returns:
list[dict]: A list of dictionaries containing the resulting
cartesian product.
Example::
my_dict = {"key1": ["value1", "value2"], "key2": ["value3", "value4"]}
out = ExperimentExecutor.permutation_generator(my_dict)
# out == [
# {"key1": "value1", "key2": "value3"},
# {"key1": "value1", "key2": "value4"},
# {"key1": "value2", "key2": "value3"},
# {"key1": "value2", "key2": "value4"},
# ]
"""
keys, values = zip(*exp_dict.items())
return [dict(zip(keys, value)) for value in itertools.product(*values)]
[docs]
def generate_experiment_permutations(self, variable_str):
"""Generates experiment permutations based on the passed string and appends it to the current instance.
Args:
variable_str (str): A Parameter.expand string (such as
epoch=[1-3] x=[1,4] y=[10,11])
Returns:
list: list with permutations over the experiment variables
"""
experiments = OrderedDict()
entries = variable_str.split(' ')
for entry in entries:
k, v = entry.split("=")
experiments[k] = Parameter.expand(v)
self.permutations = self.permutation_generator(experiments)
return self.permutations
@staticmethod
def _generate_bootstrapping(permutation):
"""creates an identifier, a list of assignments, ad values.
Args:
permutation (list): the permutation list
Returns:
str, list, list: identifier, assignments, values
"""
values = list()
for attribute, value in permutation.items():
values.append(f"{attribute}_{value}")
assignments = list()
for attribute, value in permutation.items():
assignments.append(f"{attribute}={value}")
assignments = " ".join(assignments)
identifier = "_".join(values)
return identifier, assignments, values
def _generate_hierarchical_config(self):
"""Creates a hierarchical directory with configuration yaml files, and shell script
Returns:
dict: directory with configuration and yaml files
"""
"""Runs process to build out all templates in a hierarchical-style
Returns:
None.
Side Effects:
Writes two files for each established experiment, each in their own directory.
"""
if self.verbose:
print("Outputting Hierarchical Experiments")
configuration = dict()
self.script_variables = []
suffix = self._suffix(self.script_out)
directory = self.output_dir # .path.dirname(name)
for permutation in self.permutations:
identifier, assignments, values = self._generate_bootstrapping(permutation)
if self.verbose:
print(identifier, assignments, values)
spec = yaml.dump(self.data, indent=2)
spec = self.spec_replace(spec)
print (type(spec))
variables = yaml.safe_load(spec)
print ("VARIABLES")
pprint (variables)
print ("END VARIABLES")
name = os.path.basename(self.script_out)
script = f"{directory}/{identifier}/{name}"
config = f"{directory}/{identifier}/config.yaml"
variables.update({'experiment': permutation})
variables["ee"]["identifier"] = identifier
configuration[identifier] = {
"id": identifier,
"directory": f"{directory}/{identifier}",
"experiment": assignments,
"script": script,
"config": config,
"variables": variables,
"copycode": self.copycode
}
return configuration
[docs]
def generate_experiment_batch_scripts(self, out_mode=None, replace_all=True):
"""Utility method to genrerate either hierarchical or flat outputs; or debug.
NOte the falt mode is no longer supported
Args:
out_mode (string): The mode of operation. One of: "debug",
"flat", "hierarchical"
Returns:
None: generates the batch scripts
"""
mode = self.execution_mode if out_mode is None else out_mode.lower()
if mode.startswith("d"):
Console.warning("This is just debug mode")
print()
for permutation in self.permutations:
values = ""
for attribute, value in permutation.items():
values = values + f"{attribute}={value} "
script = f"{self.output_dir}/{self.script_out}{values}".replace("=", "_")
else:
configuration = None
if mode.startswith("h"):
configuration = self._generate_hierarchical_config()
else:
raise RuntimeError(f"Invalid generator mode {mode}")
if self.verbose:
banner("Script generation")
print(Printer.write(configuration, order=["id", "experiment", "script", "config", "directory"]))
self.configuration_parameters = configuration
self.generate_setup_from_configuration(configuration, replace_all)
[docs]
def generate_submit(self, name=None, job_type='slurm'):
"""Generates a list of commands based on the permutations for submission.
Args:
name (str): Name of the experiments.
job_type (str): Name of the job type used at submission such
as ee, slurm, jsrun, mpirun, sh, bash.
Returns:
None: Prepares the internal data for the experiments. If ``verbose``
is set, prints them.
"""
if ".json" not in name:
name = f"{name}.json"
if job_type == 'slurm':
cmd = 'sbatch'
elif job_type == 'lsf':
cmd = 'bsub'
else:
cmd = job_type
# else:
# raise RuntimeError(f"Unsupported submission type {type_}")
experiments = json.loads(readfile(name))
# print (experiments)
if experiments is None:
Console.error("please define the experiment parameters")
return ""
for entry in experiments:
if self.verbose:
print(f"# Generate {entry}")
experiment = experiments[entry]
parameters = experiment["experiment"]
directory = experiment["directory"]
script = os.path.basename(experiment["script"])
print(f"(cd {directory} && {cmd} {script}) # {parameters}")
[docs]
def generate_setup_from_configuration(self, configuration, replace_all=True):
"""generates a setup directory from the configuration parameters
Args:
configuration (dict): the configuration dict
Returns:
"""
for identifier in configuration:
experiment = configuration[identifier]
Shell.mkdir(experiment["directory"])
if self.verbose:
print()
Console.info(f"Setup experiment {identifier}")
print(f"- Making dir {experiment['directory']}")
print(f"- write file {experiment['config']}")
# Generate UUID for each perm
experiment["variables"]['ee']['uuid'] = str(uuid.uuid4())
#
# CREATE SLURM SBATCH PARAMETERS FORM A KEY based on experiment.card_name
#
host = experiment["variables"]["system"]["host"]
if "cardname" in experiment["variables"]["experiment"]:
key = experiment["variables"]["experiment"]["card_name"]
rivanna = Rivanna(host=host)
experiment["variables"]["slurm"] = {
"directive": rivanna.directive[host][key],
"sbatch": rivanna.create_slurm_directives(host=host, key=key).strip()
}
elif experiment["variables"]["experiment"]["directive"]:
directive = experiment["variables"]["experiment"]["directive"]
rivanna = Rivanna(host=host)
experiment["variables"]["slurm"] = {
"sbatch": rivanna.create_slurm_directives(host=host, key=directive).strip()
}
if "-" in directive:
experiment["variables"]["experiment"]["card_name"] = directive.split("-")[0]
else:
experiment["variables"]["experiment"]["card_name"] = directive
#
# END GENERATE SLURM SBATCH
#
writefile(experiment["config"], yaml.dump(experiment["variables"], indent=2))
content_config = readfile(experiment["config"])
try:
check = yaml.safe_load(content_config)
except Exception as e:
print(e)
Console.error("We had issues with our check for the config.yaml file")
content_script, replaced = self.generate(
self.template_content,
variables=experiment["variables"])
# if self.verbose:
# for attribute, value in replaced.items():
# print (f"- replaced {attribute}={value}")
writefile(experiment["script"], content_script)
if self.copycode is not None:
for code in self.copycode:
Shell.copy_source(source=code, destination=experiment["directory"])
try:
if replace_all:
c = FlatDict()
c.load(experiment["config"])
c.apply(experiment["script"])
c.apply(experiment["config"])
except Exception as e:
print (e)
raise ValueError
@property
def now(self):
"""The time of now in the format "%Y-m-%d"
Returns:
str: "%Y-m-%d"
"""
return datetime.now().strftime("%Y-m-%d")
[docs]
def save_experiment_configuration(self, name=None):
"""Saves the experiment configuration in a json file
Args:
name (str): name of the configuration file
Returns:
None: writes into the file with given name the json content
"""
if name is not None:
content = json.dumps(self.configuration_parameters, indent=2)
writefile(name, content)