ACCU Home page ACCU Conference Page ACCU 2017 Conference Registration Page
Search Contact us ACCU at Flickr ACCU at GitHib ACCU at Google+ ACCU at Facebook ACCU at Linked-in ACCU at Twitter Skip Navigation

pinMultiprocessing and Clusters in Python

Overload Journal #137 - February 2017 + Programming Topics   Author: Silas S. Brown
Multiprocessing is possible in Python. Silas S. Brown shows us various ways.

It’s surprisingly easy to use more than one CPU core in Python. You can’t do it with straightforward threads, since the C implementation of Python has a Global Interpreter Lock (GIL) which means there can only ever be one thread performing active calculations at any one time, so threads in Python are generally useful only for waiting on I/O, handling GUIs and servers and such, not actually processing in parallel when you have multiple CPU cores. (The Java implementation has no GIL and really can run on multiple cores in parallel, but I’m assuming you have an existing Python project and want to stick with the C implementation.) But there are now ways of multiprocessing in standard C Python, and they’re not too difficult, even to add in to legacy Python code.

Python 3.2 introduced the concurrent.futures module as standard [Python], and there’s a backport for Python 2.7 which can usually be installed on Unix or GNU/Linux via sudo pip install futures (in Debian or Ubuntu you might need sudo apt-get install python-pip first; on a Mac try sudo easy_install pip). One nice thing about this module is it’s quite straightforward to roll your own ‘dummy version’ for when parallelism is not available: see Listing 1.

try:
  import concurrent.futures
  executor = \
    concurrent.futures.ProcessPoolExecutor()
except:
  class DummyExecutor:
    def submit(self, fn, *args, **kwargs):
      class Future:
        def result(self,*_):
          return fn(*args,**kwargs)
      return Future()
    def map(self, func, *iterables, **kwargs):
      for n in map(func,*iterables): yield n
  executor = DummyExecutor()
			
Listing 1

This gives you an object called executor which supports submit(function, arguments) returning an object that will, when asked for its result() later, give you either the result of the calculation or the exception raised by it, as appropriate. (Java programmers should recognise these semantics.) The executor object also has a map(function, iterables) which works like the built-in map(). If you’re on a multi-core machine and the real concurrent.futures is available in its Python installation, then some of the work will be done asynchronously on other CPU cores in between the calls to submit() and result(), so you can parallelise programs simply by looking for situations where independent calculations can be started ahead of when their results are needed, or even just by parallelising a few calls to map() as long as your map functions are not so trivial that the overhead of parallelising them would outweigh any benefit. But if your script is run on an older machine with no concurrent.futures available, it will fall back to the ‘dummy’ code which simply runs the function sequentially when its result is called for. (And if that result turns out not to be required after all and is not asked for, then the function won’t run. So if parallelism is not available then at least you can benefit from lazy evaluation. But this applies only if your algorithm involves speculative computations i.e. ones you start before knowing if you’ll really need them.)

I like the idea of ‘if it’s there, use it; if not, do without’: it means users of my scripts don’t have to make sure concurrent.futures is available in their Python installation. If they don’t have whatever it takes to install it, they’ll simply get the sequential version of my script rather than an ImportError (ImportErrors in your scripts can be bad PR). Note I’m not specifically catching ImportError around the concurrent.futures import, because it’s also possible for this import to succeed but still fail to make ProcessPoolExecutor available. This can be seen by reading __init__.py in the source code of concurrent.futures: if ProcessPoolExecutor cannot be loaded, then the module will just give you ThreadPoolExecutor. But there’s no point using ThreadPoolExecutor for multiprocessing, because ThreadPoolExecutor is subject to the GIL, so we want to verify that ProcessPoolExecutor is available before going ahead.

