Source code for cloudmesh_client.platform.virtual_cluster.cluster

import itertools
import os.path
from pipes import quote
import time

from cloudmesh_client.api import Provider

from cloudmesh_client.cloud.iaas.CloudProvider import CloudProvider
from cloudmesh_client.cloud.network import Network
from cloudmesh_client.cloud.vm import Vm
from cloudmesh_client.db import CloudmeshDatabase, IntegrityError
from cloudmesh_client.db.general.model import CLUSTER
from cloudmesh_client.exc import ClusterNameClashException
from cloudmesh_client.common.ssh.authorized_keys import AuthorizedKeys
from cloudmesh_client.common.Shell import Subprocess, SubprocessError
from cloudmesh_client.common.util import tempdir
from cloudmesh_client.shell.console import Console

from cloudmesh_client.default import Default
from cloudmesh_client.cloud.image import Image


[docs]class Cluster(CLUSTER): # list abstraction see other commands cm = CloudmeshDatabase() def __init__(self, *args, **kwargs): # Use the table defined in the model, but we need to look up # the provider object dynamically kwargs['cloud'] = kwargs.get('cloud', Default.cloud) kwargs['image'] = kwargs.get('image', Default.image) kwargs['username'] = kwargs.get('username', Image.guess_username(kwargs['image'])) kwargs['flavor'] = kwargs.get('flavor', Default.flavor) kwargs['key'] = kwargs.get('key', Default.key) kwargs['secgroup'] = kwargs.get('secgroup', Default.secgroup) super(Cluster, self).__init__(*args, **kwargs) self.provider = CloudProvider(self.cloud).provider.cloud_type # put this cluster in the database, the 'name' attribute must # be unique try: self.cm.insert(self) except IntegrityError as e: line = 'UNIQUE constraint failed: {}.name'\ .format(self.__tablename__) if line in e.message: raise ClusterNameClashException(self.__tablename__, self.name)
[docs] @classmethod def from_name(cls, name): return cls.cm.select(Cluster, name=name).one()
def __iter__(self): return iter(self.list())
[docs] def list(self): """List the nodes in the cluster. The type of the instance is determined by the provider. :returns: the nodes of the cluster :rtype: :class:`list` of instances """ table = self.cm.vm_table_from_provider(self.provider) return self.cm.select(table, cluster=self.name).all()
[docs] def delete(self, force=False): """Delete this cluster and all component nodes""" for node in self: Vm.delete(servers=[node.name], force=force) self.cm.delete(kind="vm", provider=self.provider, name=node.name) self.cm.delete_(self.__class__, cm_id=self.cm_id)
[docs] def create(self, sleeptime_s=5): """Boot all nodes in this cluster :param float sleeptime_s: sleep this number of seconds between polling VMs for ACTIVE status """ for _ in xrange(self.count - len(self.list())): self.add()
[docs] def add(self): """Boots a new instance and adds it to this cluster""" provider = Provider.from_cloud(self.cloud) Console.info('Booting VM for cluster {}'.format(self.name)) node = provider.boot( key = self.key, image = self.image, flavor = self.flavor, secgroup = self.secgroup, cluster = self.name, username = self.username ) if self.assignFloatingIP: node.create_ip()
[docs] def remove(self, cm_id): """Removes a node to the cluster, but otherwise leaves it intact. See :meth:`delete` to delete this cluster :param int cm_id: the node id of the instance to remove """ table = self.cm.vm_table_from_provider(self.provider) self.cm.update_( table, where={'cm_id': cm_id}, values={'cluster': None} )
[docs] def modify(self): "Modifies the cluster" raise NotImplementedError()
[docs] def terminate(self): "Terminates the cluster" raise NotImplementedError()
[docs] def suspend(self): "Suspends the cluster" raise NotImplementedError()
[docs] def resume(self): "Resumes the cluster" raise NotImplementedError()
[docs] def add_key(self, public_key): "Adds an ssh public key to the cluster" raise NotImplementedError()
[docs] def remove_key(self, public_key): "Removes an ssh public key from the cluster" raise NotImplementedError()
[docs] def enable_cross_ssh_login(self, useFloating=True, keytype='rsa', bits=4096, comment='CM Cluster Cross-SSH'): "Enables each node to log into all other nodes of the cluster" ssh = [ 'ssh', '-o', 'UserKnownHostsFile=/dev/null', '-o', 'StrictHostKeyChecking=no', '-l', self.username, ] ssh_keygen = [ 'ssh-keygen', '-f', '.ssh/id_{}'.format(keytype), '-b', str(bits), '-t', keytype, '-C', quote(comment), '-N', quote(''), ] slurp = [ 'scp', '-o', 'UserKnownHostsFile=/dev/null', '-o', 'StrictHostKeyChecking=no', ] with tempdir() as workdir: def auth_keys_f(node): return os.path.join(workdir, node.name, 'authorized_keys') def pubkey_f(node): return os.path.join(workdir, node.name, 'id_{}.pub'.format(keytype)) def ip_f(node): return node.floating_ip if useFloating else node.static_ip for node in self: outdir = os.path.join(workdir, node.name) os.makedirs(outdir) ip = ip_f(node) # cleanup rm = ssh + [ip] + [ 'rm', '-f', '.ssh/id_{}'.format(keytype), '.ssh/id_{}.pub'.format(keytype), ] Subprocess(rm) genkey = ssh + [ip] + ssh_keygen Subprocess(genkey) pubkey = '{}@{}:.ssh/id_{}.pub'.format(node.username, ip, keytype) auth_keys = '{}@{}:.ssh/authorized_keys'.format(node.username, ip) scp = slurp + [pubkey] + [auth_keys] + [outdir] Subprocess(scp) for nodeA in self: auth = AuthorizedKeys.from_authorized_keys(auth_keys_f(nodeA)) # add the keys for all the other machines for nodeB in self: with open(pubkey_f(nodeB)) as fd: for line in itertools.imap(str.strip, fd): if not line: continue auth.add(line) # save new authorized_keys with open(auth_keys_f(nodeA), 'w') as fd: fd.write(auth.text()) for node in self: path = auth_keys_f(node) remote = '{}@{}:.ssh/authorized_keys'.format(node.username, ip_f(node)) scp = slurp + [path] + [remote] Subprocess(scp)
[docs] def disable_cross_ssh_login(self): raise NotImplementedError()
[docs] def delete_key(self): raise NotImplementedError()
# """ # Parameter.expand # cluster # name # node+ # (some object in models) # label # name # ip+ # private key # ssh authorized keys # owner # user # type cluster # type node # """