Source code for cloudmesh.sbatch.sbatch

import itertools
import json
import os
import pathlib
import typing
import uuid
from collections import OrderedDict
from datetime import datetime
from pprint import pprint

import yaml
from nbconvert.exporters import PythonExporter

from cloudmesh.rivanna.rivanna import Rivanna
from cloudmesh.common.FlatDict import FlatDict
from cloudmesh.common.Printer import Printer
from cloudmesh.common.Shell import Shell
from cloudmesh.common.console import Console
from cloudmesh.common.parameter import Parameter
from cloudmesh.common.util import banner, readfile, writefile
from cloudmesh.common.variables import Variables

PathLike = typing.Union[str, pathlib.Path]
DictOrList = typing.Union[dict, list]
OptPath = typing.Optional[PathLike]
OptStr = typing.Optional[str]


[docs]class SBatch: # def config_read(self, path="./config.yaml"): # print (path) # return None def __init__(self, verbose=False): """ Initialize the SBatch Object :param verbose: If true prints additional infromation when SBatch methods are called :type verbose: bool """ self.flat = FlatDict({}, sep=".") self.data = dict() self.permutations = list() self.experiments = None self.dryrun = False self.verbose = False self.execution_mode = "h" self.input_dir = str(Shell.map_filename(".").path) self.output_dir = str(Shell.map_filename(".").path) self.os_variables = None self.verbose = verbose self.template_path = None self.template_content = None self.configuration_parameters = None self.script_out = None # self.gpu = None self.copycode = None
[docs] def info(self, verbose=None): """ Prints information about the SBatch object for debugging purposes :param verbose: if True prints even more information :type verbose: bool :return: None :rtype: None """ verbose = verbose or self.verbose if not verbose: return for a in [ "dryrun", "verbose", "name", "source", "destination", "attributes", "gpu", "config", "config_files", "directory", "experiment", "execution_mode", "template", "script_output", "output_dir", "input_dir", "script_in", "script_out", "os_variables", "experiments", "copy" ]: # noinspection PyBroadException try: result = getattr(self, a) except: # noqa: E722 result = self.data.get(a) print(f'{a:<12}: {result}') print("permutations:") result = getattr(self, "permutations") pprint(result) print("BEGIN FLAT") pprint(self.flat) print("END FLAT") print() print("BEGIN DATA") pprint(self.data) print("END DATA") print() print("BEGIN YAML") spec = yaml.dump(self.data, indent=2) print(spec) print("END YAML") print("BEGIN SPEC") spec = self.spec_replace(spec) print(spec) print("END SPEC") print("BEGIN PERMUTATION") p = self.permutations pprint(p) print("END PERMUTATION") # self.info() # # self.data = result # print("BEGIN DATA") pprint(self.data) print("END DATA") banner("BEGIN TEMPLATE") print(self.template_content) banner("END TEMPLATE")
[docs] @staticmethod def update_with_directory(directory, filename): """ prefix with the directory if the filename is not starting with . / ~ :param directory: the string value of the directory :type directory: str :param filename: the filename :type filename: str :return: directory/filename :rtype: str """ if directory is None: return filename elif not filename.startswith("/") and not filename.startswith(".") and not filename.startswith("~"): return f"{directory}/{filename}" else: return filename
[docs] def get_data(self, flat=False): """ converts the data from the yaml file with the flatdict :param flat: if set to true uses flatdict :type flat: boolen :return: result of flatdict without the seperator :rtype: dict """ result = self.data if flat: from cloudmesh.common.FlatDict import FlatDict result = FlatDict(self.data, sep=".") del result["sep"] return result
[docs] def spec_replace(self, spec): """ given a spec in yaml format, replaces all values in the yaml file that are of the form "{a.b}" with the value of a: b: value if it is defined in the yaml file :param spec: yaml string :type spec: str :return: replaced yaml file :rtype: str """ banner("SPECIFICATION") print(spec) data = FlatDict() data.loads(spec) banner("FLATDICT") pprint(data.__dict__) spec1 = str(data.__dict__) print(str(spec1[1:-1])) banner("MUNCH") # # should be replaced with flatdct aplied on config.yaml file # import re import munch variables = re.findall(r"\{\w.+\}", spec) data = yaml.load(spec, Loader=yaml.SafeLoader) m = munch.DefaultMunch.fromDict(data) for o in range(0,4): for i in range(0, len(variables)): for variable in variables: text = variable variable = variable[1:-1] # noinspection PyBroadException try: value = eval("m.{variable}".format(**locals())) if "{" not in value: spec = spec.replace(text, value) except: # noqa: E722 value = variable banner("END MUNCH") return spec
[docs] def update_from_os(self, variables): """ LOads all variables from os.environ into self.data with os.name :param variables: the name of the variables such as "HOME" :type variables: [str] :return: self.data with all variaples added with os.name: value :rtype: dict """ if variables is not None: if os not in self.data: self.data["os"] = {} for key in variables: self.data["os"][key] = os.environ[key] return self.data
[docs] def load_source_template(self, script): """ Registers and reads the template script in for processing This method must be run at least once prior to generating the batch script output. :param script: A string that is the path to the template script. :type script: str :return: The text of the template file unaltered. :rtype: str """ self.template_path = script self.template_content = readfile(script) return self.template_content
[docs] def update_from_dict(self, d): """ Add a dict to self. data :param d: dictionary :type d: dict :return: self.data with updated dict :rtype: dict """ self.data.update(d) return self.data
[docs] def update_from_attributes(self, attributes: str): """ attributes are of the form "a=1,b=3" :param attributes: A string to expand into key-value pairs :type attributes: :return: self.data with updated dict :rtype: dict """ flatdict = Parameter.arguments_to_dict(attributes) d = FlatDict(flatdict, sep=".") d = d.unflatten() del d["sep"] self.update_from_dict(d) return self.data
[docs] def update_from_os_environ(self): """ Updates the config file output to include OS environment variables :return: The current value of the data configuration variable :rtype: dict """ self.update_from_dict(dict(os.environ)) return self.data
[docs] def update_from_cm_variables(self, load=True): """ Adds Cloudmesh variables to the class's data parameter as a flat dict. :param load: Toggles execution; if false, method does nothing. :type load: bool :return: self.data with updated cloudmesh variables :rtype: dict """ if load: variables = Variables() v = FlatDict({"cloudmesh": variables.dict()}, sep=".") d = v.unflatten() del d["sep"] self.update_from_dict(d) return self.data
@staticmethod def _suffix(filename): """ :param filename: Returns the file suffix of a filename :type filename: str :return: the suffix of the filename :rtype: str """ return pathlib.Path(filename).suffix
[docs] def update_from_file(self, filename): """ Updates the configuration self.data with the data within the passed file. :param filename: The path to the configuration file (yaml, json, py, ipynb) :type filename: str :return: self.data with updated cloudmesh variables from the specified file :rtype: dict """ if self.verbose: print(f"Reading variables from {filename}") suffix = self._suffix(filename).lower() content = readfile(filename) if suffix in [".json"]: values = json.loads(content) elif suffix in [".yml", ".yaml"]: content = readfile(filename) values = yaml.safe_load(content) elif suffix in [".py"]: modulename = filename.replace(".py", "").replace("/", "_").replace("build_", "") from importlib.machinery import SourceFileLoader mod = SourceFileLoader(modulename, filename).load_module() values = {} for name, value in vars(mod).items(): if not name.startswith("__"): values[name] = value elif suffix in [".ipynb"]: py_name = filename.replace(".ipynb", ".py") jupy = PythonExporter() body, _ = jupy.from_filename(filename) writefile(py_name, body) # Shell.run(f"jupyter nbconvert --to python {filename}") filename = py_name modulename = filename.replace(".py", "").replace("/", "_").replace("build_", "") from importlib.machinery import SourceFileLoader mod = SourceFileLoader(modulename, filename).load_module() values = {} for name, value in vars(mod).items(): if not name.startswith("__"): values[name] = value else: raise RuntimeError(f"Unsupported config type {suffix}") self.update_from_dict(values) # self.read_config_from_dict(regular_dict) if values is not None and 'experiment' in values: experiments = values['experiment'] for key, value in experiments.items(): print(key, value) # noinspection PyBroadException,PyPep8 try: experiments[key] = Parameter.expand(value) except: experiments[key] = [value] self.permutations = self.permutation_generator(experiments) return self.data
[docs] def generate(self, script=None, variables=None, fences=("{", "}")): """ Expands the script template given the passed configuration. :param script: The string contents of the script file. :type script: str :param variables: the variables to be replaced, if ommitted uses the internal variables found :type variables: dict :param fences: A 2 position tuple, that encloses template variables (start and end). :type fences: (str,str) :return: The script that has expanded its values based on `data`. :rtype: str """ replaced = {} if variables is None: variables = self.data if script is None: script = self.template_content content = str(script) flat = FlatDict(variables, sep=".") for attribute in flat: value = flat[attribute] frame = fences[0] + attribute + fences[1] if frame in content: if self.verbose: print(f"- Expanding {frame} with {value}") replaced[attribute] = value content = content.replace(frame, str(value)) return content, replaced
[docs] @staticmethod def permutation_generator(exp_dict): """ Creates a cartisian product of a {key: list, ...} object. :param exp_dict: The dictionary to process :type exp_dict: dict :return: A list of dictionaries containing the resulting cartisian product. :rtype: list For example my_dict = {"key1": ["value1", "value2"], "key2": ["value3", "value4"]} out = permutation_generator(my_dict) out # [{"key1": "value1", "key2": 'value3"}, # {"key1": "value1", "key2": "value4"}, # {"key1": "value2", "key2": "value3"}, # {"key1": "value2", "key2": "value4"} """ keys, values = zip(*exp_dict.items()) return [dict(zip(keys, value)) for value in itertools.product(*values)]
[docs] def generate_experiment_permutations(self, variable_str): """ Generates experiment permutations based on the passed string and appends it to the current instance. :param variable_str: A Parameter.expand string (such as epoch=[1-3] x=[1,4] y=[10,11]) :type variable_str: str :return: list with permutations over the experiment variables :rtype: list """ experiments = OrderedDict() entries = variable_str.split(' ') for entry in entries: k, v = entry.split("=") experiments[k] = Parameter.expand(v) self.permutations = self.permutation_generator(experiments) return self.permutations
@staticmethod def _generate_bootstrapping(permutation): """ creates an identifier, a list of assignments, ad values. :param permutation: the permutation list :type permutation: list :return: identifier, assignments, values :rtype: str, list, list """ values = list() for attribute, value in permutation.items(): values.append(f"{attribute}_{value}") assignments = list() for attribute, value in permutation.items(): assignments.append(f"{attribute}={value}") assignments = " ".join(assignments) identifier = "_".join(values) return identifier, assignments, values def _generate_hierarchical_config(self): """ Creates a hierarchical directory with configuration yaml files, and shell script :return: directory with configuration and yaml files :rtype: dict """ """Runs process to build out all templates in a hierarchical-style Returns: None. Side Effects: Writes two files for each established experiment, each in their own directory. """ if self.verbose: print("Outputting Hierarchical Experiments") configuration = dict() self.script_variables = [] suffix = self._suffix(self.script_out) directory = self.output_dir # .path.dirname(name) for permutation in self.permutations: identifier, assignments, values = self._generate_bootstrapping(permutation) if self.verbose: print(identifier, assignments, values) spec = yaml.dump(self.data, indent=2) spec = self.spec_replace(spec) print (type(spec)) variables = yaml.safe_load(spec) print ("VARIABLES") pprint (variables) print ("END VARIABLES") name = os.path.basename(self.script_out) script = f"{directory}/{identifier}/{name}" config = f"{directory}/{identifier}/config.yaml" variables.update({'experiment': permutation}) variables["sbatch"]["identifier"] = identifier configuration[identifier] = { "id": identifier, "directory": f"{directory}/{identifier}", "experiment": assignments, "script": script, "config": config, "variables": variables, "copycode": self.copycode } return configuration
[docs] def generate_experiment_batch_scripts(self, out_mode=None, replace_all=True): """ Utility method to genrerate either hierarchical or flat outputs; or debug. NOte the falt mode is no longer supported :param out_mode: The mode of operation. One of: "debug", "flat", "hierarchical" :type out_mode: string :return: generates the batch scripts :rtype: None """ mode = self.execution_mode if out_mode is None else out_mode.lower() if mode.startswith("d"): Console.warning("This is just debug mode") print() for permutation in self.permutations: values = "" for attribute, value in permutation.items(): values = values + f"{attribute}={value} " script = f"{self.output_dir}/{self.script_out}{values}".replace("=", "_") else: configuration = None if mode.startswith("h"): configuration = self._generate_hierarchical_config() else: raise RuntimeError(f"Invalid generator mode {mode}") if self.verbose: banner("Script generation") print(Printer.write(configuration, order=["id", "experiment", "script", "config", "directory"])) self.configuration_parameters = configuration self.generate_setup_from_configuration(configuration, replace_all)
[docs] def generate_submit(self, name=None, job_type='slurm'): """ Generates a list of commands based on the permutations for submission :param name: Name of the experiments :type name: str :param job_type: name of the job type used at submission such as sbatch, slurm, jsrun, mpirun, sh, bash :type job_type: str :return: prepars the internal data for the experiments, if set to verbose, prints them :rtype: None """ if ".json" not in name: name = f"{name}.json" if job_type == 'slurm': cmd = 'sbatch' elif job_type == 'lsf': cmd = 'bsub' else: cmd = job_type # else: # raise RuntimeError(f"Unsupported submission type {type_}") experiments = json.loads(readfile(name)) # print (experiments) if experiments is None: Console.error("please define the experiment parameters") return "" for entry in experiments: if self.verbose: print(f"# Generate {entry}") experiment = experiments[entry] parameters = experiment["experiment"] directory = experiment["directory"] script = os.path.basename(experiment["script"]) print(f"{parameters} cd {directory} && {cmd} {script}")
[docs] def generate_setup_from_configuration(self, configuration, replace_all=True): """ generates a setup directory from the configuration parameters :param configuration: the configuration dict :type configuration: dict :return: :rtype: """ for identifier in configuration: experiment = configuration[identifier] Shell.mkdir(experiment["directory"]) if self.verbose: print() Console.info(f"Setup experiment {identifier}") print(f"- Making dir {experiment['directory']}") print(f"- write file {experiment['config']}") # Generate UUID for each perm experiment["variables"]['sbatch']['uuid'] = str(uuid.uuid4()) # # CREATE SLURM SBATCH PARAMETERS FORM A KEY based on experiment.card_name # host = experiment["variables"]["system"]["host"] key = experiment["variables"]["experiment"]["card_name"] rivanna = Rivanna(host=host) experiment["variables"]["slurm"] = { "directive": rivanna.directive[host][key], "sbatch": rivanna.create_slurm_directives(host=host, key=key).strip() } # # END GENERATE SLURM SBATCH # writefile(experiment["config"], yaml.dump(experiment["variables"], indent=2)) content_config = readfile(experiment["config"]) try: check = yaml.safe_load(content_config) except Exception as e: print(e) Console.error("We had issues with our check for the config.yaml file") content_script, replaced = self.generate( self.template_content, variables=experiment["variables"]) # if self.verbose: # for attribute, value in replaced.items(): # print (f"- replaced {attribute}={value}") writefile(experiment["script"], content_script) if self.copycode is not None: for code in self.copycode: Shell.copy_source(source=code, destination=experiment["directory"]) try: if replace_all: c = FlatDict() c.load(experiment["config"]) c.apply(experiment["script"]) c.apply(experiment["config"]) except Exception as e: print (e) raise ValueError
@property def now(self): """ The time of now in the format "%Y-m-%d" :return: "%Y-m-%d" :rtype: str """ return datetime.now().strftime("%Y-m-%d")
[docs] def save_experiment_configuration(self, name=None): """ Saves the experiment configuration in a json file :param name: name of the configuration file :type name: str :return: writes into the file with given name the json content :rtype: None """ if name is not None: content = json.dumps(self.configuration_parameters, indent=2) writefile(name, content)