The interface to the ‘dummy’ object is actually quite a far cry from that of the real thing. With the real concurrent.futures, you can’t pass lambda or locally-defined functions to submit() or map(), but the dummy object lets you get away with doing this. Also, the real concurrent.futures has extra functionality, such as add_done_callback and polling for completion status, and does not run a function twice if you call its result() twice. All of this can be worked around by writing a more complex dummy object, but if all you’re going to do anyway is call submit() and result() then there’s not a lot of point making the fallback that complicated: if a few lines of script are supposed to be a ‘poor man’s’ fallback for a large library, then we don’t want to make the substitute so big and complicated that we almost might as well bundle the library itself into our script. Just make sure to test your code at least once with the real concurrent.futures to make sure you haven’t accidentally tried to give it a lambda function or something (the dummy object won’t pick up on this error). You can of course insert print statements into the code to tell you which branch it’s using, to make sure you’re testing the right one; you may even want to leave something in there for the production version (i.e. ‘this script should run faster if you install futures’).

Oversized data

From this point on, I’ll assume the real concurrent.futures is present on the system and you are doing real multiprocessing.

You don’t have to worry about causing too many context switches if too many tasks are launched at once, since ProcessPoolExecutor defaults to queuing up tasks when all CPU cores are already occupied with them. But you might sometimes be worried about what kind of data you are passing in to each task, since serialisation overheads could be a serious slow-down if it has to be large.

If you’re on Unix, Python’s underlying ‘multiprocessing’ module will start the new processes with fork(), which means they each get a copy of the parent process’s memory (with copy-on-write semantics if supported by the kernel, so that no copying occurs until a memory-page is actually changed). That means your functions can read module-level global variables that have been set up at runtime before the parallel work started (just don’t try to change these during the parallel work, unless you want to cope with such changes affecting some future calculations but not others depending on which CPU or process ID happens to run what). fork() does, however, mean you’d better be careful if you’re also using threads in the same program, such as for a GUI; there are ways of working around this, but I’d suggest concentrating on making a command-line tool and let somebody else wrap it in a GUI in a different process if they must.

But you can’t rely on having fork() if your script might be run on Windows, nor if you might eventually use multiple machines in a cluster using mpi4py.futures (more on this below), SCOOP [SCOOP], or a similar tool that gives you the same API as concurrent.futures. In these cases, it’s likely that your script will be separately imported on each core, so it had better not run unless __name__ == "__main__". You can set up a few module-level variables when that happens; the subprocesses should still have the same sys.argv and os.environ if that’s any help. However, you probably won’t want to repeat a long precalculation when doing this.

Since most multiprocessing environments, even across multiple machines in a cluster, assume a shared filesystem, one fairly portable way of sharing such large precalculated data is to do it via the filesystem, as in Listing 2. To avoid the obvious race condition, this must be done before initialising the parallelism.

from cPickle import Pickler, Unpickler
if __name__ == "__main__":
  data = our_precalculation()
  Pickler(open('precalc','wb'),-1).dump(data)
else:
  try: data
  except NameError:
    data = Unpickler(open('precalc','rb')).load()
			
Listing 2

Listing 2 can detect the case where fork() has been used and the data does not need to be read back from the filesystem, although without further low-level inspection it won’t be able to detect when it can avoid writing it to the filesystem at all (but that might not be an issue if you want to write it anyway). There are other ways of passing data to non-fork()ed subprocesses without using the filesystem, but they involve going at a lower level than concurrent.futures (you can’t get away with simply passing the data into a special ‘initialiser’ function to be run on each core, since the concurrent.futures API by itself offers no guarantee that all cores in use will be reached with it).

MPI

Message Passing Interface (MPI) is a standard traditionally used on high-performance computing (HPC) clusters, and you can access it from Python using a number of libraries for interacting with one of the underlying C implementations of MPI (typically MPICH or OpenMPI). Now that we have concurrent.futures, it’s a good idea to look for libraries supporting that API so we won’t have to write anything MPI-specific (if it’s there, we can use it; if not, we can use something else). mpi4py [MPI] plans to add an mpi4py.futures module in its version 2.1, but, at the time this article was written, version 2.1 was not yet a stable release (and standard pip commands were fetching version 2.0), so if you want to experiment with mpi4py.futures, you’ll have to download the in-development version of mpi4py.

Addendum for OpenMPI

In OpenMPI, Listing 2 won’t work because

__name__ == "__main__"

in all processes. The OpenMPI equivalent is:

os.environ['OMPI_COMM_WORLD_RANK'] == '0'

