Skip to main content

Python Concurrent Futures

In modern programming, writing code that can perform multiple tasks simultaneously (concurrency) is essential for building efficient applications. Python's concurrent.futures module, introduced in Python 3.2, provides a high-level interface for asynchronously executing callables.

Introduction to Concurrent Futures

The concurrent.futures module simplifies writing concurrent code by abstracting away many of the complexities associated with threading and multiprocessing. It provides two main executor classes:

  • ThreadPoolExecutor: Uses threads for concurrent execution
  • ProcessPoolExecutor: Uses processes for parallel execution

These executors manage pools of workers, handle task distribution, and provide convenient ways to check on the status of tasks and collect their results.

Why Use Concurrent Futures?

Before diving into the details, let's understand why you might want to use this module:

  • Simplicity: It provides a clean, straightforward API for concurrent programming
  • Flexibility: Switch between threads and processes with minimal code changes
  • Control: Features like timeouts, cancellations, and callbacks for better control
  • Resource Management: Automatically manages worker pools

Basic Usage: ThreadPoolExecutor

Let's start with a simple example using ThreadPoolExecutor:

python
import concurrent.futures
import time

def task(name):
print(f"Task {name} starting...")
time.sleep(1) # Simulate work
return f"Task {name} completed"

# Using ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Submit tasks to the executor
future1 = executor.submit(task, "A")
future2 = executor.submit(task, "B")
future3 = executor.submit(task, "C")

# Get results as they complete
print(future1.result())
print(future2.result())
print(future3.result())

Output:

Task A starting...
Task B starting...
Task C starting...
Task A completed
Task B completed
Task C completed

In this example, we:

  1. Created a ThreadPoolExecutor with 3 worker threads
  2. Submitted three tasks to the executor using the submit() method
  3. Retrieved the results using the result() method on the returned Future objects

Understanding Futures

A Future represents a computation that may or may not have completed yet. It's a proxy for a result that will be available later. Key methods of Future objects include:

  • result(timeout=None): Returns the result when available (blocks until then)
  • done(): Checks if the task has completed
  • cancel(): Attempts to cancel the task
  • add_done_callback(fn): Adds a callback to be executed when the task completes
python
import concurrent.futures
import time

def task(name, seconds):
print(f"Task {name} starting, will take {seconds} seconds")
time.sleep(seconds)
return f"Task {name} completed after {seconds} seconds"

with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Create a dictionary of futures
future_to_task = {
executor.submit(task, f"Task-{i}", i+1): f"Task-{i}"
for i in range(3)
}

# Check tasks as they complete
for future in concurrent.futures.as_completed(future_to_task):
task_name = future_to_task[future]
try:
data = future.result()
print(f"{task_name} returned: {data}")
except Exception as exc:
print(f"{task_name} generated an exception: {exc}")

Output:

Task Task-0 starting, will take 1 seconds
Task Task-1 starting, will take 2 seconds
Task Task-2 starting, will take 3 seconds
Task-0 returned: Task Task-0 completed after 1 seconds
Task-1 returned: Task Task-1 completed after 2 seconds
Task-2 returned: Task Task-2 completed after 3 seconds

The as_completed() function returns futures as they complete, so you don't have to wait for all tasks to finish before processing some results.

Using ProcessPoolExecutor

ProcessPoolExecutor works similarly to ThreadPoolExecutor but uses separate processes instead of threads:

python
import concurrent.futures
import math
import time

# CPU-bound task that benefits from multiprocessing
def calculate_prime_factors(number):
factors = []
divisor = 2

while divisor <= number:
if number % divisor == 0:
factors.append(divisor)
number = number / divisor
else:
divisor += 1

return factors

numbers = [1112272535649, 90191012901919, 112272535549, 12901919]

# Using ProcessPoolExecutor for CPU-bound task
start_time = time.time()

with concurrent.futures.ProcessPoolExecutor() as executor:
results = executor.map(calculate_prime_factors, numbers)

