Skip to main content

.NET Concurrent Collections

Introduction

When working with asynchronous or multithreaded code in .NET, managing shared data structures becomes a critical challenge. Regular collections like List<T>, Dictionary<TKey, TValue>, and Queue<T> are not thread-safe by default, meaning that accessing them concurrently from multiple threads can lead to data corruption, unexpected behavior, or exceptions.

This is where .NET's concurrent collections come to the rescue. The System.Collections.Concurrent namespace provides several thread-safe collection classes that are designed specifically for scenarios where multiple threads need to access a collection simultaneously.

In this tutorial, we'll explore:

  • Why regular collections aren't thread-safe
  • The different concurrent collections available in .NET
  • When and how to use each concurrent collection
  • Practical examples of concurrent collections in real-world scenarios

Why We Need Concurrent Collections

Before diving into concurrent collections, let's understand why regular collections aren't suitable for multithreaded code with a simple example:

csharp
// This code is NOT thread-safe!
Dictionary<string, int> sharedCounter = new Dictionary<string, int>();

// Running this from multiple threads can cause problems
void IncrementCounter(string key)
{
if (sharedCounter.ContainsKey(key))
{
sharedCounter[key]++; // Read and write operation - not atomic!
}
else
{
sharedCounter[key] = 1; // Could happen at the same time in another thread
}
}

The above code can lead to several issues when executed concurrently:

  1. Race conditions: Two threads might both see that a key doesn't exist and both try to add it
  2. Lost updates: One thread's increment might be overwritten by another thread
  3. Exceptions: The dictionary might be modified while another thread is enumerating it

To avoid these issues, .NET provides concurrent collections that are specifically designed to handle multiple threads accessing them simultaneously.

Concurrent Collections in .NET

ConcurrentDictionary<TKey, TValue>

This is a thread-safe version of the traditional Dictionary<TKey, TValue>. It allows multiple threads to safely add, remove, and modify elements.

Let's explore how to use ConcurrentDictionary:

csharp
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

public class ConcurrentDictionaryExample
{
public static async Task RunExample()
{
// Create a new concurrent dictionary
ConcurrentDictionary<string, int> wordCount = new ConcurrentDictionary<string, int>();

// Words to process
string[] words = { "apple", "banana", "apple", "cherry", "banana", "apple" };

// Process words in parallel - this would cause issues with a regular dictionary
await Task.WhenAll(words.Select(word => Task.Run(() =>
{
// AddOrUpdate is an atomic operation that's thread-safe
wordCount.AddOrUpdate(
key: word,
addValue: 1, // Value to add if key doesn't exist
updateValueFactory: (key, count) => count + 1 // Function to update existing value
);
})));

// Output results
foreach (var pair in wordCount)
{
Console.WriteLine($"Word: {pair.Key}, Count: {pair.Value}");
}
}
}

// Output:
// Word: apple, Count: 3
// Word: banana, Count: 2
// Word: cherry, Count: 1

Key methods of ConcurrentDictionary:

  • TryAdd(TKey, TValue): Attempts to add a key-value pair
  • TryUpdate(TKey, TValue, TValue): Updates a value if it equals the expected value
  • TryRemove(TKey, out TValue): Tries to remove and return a value
  • AddOrUpdate(TKey, TValue, Func<TKey, TValue, TValue>): Adds or updates a key-value pair atomically
  • GetOrAdd(TKey, TValue): Gets an existing value or adds a new one if the key doesn't exist

ConcurrentQueue<T>

This is a thread-safe implementation of a FIFO (First-In-First-Out) collection:

csharp
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

public class ConcurrentQueueExample
{
public static async Task RunExample()
{
// Create a concurrent queue
ConcurrentQueue<string> taskQueue = new ConcurrentQueue<string>();

// Enqueue tasks from multiple threads
Task producer = Task.Run(() =>
{
for (int i = 1; i <= 10; i++)
{
taskQueue.Enqueue($"Task {i}");
Console.WriteLine($"Produced: Task {i}");
Task.Delay(100).Wait(); // Simulate some work
}
});

// Process tasks from multiple threads
Task consumer = Task.Run(() =>
{
while (!taskQueue.IsEmpty || !producer.IsCompleted)
{
if (taskQueue.TryDequeue(out string task))
{
Console.WriteLine($"Processing: {task}");
}
Task.Delay(150).Wait(); // Process is slightly slower than production
}
});

await Task.WhenAll(producer, consumer);
}
}

// Output might be:
// Produced: Task 1
// Processing: Task 1
// Produced: Task 2
// Produced: Task 3
// Processing: Task 2
// Produced: Task 4
// ...and so on