Additionally, in OpenMPI all processes will start running even before the MPIPoolExecutor is instantiated, so you can’t rely on delaying that until after the results of long initial calculations have been written to a file: the subprocesses will either have to poll the file for being ready, or else load it on-demand when they get the first task and cache it from there.

On a typical GNU/Linux box, you can do this as follows: become root (sudo su), make the mpicc command available (on RedHat-based systems that requires typing something like module add mpi/mpich-x86_64 after installing MPICH, or equivalent after installing OpenMPI; Debian/Ubuntu systems make it available by default when one of these packages is installed), make sure the python-dev or python-devel package is installed (apt-get install python-dev or yum install python-devel), and then try:

  pip install https://bitbucket.org/mpi4py/mpi4py/get/master.tar.gz

At this point Listing 1 can be changed (after adding extra indentation to each line) by putting Listing 3 before the beginning. Here, we check if we are being run under MPI, and, if so, we use it; otherwise we drop back to the previous Listing 1 behaviour (use concurrent.futures if available, otherwise our ‘dummy’ object). A subtlety is that mpi4py.futures will work only if it is run in a command like this:

  mpiexec -n 4 python -m mpi4py.futures script.py  args...

and that in an MPI environment too (i.e. the above module add command will need to have been run in the same shell, if appropriate). Some versions of mpiexec also have options for forwarding standard input and environment variables to processes, but not all do, so you’ll probably have to arrange for the script to run without these. Also, any script that uses sys.stdout.isatty() to determine whether or not output is being redirected will need to be updated for running under MPI, because MPI always redirects the output from the program’s point of view even when it’s still being sent to the terminal.

If you want MPI to use other machines in a cluster, then how to do this depends on your MPI version: it may involve extra setup steps before starting your program, as is the case with mpd in older versions of MPICH2 such as version 1.2. But in MPICH2 version 1.5 (the current mpich2 package in Debian Jessie), and in MPICH 3.1 (Jessie’s current mpich package), the default process manager is hydra and you simply create a text file listing the host names (or IP addresses) of the cluster machines, ensure they can all ssh into each other without password and share the filesystem, and pass this text file to mpiexec using the -f parameter or the HYDRA_HOST_FILE environment variable. (In OpenMPI you use the --hostfile parameter.) Modern MPI implementations are also able to checkpoint and restart processes in the event of failure of one or more machines in the cluster; refer to each implementation’s documentation for how to set this up.

If our script is run outside of MPI, then our detecting and handling of ‘no MPI’ is a little subtle because mpi4py.futures (if installed) will still successfully import, and it will even let you instantiate an MPIPoolExecutor(), but then will likely crash after you submit a job, and catching that crash from your Python script is very awkward (normal try/except won’t cut it). So we need to look at the command line to check we’re being run in the right way for MPI first. But we can’t just inspect sys.argv, because that will have been rewritten before control is passed to our script, so we have to get the original command line from the ps command. The ps parameters in Listing 3 were tested on both GNU/Linux and Mac OS X, and if any system does not support them then we should just fall back to the safety of not using MPI.

try:
  import os, commands
  commands.getoutput(
    "ps -p " + str(os.getpid()) + " -o args") \
    .index("-m mpi4py.futures") # ValueError
                                # if not found
  import mpi4py.futures
  executor = mpi4py.futures.MPIPoolExecutor()
except:
  # etc (as Listing 1, extra indent)
			
Listing 3

A pattern for moving long-running functions to other CPUs

If you have a function that normally runs quite quickly but can take a long time on certain inputs, it might not pay to have every call run on a different CPU, since in fast cases the overheads of doing so would outweigh the savings. But it might be useful if the program could determine for itself whether or not to run this particular call on a different CPU.

Since, in Python, any function can be turned into a generator by replacing return x with yield x ; return (giving a generator that yields a single item), the pattern shown in Listing 4 seems natural as a way to refactor existing sequential code into multiprocessing. The part marked ‘first part of function goes here’ will be repeated on the other CPU, which seems wasteful but could be faster than passing variables across if they are large; it is assumed that this part of the function does what is necessary for us to be able to figure out if the function is likely to take a long time, e.g. if the first part of the function shows that we are now generating a LOT of intermediate data (which is why we probably don’t want to pass it all across once we’ve decided we’re better off running in the background). The my_function_wrapped part is necessary because submit() takes only functions not generators.

