C# Concurrent Collections
Introduction
In modern programming, applications often need to handle multiple operations simultaneously through multithreading or parallel processing. Regular collections in C# (like List<T>
, Dictionary<K,V>
, etc.) are not thread-safe by default, meaning they can produce unexpected results or errors when accessed by multiple threads concurrently.
C# addresses this challenge with Concurrent Collections - a set of thread-safe collection classes specifically designed for scenarios where multiple threads need to access a collection simultaneously without causing corruption or data inconsistency.
In this tutorial, we'll explore:
- Why you need concurrent collections
- The key concurrent collection classes in C#
- How to use each concurrent collection with examples
- Performance considerations and best practices
Why Regular Collections Aren't Thread-Safe
Before diving into concurrent collections, let's understand why regular collections aren't suitable for multi-threaded environments:
// This code is NOT thread-safe
Dictionary<string, int> inventory = new Dictionary<string, int>();
// If two threads try to add items simultaneously
// Thread 1
inventory.Add("apple", 10);
// Thread 2 (at nearly the same time)
inventory.Add("banana", 5);
// This can result in an InvalidOperationException or other issues
When multiple threads access and modify regular collections simultaneously, several issues can occur:
- Race conditions: When the final result depends on the timing of thread execution
- Data corruption: Collection's internal state becomes invalid
- Exceptions: Operations that aren't atomic can fail midway
- Inconsistent state: Some operations complete partially
The System.Collections.Concurrent Namespace
.NET provides the System.Collections.Concurrent
namespace which contains thread-safe collection classes designed specifically for concurrent access scenarios.
Let's start by including the necessary namespace:
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
ConcurrentDictionary<TKey, TValue>
ConcurrentDictionary<TKey, TValue>
is a thread-safe version of Dictionary<TKey, TValue>
that allows multiple threads to safely add, remove, and modify elements.
Basic Usage
// Create a new concurrent dictionary
ConcurrentDictionary<string, int> concurrentInventory = new ConcurrentDictionary<string, int>();
// Add items (thread-safe)
concurrentInventory.TryAdd("apple", 10);
concurrentInventory.TryAdd("banana", 5);
// Retrieve values
if (concurrentInventory.TryGetValue("apple", out int appleCount))
{
Console.WriteLine($"Apple count: {appleCount}"); // Output: Apple count: 10
}
// Update values atomically
concurrentInventory.AddOrUpdate("apple", 1, (key, oldValue) => oldValue + 1);
Console.WriteLine($"Updated apple count: {concurrentInventory["apple"]}"); // Output: Updated apple count: 11
// Remove items
if (concurrentInventory.TryRemove("banana", out int bananaCount))
{
Console.WriteLine($"Removed bananas: {bananaCount}"); // Output: Removed bananas: 5
}
Real-World Example: Product Inventory Counter
ConcurrentDictionary<string, int> productViews = new ConcurrentDictionary<string, int>();
// Simulate multiple users viewing products simultaneously
Parallel.For(0, 100, i =>
{
// Simulate different users viewing different products
string productId = $"product{i % 10}";
// Increment the view count for this product (thread-safe)
productViews.AddOrUpdate(productId, 1, (key, oldValue) => oldValue + 1);
});
// Display results
foreach (var product in productViews)
{
Console.WriteLine($"{product.Key}: {product.Value} views");
}
// Sample Output:
// product0: 10 views
// product1: 10 views
// product2: 10 views
// ...
ConcurrentQueue<T>
ConcurrentQueue<T>
is a thread-safe version of Queue<T>
, implementing FIFO (First-In-First-Out) behavior.
Basic Usage
// Create a concurrent queue
ConcurrentQueue<string> taskQueue = new ConcurrentQueue<string>();
// Enqueue items
taskQueue.Enqueue("Task 1");
taskQueue.Enqueue("Task 2");
taskQueue.Enqueue("Task 3");
// Try to dequeue an item
if (taskQueue.TryDequeue(out string nextTask))
{
Console.WriteLine($"Processing: {nextTask}"); // Output: Processing: Task 1
}
// Look at the next item without removing it
if (taskQueue.TryPeek(out string nextInQueue))
{
Console.WriteLine($"Next in queue: {nextInQueue}"); // Output: Next in queue: Task 2
}
Console.WriteLine($"Items remaining in queue: {taskQueue.Count}"); // Output: Items remaining in queue: 2
Real-World Example: Task Processing Queue
ConcurrentQueue<string> messageQueue = new ConcurrentQueue<string>();
// Producer task - adds messages to the queue
Task producer = Task.Run(() =>
{
for (int i = 0; i < 10; i++)
{
string message = $"Message {i}";
messageQueue.Enqueue(message);
Console.WriteLine($"Produced: {message}");
Thread.Sleep(100); // Simulate work
}
});
// Consumer task - processes messages from the queue
Task consumer = Task.Run(() =>
{
// Process messages until the queue is empty and producer is done
while (!messageQueue.IsEmpty || !producer.IsCompleted)
{
if (messageQueue.TryDequeue(out string message))
{
Console.WriteLine($"Consumed: {message}");
Thread.Sleep(150); // Simulate processing time
}
}
});
// Wait for both tasks to complete
Task.WhenAll(producer, consumer).Wait();
ConcurrentBag<T>
ConcurrentBag<T>
is an unordered collection optimized for scenarios where the same thread may be both adding and removing items frequently.
Basic Usage
// Create a concurrent bag
ConcurrentBag<int> numbers = new ConcurrentBag<int>();
// Add items
numbers.Add(10);
numbers.Add(20);
numbers.Add(30);
// Try to take an item (the order is not guaranteed)
if (numbers.TryTake(out int result))
{
Console.WriteLine($"Took: {result}"); // Output might be any of the added numbers
}
// Check if the bag contains a value
Console.WriteLine($"Contains 20: {numbers.Contains(20)}"); // Output: Contains 20: True or False (if 20 was taken)
Console.WriteLine($"Count: {numbers.Count}"); // Output: Count: 2 (assuming one item was taken)
Real-World Example: Result Collection
ConcurrentBag<int> results = new ConcurrentBag<int>();
// Multiple threads perform calculations and store results
Parallel.For(0, 10, i =>
{
// Simulated work
int calculationResult = i * i;
// Store result
results.Add(calculationResult);
Console.WriteLine($"Thread {Task.CurrentId} added: {calculationResult}");
// This thread might also consume a result
if (results.TryTake(out int taken))
{
Console.WriteLine($"Thread {Task.CurrentId} took: {taken}");
}
});
// Display final results
Console.WriteLine($"Final result count: {results.Count}");
Console.WriteLine("Remaining results:");
foreach (var item in results)
{
Console.WriteLine(item);
}
ConcurrentStack<T>
ConcurrentStack<T>
is a thread-safe version of Stack<T>
, implementing LIFO (Last-In-First-Out) behavior.
Basic Usage
// Create a concurrent stack
ConcurrentStack<string> pages = new ConcurrentStack<string>();
// Push items
pages.Push("Home Page");
pages.Push("Product Page");
pages.Push("Cart Page");
// Try to pop an item
if (pages.TryPop(out string currentPage))
{
Console.WriteLine($"Navigated from: {currentPage}"); // Output: Navigated from: Cart Page
}
// Look at the top item without removing it
if (pages.TryPeek(out string topPage))
{
Console.WriteLine($"Current page: {topPage}"); // Output: Current page: Product Page
}
// Pop multiple items at once
string[] history = new string[5]; // Prepare an array to receive items
int popped = pages.TryPopRange(history, 0, 2); // Try to pop up to 2 items
Console.WriteLine($"Retrieved {popped} pages from history"); // Output: Retrieved 2 pages from history
foreach (var page in history.Take(popped))
{
Console.WriteLine($"Page: {page}");
}
Real-World Example: Navigation History
ConcurrentStack<string> browserHistory = new ConcurrentStack<string>();
// Simulate multiple browser tabs navigating concurrently
Parallel.For(0, 5, tabId =>
{
// Each tab visits 3 pages
for (int i = 1; i <= 3; i++)
{
string page = $"Tab {tabId} - Page {i}";
browserHistory.Push(page);
Console.WriteLine($"Visited: {page}");
}
});
// Display the most recent 5 page visits across all tabs
Console.WriteLine("\nRecent navigation history (most recent first):");
string[] recentHistory = new string[5];
int count = browserHistory.TryPopRange(recentHistory, 0, 5);
for (int i = 0; i < count; i++)
{
Console.WriteLine(recentHistory[i]);
}
BlockingCollection<T>
BlockingCollection<T>
isn't directly a concurrent collection, but it's a wrapper that adds blocking and bounding capabilities to any collection implementing IProducerConsumerCollection<T>
(like ConcurrentQueue, ConcurrentStack, or ConcurrentBag).
Basic Usage with Producer-Consumer Pattern
// Create a blocking collection with a ConcurrentQueue as the underlying collection
// Limit to 5 items (bounded capacity)
BlockingCollection<int> blockingCollection = new BlockingCollection<int>(
new ConcurrentQueue<int>(),
boundedCapacity: 5
);
// Producer task
Task producer = Task.Run(() =>
{
for (int i = 0; i < 10; i++)
{
blockingCollection.Add(i); // Will block if collection reaches capacity
Console.WriteLine($"Produced: {i}");
Thread.Sleep(100);
}
// Signal that no more items will be added
blockingCollection.CompleteAdding();
});
// Consumer task
Task consumer = Task.Run(() =>
{
// GetConsumingEnumerable() will block when no items are available
// and will exit when CompleteAdding() is called and collection is empty
foreach (var item in blockingCollection.GetConsumingEnumerable())
{
Console.WriteLine($"Consumed: {item}");
Thread.Sleep(300); // Processing takes longer than production
}
});
// Wait for both tasks to complete
Task.WhenAll(producer, consumer).Wait();
Console.WriteLine("Producer-consumer operation complete.");
Performance Considerations
Concurrent collections provide thread safety but come with some overhead:
-
Concurrent collections are generally slower than their non-concurrent counterparts when used in single-threaded scenarios due to synchronization overhead.
-
Choose the right collection:
ConcurrentDictionary
: Use for key-value lookupsConcurrentQueue
: Use for FIFO processingConcurrentStack
: Use for LIFO processingConcurrentBag
: Use when the same thread both adds and removes items frequently
-
Avoid unnecessary synchronization:
- Use methods like
TryGetValue
instead of checking existence and then retrieving - Use atomic operations like
AddOrUpdate
instead of separate read and write
- Use methods like
-
Method Selection Matters:
GetOrAdd
andAddOrUpdate
are atomic but may execute the value factory multiple times- Thread-safe doesn't mean transaction-safe (multiple operations aren't atomic together)
Real-World Considerations
1. Thread-Safe Caching
public class ThreadSafeCache<TKey, TValue>
{
private ConcurrentDictionary<TKey, TValue> _cache = new ConcurrentDictionary<TKey, TValue>();
private Func<TKey, TValue> _valueFactory;
public ThreadSafeCache(Func<TKey, TValue> valueFactory)
{
_valueFactory = valueFactory;
}
public TValue GetOrAdd(TKey key)
{
return _cache.GetOrAdd(key, _valueFactory);
}
public void Clear()
{
_cache.Clear();
}
}
// Usage:
var expensiveOperationCache = new ThreadSafeCache<string, int>(key =>
{
Console.WriteLine($"Computing value for {key}");
// Simulate expensive operation
Thread.Sleep(1000);
return key.Length * 10;
});
Parallel.ForEach(new[] { "apple", "banana", "apple", "cherry", "banana" }, key =>
{
int result = expensiveOperationCache.GetOrAdd(key);
Console.WriteLine($"Result for {key}: {result}");
});
2. Worker Queue
public class WorkQueue
{
private ConcurrentQueue<Action> _workItems = new ConcurrentQueue<Action>();
private ManualResetEventSlim _workAvailable = new ManualResetEventSlim(false);
private Thread[] _workers;
private bool _isProcessing = true;
public WorkQueue(int workerCount)
{
_workers = new Thread[workerCount];
for (int i = 0; i < workerCount; i++)
{
_workers[i] = new Thread(ProcessItems)
{
IsBackground = true,
Name = $"Worker {i}"
};
_workers[i].Start();
}
}
public void EnqueueWork(Action workItem)
{
_workItems.Enqueue(workItem);
_workAvailable.Set(); // Signal that work is available
}
private void ProcessItems()
{
while (_isProcessing)
{
if (_workItems.TryDequeue(out Action workItem))
{
try
{
workItem();
}
catch (Exception ex)
{
Console.WriteLine($"Error executing work item: {ex.Message}");
}
}
else
{
// No work available, wait for signal
_workAvailable.Reset();
_workAvailable.Wait(1000); // Wait with timeout to recheck condition periodically
}
}
}
public void Shutdown()
{
_isProcessing = false;
_workAvailable.Set(); // Wake up all threads
// Wait for workers to finish
foreach (var worker in _workers)
{
worker.Join();
}
}
}
// Usage example:
var workQueue = new WorkQueue(3); // Create queue with 3 workers
for (int i = 0; i < 10; i++)
{
int taskId = i;
workQueue.EnqueueWork(() =>
{
Console.WriteLine($"Processing task {taskId} on thread {Thread.CurrentThread.Name}");
Thread.Sleep(500); // Simulate work
});
}
Console.WriteLine("All tasks enqueued");
Thread.Sleep(6000); // Let workers process items
workQueue.Shutdown();
Console.WriteLine("Work queue shut down");
Summary
Concurrent collections in C# provide thread-safe alternatives to standard collections for multithreaded scenarios:
ConcurrentDictionary<TKey, TValue>
: Thread-safe key-value storeConcurrentQueue<T>
: Thread-safe FIFO collectionConcurrentBag<T>
: Thread-safe unordered collectionConcurrentStack<T>
: Thread-safe LIFO collectionBlockingCollection<T>
: Wrapper providing blocking and bounding capabilities
When working with concurrent code, these collections help avoid common multithreading issues like race conditions, deadlocks, and data corruption.
Remember that thread safety comes with some performance overhead, so use concurrent collections only when thread safety is required.
Exercises
-
Create a simple web server request counter that tracks the number of requests per URL using
ConcurrentDictionary<string, int>
. -
Implement a thread-safe work queue using
ConcurrentQueue<T>
with multiple worker threads processing jobs. -
Build a thread-safe caching mechanism that expires items after a certain time using
ConcurrentDictionary<TKey, CacheItem<TValue>>
. -
Create a producer-consumer example using
BlockingCollection<T>
where multiple producers add items and multiple consumers process them. -
Implement a thread-safe "undo" feature using
ConcurrentStack<T>
that allows multiple users to record their actions and revert them.
Additional Resources
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)