Python Thread Synchronization
In concurrent programming, when multiple threads operate on shared resources, we need mechanisms to coordinate their actions. This coordination process is called thread synchronization. Without proper synchronization, programs may encounter race conditions, deadlocks, and unpredictable behavior.
Why Thread Synchronization Matters
Imagine two threads trying to update the same variable simultaneously:
counter = 0
def increment():
global counter
for _ in range(100000):
counter += 1 # This is not atomic!
When multiple threads run this code, the final counter value might be less than expected because the increment operation (counter += 1
) isn't atomic—it involves reading the value, incrementing it, and storing it back. If two threads read the value simultaneously, they'll both increment the same initial value and one of the increments will be lost.
Let's see this in action:
import threading
counter = 0
def increment():
global counter
for _ in range(100000):
counter += 1
# Create and start two threads
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Final counter value: {counter}")
# Expected: 200000
# Actual: Less than 200000 (varies on each run)
The output will likely be less than 200,000, indicating that some increments were lost due to race conditions.
Synchronization Mechanisms in Python
Python offers several ways to synchronize threads:
1. Locks (Mutexes)
A lock (or mutex) ensures that only one thread can enter a particular section of code at a time.
import threading
counter = 0
lock = threading.Lock()
def increment():
global counter
for _ in range(100000):
lock.acquire()
try:
counter += 1
finally:
lock.release()
# Create and start two threads
t1 = threading.Thread(target=increment)
t2 = threading.Thread(target=increment)
t1.start()
t2.start()
t1.join()
t2.join()
print(f"Final counter value: {counter}") # Output: 200000
A cleaner way to use locks is with the with
statement:
def increment():
global counter
for _ in range(100000):
with lock: # Automatically acquires and releases the lock
counter += 1
2. RLock (Reentrant Lock)
An RLock
allows the same thread to acquire the lock multiple times without blocking. This is useful for recursive functions or when a function needs to call another function that also requires the same lock.
lock = threading.RLock()
def outer_function():
with lock:
print("Outer function acquired the lock")
inner_function()
def inner_function():
with lock: # With a regular Lock, this would deadlock
print("Inner function also acquired the lock")
# This works with RLock but would deadlock with a regular Lock
3. Semaphores
A semaphore limits the number of threads that can access a resource simultaneously.
import threading
import time
# Semaphore allowing 2 threads at a time
semaphore = threading.Semaphore(2)
def access_resource(thread_num):
print(f"Thread {thread_num} is trying to access the resource")
with semaphore:
print(f"Thread {thread_num} has access to the resource")
time.sleep(1) # Simulate resource usage
print(f"Thread {thread_num} is releasing the resource")
# Create and start 5 threads
threads = []
for i in range(5):
t = threading.Thread(target=access_resource, args=(i,))
threads.append(t)
t.start()
for t in threads:
t.join()
Output:
Thread 0 is trying to access the resource
Thread 0 has access to the resource
Thread 1 is trying to access the resource
Thread 1 has access to the resource
Thread 2 is trying to access the resource
Thread 3 is trying to access the resource
Thread 4 is trying to access the resource
Thread 0 is releasing the resource
Thread 2 has access to the resource
Thread 1 is releasing the resource
Thread 3 has access to the resource
Thread 2 is releasing the resource
Thread 4 has access to the resource
Thread 3 is releasing the resource
Thread 4 is releasing the resource
4. Events
An Event
allows one thread to signal an event while other threads wait for it.
import threading
import time
event = threading.Event()
def waiter():
print("Waiter: Waiting for the event")
event.wait() # Block until the event is set
print("Waiter: The event was set, continuing execution")
def setter():
print("Setter: Sleeping for 3 seconds before setting the event")
time.sleep(3)
print("Setter: Setting the event")
event.set() # Set the event, unblocking all waiters
# Create and start threads
t1 = threading.Thread(target=waiter)
t2 = threading.Thread(target=setter)
t1.start()
t2.start()
t1.join()
t2.join()
Output:
Waiter: Waiting for the event
Setter: Sleeping for 3 seconds before setting the event
Setter: Setting the event
Waiter: The event was set, continuing execution
5. Conditions
A Condition
allows threads to wait for a certain condition to become true.
import threading
import time
condition = threading.Condition()
item = None
def consumer():
with condition:
print("Consumer: Waiting for item")
while item is None:
condition.wait() # Release the lock and block until notified
print(f"Consumer: Got item {item}")
def producer():
global item
time.sleep(2) # Simulate time to produce item
with condition:
print("Producer: Creating item")
item = "New Item"
print("Producer: Notifying consumer")
condition.notify() # Notify one waiting thread
# Create and start threads
t1 = threading.Thread(target=consumer)
t2 = threading.Thread(target=producer)
t1.start()
t2.start()
t1.join()
t2.join()
Output:
Consumer: Waiting for item
Producer: Creating item
Producer: Notifying consumer
Consumer: Got item New Item
6. Queue
The Queue
module provides a thread-safe way to exchange data between threads, implementing the producer-consumer pattern.
import threading
import queue
import time
import random
# Create a queue with a capacity of 5 items
q = queue.Queue(maxsize=5)
def producer():
for i in range(10):
item = f"Item {i}"
q.put(item) # Will block if queue is full
print(f"Produced {item}")
time.sleep(random.random())
def consumer():
while True:
item = q.get() # Will block if queue is empty
if item is None:
break
print(f"Consumed {item}")
time.sleep(random.random() * 2)
q.task_done() # Signal that a task is complete
# Create and start threads
producer_thread = threading.Thread(target=producer)
consumer_thread = threading.Thread(target=consumer)
producer_thread.start()
consumer_thread.start()
# Wait for all items to be produced
producer_thread.join()
# Signal consumer to exit
q.put(None)
# Wait for consumer to process all items
consumer_thread.join()
print("All work completed")
Real-World Example: Web Crawler
Let's implement a simple multi-threaded web crawler that uses synchronization:
import threading
import queue
import requests
import time
from urllib.parse import urljoin, urlparse
class WebCrawler:
def __init__(self, start_url, max_urls=10, num_threads=3):
self.start_url = start_url
self.max_urls = max_urls
self.num_threads = num_threads
self.queue = queue.Queue()
self.visited_urls = set()
self.lock = threading.Lock() # Lock for thread-safe operations
parsed_url = urlparse(start_url)
self.domain = f"{parsed_url.scheme}://{parsed_url.netloc}"
def crawl(self):
self.queue.put(self.start_url)
threads = []
for i in range(self.num_threads):
thread = threading.Thread(target=self._worker)
thread.daemon = True
threads.append(thread)
thread.start()
# Wait for the queue to be processed or max URLs reached
while True:
with self.lock:
if len(self.visited_urls) >= self.max_urls or self.queue.empty():
break
time.sleep(1)
# Wait for all threads to finish
for thread in threads:
thread.join(timeout=3)
return self.visited_urls
def _worker(self):
while True:
try:
url = self.queue.get(block=False)
except queue.Empty:
break
with self.lock:
if url in self.visited_urls:
self.queue.task_done()
continue
if len(self.visited_urls) >= self.max_urls:
self.queue.task_done()
break
self.visited_urls.add(url)
try:
print(f"Crawling: {url}")
response = requests.get(url, timeout=5)
# Extract links from the page
from bs4 import BeautifulSoup
soup = BeautifulSoup(response.text, 'html.parser')
with self.lock:
for link in soup.find_all('a', href=True):
href = link['href']
full_url = urljoin(url, href)
# Only follow links within the same domain
if full_url.startswith(self.domain) and full_url not in self.visited_urls:
self.queue.put(full_url)
except Exception as e:
print(f"Error crawling {url}: {e}")
self.queue.task_done()
# Example usage
if __name__ == "__main__":
crawler = WebCrawler("https://example.com", max_urls=15, num_threads=3)
visited = crawler.crawl()
print("\nVisited URLs:")
for url in visited:
print(url)
print(f"\nTotal URLs visited: {len(visited)}")
This crawler uses:
- A
queue.Queue
to manage the URLs to be processed - A
threading.Lock
to protect shared data structures - Multiple threads to process URLs concurrently
Common Synchronization Problems
1. Race Conditions
Race conditions occur when multiple threads access shared data simultaneously, and the outcome depends on their relative timing.
2. Deadlocks
A deadlock happens when two or more threads are blocked forever, waiting for each other to release resources.
# Example of a deadlock
lock1 = threading.Lock()
lock2 = threading.Lock()
def thread1_function():
lock1.acquire()
print("Thread 1: Acquired lock1, waiting for lock2")
time.sleep(0.5) # Makes deadlock more likely
lock2.acquire()
print("Thread 1: Acquired lock2")
# Critical section
lock2.release()
lock1.release()
def thread2_function():
lock2.acquire()
print("Thread 2: Acquired lock2, waiting for lock1")
time.sleep(0.5) # Makes deadlock more likely
lock1.acquire()
print("Thread 2: Acquired lock1")
# Critical section
lock1.release()
lock2.release()
To prevent deadlocks:
- Always acquire locks in a consistent order
- Use timeouts when acquiring locks
- Use
threading.RLock
when appropriate - Consider higher-level synchronization primitives
3. Starvation
Starvation happens when threads can't access resources for extended periods because other threads are continuously acquiring them.
4. Priority Inversion
Priority inversion occurs when a high-priority thread is waiting for a resource held by a low-priority thread, which may be preempted by a medium-priority thread.
Best Practices for Thread Synchronization
- Minimize shared data: Limit the amount of data shared between threads.
- Use immutable data structures: They can be safely shared between threads.
- Keep critical sections small: Minimize the time spent holding locks.
- Use appropriate synchronization primitives: Choose the right tool for the job.
- Consider using higher-level concurrency features: The
concurrent.futures
module orasyncio
might be better options. - Avoid over-synchronization: Too much synchronization can reduce concurrency.
- Understand the Global Interpreter Lock (GIL): In CPython, only one thread can execute Python code at a time due to the GIL.
Summary
Thread synchronization is essential for writing reliable concurrent programs in Python. We've explored various synchronization mechanisms such as locks, semaphores, events, conditions, and queues. Each has its own use case and trade-offs.
Remember that while threads can help improve performance for I/O-bound tasks, Python's Global Interpreter Lock (GIL) limits their effectiveness for CPU-bound tasks. For CPU-intensive operations, consider using the multiprocessing
module instead.
Additional Resources
- Python Threading Documentation
- Queue Module Documentation
- "Python Concurrency with asyncio" by Matthew Fowler
- "Fluent Python" by Luciano Ramalho (includes chapters on concurrency)
Exercises
- Implement a thread-safe counter class using different synchronization primitives.
- Create a producer-consumer scenario with multiple producers and consumers using a Queue.
- Modify the web crawler example to add a maximum depth limit.
- Implement a thread pool that can execute tasks concurrently with a fixed number of worker threads.
- Solve a classic synchronization problem like the dining philosophers problem or the readers-writers problem.
By mastering thread synchronization in Python, you'll be able to write more efficient and reliable concurrent programs while avoiding common pitfalls like race conditions and deadlocks.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)