def my_function(param, can_background = True):
  # first part of function goes here
  if (can_background and 
   likely_to_take_a_long_time()):
    job = executor.submit(my_function_wrapped,
      param)
    yield "backgrounded"
    yield job.result() ; return
  # rest of function goes here
  # change all 'return x' to 'yield x ; return'

def my_function_wrapped(param):
  return my_function(param, False).next()

def caller():
  gen = my_function(param)
  result = gen.next()
  if result == "backgrounded":
    # Do something else for a while...
    result = gen.next() # get actual result
			
Listing 4

I’m not suggesting writing new programs like Listing 4, but it might be a useful pattern for refactoring legacy sequential code.

Avoiding CPU overload

The above pattern for moving long-running functions to other CPUs should work as-is on MPI, but with concurrent.futures it will result in one too many processes, because ProcessPoolExecutor defaults to running as many parallel processes as there are CPU cores, on the assumption that the control program won’t need much CPU itself, an assumption that is likely to break down when using this pattern. The Linux and BSD kernels are of course perfectly capable of multiplexing a load that’s greater than the number of available CPU cores, but it might be more efficient to reduce the number of ‘slave’ processes by 1 to allow the master to have a CPU to itself. This can be accomplished using code like that in Listing 5.

import multiprocessing
num_cpus = multiprocessing.cpu_count()
if num_cpus < 2:
  raise Exception("Not enough CPUs")
from concurrent.futures import \
  ProcessPoolExecutor
executor = ProcessPoolExecutor(num_cpus - 1)
			
Listing 5

Evaluation

The above methods were used to partially parallelise Annotator Generator [Brown12] resulting in a 15% overall speed increase when using concurrent.futures as compared to the unmodified code. This could almost certainly be improved with more parallelisation (recall Amdahl’s Law: the speedup is limited by the fraction of the program that must be sequential). Only a fraction of a percent was saved by subtracting 1 from the number of CPUs to achieve a more even load.

Results using MPI were not so satisfactory. When running with 4 processes on a single quad-core machine using MPI, the program was actually slowed down by 8% compared with running single-core, which in turn was 6% slower than the unmodified code. I believe that 6% represents the overhead of converting functions into generators, and could be eliminated by duplicating and modifying the code for the single-core case, but that would introduce a maintenance issue unless it could somehow be automated. Given Annotator Generator’s desktop usage scenario, the prevalence of multi-core CPUs on desktops, and the speedup using concurrent.futures, it doesn’t seem very high-priority to invest code complexity in saving that 6% in the single-core case. MPI’s poor performance is more worrisome, but I later discovered it was due to the system running low on RAM (and therefore being slowed down by more page faults) while running four separate MPI processes: concurrent.futures was able to share the data structures, but MPI wasn’t (even though it could use shared memory for some message passing). Once I reduced the size of the input, MPI was 14% faster than the single-core case and concurrent.futures was 18% faster than the single-core case. Perhaps MPI would perform better on a real cluster, which I have not yet had an opportunity to test. A cluster of virtual machines with OpenMPI ran 5% faster than the single-core case, but because these machines were virtual and all running on the same actual machine, I do not believe that result to be meaningful other than as a demonstration that the underlying protocols were working. Still, I suspect a greater deal of parallelisation is required to outweigh the overheads of MPI beyond those of concurrent.futures. But as it can now use the same API as concurrent.futures, not to mention SCOOP, it is now possible to write for a single concurrency API and experiment to see which framework gives the best speed improvement to your particular application.

References

[Brown12] Silas S. Brown. Web Annotation with Modified-Yarowsky and Other Algorithms. Overload issue 112 (December 2012) page 4. The modified code is now at http://people.ds.cam.ac.uk/ssb22/adjuster/annogen.html

[MPI] MPI for Python http://mpi4py.scipy.org/

[Python] Python library documentation https://docs.python.org/3/library/concurrent.futures.html

[SCOOP] SCOOP (Scalable COncurrent Operations in Python) http://scoop.readthedocs.io/

Overload Journal #137 - February 2017 + Programming Topics