Source code for cloudmesh_client.cloud.key

from __future__ import print_function
from cloudmesh_client.common.ConfigDict import ConfigDict

from cloudmesh_client.common.todo import TODO
# add imports for other cloud providers in future
from cloudmesh_client.shell.console import Console
from cloudmesh_client.cloud.ListResource import ListResource
from cloudmesh_client.common.Printer import Printer
from cloudmesh_client.db import CloudmeshDatabase
from cloudmesh_client.cloud.iaas.CloudProvider import CloudProvider
from cloudmesh_client.common.Error import Error
from uuid import UUID
from cloudmesh_client.common.dotdict import dotdict
from builtins import input
from pprint import pprint
from cloudmesh_client.common.ConfigDict import Config
from cloudmesh_client.default import Default
from cloudmesh_client.common.menu import menu_return_num
from cloudmesh_client.common.SSHkey import SSHkey
import os
from os.path import expanduser
import requests
import subprocess

# noinspection PyPep8Naming
[docs]class Key(ListResource): cm = CloudmeshDatabase()
[docs] @classmethod def info(cls, **kwargs): raise NotImplementedError()
[docs] @classmethod def get_from_dir(cls, directory=None, store=True): directory = directory or Config.path_expand("~/.ssh") files = [file for file in os.listdir(expanduser(Config.path_expand(directory))) if file.lower().endswith(".pub")] d = [] for file in files: location = Config.path_expand("{:}/{:}".format(directory, file)) sshkey = SSHkey(location).get() i = sshkey["comment"] if i is not None: i = i.replace("@", "_") i = i.replace("-", "_") i = i.replace(" ", "_") i = i.replace(".", "_") else: # use base name i = file.replace(".pub", "") sshkey["kind"] = "key" sshkey["source"] = 'file' if store: cls._add_from_sshkey( dict(sshkey), keyname=sshkey["name"], source=sshkey["source"], uri=sshkey["uri"]) else: d.append(dict(sshkey)) if not store: return d
[docs] @classmethod def get_from_git(cls, username, store=True): """ :param username: the github username :return: an array of public keys :rtype: list """ uri = 'https://github.com/{:}.keys'.format(username) content = requests.get(uri).text.strip("\n").split("\n") d = [] for key in range(0, len(content)): value = content[key] thekey = {} name = "{}_git_{}".format(username, key) thekey = { 'uri': uri, 'string': value, 'fingerprint': SSHkey._fingerprint(value), 'name': name, 'comment': name, 'cm_id': name, 'source': 'git', 'kind': 'key' } thekey["type"], thekey["key"], thekey["comment"] = SSHkey._parse(value) if thekey["comment"] is None: thekey["comment"] = name d.append(thekey) if store: try: cls.cm.add(thekey) except: Console.error("Key already in db", traceflag=False) if not store: return d
# noinspection PyProtectedMember,PyUnreachableCode,PyUnusedLocal
[docs] @classmethod def get_from_yaml(cls, filename=None, load_order=None, store=True): """ :param filename: name of the yaml file :return: a SSHKeyManager (dict of keys) """ config = None if filename is None: # default = Config.path_expand(os.path.join("~", ".cloudmesh", "cloudmesh.yaml")) # config = ConfigDict("cloudmesh.yaml") filename = "cloudmesh.yaml" config = ConfigDict(filename) elif load_order: config = ConfigDict(filename, load_order) else: Console.error("Wrong arguments") return config_keys = config["cloudmesh"]["keys"] default = config_keys["default"] keylist = config_keys["keylist"] uri = Config.path_expand(os.path.join("~", ".cloudmesh", filename)) d = [] for key in list(keylist.keys()): keyname = key value = keylist[key] if os.path.isfile(Config.path_expand(value)): path = Config.path_expand(value) if store: Key.add_from_path(path, keyname) else: d.append(Key.add_from_path(path, keyname, store=False)) else: keytype, string, comment = SSHkey._parse(value) thekey = { 'uri': 'yaml://{}'.format(uri), 'string': value, 'fingerprint': SSHkey._fingerprint(value), 'name': keyname, 'comment': comment, 'source': 'git', 'kind': 'key' } thekey["type"], thekey["key"], thekey["comment"] = SSHkey._parse(value) if thekey["comment"] is None: thekey["comment"] = keyname if store: try: cls.cm.add(thekey) except: Console.error("Key already in db", traceflag=False) else: d.append(thekey) if not store: return d """ take a look into original cloudmesh code, its possible to either specify a key or a filename the original one is able to figure this out and do the rightthing. We may want to add this logic to the SSHkey class, so we can initialize either via filename or key string. It would than figure out the right thing cloudmesh: keys: idrsa: ~/.ssh/id_rsa.pub cloudmesh: ... keys: default: name of the key keylist: keyname: ~/.ssh/id_rsa.pub keyname: ssh rsa hajfhjldahlfjhdlsak ..... comment github-x: github """
[docs] @classmethod def get_from_cloud(cls, cloud, live=False, format="table"): """ This method lists all keys of the cloud :param cloud: the cloud name :return: a SSHKeyManager (dict of keys) """ try: return CloudProvider(cloud).provider.list_key(cloud) except Exception as ex: Console.error(ex.message)
@classmethod def _add_from_sshkey(cls, sshkey, keyname=None, user=None, source=None, uri=None): user = user or cls.cm.user if keyname is None: try: keyname = sshkey['name'] except: pass if keyname is None: print("ERROR: keyname is None") thekey = { "kind": "key", "name": keyname, "uri": sshkey['uri'], "source": sshkey['source'], "fingerprint": sshkey['fingerprint'], "comment": sshkey['comment'], "value": sshkey['string'], "category": "general", "user": user} cls.cm.add(thekey)
[docs] @classmethod def add_key_to_cloud(cls, user, keyname, cloud): """ :param user: :param keyname: :param cloud: :param name_on_cloud: """ key = cls.cm.find(kind="key", name=keyname, scope="first") if key is None: Console.error("Key with the name {:} not found in database.".format(keyname)) return try: if cloud is not None: print("Adding key {:} to cloud {:}".format(keyname, cloud)) cloud_provider = CloudProvider(cloud).provider cloud_provider.add_key_to_cloud(keyname, key["value"]) except Exception as e: Console.error("problem uploading key {} to cloud {}: {}".format(keyname, cloud, e.message), traceflag=False)
[docs] @classmethod def list(cls, category=None, live=False, output="table"): "this does not work only returns all ceys in the db" (order, header) = CloudProvider(category).get_attributes("key") d = cls.cm.find(kind="key", scope="all", output=output) return Printer.write(d, order=order, header=header, output=output)
[docs] @classmethod def list_on_cloud(cls, cloud, live=False, format="table"): """ This method lists all keys of the cloud :param cloud: the cloud name """ try: keys = CloudProvider(cloud).provider.list_key(cloud) for key in keys: keys[key]["category"] = cloud if keys is None or keys is []: return None (order, header) = CloudProvider(cloud).get_attributes("key") return Printer.write(keys, order=order, header=header, output=format) except Exception as ex: Console.error(ex.message)
[docs] @classmethod def run_command(cls, cmd): """ Runs a command in a shell, returns the result""" p = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE) return p.stdout.read()
[docs] @classmethod def add_azure_key_to_db(cls, key_name, key_path, certificate_path, pfx_path): """ Adds the public key to the existing database model and adds the certificate, key and fingerprint into the azure key database model. :param key_name: Key name to be added :param key_path: Public key path :param certificate_path: Certificate file path(PEM file) :param pfx_path: PKCS encoded certificate path :return: """ pprint("add_azure_key_to_db") # Add to the current DB cls.add_from_path(key_path, key_name, source="ssh", uri="file://" + key_path) # Add certificate to the new DB fingerprint_cmd = "openssl x509 -in "+certificate_path+" -sha1 -noout -fingerprint | sed s/://g" # print("fingerprint_cmd:", fingerprint_cmd) fingerprint = cls.run_command(fingerprint_cmd) fingerprint = fingerprint.split('=')[1] fingerprint = fingerprint.rstrip('\n') # pprint("Certificate Fingerprint="+fingerprint) key_azure_obj = { "kind": "key_azure", "name": key_name, "fingerprint": fingerprint, "certificate": certificate_path, "key_path": key_path, "pfx_path": pfx_path} cls.cm.add(key_azure_obj) Console.info("Azure key added.ok.")
[docs] @classmethod def clear(cls, **kwargs): raise NotImplementedError()
[docs] @classmethod def refresh(cls, **kwargs): raise NotImplementedError()
[docs] @classmethod def delete(cls, name=None, cloud=None): if cloud is not None and name is not None: result = CloudProvider(cloud).provider.delete_key_from_cloud(name) elif cloud is not None and name is None: # # get a list of keys from cloud # loop over all keys and use the provider delete from cloud to delete that key Console.error("delete all keys from cloud not yet implemented") if name is None: cls.cm.delete(kind="key", provider="general") else: cls.cm.delete(name=name, kind="key", provider="general")
[docs] @classmethod def all(cls, output="dict"): return cls.cm.find(kind="key", scope="all", output=output)
[docs] @classmethod def find(cls, name=None, output="dict"): return cls.get(name=name, output=output)
[docs] @classmethod def get(cls, name=None, output="dict"): """ Finds the key on the database by name :param name: name of the key to be found :return: Query object of the search """ if name is None: return cls.cm.find(kind="key", output=output) else: return cls.cm.find(kind="key", name=name, output=output, scope="first")
[docs] @classmethod def set_default(cls, name): Default.set_key(name)
# deprecated use Default.key
[docs] @classmethod def get_default(cls): return Default.key
[docs] @classmethod def delete_from_cloud(cls, name, cloud=None): pass
@classmethod def _delete_from_db(cls, name=None): if name is None: cls.cm.delete(kind='key') else: cls.cm.delete(kind='key', name=name)
[docs] @classmethod def select(cls): options = [] d = cls.get(output='dict') for i in d: line = '{}: {}'.format(d[i]['name'], d[i]['fingerprint']) options.append(line) num = menu_return_num('KEYS', options) if num != 'q': return options[num] return num
# # ADD #
[docs] @classmethod def add_from_path(cls, path, keyname=None, user=None, source=None, uri=None, store=True): """ Adds the key to the database based on the path :param keyname: name of the key or path to the key :return: """ user = user or cls.cm.user sshkey = SSHkey(Config.path_expand(path)) if store: cls._add_from_sshkey(sshkey.__key__, keyname, user, source=source, uri=uri) else: return sshkey.__key__