Ok, lets see if we can make this script a little better. To do that we are going to exploit the use of Co-routines using the Python 3 library async and await. The allows the code to start all the work it can at whatever time it wants to, and then as each "awaited" task pauses for i/o to let another task have control of the process. We've already split the work into as many processes as we have on our host computer, now we will keep all of them busy as possible.
I'm using the asyncio.wait(...) function because we probably want to run this as a lambda and we'll want to stop the work before our allotted time runs out and report the results we do have, as well as report that we ran out of time. So that our future selves can either increase the max allotted time, or report it as an error.

'''
Sample code to show how to use Multiprocessing and Co-routines.
'''
import time
from multiprocessing import Process, Pipe, cpu_count
import asyncio
import boto3

class VolumesParallel:
    """Finds total volume size for all EC2 instances"""
    def __init__(self):
        self.ec2 = boto3.resource('ec2')

    @staticmethod
    async def instance_volumes(instance):
        """
        Finds total size of the EBS volumes attached
        to an EC2 instance
        """
        instance_total = 0
        for volume in instance.volumes.all():
            instance_total += volume.size
        return instance_total

    def total_size(self, instances):
        """
        Create the co-routine tasks and await the result.
        """
        instance_total = 0

        tasks = [asyncio.create_task(self.instance_volumes(instance))
                 for instance in instances]
        done, pending = await asyncio.wait(tasks)

        for task in done:
            instance_total += task.result()
        for task in pending:
            task.cancel()
        return instance_total

    def parallel_worker(self, instances, conn):
        """
        Finds the total size of this list of instances
        """
        instance_total = self.total_size(instances)
        conn.send([instance_total])
        conn.close()

    def get_all_ec2_instances(self):
        """
        Get all EC2 instances
        """
        instances = self.ec2.instances.all()
        return instances

    def parallel_total_size(self, all_instances):
        """
        Lists all EC2 instances in the default region
        and sums result of instance_volumes
        """
        print("Running in parallel")

        # create a list to keep all processes
        processes = []
        # create a list to keep connections
        parent_connections = []

        number_of_cpus = cpu_count()
        # split the work into even chunks
        number_per_chunk = (len(all_instances) + (number_of_cpus - 1))/number_of_cpus

        # create a process per cpu
        for instances in chunks(all_instances, number_per_chunk):
            # create a pipe for communication
            parent_conn, child_conn = Pipe()
            parent_connections.append(parent_conn)

            # create the process, pass instance and connection
            process = Process(target=self.parallel_worker, args=(instances, child_conn,))
            processes.append(process)

        # start all processes
        for process in processes:
            process.start()

        instances_total = 0
        for parent_connection in parent_connections:
            instances_total += parent_connection.recv()[0]

        # make sure that all processes have finished
        for process in processes:
            process.join()

        return instances_total

def chunks(l, n):   # pylint: disable=invalid-name
    """Yield successive n-sized chunks from l."""
    for i in range(0, len(l), n):
        yield l[i:i + n]

def lambda_handler(event, context):  # pylint: disable=unused-argument
    '''
    Test to figure out what the threshold should be
    '''

    threshold_to_compute_parallel = 10

    volumes = VolumesParallel()
    _start = time.time()

    all_instances = volumes.get_all_ec2_instances()

    if  len(all_instances) > threshold_to_compute_parallel:
        total = volumes.parallel_total_size(all_instances)
    else:
        total = volumes.total_size(all_instances)
    print("Total volume size: %s GB" % total)
    print("Sequential execution time: %s seconds" % (time.time() - _start))

Next up lets use this cool boto3 wrapper library aioboto3 which will allow us to call the aws services asynchronously.
https://pypi.org/project/aioboto3/
https://github.com/terrycain/aioboto3

Comments

Popular posts from this blog

Improvement on AWS Python Parallel processing for Lambdas