.NET Concurrent Collections
Introduction
When working with asynchronous or multithreaded code in .NET, managing shared data structures becomes a critical challenge. Regular collections like List<T>
, Dictionary<TKey, TValue>
, and Queue<T>
are not thread-safe by default, meaning that accessing them concurrently from multiple threads can lead to data corruption, unexpected behavior, or exceptions.
This is where .NET's concurrent collections come to the rescue. The System.Collections.Concurrent
namespace provides several thread-safe collection classes that are designed specifically for scenarios where multiple threads need to access a collection simultaneously.
In this tutorial, we'll explore:
- Why regular collections aren't thread-safe
- The different concurrent collections available in .NET
- When and how to use each concurrent collection
- Practical examples of concurrent collections in real-world scenarios
Why We Need Concurrent Collections
Before diving into concurrent collections, let's understand why regular collections aren't suitable for multithreaded code with a simple example:
// This code is NOT thread-safe!
Dictionary<string, int> sharedCounter = new Dictionary<string, int>();
// Running this from multiple threads can cause problems
void IncrementCounter(string key)
{
if (sharedCounter.ContainsKey(key))
{
sharedCounter[key]++; // Read and write operation - not atomic!
}
else
{
sharedCounter[key] = 1; // Could happen at the same time in another thread
}
}
The above code can lead to several issues when executed concurrently:
- Race conditions: Two threads might both see that a key doesn't exist and both try to add it
- Lost updates: One thread's increment might be overwritten by another thread
- Exceptions: The dictionary might be modified while another thread is enumerating it
To avoid these issues, .NET provides concurrent collections that are specifically designed to handle multiple threads accessing them simultaneously.
Concurrent Collections in .NET
ConcurrentDictionary<TKey, TValue>
This is a thread-safe version of the traditional Dictionary<TKey, TValue>
. It allows multiple threads to safely add, remove, and modify elements.
Let's explore how to use ConcurrentDictionary
:
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
public class ConcurrentDictionaryExample
{
public static async Task RunExample()
{
// Create a new concurrent dictionary
ConcurrentDictionary<string, int> wordCount = new ConcurrentDictionary<string, int>();
// Words to process
string[] words = { "apple", "banana", "apple", "cherry", "banana", "apple" };
// Process words in parallel - this would cause issues with a regular dictionary
await Task.WhenAll(words.Select(word => Task.Run(() =>
{
// AddOrUpdate is an atomic operation that's thread-safe
wordCount.AddOrUpdate(
key: word,
addValue: 1, // Value to add if key doesn't exist
updateValueFactory: (key, count) => count + 1 // Function to update existing value
);
})));
// Output results
foreach (var pair in wordCount)
{
Console.WriteLine($"Word: {pair.Key}, Count: {pair.Value}");
}
}
}
// Output:
// Word: apple, Count: 3
// Word: banana, Count: 2
// Word: cherry, Count: 1
Key methods of ConcurrentDictionary
:
TryAdd(TKey, TValue)
: Attempts to add a key-value pairTryUpdate(TKey, TValue, TValue)
: Updates a value if it equals the expected valueTryRemove(TKey, out TValue)
: Tries to remove and return a valueAddOrUpdate(TKey, TValue, Func<TKey, TValue, TValue>)
: Adds or updates a key-value pair atomicallyGetOrAdd(TKey, TValue)
: Gets an existing value or adds a new one if the key doesn't exist
ConcurrentQueue<T>
This is a thread-safe implementation of a FIFO (First-In-First-Out) collection:
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
public class ConcurrentQueueExample
{
public static async Task RunExample()
{
// Create a concurrent queue
ConcurrentQueue<string> taskQueue = new ConcurrentQueue<string>();
// Enqueue tasks from multiple threads
Task producer = Task.Run(() =>
{
for (int i = 1; i <= 10; i++)
{
taskQueue.Enqueue($"Task {i}");
Console.WriteLine($"Produced: Task {i}");
Task.Delay(100).Wait(); // Simulate some work
}
});
// Process tasks from multiple threads
Task consumer = Task.Run(() =>
{
while (!taskQueue.IsEmpty || !producer.IsCompleted)
{
if (taskQueue.TryDequeue(out string task))
{
Console.WriteLine($"Processing: {task}");
}
Task.Delay(150).Wait(); // Process is slightly slower than production
}
});
await Task.WhenAll(producer, consumer);
}
}
// Output might be:
// Produced: Task 1
// Processing: Task 1
// Produced: Task 2
// Produced: Task 3
// Processing: Task 2
// Produced: Task 4
// ...and so on
Key methods of ConcurrentQueue
:
Enqueue(T)
: Adds an item to the end of the queueTryDequeue(out T)
: Tries to remove and return the item at the beginning of the queueTryPeek(out T)
: Tries to return the item at the beginning of the queue without removing itIsEmpty
: Property that indicates whether the queue is empty
ConcurrentStack<T>
This is a thread-safe implementation of a LIFO (Last-In-First-Out) collection:
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
public class ConcurrentStackExample
{
public static async Task RunExample()
{
// Create a concurrent stack
ConcurrentStack<string> historyStack = new ConcurrentStack<string>();
// Multiple threads adding to the stack
var addTasks = Enumerable.Range(1, 5).Select(i => Task.Run(() =>
{
string action = $"Action {i}";
historyStack.Push(action);
Console.WriteLine($"Added to history: {action}");
}));
await Task.WhenAll(addTasks);
// Check the most recent action without removing it
if (historyStack.TryPeek(out string mostRecent))
{
Console.WriteLine($"Most recent action: {mostRecent}");
}
// Get the 3 most recent actions
string[] recent3;
int itemsPopped = historyStack.TryPopRange(out recent3, 3);
Console.WriteLine($"Retrieved {itemsPopped} recent actions:");
foreach (var item in recent3)
{
Console.WriteLine($" - {item}");
}
}
}
// Output might be (order can vary):
// Added to history: Action 1
// Added to history: Action 2
// Added to history: Action 3
// Added to history: Action 4
// Added to history: Action 5
// Most recent action: Action 5
// Retrieved 3 recent actions:
// - Action 5
// - Action 2
// - Action 1
Key methods of ConcurrentStack
:
Push(T)
: Adds an item to the top of the stackPushRange(T[])
: Adds multiple items to the stackTryPop(out T)
: Tries to remove and return the item at the top of the stackTryPopRange(T[], int)
: Tries to remove and return multiple items from the stackTryPeek(out T)
: Tries to return the item at the top of the stack without removing it
ConcurrentBag<T>
This is an unordered collection that allows duplicate items:
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
public class ConcurrentBagExample
{
public static async Task RunExample()
{
// Create a concurrent bag
ConcurrentBag<int> resultBag = new ConcurrentBag<int>();
// Multiple threads calculating and adding results
await Task.WhenAll(Enumerable.Range(1, 10).Select(i => Task.Run(() =>
{
// Simulate some calculation
int result = i * i;
resultBag.Add(result);
Console.WriteLine($"Thread {Task.CurrentId} added: {result}");
})));
// Process all results
int sum = 0;
while (resultBag.TryTake(out int value))
{
sum += value;
}
Console.WriteLine($"Sum of all results: {sum}");
}
}
// Output might include:
// Thread 1 added: 1
// Thread 3 added: 9
// Thread 4 added: 16
// Thread 2 added: 4
// Thread 5 added: 25
// ...
// Sum of all results: 385
Key methods of ConcurrentBag
:
Add(T)
: Adds an item to the bagTryTake(out T)
: Tries to remove and return an item from the bagTryPeek(out T)
: Tries to return an item from the bag without removing it
BlockingCollection<T>
While not technically a collection itself, BlockingCollection<T>
is an important class that provides blocking and bounding capabilities for any collection that implements IProducerConsumerCollection<T>
(which all concurrent collections do):
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
public class BlockingCollectionExample
{
public static async Task RunExample()
{
// Create a bounded blocking collection with a maximum capacity of 5
using (BlockingCollection<int> boundedCollection = new BlockingCollection<int>(capacity: 5))
{
// Producer task
Task producer = Task.Run(() =>
{
for (int i = 0; i < 20; i++)
{
// This will block if the collection reaches capacity
boundedCollection.Add(i);
Console.WriteLine($"Produced: {i}");
Thread.Sleep(100); // Simulate work
}
// Signal that we're done adding items
boundedCollection.CompleteAdding();
});
// Consumer tasks
Task[] consumers = Enumerable.Range(0, 2).Select(consumerIndex =>
Task.Run(() =>
{
// GetConsumingEnumerable() will block when empty and complete when CompleteAdding is called
foreach (int item in boundedCollection.GetConsumingEnumerable())
{
Console.WriteLine($"Consumer {consumerIndex} consumed: {item}");
Thread.Sleep(250); // Simulate slower consumption
}
})).ToArray();
// Wait for all tasks to complete
await Task.WhenAll(producer.ContinueWith(_ => Task.WhenAll(consumers)));
}
}
}
BlockingCollection
is particularly useful for producer-consumer scenarios:
Add()
will block if the collection is bounded and reaches capacityTake()
will block if the collection is emptyCompleteAdding()
signals that no more items will be addedGetConsumingEnumerable()
provides an enumerable that takes items from the collection
Real-World Example: Building a Thread-Safe Cache
Let's build a simple but practical thread-safe caching system using ConcurrentDictionary
:
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;
public class ThreadSafeCache<TKey, TValue>
{
private readonly ConcurrentDictionary<TKey, Lazy<TValue>> _cache =
new ConcurrentDictionary<TKey, Lazy<TValue>>();
// Get or create a value
public TValue GetOrAdd(TKey key, Func<TKey, TValue> valueFactory)
{
// GetOrAdd with a Lazy<T> factory ensures the valueFactory is only called once per key
// This prevents multiple threads from calculating the same value
return _cache.GetOrAdd(key, k => new Lazy<TValue>(() => valueFactory(k), LazyThreadSafetyMode.ExecutionAndPublication)).Value;
}
// Try to remove an item
public bool TryRemove(TKey key, out TValue value)
{
bool result = _cache.TryRemove(key, out Lazy<TValue> lazyValue);
value = result ? lazyValue.Value : default;
return result;
}
// Clear the cache
public void Clear() => _cache.Clear();
// Get the current count
public int Count => _cache.Count;
}
// Example usage
public class CacheExample
{
public static async Task RunExample()
{
ThreadSafeCache<string, string> dataCache = new ThreadSafeCache<string, string>();
// Simulate multiple threads accessing the cache
await Task.WhenAll(
Enumerable.Range(0, 5).Select(_ => Task.Run(() =>
{
// This expensive computation will only happen once per key
string result = dataCache.GetOrAdd("key1", key =>
{
Console.WriteLine($"Computing value for {key}... (expensive operation)");
Task.Delay(500).Wait(); // Simulate expensive computation
return $"Computed value for {key}";
});
Console.WriteLine($"Thread {Task.CurrentId} got: {result}");
}))
);
// Output will show that expensive operation happens only once
}
}
// Output:
// Computing value for key1... (expensive operation)
// Thread 1 got: Computed value for key1
// Thread 3 got: Computed value for key1
// Thread 2 got: Computed value for key1
// Thread 4 got: Computed value for key1
// Thread 5 got: Computed value for key1
This cache implementation demonstrates several important features:
- It's thread-safe without explicit locking
- It computes expensive values only once, even if multiple threads request the same key simultaneously
- It allows concurrent reads without blocking
Performance Considerations
While concurrent collections offer thread safety, they do come with some performance overhead compared to their non-concurrent counterparts:
- Higher memory usage: Concurrent collections need to maintain additional state for synchronization
- Slightly slower operations: Thread-safety mechanisms add overhead to each operation
- Less predictable performance: Under high contention, performance can degrade
Use concurrent collections when thread safety is required, but consider these alternatives when appropriate:
- For read-heavy scenarios with few writes: Consider using
ImmutableCollections
(in theSystem.Collections.Immutable
namespace) - For single-threaded scenarios: Use regular collections for better performance
- For complex operations that need to be atomic: Consider using explicit locks around regular collections
Summary
.NET concurrent collections provide thread-safe alternatives to standard collections for multithreaded programming:
ConcurrentDictionary<TKey, TValue>
: Thread-safe key-value store with atomic operationsConcurrentQueue<T>
: Thread-safe FIFO collectionConcurrentStack<T>
: Thread-safe LIFO collectionConcurrentBag<T>
: Thread-safe unordered collectionBlockingCollection<T>
: Provides blocking and bounding capabilities for producer-consumer scenarios
These collections eliminate the need for explicit locking in many common scenarios, making concurrent code safer and often more performant than manual synchronization.
When working with asynchronous code in .NET, understanding and effectively using concurrent collections is essential for building robust applications that can safely handle concurrent operations.
Additional Resources
- Microsoft Docs: Thread-Safe Collections
- Microsoft Docs: ConcurrentDictionary Class
- Microsoft Docs: BlockingCollection Overview
Exercises
- Create a thread-safe counter that can be incremented from multiple threads simultaneously using
ConcurrentDictionary
. - Implement a producer-consumer pattern where multiple producers add items to a
ConcurrentQueue
and multiple consumers process these items. - Build a thread-safe logging system using
ConcurrentQueue
that can accept log messages from multiple threads and write them to a file in order. - Create a thread-safe "last N items" cache using
ConcurrentStack
that keeps track of the most recent N items added. - Implement a parallel web crawler that uses
BlockingCollection
to manage the URLs to be crawled, ensuring a maximum number of pending URLs.
If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)