Source code for cloudmesh_client.common.ConfigDict

from __future__ import print_function

import json
import os.path
from collections import OrderedDict
import sys
import yaml
import shutil

from cloudmesh_client.common.Shell import Shell
from cloudmesh_client.common.BaseConfigDict import BaseConfigDict
from cloudmesh_client.common.todo import TODO
from cloudmesh_client.common.util import path_expand
from import Console
from cloudmesh_client.common.util import backup_name

[docs]def custom_print(data_structure, indent, attribute_indent=4): for key, value in data_structure.items(): print("\n%s%s:" % (' ' * attribute_indent * indent, str(key)), end=' ') if isinstance(value, OrderedDict): custom_print(value, indent + 1) elif isinstance(value, dict): custom_print(value, indent + 1) else: print("%s" % (str(value)), end=' ')
[docs]def ordered_dump(data, stream=None, Dumper=yaml.Dumper, **kwds): """ writes the dict into an ordered yaml. :param data: The ordered dict :param stream: the stream :param Dumper: the dumper such as yaml.SafeDumper """ class OrderedDumper(Dumper): pass def _dict_representer(dumper, data): return dumper.represent_mapping( yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, data.items()) OrderedDumper.add_representer(OrderedDict, _dict_representer) return yaml.dump(data, stream, OrderedDumper, **kwds)
# noinspection PyPep8Naming
[docs]def dprint(OD, mode='dict', s="", indent=' ' * 4, level=0): def is_number(s): try: float(s) return True except ValueError: return False def fstr(s): return s if is_number(s) else '"%s"' % s if mode != 'dict': kv_tpl = '("%s", %s)' ST = 'OrderedDict([\n' END = '])' else: kv_tpl = '"%s": %s' ST = '{\n' END = '}' for i, k in enumerate(OD.keys()): if type(OD[k]) in [dict, OrderedDict]: level += 1 s += (level - 1) * indent + kv_tpl % (k, ST + dprint(OD[k], mode=mode, indent=indent, level=level) + ( level - 1) * indent + END) level -= 1 else: s += level * indent + kv_tpl % (k, fstr(OD[k])) if i != len(OD) - 1: s += "," s += "\n" return s
[docs]class Config(object):
[docs] @classmethod def check_file_for_tabs(cls, filename, verbose=True): """identifies if the file contains tabs and returns True if it does. It also prints the location of the lines and columns. If verbose is set to False, the location is not printed. :param filename: the filename :type filename: str :rtype: True if there are tabs in the file """ filename = path_expand(filename) file_contains_tabs = False with file(filename) as f: lines ="\n") line_no = 1 for line in lines: if "\t" in line: file_contains_tabs = True location = [ i for i in range(len(line)) if line.startswith('\t', i)] if verbose: print("Tab found in line", line_no, "and column(s)", location) line_no += 1 return file_contains_tabs
[docs] @classmethod def path_expand(cls, path): """ expands the path while replacing environment variables, ./, and ~/ :param path: the path to be expanded :type path: string :return:the new path :rtype: string """ current_dir = "." + os.path.sep if path.startswith(current_dir): cwd = str(os.getcwd()) path = path.replace(current_dir, cwd, 1) location = os.path.expandvars(os.path.expanduser(path)) return location
[docs] @classmethod def find_file(cls, filename, load_order=None, verbose=False): """ find the specified file in the list of directories that are given in the array load_order :param filename: the file name :type filename: str :param load_order: an array with path names in with the filename is looked for. :type load_order: list of str :param verbose: :type verbose: bool :return: file name if successful :rtype: string if the file exists or None otherwise """ if load_order is None: load_order = [".", os.path.join("~", ".cloudmesh")] for path in load_order: name = Config.path_expand(path + os.path.sep + filename) if verbose: print("try finding file", name) if os.path.isfile(name): if verbose: print("Found File", name) return name return None
[docs]class ConfigDict(object): __shared_state = {} versions = ['4.1'] data = {} filename = None def __init__(self, filename, load_order=None, verbose=False, etc=False, reload=False): """ Creates a dictionary from a yaml configuration file while using the filename to load it in the specified load_order. The load order is an array of paths in which the file is searched. By default the load order is set to . and ~/.cloudmesh :param filename: the filename :type filename: string :param load_order: an array with path names in with the filename is looked for. :type load_order: list of strings :return: an instance of ConfigDict :rtype: ConfigDict """ self.__dict__ = self.__shared_state if != {} and not reload: return # print ("INIT CONFIGDICT", filename, load_order, verbose, etc) = None if etc: import cloudmesh_client.etc load_order = [os.path.dirname(cloudmesh_client.etc.__file__)] if load_order is None: self.load_order = [".", os.path.join("~", ".cloudmesh")] else: self.load_order = load_order for path in self.load_order: name = Config.path_expand(os.path.join(path, filename)) if verbose: print("try Loading ConfigDict", name) if os.path.isfile(name): if verbose: print("Loading ConfigDict", name) self.load(name) ConfigDict.filename = name = return # Create default yaml file raise ValueError("Could not find file {:} in {:}".format(filename, self.load_order))
[docs] def load(self, filename): """ loads the configuration from the yaml filename :param filename: :type filename: string :return: """ # print ("LOAD CONFIGDICT", filename) = BaseConfigDict(filename=Config.path_expand(filename)) try: version = str(["meta"]["version"]) if version not in self.versions: Console.error("The yaml file version must be {}".format(', '.join(self.versions))) sys.exit(1) except Exception as e: Console.error("Your yaml file ~/.cloudmesh/cloudmesh.yaml is not up to date.", traceflag=False) Console.error(e.message, traceflag=False) sys.exit(1)
# return
[docs] def write(self, filename=None, output="dict", attribute_indent=4): """ This method writes the dict into various outout formats. This includes a dict, json, and yaml :param filename: the file in which the dict is written :param output: is a string that is either "dict", "json", "yaml" :param attribute_indent: character indentation of nested attributes in """ if filename is not None: location = path_expand(filename) else: location = self['meta']['location'] # with open('data.yml', 'w') as outfile: # outfile.write( yaml.dump(data, default_flow_style=True) ) # Make a backup self.make_a_copy(location) f =, os.O_CREAT | os.O_TRUNC | os.O_WRONLY, stat.S_IRUSR | stat.S_IWUSR) if output == "json": os.write(f, self.json()) elif output in ['yml', 'yaml']: # d = dict(self) # os.write(f, yaml.dump(d, default_flow_style=False)) os.write(f, ordered_dump(OrderedDict(self), Dumper=yaml.SafeDumper, default_flow_style=False, indent=attribute_indent)) elif output == "print": os.write(f, custom_print(self, attribute_indent)) else: os.write(f, self.dump()) os.close(f)
[docs] def make_a_copy(self, location=None): """ Creates a backup of the file specified in the location. The backup filename appends a .bak.NO where number is a number that is not yet used in the backup directory. TODO: This function should be moved to another file maybe XShell :param location: the location of the file to be backed up """ import shutil dest = backup_name(location) shutil.copyfile(location, dest)
[docs] def save(self, filename=None): """ saves the configuration in the given filename, if it is none the filename at load time is used. :param filename: the file name :type filename: string :return: """ content = with open(Config.path_expand(ConfigDict.filename), 'w') as f: f.write(content)
def __setitem__(self, item, value): """ sets an item with the given value while using . formatted keys set('a.b.c", value) :param item: :type item: :param value: :type value: :return: """ keys = None if "." in item: keys = item.split(".") else: element =[item] element =[keys[0]] for key in keys[1:]: element = element[key] element = value def __getitem__(self, item): """ gets an item form the dict. The key is . separated use it as follows get("a.b.c") :param item: :type item: :return: """ if "." in item: keys = item.split(".") else: return[item] element =[keys[0]] for key in keys[1:]: element = element[key] return element def __str__(self): """ returns the dict in yaml format :return: returns the yaml output of the dict :rtype: string """ return @property def yaml(self): """ returns the dict in yaml format :return: returns the yaml output of the dict :rtype: string: """ return
[docs] def info(self): """ prints out the dict type and its content """ print(type( print(
@property def json(self, start=None): """ :param start: start key in dot notation returns the dict in json format :return: json string version :rtype: string """ if start is not None: data =[start] return json.dumps(, indent=4)
[docs] @classmethod def check(cls, filename): """ checks the filename if it is syntactically correct and does not include tabs :param filename: :type filename: string :return: """ TODO.implement()
[docs] @classmethod def getUser(cls, cloud): try: config = d = ConfigDict("cloudmesh.yaml") d = ConfigDict("cloudmesh.yaml") # # bug: cloud is none when adding a group # config = d["cloudmesh"]["clouds"][cloud] credentials = config["credentials"] cloud_type = config["cm_type"] if cloud_type == "openstack": return credentials["OS_USERNAME"] else: raise ValueError("getUser for this cloud type not yet " "supported: {}".format(cloud)) except Exception as ex: Console.error("problem getting user")
# noinspection PyPep8Naming
[docs]def Username(): d = ConfigDict("cloudmesh.yaml") if "user" not in d["cloudmesh"]["profile"]: raise RuntimeError("Profile username is not set in yaml file.") user = d["cloudmesh"]["profile"]["user"] return user
[docs]def main(): d = ConfigDict("cloudmesh.yaml") print(d, end='') print(d["meta"]) print(d["meta.kind"]) print(d["meta"]["kind"]) # this does not yet work["cloudmesh"]["profile"]["firstname"] = 'ABC' print(d) import os os.system("cat cmd3.yaml") print(d.json) print(d.filename) print("YAML") print(d.yaml)
if __name__ == "__main__": main()