Key methods of ConcurrentQueue:

  • Enqueue(T): Adds an item to the end of the queue
  • TryDequeue(out T): Tries to remove and return the item at the beginning of the queue
  • TryPeek(out T): Tries to return the item at the beginning of the queue without removing it
  • IsEmpty: Property that indicates whether the queue is empty

ConcurrentStack<T>

This is a thread-safe implementation of a LIFO (Last-In-First-Out) collection:

csharp
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

public class ConcurrentStackExample
{
public static async Task RunExample()
{
// Create a concurrent stack
ConcurrentStack<string> historyStack = new ConcurrentStack<string>();

// Multiple threads adding to the stack
var addTasks = Enumerable.Range(1, 5).Select(i => Task.Run(() =>
{
string action = $"Action {i}";
historyStack.Push(action);
Console.WriteLine($"Added to history: {action}");
}));

await Task.WhenAll(addTasks);

// Check the most recent action without removing it
if (historyStack.TryPeek(out string mostRecent))
{
Console.WriteLine($"Most recent action: {mostRecent}");
}

// Get the 3 most recent actions
string[] recent3;
int itemsPopped = historyStack.TryPopRange(out recent3, 3);

Console.WriteLine($"Retrieved {itemsPopped} recent actions:");
foreach (var item in recent3)
{
Console.WriteLine($" - {item}");
}
}
}

// Output might be (order can vary):
// Added to history: Action 1
// Added to history: Action 2
// Added to history: Action 3
// Added to history: Action 4
// Added to history: Action 5
// Most recent action: Action 5
// Retrieved 3 recent actions:
// - Action 5
// - Action 2
// - Action 1

Key methods of ConcurrentStack:

  • Push(T): Adds an item to the top of the stack
  • PushRange(T[]): Adds multiple items to the stack
  • TryPop(out T): Tries to remove and return the item at the top of the stack
  • TryPopRange(T[], int): Tries to remove and return multiple items from the stack
  • TryPeek(out T): Tries to return the item at the top of the stack without removing it

ConcurrentBag<T>

This is an unordered collection that allows duplicate items:

csharp
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

public class ConcurrentBagExample
{
public static async Task RunExample()
{
// Create a concurrent bag
ConcurrentBag<int> resultBag = new ConcurrentBag<int>();

// Multiple threads calculating and adding results
await Task.WhenAll(Enumerable.Range(1, 10).Select(i => Task.Run(() =>
{
// Simulate some calculation
int result = i * i;
resultBag.Add(result);
Console.WriteLine($"Thread {Task.CurrentId} added: {result}");
})));

// Process all results
int sum = 0;
while (resultBag.TryTake(out int value))
{
sum += value;
}

Console.WriteLine($"Sum of all results: {sum}");
}
}

// Output might include:
// Thread 1 added: 1
// Thread 3 added: 9
// Thread 4 added: 16
// Thread 2 added: 4
// Thread 5 added: 25
// ...
// Sum of all results: 385

Key methods of ConcurrentBag:

  • Add(T): Adds an item to the bag
  • TryTake(out T): Tries to remove and return an item from the bag
  • TryPeek(out T): Tries to return an item from the bag without removing it

BlockingCollection<T>

While not technically a collection itself, BlockingCollection<T> is an important class that provides blocking and bounding capabilities for any collection that implements IProducerConsumerCollection<T> (which all concurrent collections do):

csharp
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;

public class BlockingCollectionExample
{
public static async Task RunExample()
{
// Create a bounded blocking collection with a maximum capacity of 5
using (BlockingCollection<int> boundedCollection = new BlockingCollection<int>(capacity: 5))
{
// Producer task
Task producer = Task.Run(() =>
{
for (int i = 0; i < 20; i++)
{
// This will block if the collection reaches capacity
boundedCollection.Add(i);
Console.WriteLine($"Produced: {i}");
Thread.Sleep(100); // Simulate work
}

// Signal that we're done adding items
boundedCollection.CompleteAdding();
});

// Consumer tasks
Task[] consumers = Enumerable.Range(0, 2).Select(consumerIndex =>
Task.Run(() =>
{
// GetConsumingEnumerable() will block when empty and complete when CompleteAdding is called
foreach (int item in boundedCollection.GetConsumingEnumerable())
{
Console.WriteLine($"Consumer {consumerIndex} consumed: {item}");
Thread.Sleep(250); // Simulate slower consumption
}
})).ToArray();

// Wait for all tasks to complete
await Task.WhenAll(producer.ContinueWith(_ => Task.WhenAll(consumers)));
}
}
}

