跳到主要内容

C# 多线程编程

多线程编程是现代软件开发的核心技能之一。当我们需要同时执行多个任务、提高程序响应性或充分利用多核处理器时,就需要用到多线程技术。本章将从基础概念讲起,逐步深入到高级同步机制和最佳实践。

理解多线程的基本概念

为什么需要多线程?

在单线程程序中,所有操作按顺序执行。如果某个操作耗时较长(比如读取文件、网络请求或复杂计算),整个程序就会"卡住",无法响应用户操作。多线程解决了这个问题:让耗时操作在后台执行,主线程可以继续处理其他任务。

举个例子,一个下载管理器需要同时下载多个文件。如果用单线程,文件必须一个接一个下载;使用多线程,可以同时开启多个下载任务,充分利用网络带宽。

线程与进程的区别

进程是操作系统分配资源的基本单位,每个进程都有独立的内存空间。线程是进程内的执行单元,同一进程内的多个线程共享进程的内存空间。这意味着:

  • 线程间通信比进程间通信更简单(直接读写共享变量)
  • 多线程需要处理同步问题,防止数据竞争
  • 线程创建和切换的开销比进程小得多

什么时候使用多线程?

多线程适合以下场景:

  • I/O 密集型任务:文件读写、网络请求、数据库操作
  • 后台处理:定时任务、日志记录、数据同步
  • 并行计算:图像处理、科学计算、数据分析
  • 响应式 UI:耗时操作放在后台线程,保持界面流畅

但多线程不是万能药。如果任务非常简单,创建线程的开销可能比任务本身还大。线程过多还会增加上下文切换的开销,反而降低性能。

Thread 类:线程的基础操作

创建和启动线程

最直接的创建线程方式是使用 Thread 类:

using System;
using System.Threading;

