Source code for cloudmesh_client.common.BaseConfigDict

"""Some simple yaml file reader"""

from __future__ import print_function

import ast
import json
import os
import stat
import sys
from collections import OrderedDict
from pprint import pprint
from string import Template

import simplejson
import yaml

from cloudmesh_client.common.Error import Error
from cloudmesh_client.common.util import backup_name, path_expand
from cloudmesh_client.locations import config_file
from cloudmesh_client.logger import LOGGER
from cloudmesh_client.shell.console import Console

log = LOGGER(__file__)
package_dir = os.path.dirname(os.path.abspath(__file__))
attribute_indent = 4


[docs]def check_file_for_tabs(filename, verbose=True): """identifies if the file contains tabs and returns True if it does. It also prints the location of the lines and columns. If verbose is set to False, the location is not printed. :param filename: the filename :rtype: True if there are tabs in the file """ file_contains_tabs = False with open(filename) as f: lines = f.read().split("\n") line_no = 1 for line in lines: if "\t" in line: file_contains_tabs = True location = [ i for i in range(len(line)) if line.startswith('\t', i)] if verbose: Console.error("Tab found in line {} and column(s) {}".format(line_no, str(location).replace("[","").replace("]", "")), traceflag=False) line_no += 1 return file_contains_tabs
# http://stackoverflow.com/questions/5121931/in-python-how-can-you-load-yaml-mappings-as-ordereddicts
[docs]def ordered_load(stream, Loader=yaml.Loader, object_pairs_hook=OrderedDict): """ Loads an ordered dict into a yaml while preserving the order :param stream: the name of the stream :param Loader: the yam loader (such as yaml.SafeLoader) :param object_pairs_hook: the ordered dict """ class OrderedLoader(Loader): pass def construct_mapping(loader, node): loader.flatten_mapping(node) return object_pairs_hook(loader.construct_pairs(node)) OrderedLoader.add_constructor( yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, construct_mapping) return yaml.load(stream, OrderedLoader)
# usage example: # ordered_load(stream, yaml.SafeLoader)
[docs]def ordered_dump(data, stream=None, Dumper=yaml.Dumper, **kwds): """ writes the dict into an ordered yaml. :param data: The ordered dict :param stream: the stream :param Dumper: the dumper such as yaml.SafeDumper """ class OrderedDumper(Dumper): pass def _dict_representer(dumper, data): return dumper.represent_mapping( yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG, data.items()) OrderedDumper.add_representer(OrderedDict, _dict_representer) return yaml.dump(data, stream, OrderedDumper, **kwds)
# usage: # ordered_dump(data, Dumper=yaml.SafeDumper)
[docs]def read_yaml_config(filename, check=True, osreplace=True, exit=True): """ reads in a yaml file from the specified filename. If check is set to true the code will fail if the file does not exist. However if it is set to false and the file does not exist, None is returned. :param filename: the file name :param check: if True fails if the file does not exist, if False and the file does not exist return will be None """ location = filename if location is not None: location = path_expand(location) if not os.path.exists(location) and not check: return None if check and os.path.exists(location): # test for tab in yaml file if check_file_for_tabs(location): log.error("The file {0} contains tabs. yaml " "Files are not allowed to contain tabs".format(location)) sys.exit() result = None try: if osreplace: result = open(location, 'r').read() t = Template(result) result = t.substitute(os.environ) # data = yaml.safe_load(result) data = ordered_load(result, yaml.SafeLoader) else: f = open(location, "r") # data = yaml.safe_load(f) data = ordered_load(result, yaml.SafeLoader) f.close() return data except Exception as e: log.error( "The file {0} fails with a yaml read error".format(filename)) Error.traceback(e) sys.exit() else: log.error("The file {0} does not exist.".format(filename)) if exit: sys.exit() return None
[docs]class OrderedJsonEncoder(simplejson.JSONEncoder): indent = attribute_indent
[docs] def encode(self, o, depth=0): if isinstance(o, OrderedDict): return "{" + ",\n ".join([self.encode(k) + ":" + self.encode(v, depth + 1) for (k, v) in o.items()]) + "}\n" else: return simplejson.JSONEncoder.encode(self, o)
[docs]def custom_print(data_structure, indent): for key, value in data_structure.items(): print("\n%s%s:" % (' ' * attribute_indent * indent, str(key)), end=' ') if isinstance(value, OrderedDict): custom_print(value, indent + 1) elif isinstance(value, dict): custom_print(value, indent + 1) else: print("%s" % (str(value)), end=' ')
[docs]class BaseConfigDict(OrderedDict): """ A class to obtain an OrderedDict from a yaml file. """ def _set_filename(self, filename): """ Sets the filename to be used. :param filename: the filename """ self['filename'] = filename self['location'] = path_expand(self["filename"]) def __init__(self, *args, **kwargs): """ The initalization method """ OrderedDict.__init__(self, *args, **kwargs) if 'filename' in kwargs: self._set_filename(kwargs['filename']) else: log.error("filename not specified") # sys.exit() if os.path.isfile(self['location']): self.load(self['location']) # print ("ATTRIBUTE", attribute) for attribute in ['prefix']: if attribute in kwargs: self[attribute] = kwargs[attribute] else: self[attribute] = None self._update_meta() def _update_meta(self): """ internal function to define the metadata regarding filename, location, and prefix. """ for v in ["filename", "location", "prefix"]: if "meta" not in self: self["meta"] = {} self["meta"][v] = self[v] del self[v]
[docs] def read(self, filename): """ Loads the information in the yaml file. It is the same as load and is used for compatibility reasons. :param filename: the name of the yaml file """ self.load(filename)
[docs] def load(self, filename): """ Loads the yaml file with the given filename. :param filename: the name of the yaml file """ self._set_filename(filename) if os.path.isfile(self['location']): # d = OrderedDict(read_yaml_config(self['location'], check=True)) d = read_yaml_config(self['location'], check=True) with open(self['location']) as myfile: document = myfile.read() x = yaml.load(document) try: self.update(d) except: print("ERROR: can not find", self["location"]) sys.exit() else: print("Error while reading and updating the configuration file {:}".format(filename))
[docs] def make_a_copy(self, location=None): """ Creates a backup of the file specified in the location. The backup filename appends a .bak.NO where number is a number that is not yet used in the backup directory. TODO: This function should be moved to another file maybe XShell :param location: the location of the file to be backed up """ import shutil dest = backup_name(location) shutil.copyfile(location, dest)
[docs] def write(self, filename=None, output="dict", attribute_indent=attribute_indent): """ This method writes the dict into various outout formats. This includes a dict, json, and yaml :param filename: the file in which the dict is written :param output: is a string that is either "dict", "json", "yaml" :param attribute_indent: character indentation of nested attributes in """ if filename is not None: location = path_expand(filename) else: location = self['meta']['location'] # with open('data.yml', 'w') as outfile: # outfile.write( yaml.dump(data, default_flow_style=True) ) # Make a backup self.make_a_copy(location) f = os.open(location, os.O_CREAT | os.O_TRUNC | os.O_WRONLY, stat.S_IRUSR | stat.S_IWUSR) if output == "json": os.write(f, self.json()) elif output in ['yml', 'yaml']: # d = dict(self) # os.write(f, yaml.dump(d, default_flow_style=False)) os.write(f, ordered_dump(OrderedDict(self), Dumper=yaml.SafeDumper, default_flow_style=False, indent=attribute_indent)) elif output == "print": os.write(f, custom_print(self, attribute_indent)) else: os.write(f, self.dump()) os.close(f)
[docs] def error_keys_not_found(self, keys): """ Check if the requested keys are found in the dict. :param keys: keys to be looked for """ try: log.error("Filename: {0}".format(self['meta']['location'])) except: log.error("Filename: {0}".format(self['location'])) log.error("Key '{0}' does not exist".format('.'.join(keys))) indent = "" last_index = len(keys) - 1 for i, k in enumerate(keys): if i == last_index: log.error(indent + k + ": <- this value is missing") else: log.error(indent + k + ":") indent += " "
def __str__(self): """ returns the json output of the dict. """ return self.json()
[docs] def json(self): """ returns the json output of the dict. """ return json.dumps(self, indent=attribute_indent)
[docs] def yaml(self): """ returns the yaml output of the dict. """ return ordered_dump(OrderedDict(self), Dumper=yaml.SafeDumper, default_flow_style=False)
[docs] def dump(self): """ returns the json output of the dict. """ orderedPrinter = OrderedJsonEncoder() return orderedPrinter.encode(self)
[docs] def pprint(self): """ uses pprint to print the dict """ print(custom_print(self, attribute_indent))
""" def __getitem__(self, *mykeys): try: item = self.get(mykeys[0]) except: self._notify_of_error(mykeys) sys.exit() return item """
[docs] def get(self, *keys): """ returns the dict of the information as read from the yaml file. To access the file safely, you can use the keys in the order of the access. Example: get("provisioner","policy") will return the value of config["provisioner"]["policy"] from the yaml file if it does not exists an error will be printing that the value does not exists. Alternatively you can use the . notation e.g. get("provisioner.policy") """ if keys is None: return self if "." in keys[0]: keys = keys[0].split('.') element = self for v in keys: try: element = element[v] except KeyError: self.error_keys_not_found(keys) # sys.exit() return element
[docs] def set(self, value, *keys): """ Sets the dict of the information as read from the yaml file. To access the file safely, you can use the keys in the order of the access. Example: set("{'project':{'fg82':[i0-i10]}}", "provisioner","policy") will set the value of config["provisioner"]["policy"] in the yaml file if it does not exists an error will be printing that the value does not exists. Alternatively you can use the . notation e.g. set("{'project':{'fg82':[i0-i10]}}", "provisioner.policy") """ element = self if keys is None: return self if '.' in keys[0]: keys = keys[0].split(".") nested_str = ''.join(["['{0}']".format(x) for x in keys]) # Safely evaluate an expression to see if it is one of the Python # literal structures: strings, numbers, tuples, lists, dicts, booleans, # and None. Quoted string will be used if it is none of these types. try: ast.literal_eval(str(value)) converted = str(value) except ValueError: converted = "'" + str(value) + "'" exec("self" + nested_str + "=" + converted) return element
def _update(self, keys, value): """Updates the selected key with the value Args: keys (str): key names e.g. cloudmesh.server.loglevel value (str): value to set """ return self.set(value, keys)
[docs] def attribute(self, keys): """ TODO: document this method :param keys: """ if self['meta']['prefix'] is None: k = keys else: k = self['meta']['prefix'] + "." + keys return self.get(k)
if __name__ == "__main__": config = ConfigDict({"a": "1", "b": {"c": 3}}, prefix="cloudmesh.debug", filename="./etc/cloudmesh_debug.yaml") print("PPRINT") print(70 * "=") pprint(config) print("PRINT") print(70 * "=") print(config.pprint()) print(config.json()) print(70 * "=") print("A =", config["a"]) config.write(config_file("/d.yaml"), output="dict") config.write(config_file("/j.yaml"), output="json") config.write(config_file("/y.yaml"), output="yaml") # this does not work # config.write(config_file("/print.yaml"), output="print") print("mongo.path GET =", config.get("cloudmesh.server.mongo.path")) print("mongo.path ATTRIBUTE =", config.attribute("mongo.path")) print("get A =", config.get("a")) print("wrong mongo.path ATTRIBUTE =", config.attribute("mongo.path.wrong")) print("wrong mongo.path GET =", config.get("cloudmesh.server.mongo.path.wrong")) # print config["dummy"] # config["x"] = "2" # print config["x"] # print config.x