for number, factors in zip(numbers, results):
print(f"Prime factors of {number}: {factors}")

print(f"Time taken: {time.time() - start_time:.2f} seconds")

Output:

Prime factors of 1112272535649: [33941, 32771399]
Prime factors of 90191012901919: [3, 13, 2308999665177]
Prime factors of 112272535549: [112272535549]
Prime factors of 12901919: [3, 7, 613425]
Time taken: 0.87 seconds

This example demonstrates using the convenient map() method, which:

  1. Takes a function and an iterable
  2. Applies the function to each item in the iterable in parallel
  3. Returns results in the same order as the input iterable

When to Use ThreadPoolExecutor vs ProcessPoolExecutor

Choosing the right executor depends on your workload:

ThreadPoolExecutorProcessPoolExecutor
I/O-bound tasks (network requests, file operations)CPU-bound tasks (calculations, data processing)
Lower overheadHigher overhead due to process creation
Limited by GIL (Global Interpreter Lock)Not limited by GIL
Shared memory between threadsIsolated memory between processes

I/O-Bound Example: Downloading Web Pages

python
import concurrent.futures
import requests
import time

def download_page(url):
response = requests.get(url)
return f"Downloaded {url}, status: {response.status_code}, length: {len(response.text)} chars"

urls = [
'https://www.python.org',
'https://www.google.com',
'https://www.github.com',
'https://www.stackoverflow.com',
'https://www.wikipedia.org'
]

# Sequential execution
start_time = time.time()
for url in urls:
result = download_page(url)
print(result)
sequential_time = time.time() - start_time
print(f"Sequential time: {sequential_time:.2f} seconds\n")

# Parallel execution with ThreadPoolExecutor
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
results = executor.map(download_page, urls)

for result in results:
print(result)
parallel_time = time.time() - start_time
print(f"Parallel time: {parallel_time:.2f} seconds")
print(f"Speedup: {sequential_time/parallel_time:.2f}x")

For I/O-bound tasks like web requests, ThreadPoolExecutor is ideal and can provide significant speedups.

CPU-Bound Example: Image Processing

python
import concurrent.futures
import time
from PIL import Image, ImageFilter
import os

def process_image(image_path):
img = Image.open(image_path)
img = img.filter(ImageFilter.GaussianBlur(15))
filename, ext = os.path.splitext(image_path)
new_path = filename + "_blurred" + ext
img.save(new_path)
return f"Processed {image_path} to {new_path}"

# Assuming you have images in a directory
image_paths = ['image1.jpg', 'image2.jpg', 'image3.jpg'] # Replace with actual paths

# Using ProcessPoolExecutor for CPU-bound image processing
with concurrent.futures.ProcessPoolExecutor() as executor:
results = list(executor.map(process_image, image_paths))

for result in results:
print(result)

For CPU-intensive tasks like image processing, ProcessPoolExecutor bypasses the GIL and makes better use of multiple CPU cores.

Advanced Features

Timeouts

You can set timeouts to prevent waiting indefinitely for tasks:

python
import concurrent.futures
import time

def slow_task():
time.sleep(3)
return "Task completed"

with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(slow_task)

try:
# Wait for at most 1 second
result = future.result(timeout=1)
print(result)
except concurrent.futures.TimeoutError:
print("Task took too long!")
# Cancel the task if needed
future.cancel()

Output:

Task took too long!

Callbacks

You can register callbacks to be executed when a task completes:

python
import concurrent.futures

def task(n):
return n * 2

def on_completed(future):
print(f"Task completed with result: {future.result()}")

with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(task, 10)
future.add_done_callback(on_completed)

# Do other work while task is running
print("Main thread continues working...")

Output:

Main thread continues working...
Task completed with result: 20

Exception Handling

Exceptions in worker threads or processes are preserved and re-raised when you call result():

python
import concurrent.futures