// 创建线程
Thread thread = new Thread(() =>
{
Console.WriteLine($"工作线程 ID: {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(1000); // 模拟工作
Console.WriteLine("工作线程完成");
});

// 启动线程
thread.Start();
Console.WriteLine($"主线程 ID: {Thread.CurrentThread.ManagedThreadId}");

// 等待线程结束
thread.Join();
Console.WriteLine("主线程继续执行");

Thread 构造函数接受一个 ThreadStart 委托(无参数)或 ParameterizedThreadStart 委托(带一个 object 参数)。上面的例子使用了 Lambda 表达式简化代码。

向线程传递数据

有两种方式向线程传递数据:

方式一:使用 ParameterizedThreadStart

Thread thread = new Thread(obj =>
{
int count = (int)obj;
for (int i = 0; i < count; i++)
{
Console.WriteLine($"计数: {i}");
Thread.Sleep(100);
}
});

thread.Start(5); // 传递参数
thread.Join();

方式二:使用 Lambda 表达式捕获变量

string message = "Hello from thread";
int repeat = 3;

Thread thread = new Thread(() =>
{
for (int i = 0; i < repeat; i++)
{
Console.WriteLine(message);
Thread.Sleep(100);
}
});

thread.Start();
thread.Join();

Lambda 捕获变量的方式更加灵活,但要注意闭包问题:如果在启动线程前修改了变量,线程看到的是修改后的值。

前台线程与后台线程

.NET 中的线程分为前台线程和后台线程:

  • 前台线程:程序会等待所有前台线程完成后才退出
  • 后台线程:程序退出时,后台线程会被立即终止
// 后台线程示例
Thread backgroundThread = new Thread(() =>
{
for (int i = 0; i < 10; i++)
{
Console.WriteLine($"后台线程: {i}");
Thread.Sleep(500);
}
Console.WriteLine("后台线程完成"); // 这行可能不会执行
});

backgroundThread.IsBackground = true; // 设置为后台线程
backgroundThread.Start();

// 主线程不等待,直接结束
Console.WriteLine("主线程结束");
// 程序退出时,后台线程可能还在运行,但会被强制终止

默认情况下,通过 Thread 类创建的线程是前台线程。将 IsBackground 设为 true 可以改为后台线程。后台线程适合用于可以随时中断的辅助任务,比如定期清理临时文件。

线程的基本控制

Thread thread = new Thread(() =>
{
Console.WriteLine("线程开始工作...");
Thread.Sleep(5000);
Console.WriteLine("线程工作完成");
});

thread.Start();

// 检测线程状态
Console.WriteLine($"线程状态: {thread.ThreadState}");

// 等待线程结束,最多等待 2 秒
if (thread.Join(2000))
{
Console.WriteLine("线程已结束");
}
else
{
Console.WriteLine("线程仍在运行");
}

// 注意:Thread.Abort() 已被废弃,不应使用
// 正确的做法是使用 CancellationToken(后续章节讲解)

ThreadState 是一个标志枚举,包含线程的各种状态。实际开发中,我们很少直接检查线程状态,更多使用同步机制来协调线程。

线程池(ThreadPool)

为什么需要线程池?

每次创建线程都有开销:分配内存、初始化线程栈、向操作系统注册线程等。如果频繁创建和销毁线程,这些开销会累积。线程池解决了这个问题:预先创建一组线程,重复利用它们执行任务。

线程池的特点:

  • 线程池中的线程都是后台线程
  • 线程池会根据负载自动调整线程数量
  • 适合执行大量短小的任务

使用线程池

using System;
using System.Threading;

// 方式一:QueueUserWorkItem
ThreadPool.QueueUserWorkItem(state =>
{
Console.WriteLine($"线程池线程 ID: {Thread.CurrentThread.ManagedThreadId}");
Thread.Sleep(1000);
Console.WriteLine("任务完成");
});

// 方式二:Task.Run(推荐)
Task.Run(() =>
{
Console.WriteLine("使用 Task 执行任务");
});

Console.WriteLine("主线程继续执行");
Thread.Sleep(2000); // 等待线程池任务完成

Task.Run 是更现代的方式,它内部使用线程池,但提供了更丰富的功能(等待、取消、延续任务等)。

线程池的配置

// 获取线程池信息
ThreadPool.GetMinThreads(out int minWorker, out int minIO);
ThreadPool.GetMaxThreads(out int maxWorker, out int maxIO);

Console.WriteLine($"最小工作线程: {minWorker}, 最小IO线程: {minIO}");
Console.WriteLine($"最大工作线程: {maxWorker}, 最大IO线程: {maxIO}");

// 设置线程池大小(谨慎使用)
// 大多数情况下,让线程池自行管理即可
ThreadPool.SetMinThreads(8, 8);
ThreadPool.SetMaxThreads(1024, 1024);

线程池有两种线程:工作线程用于执行普通任务,IO 线程用于处理异步 IO 完成。调整线程池大小时要谨慎:太小会导致任务排队等待,太大会增加上下文切换开销。

何时使用 Thread vs ThreadPool?

场景推荐
需要前台线程Thread
需要设置线程优先级Thread
长时间运行的任务Thread(或 TaskCreationOptions.LongRunning
短小的后台任务线程池(Task.Run
大量并发任务线程池
需要精细控制线程生命周期Thread

同步机制:保护共享资源

当多个线程同时访问同一个变量时,可能产生竞态条件(Race Condition),导致不可预测的结果。同步机制用于协调线程对共享资源的访问。

竞态条件示例

先看一个没有同步的问题代码:

class Counter
{
public int Count = 0;

public void Increment()
{
for (int i = 0; i < 100000; i++)
{
Count++; // 不是原子操作!
}
}
}

var counter = new Counter();

// 两个线程同时增加计数器
Thread t1 = new Thread(counter.Increment);
Thread t2 = new Thread(counter.Increment);

t1.Start();
t2.Start();
t1.Join();
t2.Join();

Console.WriteLine($"最终计数: {counter.Count}");
// 期望: 200000,实际: 可能小于 200000

Count++ 看起来是一条语句,实际上包含三个操作:读取当前值、加一、写回。两个线程可能同时读取到相同的值,然后都加一并写回,导致少计了一次。

lock 关键字:最常用的同步方式

lock 是 C# 中最简单也是最常用的同步机制:

class SafeCounter
{
private int _count = 0;
private readonly object _lock = new object(); // 专用锁对象

public void Increment()
{
for (int i = 0; i < 100000; i++)
{
lock (_lock)
{
_count++;
}
}
}

public int GetCount()
{
lock (_lock)
{
return _count;
}
}
}

lock 的工作原理:当线程进入 lock 块时,会尝试获取锁对象。如果锁已被其他线程持有,当前线程会阻塞等待。同一时刻只有一个线程能持有锁,从而保证临界区代码的原子执行。

使用 lock 的最佳实践:

  1. 使用专用的锁对象,而不是 lock this 或 lock 类型本身
  2. 锁对象应该是 readonly 的,防止被重新赋值
  3. 尽量缩小 lock 的范围,减少持锁时间
  4. 避免在 lock 内调用外部方法,可能导致死锁

Monitor 类:lock 的底层实现

lock 语句实际上是 Monitor 类的语法糖:

// lock 语句
lock (_lock)
{
// 临界区
}

// 编译器展开后的等效代码
bool lockTaken = false;
try
{
Monitor.Enter(_lock, ref lockTaken);
// 临界区
}
finally
{
if (lockTaken)
Monitor.Exit(_lock);
}

直接使用 Monitor 可以实现更复杂的操作:

private readonly object _lock = new object();
private Queue<string> _queue = new Queue<string>();

// 尝试获取锁,超时返回 false
if (Monitor.TryEnter(_lock, TimeSpan.FromSeconds(5)))
{
try
{
// 获得锁,执行操作
_queue.Enqueue("item");
}
finally
{
Monitor.Exit(_lock);
}
}
else
{
Console.WriteLine("获取锁超时");
}

Monitor.Wait/Pulse:线程间通信

Monitor 还提供了 WaitPulsePulseAll 方法,用于线程间的协调:

class ProducerConsumer
{
private readonly object _lock = new object();
private Queue<int> _queue = new Queue<int>();
private const int MaxSize = 5;

public void Produce(int item)
{
lock (_lock)
{
// 队列满时等待
while (_queue.Count >= MaxSize)
{
Monitor.Wait(_lock); // 释放锁并等待
}

_queue.Enqueue(item);
Console.WriteLine($"生产: {item},队列大小: {_queue.Count}");

// 通知消费者
Monitor.Pulse(_lock);
}
}

public int Consume()
{
lock (_lock)
{
// 队列空时等待
while (_queue.Count == 0)
{
Monitor.Wait(_lock); // 释放锁并等待
}

int item = _queue.Dequeue();
Console.WriteLine($"消费: {item},队列大小: {_queue.Count}");

// 通知生产者
Monitor.Pulse(_lock);

return item;
}
}
}

// 使用示例
var pc = new ProducerConsumer();

var producer = new Thread(() =>
{
for (int i = 0; i < 10; i++)
{
pc.Produce(i);
Thread.Sleep(100);
}
});

var consumer = new Thread(() =>
{
for (int i = 0; i < 10; i++)
{
pc.Consume();
Thread.Sleep(200);
}
});

producer.Start();
consumer.Start();
producer.Join();
consumer.Join();

Wait 会释放锁并让线程等待,直到其他线程调用 PulsePulseAll。这是实现生产者-消费者模式的基础。

Mutex:跨进程同步

Mutex(互斥体)类似于 lock,但它可以跨进程工作:

// 创建命名 Mutex(在整个操作系统中可见)
using var mutex = new Mutex(false, "MyUniqueAppName_SingleInstance");

// 尝试获取 Mutex
if (mutex.WaitOne(TimeSpan.FromSeconds(3), false))
{
try
{
Console.WriteLine("程序运行中...");
Console.ReadLine();
}
finally
{
mutex.ReleaseMutex();
}
}
else
{
Console.WriteLine("另一个实例已在运行");
}

命名 Mutex 常用于确保程序只运行一个实例。由于 Mutex 涉及操作系统内核对象,开销比 lock 大得多(约 50 倍),在进程内同步时应优先使用 lock

Semaphore 和 SemaphoreSlim:限制并发数

信号量控制同时访问某个资源的线程数量:

using System;
using System.Threading;

// SemaphoreSlim 是轻量级版本,用于进程内同步
var semaphore = new SemaphoreSlim(3, 3); // 初始计数3,最大计数3

for (int i = 1; i <= 10; i++)
{
int id = i;
Task.Run(async () =>
{
Console.WriteLine($"任务 {id} 等待进入");

await semaphore.WaitAsync(); // 异步等待

try
{
Console.WriteLine($"任务 {id} 进入执行");
await Task.Delay(1000);
Console.WriteLine($"任务 {id} 完成");
}
finally
{
semaphore.Release(); // 释放信号量
}
});
}

Thread.Sleep(5000);

信号量就像一个夜店:最多容纳一定数量的人,满了之后其他人必须在外面等。每当有人离开,才能让新的人进入。

Semaphore 是跨进程的版本,可以用于系统级的并发控制:

// 命名 Semaphore 可以跨进程使用
var semaphore = new Semaphore(3, 3, "MyApp_Semaphore");

ReaderWriterLockSlim:读写分离锁

当读操作远多于写操作时,使用读写锁可以提高并发性能。读写锁允许多个线程同时读取,但写入时需要独占访问:

using System;
using System.Threading;

class ThreadSafeCache
{
private readonly ReaderWriterLockSlim _lock = new ReaderWriterLockSlim();
private readonly Dictionary<string, string> _cache = new Dictionary<string, string>();

public string Get(string key)
{
_lock.EnterReadLock();
try
{
_cache.TryGetValue(key, out string value);
return value;
}
finally
{
_lock.ExitReadLock();
}
}

public void Set(string key, string value)
{
_lock.EnterWriteLock();
try
{
_cache[key] = value;
}
finally
{
_lock.ExitWriteLock();
}
}

// 升级锁:读锁升级为写锁
public string GetOrAdd(string key, Func<string> factory)
{
_lock.EnterUpgradeableReadLock();
try
{
if (_cache.TryGetValue(key, out string value))
return value;

_lock.EnterWriteLock();
try
{
value = factory();
_cache[key] = value;
return value;
}
finally
{
_lock.ExitWriteLock();
}
}
finally
{
_lock.ExitUpgradeableReadLock();
}
}
}

读写锁的三种模式:

  • 读锁:多个线程可以同时持有读锁
  • 写锁:独占锁,同一时刻只有一个线程可以持有
  • 可升级读锁:允许在读的过程中升级为写锁

SpinLock:高并发场景的自旋锁

当锁的持有时间非常短时,线程等待时"自旋"(循环检查)比阻塞更高效:

SpinLock spinLock = new SpinLock();
int counter = 0;

void Increment()
{
bool lockTaken = false;
try
{
spinLock.Enter(ref lockTaken);
counter++;
}
finally
{
if (lockTaken)
spinLock.Exit();
}
}

// 多线程测试
Parallel.For(0, 100000, i => Increment());
Console.WriteLine($"计数: {counter}");

SpinLock 适合锁持有时间极短(微秒级)的高并发场景。如果锁持有时间较长,自旋会浪费 CPU 资源,此时应该使用阻塞式锁。

事件同步原语

事件同步原语用于线程间的信号通知。

AutoResetEvent:自动重置事件

AutoResetEvent 就像一个旋转门:每次信号只允许一个等待的线程通过,然后自动关闭:

using System;
using System.Threading;

var autoEvent = new AutoResetEvent(false); // 初始状态为无信号

var waiter = new Thread(() =>
{
Console.WriteLine("线程等待信号...");
autoEvent.WaitOne(); // 阻塞等待信号
Console.WriteLine("线程收到信号,继续执行");
});

waiter.Start();
Thread.Sleep(1000);

Console.WriteLine("发送信号");
autoEvent.Set(); // 发送信号,唤醒一个等待线程

waiter.Join();

ManualResetEvent:手动重置事件

ManualResetEvent 就像一扇门:打开后所有等待的线程都能通过,直到手动关闭:

using System;
using System.Threading;

var manualEvent = new ManualResetEvent(false); // 初始状态为关闭

// 启动多个等待线程
for (int i = 1; i <= 3; i++)
{
int id = i;
new Thread(() =>
{
Console.WriteLine($"线程 {id} 等待");
manualEvent.WaitOne();
Console.WriteLine($"线程 {id} 通过");
}).Start();
}

Thread.Sleep(500);
Console.WriteLine("打开门(发送信号)");
manualEvent.Set(); // 打开门,所有等待线程都能通过

Thread.Sleep(500);
Console.WriteLine("关闭门");
manualEvent.Reset(); // 关闭门,后续线程会被阻塞

CountdownEvent:倒计时事件

CountdownEvent 在计数归零时发出信号,适合等待多个任务完成:

using System;
using System.Threading;

int taskCount = 5;
using var countdown = new CountdownEvent(taskCount);

for (int i = 1; i <= taskCount; i++)
{
int id = i;
Task.Run(() =>
{
Console.WriteLine($"任务 {id} 开始");
Thread.Sleep(1000 * id);
Console.WriteLine($"任务 {id} 完成");
countdown.Signal(); // 计数减一
});
}

Console.WriteLine("等待所有任务完成...");
countdown.Wait(); // 阻塞直到计数归零
Console.WriteLine("所有任务已完成");

Barrier:阶段同步屏障

Barrier 用于多线程的分阶段同步,所有线程到达屏障后才能继续下一阶段:

using System;
using System.Threading;

int participantCount = 3;
var barrier = new Barrier(participantCount, b =>
{
// 每个阶段完成后执行
Console.WriteLine($"=== 阶段 {b.CurrentPhaseNumber} 完成 ===\n");
});

for (int i = 0; i < participantCount; i++)
{
int id = i;
new Thread(() =>
{
for (int phase = 0; phase < 3; phase++)
{
Console.WriteLine($"线程 {id} 执行阶段 {phase}");
Thread.Sleep(500 * (id + 1));

barrier.SignalAndWait(); // 到达屏障,等待其他线程
}
Console.WriteLine($"线程 {id} 完成");
}).Start();
}

Barrier 特别适合迭代算法,比如并行排序或矩阵运算,每轮迭代后需要同步所有线程。

原子操作:Interlocked 类

对于简单的数值操作,Interlocked 类提供了无锁的原子操作:

using System;
using System.Threading;

int counter = 0;

// 原子递增
Interlocked.Increment(ref counter); // counter++

// 原子递减
Interlocked.Decrement(ref counter); // counter--

// 原子加法
Interlocked.Add(ref counter, 10); // counter += 10

// 原子交换
int original = Interlocked.Exchange(ref counter, 0); // counter = 0, 返回原值

// 条件交换:只有当前值等于比较值时才交换
int comparand = 5;
int newValue = 10;
Interlocked.CompareExchange(ref counter, newValue, comparand);
// 如果 counter == 5,则 counter = 10

// 原子读取 64 位值(32位系统上需要)
long value = Interlocked.Read(ref someLongValue);

Interlocked 比锁更高效,但只能用于简单操作。复杂的逻辑仍需要锁来保护。

线程安全集合

.NET 提供了一组线程安全的集合类,位于 System.Collections.Concurrent 命名空间:

ConcurrentQueue:线程安全队列

using System.Collections.Concurrent;

var queue = new ConcurrentQueue<int>();

// 多线程入队
Parallel.For(0, 100, i => queue.Enqueue(i));

// 尝试出队
while (queue.TryDequeue(out int item))
{
Console.WriteLine(item);
}

ConcurrentStack:线程安全栈

var stack = new ConcurrentStack<int>();

// 入栈
stack.Push(1);
stack.PushRange(new[] { 2, 3, 4 });

// 尝试出栈
if (stack.TryPop(out int item))
{
Console.WriteLine($"弹出: {item}");
}

ConcurrentDictionary:线程安全字典

var dict = new ConcurrentDictionary<string, int>();

// 添加或更新
dict.AddOrUpdate("key", 1, (key, oldValue) => oldValue + 1);

// 获取或添加
int value = dict.GetOrAdd("key", 0);

// 尝试获取
if (dict.TryGetValue("key", out int val))
{
Console.WriteLine(val);
}

// 尝试移除
if (dict.TryRemove("key", out int removed))
{
Console.WriteLine($"移除: {removed}");
}

ConcurrentBag:无序集合

ConcurrentBag 适用于顺序无关的场景,性能更好:

var bag = new ConcurrentBag<int>();

Parallel.For(0, 100, i => bag.Add(i));

int count = bag.Count; // 获取元素数量

BlockingCollection:生产者-消费者集合

BlockingCollection 是最强大的线程安全集合,支持阻塞和容量限制:

using System.Collections.Concurrent;

// 创建有界集合(最大容量100)
var collection = new BlockingCollection<int>(100);

// 生产者任务
var producer = Task.Run(() =>
{
for (int i = 0; i < 100; i++)
{
collection.Add(i); // 满时阻塞
Console.WriteLine($"生产: {i}");
}
collection.CompleteAdding(); // 标记添加完成
});

// 消费者任务
var consumer = Task.Run(() =>
{
// 方式一:使用 GetConsumingEnumerable
foreach (var item in collection.GetConsumingEnumerable())
{
Console.WriteLine($"消费: {item}");
Thread.Sleep(10);
}

// 方式二:手动循环
// while (!collection.IsCompleted)
// {
// if (collection.TryTake(out int item, 100))
// Console.WriteLine($"消费: {item}");
// }
});

await Task.WhenAll(producer, consumer);
Console.WriteLine("所有任务完成");

BlockingCollection 可以包装其他集合类型:

// 使用栈作为底层存储(后进先出)
var stackCollection = new BlockingCollection<int>(new ConcurrentStack<int>());

// 使用队列作为底层存储(先进先出,默认)
var queueCollection = new BlockingCollection<int>(new ConcurrentQueue<int>());

取消操作:CancellationToken

长时间运行的任务应该支持取消。CancellationToken 是 .NET 中标准的取消机制:

using System;
using System.Threading;

// 创建取消源
using var cts = new CancellationTokenSource();

// 启动可取消的任务
var task = Task.Run(() =>
{
for (int i = 0; i < 100; i++)
{
// 检查是否请求取消
if (cts.Token.IsCancellationRequested)
{
Console.WriteLine("任务被取消,正在清理...");
cts.Token.ThrowIfCancellationRequested(); // 抛出 OperationCanceledException
}

Console.WriteLine($"处理中: {i}");
Thread.Sleep(100);
}
Console.WriteLine("任务完成");
}, cts.Token);

// 3秒后取消
Thread.Sleep(3000);
cts.Cancel();

try
{
task.Wait();
}
catch (AggregateException ex)
{
Console.WriteLine($"任务异常: {ex.InnerException?.Message}");
}

取消超时

// 设置超时自动取消
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));

// 或者稍后设置
cts.CancelAfter(TimeSpan.FromSeconds(10));

组合多个取消令牌

var cts1 = new CancellationTokenSource();
var cts2 = new CancellationTokenSource();

// 任一令牌取消都会触发
var combined = CancellationTokenSource.CreateLinkedTokenSource(
cts1.Token, cts2.Token);

// 使用 combined.Token

取消异步操作

async Task DownloadWithCancelAsync(string url, CancellationToken ct)
{
using var client = new HttpClient();

// 传递取消令牌给异步方法
var response = await client.GetAsync(url, ct);
var content = await response.Content.ReadAsStringAsync(ct);

Console.WriteLine($"下载完成: {content.Length} 字节");
}

// 使用
using var cts = new CancellationTokenSource();
cts.CancelAfter(TimeSpan.FromSeconds(10));

await DownloadWithCancelAsync("https://example.com", cts.Token);

线程本地存储

ThreadLocal

ThreadLocal<T> 为每个线程提供独立的变量副本:

using System;
using System.Threading;

// 每个线程有独立的计数器
var threadLocal = new ThreadLocal<int>(() => Thread.CurrentThread.ManagedThreadId);

Parallel.For(0, 5, i =>
{
Console.WriteLine($"线程 {Thread.CurrentThread.ManagedThreadId}, 值: {threadLocal.Value}");
threadLocal.Value++; // 只影响当前线程的副本
});

// 获取所有线程的值
Console.WriteLine($"所有值: {string.Join(", ", threadLocal.Values)}");

threadLocal.Dispose();

AsyncLocal

AsyncLocal<T> 用于异步上下文中传递数据:

using System;
using System.Threading;

var asyncLocal = new AsyncLocal<string>();

asyncLocal.Value = "主线程数据";

await Task.Run(() =>
{
Console.WriteLine($"任务中: {asyncLocal.Value}"); // 可以访问
asyncLocal.Value = "任务中修改";
});

Console.WriteLine($"主线程: {asyncLocal.Value}"); // 仍然是 "主线程数据"

AsyncLocal 的值会沿着异步调用链传递,但不会反向传播(除非使用 AsyncLocalValueChangedContext)。

死锁:识别与预防

什么是死锁?

死锁是指两个或多个线程相互等待对方释放资源,导致所有线程都无法继续执行:

object lockA = new object();
object lockB = new object();

// 线程1:先锁A,再锁B
new Thread(() =>
{
lock (lockA)
{
Thread.Sleep(100);
lock (lockB) // 等待锁B
{
Console.WriteLine("线程1完成");
}
}
}).Start();

// 线程2:先锁B,再锁A
new Thread(() =>
{
lock (lockB)
{
Thread.Sleep(100);
lock (lockA) // 等待锁A - 死锁!
{
Console.WriteLine("线程2完成");
}
}
}).Start();

预防死锁的策略

1. 锁顺序

始终以相同的顺序获取多个锁:

// 正确做法:按特定顺序获取锁
void Transfer(Account from, Account to, decimal amount)
{
// 按 ID 排序确保锁顺序一致
var first = from.Id < to.Id ? from : to;
var second = from.Id < to.Id ? to : from;

lock (first)
{
lock (second)
{
from.Balance -= amount;
to.Balance += amount;
}
}
}

2. 使用 TryEnter 设置超时

if (Monitor.TryEnter(lockA, TimeSpan.FromSeconds(5)))
{
try
{
if (Monitor.TryEnter(lockB, TimeSpan.FromSeconds(5)))
{
try
{
// 执行操作
}
finally
{
Monitor.Exit(lockB);
}
}
}
finally
{
Monitor.Exit(lockA);
}
}

3. 减少锁的持有时间

// 错误:持锁时间过长
lock (_lock)
{
var data = GetData();
ProcessData(data); // 耗时操作
SaveData(data);
}

// 正确:只锁必要的部分
List<Data> data;
lock (_lock)
{
data = GetData(); // 快速复制
}
ProcessData(data); // 不持锁处理
lock (_lock)
{
SaveData(data);
}

4. 使用更高级的抽象

优先使用 BlockingCollectionChannel 等高级抽象,而不是手动加锁:

// 现代 C# 使用 Channel 实现生产者-消费者
var channel = System.Threading.Channels.Channel.CreateBounded<int>(100);

// 生产者
async Task ProduceAsync()
{
for (int i = 0; i < 100; i++)
{
await channel.Writer.WriteAsync(i);
}
channel.Writer.Complete();
}

// 消费者
async Task ConsumeAsync()
{
await foreach (var item in channel.Reader.ReadAllAsync())
{
Console.WriteLine(item);
}
}

并行编程

Parallel.For 和 Parallel.ForEach

using System;
using System.Threading.Tasks;

// 并行 for 循环
Parallel.For(0, 100, i =>
{
Console.WriteLine($"处理 {i}");
});

// 并行 foreach
var items = Enumerable.Range(0, 100).ToList();
Parallel.ForEach(items, item =>
{
Process(item);
});

// 带选项的并行循环
var options = new ParallelOptions
{
MaxDegreeOfParallelism = 4, // 最大并行度
CancellationToken = cts.Token
};

Parallel.For(0, 100, options, i =>
{
// 并行处理
});

PLINQ:并行 LINQ

var numbers = Enumerable.Range(1, 1000000);

// 并行处理
var result = numbers
.AsParallel()
.Where(n => n % 2 == 0)
.Select(n => n * n)
.OrderBy(n => n)
.ToList();

// 指定并行度
var result2 = numbers
.AsParallel()
.AsOrdered() // 保持原始顺序
.WithDegreeOfParallelism(4)
.Where(n => n % 2 == 0)
.ToList();

注意:并非所有操作都适合并行化。对于小数据集或简单操作,并行化的开销可能超过收益。

多线程最佳实践

1. 优先使用高级抽象

优先级从高到低:

  1. async/awaitTask
  2. Parallel.For/ForEach 和 PLINQ
  3. BlockingCollection 等线程安全集合
  4. ThreadPool.QueueUserWorkItem
  5. Thread 类(尽量避免直接使用)

2. 避免过度同步

不必要的同步会降低性能和可扩展性:

  • 使用不可变对象,避免共享状态
  • 使用线程安全集合代替手动加锁
  • 考虑使用 ThreadLocalAsyncLocal 避免共享

3. 正确处理异常

线程中未处理的异常会导致程序崩溃:

var task = Task.Run(() =>
{
throw new InvalidOperationException("任务中的异常");
});

try
{
task.Wait();
}
catch (AggregateException ex)
{
// 处理所有内部异常
foreach (var inner in ex.InnerExceptions)
{
Console.WriteLine($"异常: {inner.Message}");
}
}

// 或使用 await
try
{
await task;
}
catch (InvalidOperationException ex)
{
Console.WriteLine($"异常: {ex.Message}");
}

4. 避免线程亲和性问题

某些对象有线程亲和性(如 UI 控件),只能在创建它们的线程上访问:

// WPF 示例
Dispatcher.Invoke(() =>
{
// 在 UI 线程执行
textBox.Text = "更新文本";
});

// WinForms 示例
if (textBox.InvokeRequired)
{
textBox.Invoke(() => textBox.Text = "更新文本");
}

5. 使用诊断工具

  • lock 竞争多时考虑使用 ReaderWriterLockSlimSpinLock
  • 使用 Visual Studio 并行调试窗口分析线程状态
  • 使用性能分析器查找锁竞争热点

小结

多线程编程的核心要点:

概念用途选择建议
Thread精细控制线程需要前台线程或特定优先级时
ThreadPool短任务执行通过 Task.Run 使用
lock保护共享资源最常用的同步方式
Monitor复杂同步逻辑需要 TryEnterWait/Pulse
Mutex跨进程同步确保单实例运行
Semaphore限制并发数连接池、资源池
ReaderWriterLockSlim读写分离读多写少场景
Event线程信号AutoResetEventManualResetEvent
Barrier阶段同步并行迭代算法
Interlocked原子操作简单数值操作
ConcurrentDictionary线程安全字典并发读写场景
BlockingCollection生产者-消费者数据管道
CancellationToken取消操作长时间运行的任务

练习

  1. 实现一个线程安全的计数器类,支持递增、递减和获取当前值
  2. 使用 BlockingCollection 实现一个简单的任务队列
  3. 使用 Barrier 实现一个多线程并行排序算法
  4. 实现一个支持取消的异步下载器,同时下载多个文件
  5. 使用 ReaderWriterLockSlim 实现一个线程安全的缓存类