from __future__ import print_function
import cmd
import datetime
import os
import string
import sys
import textwrap
from cloudmesh_client.common.ConfigDict import ConfigDict
from cloudmesh_client.common.Error import Error
from cloudmesh_client.common.Shell import Shell
from cloudmesh_client.common.util import path_expand
from cloudmesh_client.shell.console import Console
from cloudmesh_client.var import Var
# noinspection PyPep8
import cloudmesh_client
from cloudmesh_client.default import Default
from cloudmesh_client.common.util import get_python
from cloudmesh_client.common.util import check_python
import cloudmesh_client
from cloudmesh_client.common.Printer import Printer
from cloudmesh_client.shell.command import command
from cloudmesh_client.shell.command import PluginCommand
from cloudmesh_client.common.ssh_config import ssh_config
import cloudmesh_client.etc
from cloudmesh_client.cloud.secgroup import SecGroup
from cloudmesh_client.cloud.key import Key
import cloudmesh_client.shell.plugins
from cloudmesh_client.common.StopWatch import StopWatch
from cloudmesh_client.db import CloudmeshDatabase
from cloudmesh_client import setup_yaml
import importlib
cm = CloudmeshDatabase()
[docs]class CloudmeshContext(object):
def __init__(self, **kwargs):
self.__dict__ = kwargs
PluginCommandClasses = type(
'CommandProxyClass',
tuple(PluginCommand.__subclasses__()),
{})
# print (type(PluginCommand.__subclasses__()))
# print (PluginCommand.__subclasses__())
"""
# not yet implemented
class ConsoleClasses(object):
def __init__(self, *command_classes):
classes = []
for c in command_classes:
classes.append(PluginCommand.__subclasses__())
print (classes)
PluginCommandClasses = type(
'CommandProxyClass',
tuple(classes),
{})
return PluginCommandClasses
"""
# console = ConsoleFactory(PluginCommand)https://www.rackmountsales.com/kvm-switches/kvm-switch-8-port-combo-usb-ps-2-kvm-switch-sun-imac-compatible-part-kvm-s8.html
# noinspection PyBroadException,PyPep8Naming
[docs]class CloudmeshConsole(cmd.Cmd, PluginCommandClasses):
# class CloudmeshConsole(cmd.Cmd,
# ConsoleClasses(PluginCommand)):
[docs] def precmd(self, line):
StopWatch.start("command")
return line
[docs] def postcmd(self, stop, line):
StopWatch.stop("command")
if Default.timer:
print("Timer: {:.4f}s ({})".format(StopWatch.get("command"),
line.strip()))
return stop
[docs] def onecmd(self, line):
"""Interpret the argument as though it had been typed in response
to the prompt.
This may be overridden, but should not normally need to be;
see the precmd() and postcmd() methods for useful execution hooks.
The return value is a flag indicating whether interpretation of
commands by the interpreter should stop.
"""
line = self.replace_vars(line)
if line is None:
return ""
if line.startswith("!"):
line.replace("!", "! ")
line = self.var_replacer(line)
if line != "hist" and line:
self._hist += [line.strip()]
if line.startswith("!") or line.startswith("shell"):
self.do_shell_exec(line[1:])
return ""
cmd, arg, line = self.parseline(line)
if not line:
return self.emptyline()
if os.path.isfile(line):
self.do_exec(line)
return ""
if line.startswith('#') \
or line.startswith('//') \
or line.startswith('/*'):
print(line)
return self.emptyline()
if cmd is None:
return self.default(line)
self.lastcmd = line
if line == 'EOF':
self.lastcmd = ''
if cmd == '':
return self.default(line)
else:
try:
func = getattr(self, 'do_' + cmd)
except AttributeError:
return self.default(line)
return func(arg)
[docs] def register_topics(self):
topics = {}
for command in PluginCommand.__subclasses__():
tmp = command.topics.copy()
topics.update(tmp)
for name in topics:
self.register_command_topic(topics[name], name)
for name in ["q", "EOF", "man", "version", "help", "history",
"pause", "quit", "var"]:
self.register_command_topic("shell", name)
def __init__(self, context):
cmd.Cmd.__init__(self)
self.variables = {}
self.command_topics = {}
self.register_topics()
self.context = context
# TODO get loglevel from DB or yaml file, if not defined set to ERROR
self.loglevel = "DEBUG"
self._hist = []
if self.context.debug:
print("init CloudmeshConsole")
self.prompt = 'cm> '
self.doc_header = "Documented commands (type help <command>):"
self.banner = textwrap.dedent("""
+=======================================================+
. ____ _ _ _ .
. / ___| | ___ _ _ __| |_ __ ___ ___ ___| |__ .
. | | | |/ _ \| | | |/ _` | '_ ` _ \ / _ \/ __| '_ \ .
. | |___| | (_) | |_| | (_| | | | | | | __/\__ \ | | | .
. \____|_|\___/ \__,_|\__,_|_| |_| |_|\___||___/_| |_| .
+=======================================================+
Cloudmesh Shell
""")
# KeyCommands.__init__(self, context)
#
# set default cloud and default group if they do not exist
# use the first cloud in cloudmesh.yaml as default
#
Console.set_debug(Default.debug)
filename = path_expand("~/.cloudmesh/cloudmesh.yaml")
# moved to import cloudmesh_client
# create_cloudmesh_yaml(filename)
setup_yaml()
# Initialize Logging
# LogUtil.initialize_logging()
# sys,exit(1)
# ##################
# DEFAULTS
#
#
# SET DEFAULT CLOUD
#
value = Default.get(name='cloud', category='general')
if value is None:
config = ConfigDict(filename=filename)["cloudmesh"]
if 'active' in config:
cloud = config["active"][0]
else:
clouds = config["clouds"]
cloud = list(clouds.keys())[0]
Default.set('cloud', cloud, category='general')
#
# NOT SURE WHAT THIS IS FOR
#
value = Default.get(name='default', category='general')
if value is None:
Default.set('default', 'default', category='general')
#
# SET DEFAULT CLUSTER
#
'''
cluster = ConfigDict(filename="cloudmesh.yaml")["cloudmesh"]["active"][0]
value = Default.get(name='cluster', category='general')
if value is None:
try:
hosts = ssh_config().names()
if hosts is not None:
cluster = hosts[0]
except:
pass # use the hardcoded cluster
else:
cluster = value
Default.set('cluster', cluster, category='general')
'''
#
# SET DEFAULT GROUP
#
group = Default.group
if group is None:
Default.set_group("default")
#
# LOAD DEFAULTS FROM YAML
#
Default.load("cloudmesh.yaml")
try:
d = Key.get_from_dir("~/.ssh", store=False)
except Exception as e:
Console.error(e.message)
#
# SET DEFAULT TIMER
#
on = Default.timer
#
# SET DEFUALT SECGROUP
#
#
# SET DEFAULT REFRESH
#
# r = Default.refresh
# print ("REFRESH", r)
# if r is None:
# Default.set_refresh("on")
#
# SET DEFAULT USER
#
user = Default.user
if user is None:
user = ConfigDict(filename=filename)["cloudmesh"]["profile"]["user"]
Default.set_user(user)
r = Default.secgroup
if r is None:
SecGroup.reset_defaults()
'''
#secgroup = "{}-default".format(Default.user)
secgroup = "default"
Default.set_secgroup(secgroup)
SecGroup.add_rule_to_db(group=secgroup, name="ssh",from_port="22",to_port="22",protocol="tcp", cidr="0.0.0.0/0")
SecGroup.add_rule_to_db(group=secgroup, name="http",from_port="80",to_port="80",protocol="tcp", cidr="0.0.0.0/0")
SecGroup.add_rule_to_db(group=secgroup, name="https", from_port="443", to_port="443", protocol="tcp", cidr="0.0.0.0/0")
SecGroup.add_rule_to_db(group=secgroup, name="ping", from_port="0", to_port="0", protocol="icmp", cidr="0.0.0.0/0")
'''
"""
try:
sshm = SSHKeyManager()
m = sshm.get_from_yaml(
load_order="~/.cloudmesh/cloudmesh.yaml")
d = dict(m.__keys__)
sshdb = SSHKeyDBManager()
for keyname in m.__keys__:
filename = m[keyname]["path"]
try:
sshdb.add(filename,
keyname,
source="yaml",
uri="file://" + filename)
except Exception as e:
pass
except Exception as e:
Console.error("Problem adding keys from yaml file")
"""
for c in CloudmeshConsole.__bases__[1:]:
# noinspection PyArgumentList
c.__init__(self, context)
[docs] def preloop(self):
"""adds the banner to the preloop"""
if self.context.splash:
lines = textwrap.dedent(self.banner).split("\n")
for line in lines:
# Console.cprint("BLUE", "", line)
print(line)
# noinspection PyUnusedLocal
[docs] def do_EOF(self, args):
"""
::
Usage:
EOF
Description:
Command to the shell to terminate reading a script.
"""
return True
# noinspection PyUnusedLocal
[docs] def do_quit(self, args):
"""
::
Usage:
quit
Description:
Action to be performed whne quit is typed
"""
return True
do_q = do_quit
[docs] def emptyline(self):
return
[docs] def load_instancemethod(self, location):
module_name, class_name = location.rsplit(".", 1)
f = getattr(importlib.import_module(module_name), class_name)
setattr(self, f.__name__, f)
# noinspection PyUnusedLocal
[docs] def do_context(self, args):
"""
::
Usage:
context
Description:
Lists the context variables and their values
"""
"""
:param args:
:return:
"""
print(self.context.__dict__)
# noinspection PyUnusedLocal
@command
def do_version(self, args, arguments):
"""
Usage:
version [--format=FORMAT] [--check=CHECK]
Options:
--format=FORMAT the format to print the versions in [default: table]
--check=CHECK boolean tp conduct an additional check [default: True]
Description:
Prints out the version number
"""
python_version, pip_version = get_python()
try:
git_hash_version = Shell.execute('git', 'log -1 --format=%h', traceflag=False, witherror=False)
except:
git_hash_version = 'N/A'
versions = {
"cloudmesh_client": {
"name": "cloudmesh_client",
"version": str(cloudmesh_client.__version__)
},
# "cloudmesh_base": {
# "name": "cloudmesh_base",
# "version": str(cloudmesh_base.__version__)
# },
"python": {
"name": "python",
"version": str(python_version)
},
"pip": {
"name": "pip",
"version": str(pip_version)
},
"git": {
"name": "git hash",
"version": str(git_hash_version)
}
}
print(Printer.write(versions, output=arguments["--format"],
order=["name", "version"]))
if arguments["--check"] in ["True"]:
check_python()
[docs] def register_command_topic(self, topic, command_name):
try:
tmp = self.command_topics[topic]
except:
self.command_topics[topic] = []
self.command_topics[topic].append(command_name)
[docs] def do_help(self, arg):
"""
::
Usage:
help
help COMMAND
Description:
List available commands with "help" or detailed help with
"help COMMAND"."""
if arg:
try:
func = getattr(self, 'help_' + arg)
except AttributeError:
try:
doc = getattr(self, 'do_' + arg).__doc__
if doc:
self.stdout.write("%s\n" % str(doc))
return
except AttributeError:
pass
self.stdout.write("%s\n" % str(self.nohelp % (arg,)))
return
func()
else:
names = self.get_names()
cmds_doc = []
cmds_undoc = []
help_page = {}
for name in names:
if name[:5] == 'help_':
help_page[name[5:]] = 1
names.sort()
# There can be duplicates if routines overridden
prevname = ''
for name in names:
if name[:3] == 'do_':
if name == prevname:
continue
prevname = name
cmd = name[3:]
if cmd in help_page:
cmds_doc.append(cmd)
del help_page[cmd]
elif getattr(self, name).__doc__:
cmds_doc.append(cmd)
else:
cmds_undoc.append(cmd)
self.stdout.write("%s\n" % str(self.doc_leader))
self.print_topics(self.doc_header, cmds_doc, 15, 80)
self.print_topics(self.misc_header, list(help_page.keys()), 15, 80)
self.print_topics(self.undoc_header, cmds_undoc, 15, 80)
for topic in self.command_topics:
topic_cmds = sorted(self.command_topics[topic], key=str.lower)
self.print_topics(string.capwords(topic + " commands"),
topic_cmds, 15, 80)
[docs] def help_help(self):
"""
::
Usage:
help NAME
Prints out the help message for a given function
"""
print(textwrap.dedent(self.help_help.__doc__))
[docs] def do_exec(self, filename):
"""
::
Usage:
exec FILENAME
executes the commands in the file. See also the script command.
Arguments:
FILENAME The name of the file
"""
if not filename:
Console.error("the command requires a filename as parameter")
return
if os.path.exists(filename):
with open(filename, "r") as f:
for line in f:
if self.context.echo:
Console.ok("cm> {:}".format(str(line)))
self.precmd(line)
stop = self.onecmd(line)
self.postcmd(stop, line)
else:
Console.error('file "{:}" does not exist.'.format(filename))
sys.exit()
# noinspection PyUnusedLocal
[docs] def do_shell_exec(self, args):
# just ignore arguments and pass on args
command = path_expand(args)
try:
os.system(command)
except Exception as e:
Console.error(e.msg, traceflag=False)
# noinspection PyUnusedLocal
# BUG: this does not for some reason execute the arguments
@command
def do_shell(self, args, arguments):
"""
Usage:
shell ARGUMENTS...
Description:
Executes a shell command
"""
# just ignore arguments and pass on args
command = path_expand(args)
try:
os.system(command)
except Exception as e:
Console.error(e, traceflag=False)
#
# VAR
#
[docs] def update_time(self):
time = datetime.datetime.now().strftime("%H:%M:%S")
date = datetime.datetime.now().strftime("%Y-%m-%d")
self.variables['time'] = time
self.variables['date'] = date
[docs] def var_finder(self, line, c='$'):
line = line.replace('$', ' $').strip()
words = line.replace('-', ' ').replace('_', ' ').split(" ")
variables = []
for word in words:
if word.startswith('$'):
variables.append(word)
vars = {
"normal": [],
"os": [],
"dot": []
}
for word in variables:
word = word.replace('$', "")
if word.startswith('os.'):
vars["os"].append(word)
elif '.' in word:
vars["dot"].append(word)
else:
vars["normal"].append(word)
return vars
[docs] def var_replacer(self, line, c='$'):
vars = self.var_finder(line, c=c)
for v in vars["normal"]:
value = str(Var.get(v))
line = line.replace(c + v, value)
# replace in line the variable $v with value
for v in vars["os"]:
name = v.replace('os.', '')
if name in os.environ:
value = os.environ[name]
line = line.replace(c + v, value)
else:
Console.error("can not find environment variable {}".format(
v))
if c + v in line:
value = os.environ(v)
# replace in line the variable $v with value
for v in vars["dot"]:
try:
config = ConfigDict("cloudmesh.yaml")
print(config["cloudmesh.profile"])
value = config[v]
line = line.replace(c + v, value)
except Exception as e:
Console.error("can not find variable {} in cloudmesh.yaml".format(value))
return line
[docs] def replace_vars(self, line):
self.update_time()
newline = line
variables = Var.list(output='dict')
if len(variables) is not None:
for v in variables:
v = str(v)
name = str(variables[v]["name"])
value = str(variables[v]["value"])
newline = newline.replace("$" + name, value)
# for v in os.environ:
# newline = newline.replace("$" + v.name, os.environ[v])
newline = path_expand(newline)
return newline
'''
def _add_variable(self, name, value):
self.variables[name] = value
# self._list_variables()
def _delete_variable(self, name):
self._list_variables()
del self.variables[name]
# self._list_variables()
def _list_variables(self):
Console.ok(10 * "-")
Console.ok('Variables')
Console.ok(10 * "-")
for v in self.variables:
Console.ok("{:} = {:}".format(v, self.variables[v]))
'''
@command
def do_var(self, arg, arguments):
"""
Usage:
var list
var delete NAMES
var NAME=VALUE
var NAME
Arguments:
NAME Name of the variable
NAMES Names of the variable separated by spaces
VALUE VALUE to be assigned
special vars date and time are defined
"""
if arguments['list'] or arg == '' or arg is None:
# self._list_variables()
print(Var.list())
return ""
elif arguments['NAME=VALUE'] and "=" in arguments["NAME=VALUE"]:
(variable, value) = arg.split('=', 1)
if value == "time" or value == "now":
value = datetime.datetime.now().strftime("%H:%M:%S")
elif value == "date":
value = datetime.datetime.now().strftime("%Y-%m-%d")
elif value.startswith("default."):
name = value.replace("default.", "")
value = Default.get(name=name, category="general")
elif "." in value:
try:
config = ConfigDict("cloudmesh.yaml")
value = config[value]
except Exception as e:
Console.error("can not find variable {} in cloudmesh.yaml".format(value))
value = None
# self._add_variable(variable, value)
Var.set(variable, value)
return ""
elif arguments['NAME=VALUE'] and "=" not in arguments["NAME=VALUE"]:
try:
v = arguments['NAME=VALUE']
# Console.ok(str(self.variables[v]))
Console.ok(str(Var.get(v)))
except:
Console.error('variable {:} not defined'.format(arguments['NAME=VALUE']))
elif arg.startswith('delete'):
variable = arg.split(' ')[1]
Var.delete(variable)
# self._delete_variable(variable)
return ""
@command
def do_history(self, args, arguments):
"""
Usage:
history
history list
history last
history ID
"""
try:
if arguments["list"] or args == "":
print("LIST")
h = 0
for line in self._hist:
print("{}: {}".format(h, self._hist[h]))
h += 1
return ""
elif arguments["last"]:
h = len(self._hist)
if h > 1:
command = self._hist[h - 2]
self.precmd(command)
stop = self.onecmd(command)
self.postcmd(stop, command)
return ""
elif arguments["ID"]:
h = int(arguments["ID"])
if h in range(0, len(self._hist)):
print("{}".format(self._hist[h]))
if not args.startswith("history"):
command = self._hist[h]
self.precmd(command)
stop = self.onecmd(command)
self.postcmd(stop, command)
return ""
except:
Console.error("could not execute the last command")
do_h = do_history
[docs]def simple():
context = CloudmeshContext(debug=False,
splash=True)
con = CloudmeshConsole(context)
con.cmdloop()
# noinspection PyBroadException
[docs]def main():
"""cm.
Usage:
cm --help
cm [--echo] [--debug] [--nosplash] [-i] [COMMAND ...]
Arguments:
COMMAND A command to be executed
Options:
--file=SCRIPT -f SCRIPT Executes the script
-i After start keep the shell interactive,
otherwise quit [default: False]
--nosplash do not show the banner [default: False]
"""
#
# TODO: There is a bug in opensatck that does not send a Deprecatuon Class
# for the warning, so we ignore all warnings for now :(
#
import warnings
warnings.filterwarnings("ignore")
def manual():
print(main.__doc__)
args = sys.argv[1:]
arguments = {
'--echo': '--echo' in args,
'--help': '--help' in args,
'--debug': '--debug' in args,
'--nosplash': '--nosplash' in args,
'-i': '-i' in args}
echo = arguments["--echo"]
if arguments['--help']:
manual()
sys.exit()
for a in args:
if a in arguments:
args.remove(a)
arguments['COMMAND'] = [' '.join(args)]
commands = arguments["COMMAND"]
if len(commands) > 0:
if ".cm" in commands[0]:
arguments["SCRIPT"] = commands[0]
commands = commands[1:]
else:
arguments["SCRIPT"] = None
arguments["COMMAND"] = ' '.join(commands)
if arguments["COMMAND"] == '':
arguments["COMMAND"] = None
# noinspection PySimplifyBooleanCheck
if arguments['COMMAND'] == []:
arguments['COMMAND'] = None
splash = not arguments['--nosplash']
debug = arguments['--debug']
interactive = arguments['-i']
script = arguments["SCRIPT"]
command = arguments["COMMAND"]
context = CloudmeshContext(
interactive=interactive,
debug=debug,
echo=echo,
splash=splash)
cmd = CloudmeshConsole(context)
if script is not None:
cmd.do_exec(script)
try:
if echo:
print("cm>", command)
if command is not None:
cmd.precmd(command)
stop = cmd.onecmd(command)
cmd.postcmd(stop, command)
except Exception as e:
print("ERROR: executing command '{0}'".format(command))
print(70 * "=")
print(e)
print(70 * "=")
Error.traceback()
if interactive or (command is None and script is None):
cmd.cmdloop()
if __name__ == "__main__":
main()
# simple()