def might_fail(n):
if n == 0:
raise ValueError("Cannot divide by zero")
return 100 / n

with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(might_fail, n) for n in [5, 0, 10]]

for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
print(f"Task succeeded with result: {result}")
except Exception as e:
print(f"Task failed with exception: {e}")

Output:

Task succeeded with result: 20.0
Task failed with exception: Cannot divide by zero
Task succeeded with result: 10.0

Real-World Use Case: Web Scraper

Let's build a simple web scraper that downloads and parses multiple web pages concurrently:

python
import concurrent.futures
import requests
from bs4 import BeautifulSoup
import time

def fetch_and_parse(url):
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser')
title = soup.title.string if soup.title else "No title"
links = len(soup.find_all('a'))
return {
'url': url,
'title': title,
'status': response.status_code,
'links': links
}
else:
return {
'url': url,
'title': None,
'status': response.status_code,
'links': 0
}
except Exception as e:
return {
'url': url,
'title': None,
'status': str(e),
'links': 0
}

# List of websites to scrape
websites = [
'https://www.python.org',
'https://www.github.com',
'https://www.stackoverflow.com',
'https://www.wikipedia.org',
'https://www.reddit.com',
'https://www.nytimes.com',
'https://www.bbc.com',
'https://www.cnn.com'
]

start_time = time.time()

# Using ThreadPoolExecutor for I/O-bound web scraping
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
# Submit all scraping tasks
future_to_url = {executor.submit(fetch_and_parse, url): url for url in websites}

# Process results as they complete
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
print(f"URL: {data['url']}")
print(f"Title: {data['title']}")
print(f"Status: {data['status']}")
print(f"Links found: {data['links']}")
print("-" * 40)
except Exception as e:
print(f"{url} generated an exception: {e}")

print(f"Total time: {time.time() - start_time:.2f} seconds")

This scraper demonstrates a practical application of concurrent.futures for I/O-bound tasks, efficiently collecting data from multiple websites simultaneously.

Best Practices

  1. Use context managers (with statements) when creating executors to ensure proper cleanup of resources.
  2. Choose the right executor for your workload:
    • ThreadPoolExecutor for I/O-bound tasks
    • ProcessPoolExecutor for CPU-bound tasks
  3. Be careful with shared resources when using multiple threads or processes.
  4. Handle exceptions properly to prevent silent failures.
  5. Set appropriate timeouts to avoid waiting indefinitely.
  6. Consider the overhead of creating processes vs. threads when determining pool size.
  7. Don't use too many workers - more isn't always better:
    • For I/O-bound tasks: workers = CPUs × 5 (or more, depending on the task)
    • For CPU-bound tasks: workers = CPUs (or CPUs - 1)

Summary

The concurrent.futures module provides a clean and efficient way to implement concurrency in Python. It offers:

  • A high-level interface for both thread and process-based concurrency
  • Control over parallel execution with futures, callbacks, and timeouts
  • Simple methods to submit tasks and collect results
  • Easy switching between threading and multiprocessing models

By understanding when and how to use ThreadPoolExecutor and ProcessPoolExecutor, you can write more efficient Python code that takes advantage of both I/O concurrency and multiple CPU cores.

Additional Resources

Exercises

  1. I/O Exercise: Write a program that downloads 10 large files concurrently using ThreadPoolExecutor and compare the performance to sequential downloads.

  2. CPU Exercise: Create a program that generates prime numbers up to a certain limit using ProcessPoolExecutor, dividing the work among multiple processes.

  3. Mixed Workload: Build an application that processes images from a directory (CPU-bound) and uploads them to a server (I/O-bound), using the appropriate executor for each task.

  4. Error Handling: Modify the web scraper example to retry failed requests a certain number of times before giving up.

  5. Advanced: Create a task scheduler that manages both I/O and CPU tasks, dynamically choosing the appropriate executor based on the task type.



If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)