Improvement on AWS Python Parallel processing for Lambdas

This post was started after I read https://aws.amazon.com/blogs/compute/parallel-processing-in-python-with-aws-lambda/ by Oz Akan

It turns out that article had a mistake in it which our team found through testing. And that is in the article as it appeared in May 2019, it had the "join" before the "recv" which is backward.  https://docs.python.org/3.7/library/multiprocessing.html I'm noting that up front because I know most of you won't read this whole post and instead skim it for the bits you actually think you need.

Using python on AWS with the lambda functions is one of the many alternative languages that AWS supports. There are a few cavets to getting high performance from your scripts. One is that multiprocessing is not fully supported the way it is in CPython.

If you want to use multiprocessing.Queue or multiprocessing.Pool on AWS Lambda, you are going to get the exception:





1
2
3
4
[Errno 38] Function not implemented: OSError

    sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
OSError: [Errno 38] Function not implemented
The reason for that is due to the Lambda execution environment does not have support for shared memory for processes. Therefore you can’t use multiprocessing.Queue or multiprocessing.Pool.
However Lambda does support the usage of multiprocessing.Pipe so we'll use the tool we have.

Second there are at this time a max of 2 CPU's in the process, so it's often less time wise expensive to not bother spinning up just one more process, and spinning up even more is pretty useless as there are no more CPU's to actually do the work. But wait you say, my program is I/O bound, don't worry we'll use co-routines to help with that.

I'm going to try and walk you though having a robust multiprocessing script that auto scales up and down depending on the demands put on it.

So here's a copy of the original code with our fix for the bug, and where we split the work evenly between the cpu's.


# pylint: disable=missing-docstring
import time
from multiprocessing import Process, Pipe, cpu_count
import boto3

class VolumesParallel:
    """Finds total volume size for all EC2 instances"""
    def __init__(self):
        self.ec2 = boto3.resource('ec2')

    @staticmethod
    def instance_volumes(instance):
        """
        Finds total size of the EBS volumes attached
        to an EC2 instance
        """
        instance_total = 0
        for volume in instance.volumes.all():
            instance_total += volume.size
        return instance_total

    def total_size(self, instances):
        instance_total = 0

        for instance in instances:
            instance_total += self.instance_volumes(instance)
        return instance_total

    def parallel_worker(self, instances, conn):
        """
        Finds the total size of this list of instances
        """
        instance_total = self.total_size(instances)
        conn.send([instance_total])
        conn.close()

    def get_all_ec2_instances(self):
        """
        get all EC2 instances
        """
        instances = self.ec2.instances.all()
        return instances

    def parallel_total_size(self, all_instances):
        """
        Lists all EC2 instances in the default region
        and sums result of instance_volumes
        """
        print("Running in parallel")

        # create a list to keep all processes
        processes = []
        # create a list to keep connections
        parent_connections = []

        number_of_cpus = cpu_count()
        # split the work into even chunks
        number_per_chunk = (len(all_instances) + (number_of_cpus - 1))/number_of_cpus

        # create a process per cpu
        for instances in chunks(all_instances, number_per_chunk):
            # create a pipe for communication
            parent_conn, child_conn = Pipe()
            parent_connections.append(parent_conn)

            # create the process, pass instance and connection
            process = Process(target=self.parallel_worker, args=(instances, child_conn,))
            processes.append(process)

        # start all processes
        for process in processes:
            process.start()

        instances_total = 0
        for parent_connection in parent_connections:
            instances_total += parent_connection.recv()[0]

        # make sure that all processes have finished
        for process in processes:
            process.join()

        return instances_total

def chunks(l, n):   # pylint: disable=invalid-name
    """Yield successive n-sized chunks from l."""
    for i in range(0, len(l), n):
        yield l[i:i + n]

def lambda_handler(event, context):  # pylint: disable=unused-argument
    # test to figure out what the threshold should be

    threshold_to_compute_parallel = 10

    volumes = VolumesParallel()
    _start = time.time()

    all_instances = volumes.get_all_ec2_instances()

    if  len(all_instances) > threshold_to_compute_parallel:
        total = volumes.parallel_total_size(all_instances)
    else:
        total = volumes.total_size(all_instances)
    print("Total volume size: %s GB" % total)
    print("Sequential execution time: %s seconds" % (time.time() - _start))


The one thing you first notice is that this code is getting more complex so we only want to bother doing parallel processing is if it's going to be $$ worth it. ie We'll be running this script often.

But wait! You said you would use co-routines... don't worry, that's for the next post when we take this script to the next level.




Comments

Popular posts from this blog