Futures and easy parallelisation

Here is one small snippet that it’s very useful when dealing with quick scripts that perform slow tasks and can benefit from running in parallel.

Enter the Futures

Futures are a convenient abstraction in Python for running tasks in the background.

There are also asyncio Futures that can be used with asyncio loops, which work in a similar way, but require an asyncio loop. The Futures we are talking in this post work over regular multitask: threads or processes.

The operation is simple, you create an Executor object and submit a task, defined as a callable (a function) and its arguments. This immediately returns you a Future object, which can be used to check whether the task is in progress or finished, and if it’s finished, the result (the returned value of the function) can be retrieved.

Executors could be based in Threads or Processes.

The snippet

Ok, that’s all great, but let’s go to see the code, which is quite easy. Typically, in your code you will have a list of tasks that you want to execute in parallel. Probably you tried sequentially first, but it takes a while. Running them in parallel is easy following this pattern.

from concurrent.futures import ThreadPoolExecutor

executor = ThreadPoolExecutor(NUM_WORKERS)

def function(x):
    return x ** 2

arguments = [[2], [3], [4], [5], [6]]

futures_array = [executor.submit(function, *arg) for arg in arguments]
result = [future.result() for future in futures_array]

The initial block creates a ThreadPoolExecutor with 5 workers. The number can be tweaked depending on the system. For slow I/O operations like calling external URLs, this number could easily be 10 or 20.

Then, we wrap our task into a function that returns the desired result. In this case, it receives a number and returns it to the power of two.

We prepare the arguments, like in this case the numbers to calculate. Note that each element needs to be a tuple or list, as it will be passed to the submit method of the executor with *arg.

Finally, the juicy bit. We create a futures_array submitting all the information to the executor. This returns immediately with all the future objects.

Next, we call the .result() method on each future, retrieving the result. Note that the result method is blocking, so it won’t continue until all tasks are done.

Et voilà! The results are run in parallel in 5 workers!

A more realistic scenario

Sure, it will be strange to calculate squared numbers in Python in that way. But here is a sightly more common scenario for this kind of parallel operation: Retrieve web addresses.

from concurrent.futures import ThreadPoolExecutor

import requests
from urllib.parse import urljoin

executor = ThreadPoolExecutor(NUM_WORKERS)

def retrieve(root_url, path):
    url = urljoin(root_url, path)
    print(f'{time.time()} Retrieving {url}')
    result = requests.get(url)
    return result

arguments = [('https://google.com/', 'search'),
             ('https://www.facebook.com/', 'login'),
             ('https://nyt.com/', 'international')]
futures_array = [executor.submit(retrieve, *arg) for arg in arguments]
result = [future.result() for future in futures_array]

In this case, our function retrieves a URL based on a root and a path. As you can see, the arguments are, in each case, a tuple with the two values.

The retrieve function joins the root and the path and gets the URL (we use the requests module). The print statement work as logs to see the progress of the tasks.

If you execute the code, you’ll notice that the first two tasks start almost at the same time, while the third takes a little more of time and start around 275ms later. That’s because we created two workers, and while they are busy, the third task needs to wait. All this is handled automatically.

1645112889.005687 Retrieving https://google.com/search
1645112889.006123 Retrieving https://www.facebook.com/login
1645112889.2814898 Retrieving https://nyt.com/international
[<Response [200]>, <Response [200]>, <Response [200]>]

So that’s it! This is a pattern that I use from time to time in small scripts that very handy and gives great results, without having to complicate the code to deal with cumbersome thread classes or similar.

Photo by KEHN HERMANO on Pexels.com

One more thing

You may structure the submit calls with the specific number of parameters, if you prefer

arguments = [('https://google.com/', 'search'),
             ('https://www.facebook.com/', 'login'),
             ('https://nyt.com/', 'international')]
futures_array = [executor.submit(retrieve, root, path) for root, path in arguments]
result = [future.result() for future in futures_array]

The *arg part allows to use a variable number of arguments using defaults depending on the specific call, and allow for fast copy/paste.

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

%d bloggers like this: