Python Concurrent Futures
In modern programming, writing code that can perform multiple tasks simultaneously (concurrency) is essential for building efficient applications. Python's concurrent.futures
module, introduced in Python 3.2, provides a high-level interface for asynchronously executing callables.
Introduction to Concurrent Futures
The concurrent.futures
module simplifies writing concurrent code by abstracting away many of the complexities associated with threading and multiprocessing. It provides two main executor classes:
- ThreadPoolExecutor: Uses threads for concurrent execution
- ProcessPoolExecutor: Uses processes for parallel execution
These executors manage pools of workers, handle task distribution, and provide convenient ways to check on the status of tasks and collect their results.
Why Use Concurrent Futures?
Before diving into the details, let's understand why you might want to use this module:
- Simplicity: It provides a clean, straightforward API for concurrent programming
- Flexibility: Switch between threads and processes with minimal code changes
- Control: Features like timeouts, cancellations, and callbacks for better control
- Resource Management: Automatically manages worker pools
Basic Usage: ThreadPoolExecutor
Let's start with a simple example using ThreadPoolExecutor
:
import concurrent.futures
import time
def task(name):
print(f"Task {name} starting...")
time.sleep(1) # Simulate work
return f"Task {name} completed"
# Using ThreadPoolExecutor
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Submit tasks to the executor
future1 = executor.submit(task, "A")
future2 = executor.submit(task, "B")
future3 = executor.submit(task, "C")
# Get results as they complete
print(future1.result())
print(future2.result())
print(future3.result())
Output:
Task A starting...
Task B starting...
Task C starting...
Task A completed
Task B completed
Task C completed
In this example, we:
- Created a
ThreadPoolExecutor
with 3 worker threads - Submitted three tasks to the executor using the
submit()
method - Retrieved the results using the
result()
method on the returnedFuture
objects
Understanding Futures
A Future
represents a computation that may or may not have completed yet. It's a proxy for a result that will be available later. Key methods of Future
objects include:
result(timeout=None)
: Returns the result when available (blocks until then)done()
: Checks if the task has completedcancel()
: Attempts to cancel the taskadd_done_callback(fn)
: Adds a callback to be executed when the task completes
import concurrent.futures
import time
def task(name, seconds):
print(f"Task {name} starting, will take {seconds} seconds")
time.sleep(seconds)
return f"Task {name} completed after {seconds} seconds"
with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
# Create a dictionary of futures
future_to_task = {
executor.submit(task, f"Task-{i}", i+1): f"Task-{i}"
for i in range(3)
}
# Check tasks as they complete
for future in concurrent.futures.as_completed(future_to_task):
task_name = future_to_task[future]
try:
data = future.result()
print(f"{task_name} returned: {data}")
except Exception as exc:
print(f"{task_name} generated an exception: {exc}")
Output:
Task Task-0 starting, will take 1 seconds
Task Task-1 starting, will take 2 seconds
Task Task-2 starting, will take 3 seconds
Task-0 returned: Task Task-0 completed after 1 seconds
Task-1 returned: Task Task-1 completed after 2 seconds
Task-2 returned: Task Task-2 completed after 3 seconds
The as_completed()
function returns futures as they complete, so you don't have to wait for all tasks to finish before processing some results.
Using ProcessPoolExecutor
ProcessPoolExecutor
works similarly to ThreadPoolExecutor
but uses separate processes instead of threads:
import concurrent.futures
import math
import time
# CPU-bound task that benefits from multiprocessing
def calculate_prime_factors(number):
factors = []
divisor = 2
while divisor <= number:
if number % divisor == 0:
factors.append(divisor)
number = number / divisor
else:
divisor += 1
return factors
numbers = [1112272535649, 90191012901919, 112272535549, 12901919]
# Using ProcessPoolExecutor for CPU-bound task
start_time = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
results = executor.map(calculate_prime_factors, numbers)
for number, factors in zip(numbers, results):
print(f"Prime factors of {number}: {factors}")
print(f"Time taken: {time.time() - start_time:.2f} seconds")
Output:
Prime factors of 1112272535649: [33941, 32771399]
Prime factors of 90191012901919: [3, 13, 2308999665177]
Prime factors of 112272535549: [112272535549]
Prime factors of 12901919: [3, 7, 613425]
Time taken: 0.87 seconds
This example demonstrates using the convenient map()
method, which:
- Takes a function and an iterable
- Applies the function to each item in the iterable in parallel
- Returns results in the same order as the input iterable
When to Use ThreadPoolExecutor vs ProcessPoolExecutor
Choosing the right executor depends on your workload:
ThreadPoolExecutor | ProcessPoolExecutor |
---|---|
I/O-bound tasks (network requests, file operations) | CPU-bound tasks (calculations, data processing) |
Lower overhead | Higher overhead due to process creation |
Limited by GIL (Global Interpreter Lock) | Not limited by GIL |
Shared memory between threads | Isolated memory between processes |
I/O-Bound Example: Downloading Web Pages
import concurrent.futures
import requests
import time
def download_page(url):
response = requests.get(url)
return f"Downloaded {url}, status: {response.status_code}, length: {len(response.text)} chars"
urls = [
'https://www.python.org',
'https://www.google.com',
'https://www.github.com',
'https://www.stackoverflow.com',
'https://www.wikipedia.org'
]
# Sequential execution
start_time = time.time()
for url in urls:
result = download_page(url)
print(result)
sequential_time = time.time() - start_time
print(f"Sequential time: {sequential_time:.2f} seconds\n")
# Parallel execution with ThreadPoolExecutor
start_time = time.time()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
results = executor.map(download_page, urls)
for result in results:
print(result)
parallel_time = time.time() - start_time
print(f"Parallel time: {parallel_time:.2f} seconds")
print(f"Speedup: {sequential_time/parallel_time:.2f}x")
For I/O-bound tasks like web requests, ThreadPoolExecutor
is ideal and can provide significant speedups.
CPU-Bound Example: Image Processing
import concurrent.futures
import time
from PIL import Image, ImageFilter
import os
def process_image(image_path):
img = Image.open(image_path)
img = img.filter(ImageFilter.GaussianBlur(15))
filename, ext = os.path.splitext(image_path)
new_path = filename + "_blurred" + ext
img.save(new_path)
return f"Processed {image_path} to {new_path}"
# Assuming you have images in a directory
image_paths = ['image1.jpg', 'image2.jpg', 'image3.jpg'] # Replace with actual paths
# Using ProcessPoolExecutor for CPU-bound image processing
with concurrent.futures.ProcessPoolExecutor() as executor:
results = list(executor.map(process_image, image_paths))
for result in results:
print(result)
For CPU-intensive tasks like image processing, ProcessPoolExecutor
bypasses the GIL and makes better use of multiple CPU cores.
Advanced Features
Timeouts
You can set timeouts to prevent waiting indefinitely for tasks:
import concurrent.futures
import time
def slow_task():
time.sleep(3)
return "Task completed"
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(slow_task)
try:
# Wait for at most 1 second
result = future.result(timeout=1)
print(result)
except concurrent.futures.TimeoutError:
print("Task took too long!")
# Cancel the task if needed
future.cancel()
Output:
Task took too long!
Callbacks
You can register callbacks to be executed when a task completes:
import concurrent.futures
def task(n):
return n * 2
def on_completed(future):
print(f"Task completed with result: {future.result()}")
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(task, 10)
future.add_done_callback(on_completed)
# Do other work while task is running
print("Main thread continues working...")
Output:
Main thread continues working...
Task completed with result: 20
Exception Handling
Exceptions in worker threads or processes are preserved and re-raised when you call result()
:
import concurrent.futures
def might_fail(n):
if n == 0:
raise ValueError("Cannot divide by zero")
return 100 / n
with concurrent.futures.ThreadPoolExecutor() as executor:
futures = [executor.submit(might_fail, n) for n in [5, 0, 10]]
for future in concurrent.futures.as_completed(futures):
try:
result = future.result()
print(f"Task succeeded with result: {result}")
except Exception as e:
print(f"Task failed with exception: {e}")
Output:
Task succeeded with result: 20.0
Task failed with exception: Cannot divide by zero
Task succeeded with result: 10.0
Real-World Use Case: Web Scraper
Let's build a simple web scraper that downloads and parses multiple web pages concurrently:
import concurrent.futures
import requests
from bs4 import BeautifulSoup
import time
def fetch_and_parse(url):
try:
response = requests.get(url, timeout=5)
if response.status_code == 200:
soup = BeautifulSoup(response.text, 'html.parser')
title = soup.title.string if soup.title else "No title"
links = len(soup.find_all('a'))
return {
'url': url,
'title': title,
'status': response.status_code,
'links': links
}
else:
return {
'url': url,
'title': None,
'status': response.status_code,
'links': 0
}
except Exception as e:
return {
'url': url,
'title': None,
'status': str(e),
'links': 0
}
# List of websites to scrape
websites = [
'https://www.python.org',
'https://www.github.com',
'https://www.stackoverflow.com',
'https://www.wikipedia.org',
'https://www.reddit.com',
'https://www.nytimes.com',
'https://www.bbc.com',
'https://www.cnn.com'
]
start_time = time.time()
# Using ThreadPoolExecutor for I/O-bound web scraping
with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor:
# Submit all scraping tasks
future_to_url = {executor.submit(fetch_and_parse, url): url for url in websites}
# Process results as they complete
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
print(f"URL: {data['url']}")
print(f"Title: {data['title']}")
print(f"Status: {data['status']}")
print(f"Links found: {data['links']}")
print("-" * 40)
except Exception as e:
print(f"{url} generated an exception: {e}")
print(f"Total time: {time.time() - start_time:.2f} seconds")
This scraper demonstrates a practical application of concurrent.futures
for I/O-bound tasks, efficiently collecting data from multiple websites simultaneously.
Best Practices
- Use context managers (
with
statements) when creating executors to ensure proper cleanup of resources. - Choose the right executor for your workload:
ThreadPoolExecutor
for I/O-bound tasksProcessPoolExecutor
for CPU-bound tasks
- Be careful with shared resources when using multiple threads or processes.
- Handle exceptions properly to prevent silent failures.
- Set appropriate timeouts to avoid waiting indefinitely.
- Consider the overhead of creating processes vs. threads when determining pool size.
- Don't use too many workers - more isn't always better:
- For I/O-bound tasks: workers = CPUs × 5 (or more, depending on the task)
- For CPU-bound tasks: workers = CPUs (or CPUs - 1)
Summary
The concurrent.futures
module provides a clean and efficient way to implement concurrency in Python. It offers:
- A high-level interface for both thread and process-based concurrency
- Control over parallel execution with futures, callbacks, and timeouts
- Simple methods to submit tasks and collect results
- Easy switching between threading and multiprocessing models
By understanding when and how to use ThreadPoolExecutor
and ProcessPoolExecutor
, you can write more efficient Python code that takes advantage of both I/O concurrency and multiple CPU cores.
Additional Resources
- Python's Official Documentation on concurrent.futures
- PEP 3148 – futures - execute computations asynchronously
- Python Parallel Programming Cookbook
Exercises
-
I/O Exercise: Write a program that downloads 10 large files concurrently using
ThreadPoolExecutor
and compare the performance to sequential downloads. -
CPU Exercise: Create a program that generates prime numbers up to a certain limit using
ProcessPoolExecutor
, dividing the work among multiple processes. -
Mixed Workload: Build an application that processes images from a directory (CPU-bound) and uploads them to a server (I/O-bound), using the appropriate executor for each task.
-
Error Handling: Modify the web scraper example to retry failed requests a certain number of times before giving up.
-
Advanced: Create a task scheduler that manages both I/O and CPU tasks, dynamically choosing the appropriate executor based on the task type.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)