Source code for bellatrix.lib.ec2_lib

"""
EC2 utilities wrapping boto methods

boto reference in: http://boto.cloudhackers.com/ec2_tut.html
"""

import datetime
import hashlib
import logging
import os
import sys
import time

import boto
from boto.s3.key import Key

import bellatrix
NAME = bellatrix.APP

[docs]class Ec2lib: def __init__(self, key, sec): self._key = key self._sec = sec self.ec2 = self.getEC2Connection() self.cw = self.getCloudWatchConnection() self._running_state = "running" self.ERR_CONNECTION_REFUSED = 65280 self.NAME = NAME self.DEFAULT_TIME_OUT = 300 #times self.DEFAULT_STEP = 3 #seconds
[docs] def getCloudWatchConnection(self): return boto.connect_cloudwatch(self._key, self._sec)
[docs] def getEC2Connection(self): return boto.connect_ec2(self._key, self._sec)
[docs] def getS3Connection(self): logging.debug("k:**%s** s:**%s**" % (self._key, self._sec)) return boto.connect_s3(self._key, self._sec)
[docs] def getCPUMetric(self, instance_name): hours_span = self.hours end = datetime.datetime.utcnow() start = end - datetime.timedelta(hours=hours_span) return self.cw.get_metric_statistics(self.PERIOD, start, end, self.metric, self.NAMESPACE, self.function, instance_name)
[docs] def getReservations(self): return self.ec2.get_all_instances()
[docs] def getInstance(self, instance): """ Given a instance string, returns an instance object """ for i in self.getInstances(): if i.id == instance: return i return None
[docs] def getAllRunningInstances(self): return self.getInstances()
[docs] def getAllInstances(self): return self.getInstances(None)
[docs] def getImage(self, ami): ami_object = self.ec2.get_image(ami) if not ami_object or isinstance(ami_object, str): raise Exception("We couldn't retrieve ami information from the ami code: %s """ """Either the ami doesn't exist or this account doesn't have permissions to access it. """ """If the latest, you can use bellatrix set_permissions""" % (ami)) return ami_object
[docs] def createImage(self, instance_id, name, description=None, no_reboot=False): """ Will create an AMI from the instance in the running or stopped state. :type instance_id: string :param instance_id: the ID of the instance to image. :type name: string :param name: The name of the new image :type description: string :param description: An optional human-readable string describing the contents and purpose of the AMI. :type no_reboot: bool :param no_reboot: An optional flag indicating that the bundling process should not attempt to shutdown the instance before bundling. If this flag is True, the responsibility of maintaining file system integrity is left to the owner of the instance. :rtype: string :return: The new image id """ name = name.replace(":","-")[:128] logging.info("burning instance: %s name: %s, description: %s" % (instance_id, name, description)) return self.ec2.create_image(instance_id, name, description, no_reboot)
[docs] def getAmiInfo(self, ami): ret = None try: ret = image = self.getImage(ami) logging.info("image info: %s" + image) except: logging.error("Error getting information for image:%s " % ami) return ret
[docs] def getEc2Instance(self, ami, key_name, security_group, instance_type, instance_initiated_shutdown_behavior=bellatrix.TERMINATE, new_size=None): image = self.getImage(ami) inst = self._startInstance(image, key_name, security_group, instance_type, self.NAME, instance_initiated_shutdown_behavior=instance_initiated_shutdown_behavior, new_size=new_size) return inst
[docs] def startInstance(self, ami, instance_type, key_name, security_groups, new_size=None): inst = self.getEc2Instance(ami, key_name, security_groups.split(), instance_type, bellatrix.TERMINATE, new_size) dns_name = self.getDNSName(inst) self.waitUntilInstanceIsReady(inst) return inst, dns_name
def _startInstance(self, image, key_name, security_group, instance_type, owner_name=os.path.basename(__file__), instance_initiated_shutdown_behavior=bellatrix.TERMINATE, new_size=None): """ starts an instance given an 'image' object :instance_initiated_shutdown_behavior: string. Valid values are stop | terminate :instance_type: string :param instance_type: The type of instance to run. Current choices are: m1.small | m1.large | m1.xlarge | c1.medium | c1.xlarge | m2.xlarge | m2.2xlarge | m2.4xlarge | cc1.4xlarge :owner_name: string. Just the entity that initiated the instance. You will see the name in the 'tag name'. This library by default. """ logging.info("starting image: %s key %s type %s shutdown_behavior %s new size %s" % (image.id, key_name, instance_type, instance_initiated_shutdown_behavior, new_size)) mapping = None if new_size != None: dev_sda1 = boto.ec2.blockdevicemapping.BlockDeviceType(delete_on_termination=True) dev_sda1.size = new_size bdm = boto.ec2.blockdevicemapping.BlockDeviceMapping() bdm['/dev/sda1'] = dev_sda1 mapping = bdm reservation = image.run(1, 1, key_name, security_group, instance_type=instance_type, instance_initiated_shutdown_behavior=instance_initiated_shutdown_behavior, block_device_map=mapping) logging.info("we got %d instance (should be only one)." % len(reservation.instances)) i = reservation.instances[0] self.tagInstance(i.id, "Name", owner_name + " started me") return i
[docs] def stopInstance(self, i): i.stop()
[docs] def tagInstance(self, instance, key, value, time_out=None, step=None): logging.info("tagging instance:%s key:%s value:%s" % (instance, key, value)) #todo: make the waiting structure a generic function time_out=time_out if time_out != None else self.DEFAULT_TIME_OUT step = step if step != None else self.DEFAULT_STEP while (time_out > 0): try: time_out -= step self.ec2.create_tags([instance], {key: value}) time.sleep(step) except: logging.info("tagging operation failed, but maybe the instance wasn't ready. We will keep trying %s more seconds" % (time_out)) else: logging.info("instance:%s was successfully tagged with: key:%s value:%s" % (instance, key, value)) break
[docs] def terminateInstance(self, i): i.terminate()
[docs] def getInstances(self, state=bellatrix.RUNNING): instances = [] dict_inst = {} logging.debug("getting all instances...") for img in self.getReservations(): for r in img.instances: if state == None or state == r.state_code: instances.append(r) logging.debug("appending instance:%s" % r) return instances
[docs] def destroyIdleInstances(self): #TODO: this won't work due to refactor on getInstances... logging.info("Killing running instances consistently under the %s%s cpu threshold that" \ " have run for at least %s hour/s" % (self.threshold, "%", self.hours)) instances = self.getInstances() logging.info("getting active instances...") for i in instances: try: instance_name = i[0] instance = i[1] metrics = self.getCPUMetric(instance_name) logging.info("getting metrics for instance: %s" % (str(instance_name))) kill_it = True for mt in metrics: val = mt[u'Maximum'] if val > self.threshold: kill_it = False logging.info("CPU use: %s" % (str(val) + "%")) if kill_it: if len(metrics) >= self.expected_samples: logging.info("killing %s... AWS ident:XXX%s" % (instance_name, self._key[len(self._key) - 3:])) if not self.dryrun: instance.stop() else: logging.info("Dry run mode, I am not feeling like killing anyone..." % instance_name) else: logging.info("This instance was not running enough time. Forgiving %s..." % instance_name) logging.info("Len metrics:%s expected samples:%s" % (len(metrics), self.expected_samples)) except: logging.exception("Error: ")
[docs] def setExceptions(self, elements): """AMI's and instances put into this list will be skipped in the shutdown process""" for e in elements: if e not in self.exceptions: self.exceptions.append(e)
[docs] def getDNSName(self, inst, TIME_OUT=300): """get DNS name for an instance. This operation could take some time as the startup for new instances is not immediate""" logging.info("getting the dns name for instance: " + inst.id + " time out is: " + str(TIME_OUT) + " seconds...") step = 3 while (not inst.dns_name and TIME_OUT > 0): TIME_OUT -= step inst.update() time.sleep(step) if not inst.dns_name: raise Exception("Sorry, but the instance never returned its address...") logging.info("DNS name for %s is %s" % (inst.id, inst.dns_name)) return inst.dns_name
[docs] def waitUntilInstanceIsReady(self, inst, TIME_OUT=300): logging.info("waiting until instance: " + inst.id + " is ready. Time out is: " + str(TIME_OUT) + " seconds...") step = 3 #todo: make this concurrent while (inst.state != self._running_state and TIME_OUT > 0): TIME_OUT -= step inst.update() time.sleep(step) if inst.state != self._running_state: raise Exception("Sorry, but the instance never got the: " + self._running_state + " state") logging.info("Instance %s is %s" % (inst.id, inst.state))
[docs] def waitForConnectionReady(self, inst, user, key, dns, TIME_OUT=300): logging.info("waiting until instance is ready to receive ssh connections. Instance: " + inst.id + " Time out is: " + str(TIME_OUT) + " seconds...") tmp_file = "tmp" cmd = "ssh -o StrictHostKeyChecking=no -i %s %s@%s '%s' > %s" % (key, user, dns, "echo CONNECTION READY", tmp_file) step = 3 result = self.ERR_CONNECTION_REFUSED while (result == self.ERR_CONNECTION_REFUSED and TIME_OUT > 0): TIME_OUT -= step time.sleep(step) logging.info("executing:%s " % cmd ) result = os.system(cmd) if result == self._running_state: raise Exception("Sorry, but the instance never got ready for SSH connections") logging.info("Instance %s is ready to receive ssh connections. %s" % (inst.id, open(tmp_file).read()))
[docs] def setPermissionsToAmis(self, amis, account_permissions, retry=True): """set account permissions to a set of ami's""" i=0 WAIT = 30 while i < len(amis): try: image = self.getImage(amis[i]) if image==None: logging.info("Image doesn't exist in this acount. Finishing execution.") break logging.info("image information:%s" % image) logging.info("setting execute permissions to %s for accounts:%s" % (amis[i],account_permissions)) res=image.set_launch_permissions(account_permissions) logging.info("operation result:%s " % res ) i += 1 except: logging.exception("Error setting permissions to ami:%s Please check whether " \ "it exists or your account has proper permissions to access it." % amis[i]) if retry: logging.info("retrying execution in %s seconds." % WAIT) time.sleep(WAIT)
[docs] def getSecurityGroups(self, groupnames=None): """get all security groups from this account""" #the following call should return something like: #[SecurityGroup:alfresco, SecurityGroup:apache, SecurityGroup:vnc, #SecurityGroup:appserver2, SecurityGroup:FTP, SecurityGroup:webserver, #SecurityGroup:default, SecurityGroup:test-1228851996] #>>> us_group = groups[0] #>>> us_group #SecurityGroup:alfresco #>>> us_group.rules #[IPPermissions:tcp(22-22), IPPermissions:tcp(80-80), IPPermissions:tcp(1445-1445)] return self.ec2.get_all_security_groups(groupnames)
[docs] def authorizeSecurityGroup(self, securityGroup, cidr_ip, from_port, security_group=None, to_port=None, ip_protocol='tcp'): to_port = (from_port if to_port == None else to_port) logging.info("authorizing... security group:%s, ip_protocol:%s, from_port:%s, to_port:%s cidr:%s sg:%s" % (securityGroup, ip_protocol, from_port, to_port, cidr_ip, security_group)) try: if cidr_ip != None: securityGroup.authorize(ip_protocol=ip_protocol, from_port=from_port, to_port=to_port, cidr_ip=cidr_ip) else: securityGroup.authorize(ip_protocol=ip_protocol, src_group=security_group, from_port=from_port, to_port=to_port, cidr_ip=cidr_ip) except: logging.exception("Exception applying authorization..")
[docs] def revokeAllSecurityGroupRules(self, sg): logging.info("revoking permissions from security group:%s " % sg) for r in sg.rules: for g in r.grants: #todo detect whether is a security group or an ip logging.info("revoking.. sg:%s, ip_protocol:%s, from_port:%s, to_port:%s, cidr:%s" % (sg, r.ip_protocol, r.from_port, r.to_port, g)) status = sg.revoke(r.ip_protocol, r.from_port, r.to_port, g) logging.info("status: %s" % status)
[docs] def percent_cb(self, complete, total): sys.stdout.write('.') sys.stdout.flush()
[docs] def uploadToBucket(self, bucket, file_to_upload, acl="public"): bucket = bucket k = Key(bucket) k.key = file_to_upload k.set_acl(acl) k.set_contents_from_filename(file_to_upload, cb=self.percent_cb, num_cb=10)
def _copy(self, key_str, path, bucket, acl, pretend, force_copy=False): """Perform the actual copy operation. This method is only called by L{uploadToS3} Function originally taken from here: http://www.gordontillman.info/computers/41-web-application-development/88-using-python-boto-library-with-s3 @param key_str: The string used to lookup the Key within the bucket @type key_str: string @param path: The path to the local file @type path: string @param bucket: The bucket we are currently working with @type bucket: boto.s3.bucket.Bucket @param acl: One of the supported ACL policies @type acl: string @param pretend: If True, we just log what we would do @type pretend: boolean @param force_copy: If True, do the copy even if we normally wouldn't @type force_copy: boolean """ logging.debug("key_str='%s', path='%s', acl='%s', pretend=%s, " "force_copy=%s", key_str, path, acl, pretend, force_copy) need_to_copy = True key = bucket.get_key(key_str) stat = os.stat(path) is_dir = os.path.isdir(path) if key: if is_dir: logging.info("'%s' exists - no need to create.", path) need_to_copy = False elif key.size == stat[6]: f = open(path) fd = f.read() f.close() m = hashlib.md5(fd) if '"%s"' % m.hexdigest() == key.etag: logging.info("'%s' - no need to copy. size (%d) " "and md5 (%s) match", path, key.size, key.etag) need_to_copy = False else: key = boto.s3.key.Key(bucket) key.key = key_str if need_to_copy or force_copy: if pretend: logging.info("Would copy '%s' to '%s' with ACL '%s'", path, key_str, acl) else: logging.info("Copying '%s' to '%s' with ACL '%s'", path, key_str, acl) key.set_metadata('mode', str(stat[0])) key.set_metadata('gid', str(stat[5])) key.set_metadata('uid', str(stat[4])) key.set_metadata('mtime', str(stat[8])) if is_dir: key.set_contents_from_string("", headers={'Content-Type': 'application/x-directory'}) else: key.set_contents_from_filename(path, cb=self.percent_cb, num_cb=10) key.set_acl(acl)
[docs] def uploadToS3(self, src, bucket, acl=None, key_prefix="", s3_conn=None, filter=None, pretend=None, starting_with=None): """Copy the file specified by src (or contained in src if it is a directory) to the specified S3 bucket, pre-pending the optional key_prefix to the relative path of each file within src. Function originally taken from here: http://www.gordontillman.info/computers/41-web-application-development/88-using-python-boto-library-with-s3 @param bucket: The name of the bucket we are working with. It must already exist. @type bucket: string @param key_prefix: This will be prepended to every source path that we copy. Can be empty. Often is. @type key_prefix: string @param src: This is the source file or directory. @type src: string @param s3_conn: This is an active S3 connection. @type s3_conn: boto.s3.connection.S3Connection @param filter: Only files with extensions listed in this filter will be candidates for copying. You can have an empty string in this list to copy files with NO file extension. @type filter: list of strings @param acl: One of the supported ACL policies. @type acl: string @param pretend: If true, we just log what we would do, but we don't do it. @type pretend: boolean @param starting_with: An option source file path. If specified, skips all the files preceeding it until this file is reached. @type starting_with: string @raise boto.exception.S3ResponseError: If you specify a non-existing bucket """ if starting_with: found_start = False else: found_start = True if not s3_conn: s3_conn = self.getS3Connection() logging.debug("s3_conn: " + str(s3_conn)) if not filter: filter = "" if not acl: acl = "public-read" if pretend is None: pretend = False logging.info("bucket=%s, key_prefix=%s, src=%s, filter=%s" ", acl=%s, pretend=%s", bucket, key_prefix, src,",".join(filter), acl, pretend) logging.debug("bucket list: " + str(s3_conn.get_all_buckets())) b = s3_conn.get_bucket(bucket) if os.path.isfile(src): paths = [('.', [], [src])] else: paths = os.walk(src) for dir in paths: dir_key_str = os.path.normpath( os.path.join(key_prefix, dir[0])).strip('/') dir_path = dir[0] if dir_key_str == ".": dir_created = True else: dir_created = False for file in dir[2]: if not filter or os.path.splitext(file)[1] in filter: path = os.path.normpath(os.path.join(dir[0], file)) key_str = os.path.normpath(os.path.join(key_prefix, dir[0], file)).strip('/') if not found_start: if path == starting_with: found_start = True else: continue try: logging.debug("dir_key_str='%s', dir_path='%s', " "key_str='%s', path='%s'", dir_key_str, dir_path, key_str, path) if not dir_created: self._copy(dir_key_str, dir_path, b, acl, pretend) dir_created = True self._copy(key_str, path, b, acl, pretend) except boto.exception.S3ResponseError, e: logging.warn("S3ResponseError '%s' while copying '%s'." " Will retry 1 time", str(e), path) self._copy(key_str, path, b, acl, pretend, True)