Ok, lets see if we can make this script a little better. To do that we are going to exploit the use
of Co-routines using the Python 3 library
I'm using the
Next up lets use this cool
https://pypi.org/project/aioboto3/
https://github.com/terrycain/aioboto3
async
and await. The allows the code to start all the work it can at whatever time it wants to, and then as each "awaited" task pauses for i/o to let another task have control of the process. We've already split the work into as many processes
as we have on our host computer, now we will keep all of them busy as possible.
I'm using the
asyncio.wait(...) function because we probably want to run this as a lambda and we'll want to stop the work before our allotted time runs out and report the results we do have, as well as report that we ran out of time. So that our future selves can either increase the max allotted time, or report it as an error.
'''
Sample code to show how to use Multiprocessing and Co-routines.
'''
import time
from multiprocessing import Process, Pipe, cpu_count
import asyncio
import boto3
class VolumesParallel:
"""Finds total volume size for all EC2 instances"""
def __init__(self):
self.ec2 = boto3.resource('ec2')
@staticmethod
async def instance_volumes(instance):
"""
Finds total size of the EBS volumes attached
to an EC2 instance
"""
instance_total = 0
for volume in instance.volumes.all():
instance_total += volume.size
return instance_total
def total_size(self, instances):
"""
Create the co-routine tasks and await the result.
"""
instance_total = 0
tasks = [asyncio.create_task(self.instance_volumes(instance))
for instance in instances]
done, pending = await asyncio.wait(tasks)
for task in done:
instance_total += task.result()
for task in pending:
task.cancel()
return instance_total
def parallel_worker(self, instances, conn):
"""
Finds the total size of this list of instances
"""
instance_total = self.total_size(instances)
conn.send([instance_total])
conn.close()
def get_all_ec2_instances(self):
"""
Get all EC2 instances
"""
instances = self.ec2.instances.all()
return instances
def parallel_total_size(self, all_instances):
"""
Lists all EC2 instances in the default region
and sums result of instance_volumes
"""
print("Running in parallel")
# create a list to keep all processes
processes = []
# create a list to keep connections
parent_connections = []
number_of_cpus = cpu_count()
# split the work into even chunks
number_per_chunk = (len(all_instances) + (number_of_cpus - 1))/number_of_cpus
# create a process per cpu
for instances in chunks(all_instances, number_per_chunk):
# create a pipe for communication
parent_conn, child_conn = Pipe()
parent_connections.append(parent_conn)
# create the process, pass instance and connection
process = Process(target=self.parallel_worker, args=(instances, child_conn,))
processes.append(process)
# start all processes
for process in processes:
process.start()
instances_total = 0
for parent_connection in parent_connections:
instances_total += parent_connection.recv()[0]
# make sure that all processes have finished
for process in processes:
process.join()
return instances_total
def chunks(l, n): # pylint: disable=invalid-name
"""Yield successive n-sized chunks from l."""
for i in range(0, len(l), n):
yield l[i:i + n]
def lambda_handler(event, context): # pylint: disable=unused-argument
'''
Test to figure out what the threshold should be
'''
threshold_to_compute_parallel = 10
volumes = VolumesParallel()
_start = time.time()
all_instances = volumes.get_all_ec2_instances()
if len(all_instances) > threshold_to_compute_parallel:
total = volumes.parallel_total_size(all_instances)
else:
total = volumes.total_size(all_instances)
print("Total volume size: %s GB" % total)
print("Sequential execution time: %s seconds" % (time.time() - _start))
Next up lets use this cool
boto3 wrapper library aioboto3
which will allow us to call the aws services asynchronously.
https://pypi.org/project/aioboto3/
https://github.com/terrycain/aioboto3
Comments
Post a Comment