Parallel processing#

To speed up processing of large datasets, it can be helpful to distribute processing tasks among multiple CPU cores or computers. Parallel processing in Python is a huge topic. The goal of this chapter is to introduce you to the topic and give you some basic tools to do parallel processing on a single machine. First, we need to introduce some terminology:

Threading: Threads are components of a process that can run in parallel while sharing the same memory space, i.e., the code to be executed as well as all the variables declared in the program. Threads are more lightweight and have lower overhead compared to processes. In python, the threading module is used to run parallel threads.

Multiprocessing: A process is an instance of a computer program being executed. Each process has its own memory space. Spawning processes is a bit slower than spawning threads. In python, the multiprocessing module is used to run independent parallel processes by using subprocesses (instead of threads).

In synchronous parallel execution, the processes are completed in the same order in which they were started. This is achieved by locking the main program until the respective processes are finished.

In asynchronous parallel execution, the processes are not locked. Since the next process does not wait for the previous process to finish, the order of the results do not correspond with the order of the processes. This is usually faster than synchronous execution.

Although you can more easily share objects between threads, you have to take extra care to make sure that two threads don’t write to the same object. You also do not know the order with which treads write to memory, so this can cause troubles if you have dependencies between threads. Because of the added programming overhead of object synchronization, multi-threaded programming is more bug-prone. Hence, we focus here on multi-processes programming, which is easier to implement. In either case, parallel processing comes with an overhead, which can even increase processing time of small tasks.

Example#

We will use a simple function to demonstrate parallel processing in Python. This is by no means meant to be a meaningful example. It is only illustrative. The function count_letters below counts the number of letters in a word. It takes a string argument as input and returns a number. The function also takes a keyword argument that specifies which letters to count. By default, the keyword argument is set such that all letters in a word are counted.

def count_letters(word, letter=""):

    """Returns the numbers of letters specified by letter."""

    if letter == "":
        letter = word

    selection = [x for x in word if x in letter]
    
    return len(selection)

To illustrate the example without parallelization, we loop over a list of words. The results are collected in a list named letter_count.

words = ["map", "entry", "buyer", "hall", "clothes", "patience", 
         "negotiation", "courage", "grandmother", "writer"]

letter_count = []
for word in words:
    letter_count.append(count_letters(word, "e"))
    
letter_count
[0, 1, 1, 0, 1, 2, 1, 1, 1, 1]

Multiprocessing#

The multiprocessing module is a standard python library that allows you to run multiple processes simultaneously. Many geoprocessing tasks can be easily split into parallel processes without much effort. The multiprocessing module provides here some easy to use functionality. Note, the package requires that the function you want to parallize is importable by the subprocesses. This is automatically the case when you execute your code via a Python file, but not in interactive mode. This means that the examples using multiprocessing.Pool do not work out of the box in the interactive interpreter including jupyter notebooks. As a workaround, you can put your function in a separate module (python file) and import it as we do below:

from multiprocessing import Pool, cpu_count
from geopy_python_fun import count_letters

The maximum number of processes you can run at a time is limited by the number of processors in your computer. You can query the number of processors on your machine with the cpu_count() function in multiprocessing.

cpu_count()
8

Pool class#

The Pool class of the multiprocessing module provides synchronous and asynchronous versions of the methods: apply(), map() and, starmap(). These methods can be used to make any function run in parallel.

  1. Synchronous execution

  • Pool.map() and Pool.starmap()

  • Pool.apply()

  1. Asynchronous execution

  • Pool.map_async() and Pool.starmap_async()

  • Pool.apply_async()

Pool.map#

The Pool class provides a parallel version of Python’s map function. Pool.map() accepts only a single iterable object as argument, in our example the list words. Before using Pool.map(), you need to instantiate the processing pool with the desired number of parallel processes. Finally, you need to shutdown the processing pool after you are done with close().

# Init pool for 3 parallel processes
pool = Pool(3)

letter_count = pool.map(count_letters, words)

# Close pool
pool.close()

letter_count
[3, 5, 5, 4, 7, 8, 11, 7, 11, 6]

Pool.apply#

In our example, we may also want to supply the function with the keyword argument letter. We cannot do this with map. We may change the default values of the count_letters() function to work around this limitation. However, if you want to pass multiple arguments and keyword arguments to a function, you are better off using apply(). The function Pool.apply() allows you to pass multiple arguments to your function. You can specify both positional arguments (args) and keyword arguments (kwds). Positional arguments are passed as a tuple. Since our example only takes a single positional argument, we need to pack it into a tuple as follows: (word, ). Keyword arguments are passed as a dictionary. Unlike map, apply requires us to loop over our iterable words. You could use a list comprehension to achieve the same thing, but it might be more difficult to read. Note, we do not cover the function starmap() here. Pool.starmap() works similar to map but it allows you to pass multiple arguments by packing them into a tuple. This also requires you to write a loop, but it is less convenient and transparent than apply.

# Init pool for 3 parallel processes
pool = Pool(3)

# pool.apply the count_letters function
letter_count = []
for word in words:
    this_count = pool.apply(count_letters, args=(word, ),
                            kwds={"letter": "e"})
    letter_count.append(this_count)

# Close pool
pool.close()

letter_count
[0, 1, 1, 0, 1, 2, 1, 1, 1, 1]

Pool.apply_async#

Sometimes not all subprocesses take the same amount of time, e.g., when the underlying data or model complexity varies. In this case, it is faster to run all subprocesses asynchronously. A major consequence of using Pool.apply_async() is that the subprocesses do not finish in the same order and that collecting the data takes some extra steps. Perhaps, you do not need the subprocesses to return anything to the main program, e.g., by making them write their results to disk. In this case, the only difference to using Pool.apply() is that you need to tell your program where to wait for the subprocesses to finish using the join() function. In other words, the execution of the code following join() is halted until all subprocesses are finished.

When you want to collect the results returned by the subprocesses, you have two choices: 1) using a callback function, or 2) by extracting the results from the returned list of pool.ApplyResult objects. Both examples are shown below.

# Init pool for 3 parallel processes
pool = Pool(3)

# define callback function to collect the output
def collect_result(result):
    global letter_count
    letter_count.append(result)

# pool.apply the count_letters function
letter_count = []
for word in words:
    pool.apply_async(count_letters, args=(word, ), 
                     kwds={"letter": "e"},
                     callback=collect_result)

# Close pool
pool.close()

# Waits with the execution of the subsequent code until all processes
# in the queue are done.
pool.join()

letter_count
[0, 1, 1, 0, 1, 2, 1, 1, 1, 1]
# Init pool for 3 parallel processes
pool = Pool(3)

# pool.apply the count_letters function
result = []
for word in words:
    result.append(pool.apply_async(count_letters, args=(word, ), 
                  kwds={"letter": "e"}))
    
# result is a list of pool.ApplyResult objects
# retrieve letter results from that list of objects
letter_count = [x.get() for x in result]

# Close pool
pool.close()

# Wait with the execution of the subsequent code until all processes 
# in the queue are done.
pool.join()

letter_count
[0, 1, 1, 0, 1, 2, 1, 1, 1, 1]