C# 多线程编程
多线程编程是现代软件开发的核心技能之一。当我们需要同时执行多个任务、提高程序响应性或充分利用多核处理器时,就需要用到多线程技术。本章将从基础概念讲起,逐步深入到高级同步机制和最佳实践。
理解多线程的基本概念
为什么需要多线程?
在单线程程序中,所有操作按顺序执行。如果某个操作耗时较长(比如读取文件、网络请求或复杂计算),整个程序就会"卡住",无法响应用户操作。多线程解决了这个问题:让耗时操作在后台执行,主线程可以继续处理其他任务。
举个例子,一个下载管理器需要同时下载多个文件。如果用单线程,文件必须一个接一个下载;使用多线程,可以同时开启多个下载任务,充分利用网络带宽。
线程与进程的区别
进程是操作系统分配资源的基本单位,每个进程都有独立的内存空间。线程是进程内的执行单元,同一进程内的多个线程共享进程的内存空间。这意味着:
- 线程间通信比进程间通信更简单(直接读写共享变量)
- 多线程需要处理同步问题,防止数据竞争
- 线程创建和切换的开销比进程小得多
什么时候使用多线程?
多线程适合以下场景:
- I/O 密集型任务:文件读写、网络请求、数据库操作
- 后台处理:定时任务、日志记录、数据同步
- 并行计算:图像处理、科学计算、数据分析
- 响应式 UI:耗时操作放在后台线程,保持界面流畅
但多线程不是万能药。如果任务非常简单,创建线程的开销可能比任务本身还大。线程过多还会增加上下文切换的开销,反而降低性能。
Thread 类:线程的基础操作
创建和启动线程
最直接的创建线程方式是使用 Thread 类:
using System;
using System.Threading;
// 创建线程
Thread thread = new Thread(() =>
{
Console.WriteLine($"工作线程 ID: {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(1000); // 模拟工作
Console.WriteLine("工作线程完成");
});
// 启动线程
thread.Start();
Console.WriteLine($"主线程 ID: {Thread.CurrentThread.ManagedThreadId}");
// 等待线程结束
thread.Join();
Console.WriteLine("主线程继续执行");
Thread 构造函数接受一个 ThreadStart 委托(无参数)或 ParameterizedThreadStart 委托(带一个 object 参数)。上面的例子使用了 Lambda 表达式简化代码。
向线程传递数据
有两种方式向线程传递数据:
方式一:使用 ParameterizedThreadStart
Thread thread = new Thread(obj =>
{
int count = (int)obj;
for (int i = 0; i < count; i++)
{
Console.WriteLine($"计数: {i}");
Thread.Sleep(100);
}
});
thread.Start(5); // 传递参数
thread.Join();
方式二:使用 Lambda 表达式捕获变量
string message = "Hello from thread";
int repeat = 3;
Thread thread = new Thread(() =>
{
for (int i = 0; i < repeat; i++)
{
Console.WriteLine(message);
Thread.Sleep(100);
}
});
thread.Start();
thread.Join();
Lambda 捕获变量的方式更加灵活,但要注意闭包问题:如果在启动线程前修改了变量,线程看到的是修改后的值。
前台线程与后台线程
.NET 中的线程分为前台线程和后台线程:
- 前台线程:程序会等待所有前台线程完成后才退出
- 后台线程:程序退出时,后台线程会被立即终止
// 后台线程示例
Thread backgroundThread = new Thread(() =>
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine($"后台线程: {i}");
Thread.Sleep(500);
}
Console.WriteLine("后台线程完成"); // 这行可能不会执行
});
backgroundThread.IsBackground = true; // 设置为后台线程
backgroundThread.Start();
// 主线程不等待,直接结束
Console.WriteLine("主线程结束");
// 程序退出时,后台线程可能还在运行,但会被强制终止
默认情况下,通过 Thread 类创建的线程是前台线程。将 IsBackground 设为 true 可以改为后台线程。后台线程适合用于可以随时中断的辅助任务,比如定期清理临时文件。
线程的基本控制
Thread thread = new Thread(() =>
{
Console.WriteLine("线程开始工作...");
Thread.Sleep(5000);
Console.WriteLine("线程工作完成");
});
thread.Start();
// 检测线程状态
Console.WriteLine($"线程状态: {thread.ThreadState}");
// 等待线程结束,最多等待 2 秒
if (thread.Join(2000))
{
Console.WriteLine("线程已结束");
}
else
{
Console.WriteLine("线程仍在运行");
}
// 注意:Thread.Abort() 已被废弃,不应使用
// 正确的做法是使用 CancellationToken(后续章节讲解)
ThreadState 是一个标志枚举,包含线程的各种状态。实际开发中,我们很少直接检查线程状态,更多使用同步机制来协调线程。
线程池(ThreadPool)
为什么需要线程池?
每次创建线程都有开销:分配内存、初始化线程栈、向操作系统注册线程等。如果频繁创建和销毁线程,这些开销会累积。线程池解决了这个问题:预先创建一组线程,重复利用它们执行任务。
线程池的特点:
- 线程池中的线程都是后台线程
- 线程池会根据负载自动调整线程数量
- 适合执行大量短小的任务
使用线程池
using System;
using System.Threading;
// 方式一:QueueUserWorkItem
ThreadPool.QueueUserWorkItem(state =>
{
Console.WriteLine($"线程池线程 ID: {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(1000);
Console.WriteLine("任务完成");
});
// 方式二:Task.Run(推荐)
Task.Run(() =>
{
Console.WriteLine("使用 Task 执行任务");
});
Console.WriteLine("主线程继续执行");
Thread.Sleep(2000); // 等待线程池任务完成
Task.Run 是更现代的方式,它内部使用线程池,但提供了更丰富的功能(等待、取消、延续任务等)。
线程池的配置
// 获取线程池信息
ThreadPool.GetMinThreads(out int minWorker, out int minIO);
ThreadPool.GetMaxThreads(out int maxWorker, out int maxIO);
Console.WriteLine($"最小工作线程: {minWorker}, 最小IO线程: {minIO}");
Console.WriteLine($"最大工作线程: {maxWorker}, 最大IO线程: {maxIO}");
// 设置线程池大小(谨慎使用)
// 大多数情况下,让线程池自行管理即可
ThreadPool.SetMinThreads(8, 8);
ThreadPool.SetMaxThreads(1024, 1024);
线程池有两种线程:工作线程用于执行普通任务,IO 线程用于处理异步 IO 完成。调整线程池大小时要谨慎:太小会导致任务排队等待,太大会增加上下文切换开销。
何时使用 Thread vs ThreadPool?
| 场景 | 推荐 |
|---|---|
| 需要前台线程 | Thread |
| 需要设置线程优先级 | Thread |
| 长时间运行的任务 | Thread(或 TaskCreationOptions.LongRunning) |
| 短小的后台任务 | 线程池(Task.Run) |
| 大量并发任务 | 线程池 |
| 需要精细控制线程生命周期 | Thread |
同步机制:保护共享资源
当多个线程同时访问同一个变量时,可能产生竞态条件(Race Condition),导致不可预测的结果。同步机制用于协调线程对共享资源的访问。
竞态条件示例
先看一个没有同步的问题代码:
class Counter
{
public int Count = 0;
public void Increment()
{
for (int i = 0; i < 100000; i++)
{
Count++; // 不是原子操作!
}
}
}
var counter = new Counter();
// 两个线程同时增加计数器
Thread t1 = new Thread(counter.Increment);
Thread t2 = new Thread(counter.Increment);
t1.Start();
t2.Start();
t1.Join();
t2.Join();
Console.WriteLine($"最终计数: {counter.Count}");
// 期望: 200000,实际: 可能小于 200000
Count++ 看起来是一条语句,实际上包含三个操作:读取当前值、加一、写回。两个线程可能同时读取到相同的值,然后都加一并写回,导致少计了一次。
lock 关键字:最常用的同步方式
lock 是 C# 中最简单也是最常用的同步机制:
class SafeCounter
{
private int _count = 0;
private readonly object _lock = new object(); // 专用锁对象
public void Increment()
{
for (int i = 0; i < 100000; i++)
{
lock (_lock)
{
_count++;
}
}
}
public int GetCount()
{
lock (_lock)
{
return _count;
}
}
}
lock 的工作原理:当线程进入 lock 块时,会尝试获取锁对象。如果锁已被其他线程持有,当前线程会阻塞等待。同一时刻只有一个线程能持有锁,从而保证临界区代码的原子执行。
使用 lock 的最佳实践:
- 使用专用的锁对象,而不是 lock
this或 lock 类型本身 - 锁对象应该是
readonly的,防止被重新赋值 - 尽量缩小 lock 的范围,减少持锁时间
- 避免在 lock 内调用外部方法,可能导致死锁
Monitor 类:lock 的底层实现
lock 语句实际上是 Monitor 类的语法糖:
// lock 语句
lock (_lock)
{
// 临界区
}
// 编译器展开后的等效代码
bool lockTaken = false;
try
{
Monitor.Enter(_lock, ref lockTaken);
// 临界区
}
finally
{
if (lockTaken)
Monitor.Exit(_lock);
}
直接使用 Monitor 可以实现更复杂的操作:
private readonly object _lock = new object();
private Queue<string> _queue = new Queue<string>();
// 尝试获取锁,超时返回 false
if (Monitor.TryEnter(_lock, TimeSpan.FromSeconds(5)))
{
try
{
// 获得锁,执行操作
_queue.Enqueue("item");
}
finally
{
Monitor.Exit(_lock);
}
}
else
{
Console.WriteLine("获取锁超时");
}
Monitor.Wait/Pulse:线程间通信
Monitor 还提供了 Wait、Pulse 和 PulseAll 方法,用于线程间的协调:
class ProducerConsumer
{
private readonly object _lock = new object();
private Queue<int> _queue = new Queue<int>();
private const int MaxSize = 5;
public void Produce(int item)
{
lock (_lock)
{
// 队列满时等待
while (_queue.Count >= MaxSize)
{
Monitor.Wait(_lock); // 释放锁并等待
}
_queue.Enqueue(item);
Console.WriteLine($"生产: {item},队列大小: {_queue.Count}");
// 通知消费者
Monitor.Pulse(_lock);
}
}
public int Consume()
{
lock (_lock)
{
// 队列空时等待
while (_queue.Count == 0)
{
Monitor.Wait(_lock); // 释放锁并等待
}
int item = _queue.Dequeue();
Console.WriteLine($"消费: {item},队列大小: {_queue.Count}");
// 通知生产者
Monitor.Pulse(_lock);
return item;
}
}
}
// 使用示例
var pc = new ProducerConsumer();
var producer = new Thread(() =>
{
for (int i = 0; i < 10; i++)
{
pc.Produce(i);
Thread.Sleep(100);
}
});
var consumer = new Thread(() =>
{
for (int i = 0; i < 10; i++)
{
pc.Consume();
Thread.Sleep(200);
}
});
producer.Start();
consumer.Start();
producer.Join();
consumer.Join();
Wait 会释放锁并让线程等待,直到其他线程调用 Pulse 或 PulseAll。这是实现生产者-消费者模式的基础。
Mutex:跨进程同步
Mutex(互斥体)类似于 lock,但它可以跨进程工作:
// 创建命名 Mutex(在整个操作系统中可见)
using var mutex = new Mutex(false, "MyUniqueAppName_SingleInstance");
// 尝试获取 Mutex
if (mutex.WaitOne(TimeSpan.FromSeconds(3), false))
{
try
{
Console.WriteLine("程序运行中...");
Console.ReadLine();
}
finally
{
mutex.ReleaseMutex();
}
}
else
{
Console.WriteLine("另一个实例已在运行");
}
命名 Mutex 常用于确保程序只运行一个实例。由于 Mutex 涉及操作系统内核对象,开销比 lock 大得多(约 50 倍),在进程内同步时应优先使用 lock。
Semaphore 和 SemaphoreSlim:限制并发数
信号量控制同时访问某个资源的线程数量:
using System;
using System.Threading;
// SemaphoreSlim 是轻量级版本,用于进程内同步
var semaphore = new SemaphoreSlim(3, 3); // 初始计数3,最大计数3
for (int i = 1; i <= 10; i++)
{
int id = i;
Task.Run(async () =>
{
Console.WriteLine($"任务 {id} 等待进入");
await semaphore.WaitAsync(); // 异步等待
try
{
Console.WriteLine($"任务 {id} 进入执行");
await Task.Delay(1000);
Console.WriteLine($"任务 {id} 完成");
}
finally
{
semaphore.Release(); // 释放信号量
}
});
}
Thread.Sleep(5000);
信号量就像一个夜店:最多容纳一定数量的人,满了之后其他人必须在外面等。每当有人离开,才能让新的人进入。
Semaphore 是跨进程的版本,可以用于系统级的并发控制:
// 命名 Semaphore 可以跨进程使用
var semaphore = new Semaphore(3, 3, "MyApp_Semaphore");
ReaderWriterLockSlim:读写分离锁
当读操作远多于写操作时,使用读写锁可以提高并发性能。读写锁允许多个线程同时读取,但写入时需要独占访问:
using System;
using System.Threading;
class ThreadSafeCache
{
private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim();
private readonly Dictionary<string, string> _cache = new Dictionary<string, string>();
public string Get(string key)
{
_lock.EnterReadLock();
try
{
_cache.TryGetValue(key, out string value);
return value;
}
finally
{
_lock.ExitReadLock();
}
}
public void Set(string key, string value)
{
_lock.EnterWriteLock();
try
{
_cache[key] = value;
}
finally
{
_lock.ExitWriteLock();
}
}
// 升级锁:读锁升级为写锁
public string GetOrAdd(string key, Func<string> factory)
{
_lock.EnterUpgradeableReadLock();
try
{
if (_cache.TryGetValue(key, out string value))
return value;
_lock.EnterWriteLock();
try
{
value = factory();
_cache[key] = value;
return value;
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
}
读写锁的三种模式:
- 读锁:多个线程可以同时持有读锁
- 写锁:独占锁,同一时刻只有一个线程可以持有
- 可升级读锁:允许在读的过程中升级为写锁
SpinLock:高并发场景的自旋锁
当锁的持有时间非常短时,线程等待时"自旋"(循环检查)比阻塞更高效:
SpinLock spinLock = new SpinLock();
int counter = 0;
void Increment()
{
bool lockTaken = false;
try
{
spinLock.Enter(ref lockTaken);
counter++;
}
finally
{
if (lockTaken)
spinLock.Exit();
}
}
// 多线程测试
Parallel.For(0, 100000, i => Increment());
Console.WriteLine($"计数: {counter}");
SpinLock 适合锁持有时间极短(微秒级)的高并发场景。如果锁持有时间较长,自旋会浪费 CPU 资源,此时应该使用阻塞式锁。
事件同步原语
事件同步原语用于线程间的信号通知。
AutoResetEvent:自动重置事件
AutoResetEvent 就像一个旋转门:每次信号只允许一个等待的线程通过,然后自动关闭:
using System;
using System.Threading;
var autoEvent = new AutoResetEvent(false); // 初始状态为无信号
var waiter = new Thread(() =>
{
Console.WriteLine("线程等待信号...");
autoEvent.WaitOne(); // 阻塞等待信号
Console.WriteLine("线程收到信号,继续执行");
});
waiter.Start();
Thread.Sleep(1000);
Console.WriteLine("发送信号");
autoEvent.Set(); // 发送信号,唤醒一个等待线程
waiter.Join();
ManualResetEvent:手动重置事件
ManualResetEvent 就像一扇门:打开后所有等待的线程都能通过,直到手动关闭:
using System;
using System.Threading;
var manualEvent = new ManualResetEvent(false); // 初始状态为关闭
// 启动多个等待线程
for (int i = 1; i <= 3; i++)
{
int id = i;
new Thread(() =>
{
Console.WriteLine($"线程 {id} 等待");
manualEvent.WaitOne();
Console.WriteLine($"线程 {id} 通过");
}).Start();
}
Thread.Sleep(500);
Console.WriteLine("打开门(发送信号)");
manualEvent.Set(); // 打开门,所有等待线程都能通过
Thread.Sleep(500);
Console.WriteLine("关闭门");
manualEvent.Reset(); // 关闭门,后续线程会被阻塞
CountdownEvent:倒计时事件
CountdownEvent 在计数归零时发出信号,适合等待多个任务完成:
using System;
using System.Threading;
int taskCount = 5;
using var countdown = new CountdownEvent(taskCount);
for (int i = 1; i <= taskCount; i++)
{
int id = i;
Task.Run(() =>
{
Console.WriteLine($"任务 {id} 开始");
Thread.Sleep(1000 * id);
Console.WriteLine($"任务 {id} 完成");
countdown.Signal(); // 计数减一
});
}
Console.WriteLine("等待所有任务完成...");
countdown.Wait(); // 阻塞直到计数归零
Console.WriteLine("所有任务已完成");
Barrier:阶段同步屏障
Barrier 用于多线程的分阶段同步,所有线程到达屏障后才能继续下一阶段:
using System;
using System.Threading;
int participantCount = 3;
var barrier = new Barrier(participantCount, b =>
{
// 每个阶段完成后执行
Console.WriteLine($"=== 阶段 {b.CurrentPhaseNumber} 完成 ===\n");
});
for (int i = 0; i < participantCount; i++)
{
int id = i;
new Thread(() =>
{
for (int phase = 0; phase < 3; phase++)
{
Console.WriteLine($"线程 {id} 执行阶段 {phase}");
Thread.Sleep(500 * (id + 1));
barrier.SignalAndWait(); // 到达屏障,等待其他线程
}
Console.WriteLine($"线程 {id} 完成");
}).Start();
}
Barrier 特别适合迭代算法,比如并行排序或矩阵运算,每轮迭代后需要同步所有线程。
原子操作:Interlocked 类
对于简单的数值操作,Interlocked 类提供了无锁的原子操作:
using System;
using System.Threading;
int counter = 0;
// 原子递增
Interlocked.Increment(ref counter); // counter++
// 原子递减
Interlocked.Decrement(ref counter); // counter--
// 原子加法
Interlocked.Add(ref counter, 10); // counter += 10
// 原子交换
int original = Interlocked.Exchange(ref counter, 0); // counter = 0, 返回原值
// 条件交换:只有当前值等于比较值时才交换
int comparand = 5;
int newValue = 10;
Interlocked.CompareExchange(ref counter, newValue, comparand);
// 如果 counter == 5,则 counter = 10
// 原子读取 64 位值(32位系统上需要)
long value = Interlocked.Read(ref someLongValue);
Interlocked 比锁更高效,但只能用于简单操作。复杂的逻辑仍需要锁来保护。
线程安全集合
.NET 提供了一组线程安全的集合类,位于 System.Collections.Concurrent 命名空间:
ConcurrentQueue:线程安全队列
using System.Collections.Concurrent;
var queue = new ConcurrentQueue<int>();
// 多线程入队
Parallel.For(0, 100, i => queue.Enqueue(i));
// 尝试出队
while (queue.TryDequeue(out int item))
{
Console.WriteLine(item);
}
ConcurrentStack:线程安全栈
var stack = new ConcurrentStack<int>();
// 入栈
stack.Push(1);
stack.PushRange(new[] { 2, 3, 4 });
// 尝试出栈
if (stack.TryPop(out int item))
{
Console.WriteLine($"弹出: {item}");
}
ConcurrentDictionary:线程安全字典
var dict = new ConcurrentDictionary<string, int>();
// 添加或更新
dict.AddOrUpdate("key", 1, (key, oldValue) => oldValue + 1);
// 获取或添加
int value = dict.GetOrAdd("key", 0);
// 尝试获取
if (dict.TryGetValue("key", out int val))
{
Console.WriteLine(val);
}
// 尝试移除
if (dict.TryRemove("key", out int removed))
{
Console.WriteLine($"移除: {removed}");
}
ConcurrentBag:无序集合
ConcurrentBag 适用于顺序无关的场景,性能更好:
var bag = new ConcurrentBag<int>();
Parallel.For(0, 100, i => bag.Add(i));
int count = bag.Count; // 获取元素数量
BlockingCollection:生产者-消费者集合
BlockingCollection 是最强大的线程安全集合,支持阻塞和容量限制:
using System.Collections.Concurrent;
// 创建有界集合(最大容量100)
var collection = new BlockingCollection<int>(100);
// 生产者任务
var producer = Task.Run(() =>
{
for (int i = 0; i < 100; i++)
{
collection.Add(i); // 满时阻塞
Console.WriteLine($"生产: {i}");
}
collection.CompleteAdding(); // 标记添加完成
});
// 消费者任务
var consumer = Task.Run(() =>
{
// 方式一:使用 GetConsumingEnumerable
foreach (var item in collection.GetConsumingEnumerable())
{
Console.WriteLine($"消费: {item}");
Thread.Sleep(10);
}
// 方式二:手动循环
// while (!collection.IsCompleted)
// {
// if (collection.TryTake(out int item, 100))
// Console.WriteLine($"消费: {item}");
// }
});
await Task.WhenAll(producer, consumer);
Console.WriteLine("所有任务完成");
BlockingCollection 可以包装其他集合类型:
// 使用栈作为底层存储(后进先出)
var stackCollection = new BlockingCollection<int>(new ConcurrentStack<int>());
// 使用队列作为底层存储(先进先出,默认)
var queueCollection = new BlockingCollection<int>(new ConcurrentQueue<int>());
取消操作:CancellationToken
长时间运行的任务应该支持取消。CancellationToken 是 .NET 中标准的取消机制:
using System;
using System.Threading;
// 创建取消源
using var cts = new CancellationTokenSource();
// 启动可取消的任务
var task = Task.Run(() =>
{
for (int i = 0; i < 100; i++)
{
// 检查是否请求取消
if (cts.Token.IsCancellationRequested)
{
Console.WriteLine("任务被取消,正在清理...");
cts.Token.ThrowIfCancellationRequested(); // 抛出 OperationCanceledException
}
Console.WriteLine($"处理中: {i}");
Thread.Sleep(100);
}
Console.WriteLine("任务完成");
}, cts.Token);
// 3秒后取消
Thread.Sleep(3000);
cts.Cancel();
try
{
task.Wait();
}
catch (AggregateException ex)
{
Console.WriteLine($"任务异常: {ex.InnerException?.Message}");
}
取消超时
// 设置超时自动取消
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
// 或者稍后设置
cts.CancelAfter(TimeSpan.FromSeconds(10));
组合多个取消令牌
var cts1 = new CancellationTokenSource();
var cts2 = new CancellationTokenSource();
// 任一令牌取消都会触发
var combined = CancellationTokenSource.CreateLinkedTokenSource(
cts1.Token, cts2.Token);
// 使用 combined.Token
取消异步操作
async Task DownloadWithCancelAsync(string url, CancellationToken ct)
{
using var client = new HttpClient();
// 传递取消令牌给异步方法
var response = await client.GetAsync(url, ct);
var content = await response.Content.ReadAsStringAsync(ct);
Console.WriteLine($"下载完成: {content.Length} 字节");
}
// 使用
using var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(10));
await DownloadWithCancelAsync("https://example.com", cts.Token);
线程本地存储
ThreadLocal
ThreadLocal<T> 为每个线程提供独立的变量副本:
using System;
using System.Threading;
// 每个线程有独立的计数器
var threadLocal = new ThreadLocal<int>(() => Thread.CurrentThread.ManagedThreadId);
Parallel.For(0, 5, i =>
{
Console.WriteLine($"线程 {Thread.CurrentThread.ManagedThreadId}, 值: {threadLocal.Value}");
threadLocal.Value++; // 只影响当前线程的副本
});
// 获取所有线程的值
Console.WriteLine($"所有值: {string.Join(", ", threadLocal.Values)}");
threadLocal.Dispose();
AsyncLocal
AsyncLocal<T> 用于异步上下文中传递数据:
using System;
using System.Threading;
var asyncLocal = new AsyncLocal<string>();
asyncLocal.Value = "主线程数据";
await Task.Run(() =>
{
Console.WriteLine($"任务中: {asyncLocal.Value}"); // 可以访问
asyncLocal.Value = "任务中修改";
});
Console.WriteLine($"主线程: {asyncLocal.Value}"); // 仍然是 "主线程数据"
AsyncLocal 的值会沿着异步调用链传递,但不会反向传播(除非使用 AsyncLocalValueChangedContext)。
死锁:识别与预防
什么是死锁?
死锁是指两个或多个线程相互等待对方释放资源,导致所有线程都无法继续执行:
object lockA = new object();
object lockB = new object();
// 线程1:先锁A,再锁B
new Thread(() =>
{
lock (lockA)
{
Thread.Sleep(100);
lock (lockB) // 等待锁B
{
Console.WriteLine("线程1完成");
}
}
}).Start();
// 线程2:先锁B,再锁A
new Thread(() =>
{
lock (lockB)
{
Thread.Sleep(100);
lock (lockA) // 等待锁A - 死锁!
{
Console.WriteLine("线程2完成");
}
}
}).Start();
预防死锁的策略
1. 锁顺序
始终以相同的顺序获取多个锁:
// 正确做法:按特定顺序获取锁
void Transfer(Account from, Account to, decimal amount)
{
// 按 ID 排序确保锁顺序一致
var first = from.Id < to.Id ? from : to;
var second = from.Id < to.Id ? to : from;
lock (first)
{
lock (second)
{
from.Balance -= amount;
to.Balance += amount;
}
}
}
2. 使用 TryEnter 设置超时
if (Monitor.TryEnter(lockA, TimeSpan.FromSeconds(5)))
{
try
{
if (Monitor.TryEnter(lockB, TimeSpan.FromSeconds(5)))
{
try
{
// 执行操作
}
finally
{
Monitor.Exit(lockB);
}
}
}
finally
{
Monitor.Exit(lockA);
}
}
3. 减少锁的持有时间
// 错误:持锁时间过长
lock (_lock)
{
var data = GetData();
ProcessData(data); // 耗时操作
SaveData(data);
}
// 正确:只锁必要的部分
List<Data> data;
lock (_lock)
{
data = GetData(); // 快速复制
}
ProcessData(data); // 不持锁处理
lock (_lock)
{
SaveData(data);
}
4. 使用更高级的抽象
优先使用 BlockingCollection、Channel 等高级抽象,而不是手动加锁:
// 现代 C# 使用 Channel 实现生产者-消费者
var channel = System.Threading.Channels.Channel.CreateBounded<int>(100);
// 生产者
async Task ProduceAsync()
{
for (int i = 0; i < 100; i++)
{
await channel.Writer.WriteAsync(i);
}
channel.Writer.Complete();
}
// 消费者
async Task ConsumeAsync()
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine(item);
}
}
并行编程
Parallel.For 和 Parallel.ForEach
using System;
using System.Threading.Tasks;
// 并行 for 循环
Parallel.For(0, 100, i =>
{
Console.WriteLine($"处理 {i}");
});
// 并行 foreach
var items = Enumerable.Range(0, 100).ToList();
Parallel.ForEach(items, item =>
{
Process(item);
});
// 带选项的并行循环
var options = new ParallelOptions
{
MaxDegreeOfParallelism = 4, // 最大并行度
CancellationToken = cts.Token
};
Parallel.For(0, 100, options, i =>
{
// 并行处理
});
PLINQ:并行 LINQ
var numbers = Enumerable.Range(1, 1000000);
// 并行处理
var result = numbers
.AsParallel()
.Where(n => n % 2 == 0)
.Select(n => n * n)
.OrderBy(n => n)
.ToList();
// 指定并行度
var result2 = numbers
.AsParallel()
.AsOrdered() // 保持原始顺序
.WithDegreeOfParallelism(4)
.Where(n => n % 2 == 0)
.ToList();
注意:并非所有操作都适合并行化。对于小数据集或简单操作,并行化的开销可能超过收益。
多线程最佳实践
1. 优先使用高级抽象
优先级从高到低:
async/await和TaskParallel.For/ForEach和 PLINQBlockingCollection等线程安全集合ThreadPool.QueueUserWorkItemThread类(尽量避免直接使用)
2. 避免过度同步
不必要的同步会降低性能和可扩展性:
- 使用不可变对象,避免共享状态
- 使用线程安全集合代替手动加锁
- 考虑使用
ThreadLocal或AsyncLocal避免共享
3. 正确处理异常
线程中未处理的异常会导致程序崩溃:
var task = Task.Run(() =>
{
throw new InvalidOperationException("任务中的异常");
});
try
{
task.Wait();
}
catch (AggregateException ex)
{
// 处理所有内部异常
foreach (var inner in ex.InnerExceptions)
{
Console.WriteLine($"异常: {inner.Message}");
}
}
// 或使用 await
try
{
await task;
}
catch (InvalidOperationException ex)
{
Console.WriteLine($"异常: {ex.Message}");
}
4. 避免线程亲和性问题
某些对象有线程亲和性(如 UI 控件),只能在创建它们的线程上访问:
// WPF 示例
Dispatcher.Invoke(() =>
{
// 在 UI 线程执行
textBox.Text = "更新文本";
});
// WinForms 示例
if (textBox.InvokeRequired)
{
textBox.Invoke(() => textBox.Text = "更新文本");
}
5. 使用诊断工具
lock竞争多时考虑使用ReaderWriterLockSlim或SpinLock- 使用 Visual Studio 并行调试窗口分析线程状态
- 使用性能分析器查找锁竞争热点
小结
多线程编程的核心要点:
| 概念 | 用途 | 选择建议 |
|---|---|---|
Thread | 精细控制线程 | 需要前台线程或特定优先级时 |
ThreadPool | 短任务执行 | 通过 Task.Run 使用 |
lock | 保护共享资源 | 最常用的同步方式 |
Monitor | 复杂同步逻辑 | 需要 TryEnter 或 Wait/Pulse |
Mutex | 跨进程同步 | 确保单实例运行 |
Semaphore | 限制并发数 | 连接池、资源池 |
ReaderWriterLockSlim | 读写分离 | 读多写少场景 |
Event | 线程信号 | AutoResetEvent、ManualResetEvent |
Barrier | 阶段同步 | 并行迭代算法 |
Interlocked | 原子操作 | 简单数值操作 |
ConcurrentDictionary | 线程安全字典 | 并发读写场景 |
BlockingCollection | 生产者-消费者 | 数据管道 |
CancellationToken | 取消操作 | 长时间运行的任务 |
练习
- 实现一个线程安全的计数器类,支持递增、递减和获取当前值
- 使用
BlockingCollection实现一个简单的任务队列 - 使用
Barrier实现一个多线程并行排序算法 - 实现一个支持取消的异步下载器,同时下载多个文件
- 使用
ReaderWriterLockSlim实现一个线程安全的缓存类