BlockingCollection is particularly useful for producer-consumer scenarios:

  • Add() will block if the collection is bounded and reaches capacity
  • Take() will block if the collection is empty
  • CompleteAdding() signals that no more items will be added
  • GetConsumingEnumerable() provides an enumerable that takes items from the collection

Real-World Example: Building a Thread-Safe Cache

Let's build a simple but practical thread-safe caching system using ConcurrentDictionary:

csharp
using System;
using System.Collections.Concurrent;
using System.Threading.Tasks;

public class ThreadSafeCache<TKey, TValue>
{
private readonly ConcurrentDictionary<TKey, Lazy<TValue>> _cache =
new ConcurrentDictionary<TKey, Lazy<TValue>>();

// Get or create a value
public TValue GetOrAdd(TKey key, Func<TKey, TValue> valueFactory)
{
// GetOrAdd with a Lazy<T> factory ensures the valueFactory is only called once per key
// This prevents multiple threads from calculating the same value
return _cache.GetOrAdd(key, k => new Lazy<TValue>(() => valueFactory(k), LazyThreadSafetyMode.ExecutionAndPublication)).Value;
}

// Try to remove an item
public bool TryRemove(TKey key, out TValue value)
{
bool result = _cache.TryRemove(key, out Lazy<TValue> lazyValue);
value = result ? lazyValue.Value : default;
return result;
}

// Clear the cache
public void Clear() => _cache.Clear();

// Get the current count
public int Count => _cache.Count;
}

// Example usage
public class CacheExample
{
public static async Task RunExample()
{
ThreadSafeCache<string, string> dataCache = new ThreadSafeCache<string, string>();

// Simulate multiple threads accessing the cache
await Task.WhenAll(
Enumerable.Range(0, 5).Select(_ => Task.Run(() =>
{
// This expensive computation will only happen once per key
string result = dataCache.GetOrAdd("key1", key =>
{
Console.WriteLine($"Computing value for {key}... (expensive operation)");
Task.Delay(500).Wait(); // Simulate expensive computation
return $"Computed value for {key}";
});

Console.WriteLine($"Thread {Task.CurrentId} got: {result}");
}))
);

// Output will show that expensive operation happens only once
}
}

// Output:
// Computing value for key1... (expensive operation)
// Thread 1 got: Computed value for key1
// Thread 3 got: Computed value for key1
// Thread 2 got: Computed value for key1
// Thread 4 got: Computed value for key1
// Thread 5 got: Computed value for key1

This cache implementation demonstrates several important features:

  1. It's thread-safe without explicit locking
  2. It computes expensive values only once, even if multiple threads request the same key simultaneously
  3. It allows concurrent reads without blocking

Performance Considerations

While concurrent collections offer thread safety, they do come with some performance overhead compared to their non-concurrent counterparts:

  1. Higher memory usage: Concurrent collections need to maintain additional state for synchronization
  2. Slightly slower operations: Thread-safety mechanisms add overhead to each operation
  3. Less predictable performance: Under high contention, performance can degrade

Use concurrent collections when thread safety is required, but consider these alternatives when appropriate:

  • For read-heavy scenarios with few writes: Consider using ImmutableCollections (in the System.Collections.Immutable namespace)
  • For single-threaded scenarios: Use regular collections for better performance
  • For complex operations that need to be atomic: Consider using explicit locks around regular collections

Summary

.NET concurrent collections provide thread-safe alternatives to standard collections for multithreaded programming:

  • ConcurrentDictionary<TKey, TValue>: Thread-safe key-value store with atomic operations
  • ConcurrentQueue<T>: Thread-safe FIFO collection
  • ConcurrentStack<T>: Thread-safe LIFO collection
  • ConcurrentBag<T>: Thread-safe unordered collection
  • BlockingCollection<T>: Provides blocking and bounding capabilities for producer-consumer scenarios

These collections eliminate the need for explicit locking in many common scenarios, making concurrent code safer and often more performant than manual synchronization.

When working with asynchronous code in .NET, understanding and effectively using concurrent collections is essential for building robust applications that can safely handle concurrent operations.

Additional Resources

Exercises

  1. Create a thread-safe counter that can be incremented from multiple threads simultaneously using ConcurrentDictionary.
  2. Implement a producer-consumer pattern where multiple producers add items to a ConcurrentQueue and multiple consumers process these items.
  3. Build a thread-safe logging system using ConcurrentQueue that can accept log messages from multiple threads and write them to a file in order.
  4. Create a thread-safe "last N items" cache using ConcurrentStack that keeps track of the most recent N items added.
  5. Implement a parallel web crawler that uses BlockingCollection to manage the URLs to be crawled, ensuring a maximum number of pending URLs.


If you spot any mistakes on this website, please let me know at [email protected]. I’d greatly appreciate your feedback! :)