Go 并发编程
本章将介绍 Go 语言强大的并发编程能力。Go 从语言层面原生支持并发,通过 goroutine 和 channel 使得并发编程变得简单而高效。
并发与并行
在开始学习 Go 的并发特性之前,我们需要理解两个重要概念:
- 并发(Concurrency):是指同时处理多个任务的能力。在单核 CPU 上,通过时间片轮转的方式交替执行多个任务,从宏观上看像是同时进行的。
- 并行(Parallelism):是指同时执行多个任务的能力。需要多核 CPU 支持,每个核心真正同时执行不同的任务。
并发关注的是结构,即如何组织程序来处理多个任务;并行关注的是执行,即如何同时运行多个任务。Go 的并发模型可以让程序在单核上实现并发,在多核上实现并行。
Go 的并发模型
Go 采用了 CSP(Communicating Sequential Processes,通信顺序进程) 模型,核心理念是:
"不要通过共享内存来通信,而要通过通信来共享内存。"
这意味着在 Go 中,goroutine 之间通过 channel 传递数据来通信,而不是通过共享变量并加锁来同步。
Goroutine
Goroutine 是 Go 中轻量级的并发执行单元。它比操作系统线程更轻量,初始栈大小只有 2KB(可以动态伸缩),而线程通常需要 1MB 以上的栈空间。
创建 Goroutine
使用 go 关键字启动一个 goroutine:
package main
import (
"fmt"
"time"
)
func sayHello() {
fmt.Println("Hello from goroutine!")
}
func main() {
// 启动 goroutine
go sayHello()
fmt.Println("Hello from main!")
// 等待 goroutine 执行完成
time.Sleep(time.Second)
}
输出顺序可能为:
Hello from main!
Hello from goroutine!
main 函数本身也是一个 goroutine。当 main 函数返回时,所有其他 goroutine 都会被立即终止,不管它们是否执行完成。这就是为什么上面代码需要 time.Sleep 来等待 goroutine 完成。
匿名函数 Goroutine
可以使用匿名函数创建 goroutine:
func main() {
// 方式一:立即执行的匿名函数
go func() {
fmt.Println("匿名 goroutine")
}()
// 方式二:带参数的匿名函数
message := "Hello"
go func(msg string) {
fmt.Println(msg)
}(message)
time.Sleep(time.Second)
}
闭包变量捕获问题
在使用闭包时需要注意变量捕获的问题:
// ❌ 错误示例:所有 goroutine 可能都打印 3
for i := 0; i < 3; i++ {
go func() {
fmt.Println(i) // 捕获的是变量 i 的引用
}()
}
// ✅ 正确示例:将 i 作为参数传递
for i := 0; i < 3; i++ {
go func(n int) {
fmt.Println(n) // 每个 goroutine 有自己的 n 副本
}(i)
}
使用 sync.WaitGroup 等待
使用 time.Sleep 等待 goroutine 完成是不推荐的,应该使用 sync.WaitGroup:
package main
import (
"fmt"
"sync"
"time"
)
func worker(id int, wg *sync.WaitGroup) {
// 延迟调用 Done,确保函数退出时通知 WaitGroup
defer wg.Done()
fmt.Printf("Worker %d starting\n", id)
time.Sleep(time.Second)
fmt.Printf("Worker %d done\n", id)
}
func main() {
var wg sync.WaitGroup
// 启动 5 个 goroutine
for i := 1; i <= 5; i++ {
// 在启动 goroutine 前增加计数器
wg.Add(1)
go worker(i, &wg)
}
// 等待所有 goroutine 完成
wg.Wait()
fmt.Println("All workers done")
}
WaitGroup 的三个方法:
Add(delta int):增加计数器,通常在启动 goroutine 前调用Done():减少计数器,等价于Add(-1)Wait():阻塞直到计数器归零
wg.Add(1) 应该在启动 goroutine 之前调用,而不是在 goroutine 内部调用,以避免竞态条件。
Channel
Channel(通道)是 goroutine 之间通信的管道。通过 channel,一个 goroutine 可以发送值给另一个 goroutine。
创建 Channel
使用 make 创建 channel:
// 无缓冲 channel
ch := make(chan int)
// 有缓冲 channel,容量为 10
ch := make(chan int, 10)
发送和接收
使用 <- 操作符发送和接收数据:
ch := make(chan int)
// 发送数据
ch <- 42
// 接收数据
value := <-ch
// 接收数据并检查 channel 是否已关闭
value, ok := <-ch
// 如果 ok 为 false,表示 channel 已关闭且没有更多数据
无缓冲 Channel
无缓冲 channel 也称为同步 channel,发送操作会阻塞直到另一个 goroutine 执行接收操作:
func main() {
ch := make(chan string)
// 发送 goroutine
go func() {
ch <- "Hello" // 阻塞,直到有人接收
fmt.Println("消息已发送")
}()
// 接收
msg := <-ch // 阻塞,直到有数据可接收
fmt.Println(msg)
}
特点:
- 发送和接收操作是同步的
- 每次发送必须有对应的接收,否则会死锁
- 适合用于 goroutine 间的同步
有缓冲 Channel
有缓冲 channel 有一个固定大小的缓冲区,发送操作只有在缓冲区满时才会阻塞:
func main() {
// 创建容量为 2 的缓冲 channel
ch := make(chan int, 2)
// 可以连续发送两个值而不阻塞
ch <- 1
ch <- 2
// 第三个发送会阻塞(缓冲区已满)
// ch <- 3 // 死锁!
// 接收数据
fmt.Println(<-ch) // 1
fmt.Println(<-ch) // 2
}
特点:
- 发送操作在缓冲区未满时不会阻塞
- 接收操作在缓冲区不为空时不会阻塞
- 适合用于生产者-消费者模式
关闭 Channel
使用 close 函数关闭 channel:
ch := make(chan int, 3)
ch <- 1
ch <- 2
close(ch) // 关闭 channel
// 从已关闭的 channel 接收
value, ok := <-ch
fmt.Println(value, ok) // 1 true
value, ok = <-ch
fmt.Println(value, ok) // 2 true
value, ok = <-ch
fmt.Println(value, ok) // 0 false(channel 已空且关闭)
关闭 channel 的规则:
- 只有发送者应该关闭 channel,接收者不应该关闭
- 向已关闭的 channel 发送数据会 panic
- 从已关闭的 channel 接收会返回零值和
false - 关闭一个已关闭的 channel 会 panic
- 关闭 nil channel 会 panic
使用 range 遍历 Channel
可以使用 for range 持续从 channel 接收数据,直到 channel 关闭:
func main() {
ch := make(chan int, 5)
// 发送数据
go func() {
for i := 0; i < 5; i++ {
ch <- i
}
close(ch) // 发送完毕后关闭
}()
// 接收数据直到 channel 关闭
for value := range ch {
fmt.Println(value)
}
fmt.Println("Channel 已关闭")
}
单向 Channel
Go 支持单向 channel 类型,用于限制 channel 的操作方向:
// 只发送 channel
func sender(ch chan<- int) {
ch <- 42
// <-ch // 编译错误!不能从只发送 channel 接收
}
// 只接收 channel
func receiver(ch <-chan int) {
value := <-ch
fmt.Println(value)
// ch <- 1 // 编译错误!不能向只接收 channel 发送
}
func main() {
ch := make(chan int)
go sender(ch)
go receiver(ch)
}
类型转换:
ch := make(chan int) // 双向 channel
var sendCh chan<- int = ch // 转换为只发送
var recvCh <-chan int = ch // 转换为只接收
Select
select 语句用于同时等待多个 channel 操作,类似于网络编程中的 select/poll/epoll。
基本用法
func main() {
ch1 := make(chan string)
ch2 := make(chan string)
go func() {
time.Sleep(time.Second)
ch1 <- "from ch1"
}()
go func() {
time.Sleep(500 * time.Millisecond)
ch2 <- "from ch2"
}()
// 等待最先到达的数据
select {
case msg1 := <-ch1:
fmt.Println("Received", msg1)
case msg2 := <-ch2:
fmt.Println("Received", msg2)
}
}
超时处理
使用 time.After 实现超时:
func main() {
ch := make(chan string)
go func() {
time.Sleep(2 * time.Second)
ch <- "result"
}()
select {
case res := <-ch:
fmt.Println(res)
case <-time.After(time.Second):
fmt.Println("Timeout!")
}
}
非阻塞操作
使用 default 实现非阻塞操作:
func main() {
ch := make(chan int, 1)
select {
case ch <- 42:
fmt.Println("发送成功")
default:
fmt.Println("发送失败(channel 满)")
}
select {
case value := <-ch:
fmt.Println("接收:", value)
default:
fmt.Println("接收失败(channel 空)")
}
}
检查 Channel 是否已关闭
func main() {
ch := make(chan int, 1)
close(ch)
select {
case value, ok := <-ch:
if ok {
fmt.Println("接收到:", value)
} else {
fmt.Println("Channel 已关闭")
}
default:
fmt.Println("没有数据")
}
}
并发安全
当多个 goroutine 同时访问共享资源时,需要使用同步机制来保证数据安全。
sync.Mutex
互斥锁用于保护临界区,同一时间只有一个 goroutine 可以访问:
package main
import (
"fmt"
"sync"
)
type Counter struct {
mu sync.Mutex
value int
}
func (c *Counter) Increment() {
c.mu.Lock() // 获取锁
defer c.mu.Unlock() // 释放锁
c.value++
}
func (c *Counter) Value() int {
c.mu.Lock()
defer c.mu.Unlock()
return c.value
}
func main() {
counter := Counter{}
var wg sync.WaitGroup
// 启动 1000 个 goroutine
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Increment()
}()
}
wg.Wait()
fmt.Println("Counter:", counter.Value()) // 1000
}
sync.RWMutex
读写锁允许多个读者同时读取,但写者需要独占访问:
package main
import (
"fmt"
"sync"
"time"
)
type Cache struct {
mu sync.RWMutex
data map[string]string
}
func (c *Cache) Get(key string) string {
c.mu.RLock() // 读锁
defer c.mu.RUnlock()
return c.data[key]
}
func (c *Cache) Set(key, value string) {
c.mu.Lock() // 写锁
defer c.mu.Unlock()
c.data[key] = value
}
func main() {
cache := &Cache{
data: make(map[string]string),
}
// 写操作
go func() {
cache.Set("key", "value")
}()
// 多个读操作可以同时进行
for i := 0; i < 3; i++ {
go func() {
fmt.Println(cache.Get("key"))
}()
}
}
读写锁的规则:
- 读锁可以同时被多个 goroutine 获取
- 写锁是独占的,只能被一个 goroutine 获取
- 如果有写锁存在,读锁会被阻塞
- 如果有读锁存在,写锁会被阻塞
sync/atomic
Go 提供了原子操作包,用于对基本类型进行无锁并发操作:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var counter int64
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
atomic.AddInt64(&counter, 1) // 原子加操作
}()
}
wg.Wait()
fmt.Println("Counter:", counter) // 1000
// 比较并交换(Compare And Swap)
old := atomic.LoadInt64(&counter)
swapped := atomic.CompareAndSwapInt64(&counter, old, 0)
fmt.Println("Swapped:", swapped) // true
fmt.Println("Counter:", counter) // 0
}
常用的原子操作:
Add:原子加法Load:原子读取Store:原子存储Swap:原子交换CompareAndSwap:比较并交换
atomic.Int64(Go 1.19+)
Go 1.19 引入了新的原子类型,使用更加方便:
package main
import (
"fmt"
"sync"
"sync/atomic"
)
func main() {
var counter atomic.Int64
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter.Add(1)
}()
}
wg.Wait()
fmt.Println("Counter:", counter.Load()) // 1000
// 原子交换
old := counter.Swap(0)
fmt.Println("Old value:", old) // 1000
fmt.Println("New value:", counter.Load()) // 0
// 比较并交换
swapped := counter.CompareAndSwap(0, 42)
fmt.Println("Swapped:", swapped) // true
fmt.Println("Value:", counter.Load()) // 42
}
Context
Context 包用于在 API 边界和进程之间传递截止时间、取消信号和其他请求范围的值。
创建 Context
import "context"
// 背景 context,作为根 context
ctx := context.Background()
// TODO context,用于不确定使用哪个 context 时
ctx := context.TODO()
取消信号
package main
import (
"context"
"fmt"
"time"
)
func worker(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Println("Worker stopped:", ctx.Err())
return
default:
fmt.Println("Working...")
time.Sleep(500 * time.Millisecond)
}
}
}
func main() {
// 创建可取消的 context
ctx, cancel := context.WithCancel(context.Background())
go worker(ctx)
time.Sleep(2 * time.Second)
cancel() // 取消 context
time.Sleep(time.Second) // 等待 worker 退出
}
超时控制
func main() {
// 创建带超时的 context
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel() // 释放资源
select {
case <-time.After(2 * time.Second):
fmt.Println("操作完成")
case <-ctx.Done():
fmt.Println("超时:", ctx.Err()) // context deadline exceeded
}
}
截止时间
func main() {
// 设置截止时间
deadline := time.Now().Add(2 * time.Second)
ctx, cancel := context.WithDeadline(context.Background(), deadline)
defer cancel()
// 检查截止时间
d, ok := ctx.Deadline()
fmt.Println("Deadline:", d, "set:", ok)
select {
case <-ctx.Done():
fmt.Println("已到达截止时间")
}
}
传递值
func main() {
// 创建带值的 context
ctx := context.WithValue(context.Background(), "userID", 12345)
ctx = context.WithValue(ctx, "requestID", "abc-123")
// 获取值
if userID, ok := ctx.Value("userID").(int); ok {
fmt.Println("UserID:", userID)
}
if requestID, ok := ctx.Value("requestID").(string); ok {
fmt.Println("RequestID:", requestID)
}
}
Context 的 Value 应该用于传递请求范围的数据,如 trace ID、用户 ID 等,不要用于传递可选参数或业务数据。
HTTP 请求中的 Context
func main() {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", "https://example.com", nil)
if err != nil {
log.Fatal(err)
}
resp, err := http.DefaultClient.Do(req)
if err != nil {
log.Fatal(err)
}
defer resp.Body.Close()
fmt.Println("Status:", resp.Status)
}
并发模式
Pipeline 模式
Pipeline 是一系列通过 channel 连接的处理阶段:
package main
import "fmt"
// 第一阶段:生成数据
func generate(nums ...int) <-chan int {
out := make(chan int)
go func() {
for _, n := range nums {
out <- n
}
close(out)
}()
return out
}
// 第二阶段:处理数据(平方)
func square(in <-chan int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
out <- n * n
}
close(out)
}()
return out
}
func main() {
// 构建 pipeline
c := generate(2, 3)
out := square(c)
// 消费结果
for n := range out {
fmt.Println(n) // 4, 9
}
// 可以链式调用
for n := range square(square(generate(2, 3))) {
fmt.Println(n) // 16, 81
}
}
Fan-out / Fan-in 模式
Fan-out:多个 goroutine 从同一个 channel 读取数据,实现并行处理。 Fan-in:将多个 channel 的结果合并到一个 channel。
package main
import (
"fmt"
"sync"
)
// Fan-out:启动多个 worker 处理同一个输入 channel
func worker(id int, in <-chan int, out chan<- int) {
for n := range in {
fmt.Printf("Worker %d: processing %d\n", id, n)
out <- n * n
}
}
// Fan-in:合并多个 channel 到一个 channel
func merge(channels ...<-chan int) <-chan int {
var wg sync.WaitGroup
out := make(chan int)
// 为每个输入 channel 启动一个 goroutine
output := func(c <-chan int) {
for n := range c {
out <- n
}
wg.Done()
}
wg.Add(len(channels))
for _, c := range channels {
go output(c)
}
// 等待所有输入 channel 关闭后,关闭输出 channel
go func() {
wg.Wait()
close(out)
}()
return out
}
func main() {
// 输入 channel
in := make(chan int, 10)
// 启动 3 个 worker
out1 := make(chan int)
out2 := make(chan int)
out3 := make(chan int)
go worker(1, in, out1)
go worker(2, in, out2)
go worker(3, in, out3)
// 发送数据
for i := 0; i < 10; i++ {
in <- i
}
close(in)
// 关闭输出 channel
go func() {
// 等待一段时间让 worker 处理完
time.Sleep(time.Second)
close(out1)
close(out2)
close(out3)
}()
// 合并结果
for n := range merge(out1, out2, out3) {
fmt.Println(n)
}
}
Worker Pool 模式
创建固定数量的 worker 来处理任务:
package main
import (
"fmt"
"sync"
"time"
)
type Job struct {
ID int
Data string
}
type Result struct {
Job Job
Result string
}
func worker(id int, jobs <-chan Job, results chan<- Result, wg *sync.WaitGroup) {
defer wg.Done()
for job := range jobs {
fmt.Printf("Worker %d: processing job %d\n", id, job.ID)
time.Sleep(time.Second) // 模拟处理
results <- Result{
Job: job,
Result: fmt.Sprintf("Processed: %s", job.Data),
}
}
}
func main() {
const numWorkers = 3
const numJobs = 10
jobs := make(chan Job, numJobs)
results := make(chan Result, numJobs)
var wg sync.WaitGroup
// 启动 worker
for i := 1; i <= numWorkers; i++ {
wg.Add(1)
go worker(i, jobs, results, &wg)
}
// 发送任务
for i := 1; i <= numJobs; i++ {
jobs <- Job{ID: i, Data: fmt.Sprintf("Task %d", i)}
}
close(jobs)
// 等待所有 worker 完成
go func() {
wg.Wait()
close(results)
}()
// 收集结果
for result := range results {
fmt.Printf("Result: Job %d -> %s\n", result.Job.ID, result.Result)
}
}
生产者-消费者模式
package main
import (
"fmt"
"math/rand"
"sync"
"time"
)
func producer(id int, ch chan<- int, wg *sync.WaitGroup) {
defer wg.Done()
for i := 0; i < 5; i++ {
num := rand.Intn(100)
ch <- num
fmt.Printf("Producer %d: produced %d\n", id, num)
time.Sleep(time.Duration(rand.Intn(500)) * time.Millisecond)
}
}
func consumer(id int, ch <-chan int, wg *sync.WaitGroup) {
defer wg.Done()
for num := range ch {
fmt.Printf("Consumer %d: consumed %d\n", id, num)
time.Sleep(time.Duration(rand.Intn(300)) * time.Millisecond)
}
}
func main() {
ch := make(chan int, 5)
var wg sync.WaitGroup
// 启动生产者
for i := 1; i <= 2; i++ {
wg.Add(1)
go producer(i, ch, &wg)
}
// 启动消费者
for i := 1; i <= 3; i++ {
wg.Add(1)
go consumer(i, ch, &wg)
}
// 等待生产者完成后关闭 channel
go func() {
wg.Wait()
close(ch)
}()
time.Sleep(5 * time.Second)
}
速率限制模式
使用 ticker 实现速率限制:
package main
import (
"fmt"
"time"
)
func main() {
requests := make(chan int, 10)
for i := 0; i < 10; i++ {
requests <- i
}
close(requests)
// 每 200ms 处理一个请求
limiter := time.Tick(200 * time.Millisecond)
for req := range requests {
<-limiter // 等待 ticker
fmt.Println("Request", req, time.Now().Format("15:04:05.000"))
}
}
等待超时模式
func waitForResult() (string, error) {
result := make(chan string, 1)
go func() {
time.Sleep(2 * time.Second) // 模拟耗时操作
result <- "success"
}()
select {
case res := <-result:
return res, nil
case <-time.After(time.Second):
return "", fmt.Errorf("timeout")
}
}
其他同步原语
sync.Once
确保某个操作只执行一次:
package main
import (
"fmt"
"sync"
)
type Singleton struct {
name string
}
var (
instance *Singleton
once sync.Once
)
func GetInstance() *Singleton {
once.Do(func() {
fmt.Println("Creating instance...")
instance = &Singleton{name: "Singleton"}
})
return instance
}
func main() {
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
wg.Add(1)
go func() {
defer wg.Done()
inst := GetInstance()
fmt.Printf("Instance: %p\n", inst)
}()
}
wg.Wait()
// 输出:Creating instance... 只出现一次
// 所有 goroutine 获取的是同一个实例
}
sync.Cond
条件变量,用于等待某个条件成立:
package main
import (
"fmt"
"sync"
"time"
)
type Queue struct {
items []int
cond *sync.Cond
}
func NewQueue() *Queue {
return &Queue{
cond: sync.NewCond(&sync.Mutex{}),
}
}
func (q *Queue) Put(item int) {
q.cond.L.Lock()
defer q.cond.L.Unlock()
q.items = append(q.items, item)
fmt.Println("Produced:", item)
q.cond.Signal() // 通知等待的消费者
}
func (q *Queue) Get() int {
q.cond.L.Lock()
defer q.cond.L.Unlock()
// 等待队列不为空
for len(q.items) == 0 {
q.cond.Wait()
}
item := q.items[0]
q.items = q.items[1:]
fmt.Println("Consumed:", item)
return item
}
func main() {
queue := NewQueue()
// 消费者
go func() {
for i := 0; i < 5; i++ {
queue.Get()
}
}()
// 生产者
for i := 0; i < 5; i++ {
queue.Put(i)
time.Sleep(100 * time.Millisecond)
}
time.Sleep(time.Second)
}
sync.Pool
对象池,用于复用临时对象,减少内存分配:
package main
import (
"fmt"
"sync"
)
var pool = sync.Pool{
New: func() interface{} {
fmt.Println("Creating new object")
return make([]byte, 1024)
},
}
func main() {
// 从池中获取对象
data := pool.Get().([]byte)
fmt.Println("Got object, len:", len(data))
// 使用对象...
// 将对象放回池中
pool.Put(data)
// 再次获取,可能复用之前的对象
data2 := pool.Get().([]byte)
fmt.Println("Got object again, len:", len(data2))
// 输出:
// Creating new object(只有第一次)
// Got object, len: 1024
// Got object again, len: 1024
}
sync.Map
并发安全的 map,适用于读多写少的场景:
package main
import (
"fmt"
"sync"
)
func main() {
var m sync.Map
// 存储
m.Store("name", "张三")
m.Store("age", 25)
// 读取
if value, ok := m.Load("name"); ok {
fmt.Println("Name:", value)
}
// 读取或存储(如果不存在则存储)
value, loaded := m.LoadOrStore("city", "北京")
fmt.Println("City:", value, "loaded:", loaded)
// 遍历
m.Range(func(key, value interface{}) bool {
fmt.Printf("%v: %v\n", key, value)
return true
})
// 删除
m.Delete("age")
}
常见问题与陷阱
死锁
死锁是指两个或多个 goroutine 相互等待,导致都无法继续执行:
// ❌ 死锁示例 1:无缓冲 channel 只有发送没有接收
func deadlock1() {
ch := make(chan int)
ch <- 42 // 阻塞,没有接收者
}
// ❌ 死锁示例 2:循环等待
func deadlock2() {
ch1 := make(chan int)
ch2 := make(chan int)
go func() {
<-ch1 // 等待 ch1
ch2 <- 1
}()
ch1 <- 1 // 等待被接收
<-ch2 // 等待 ch2
}
// ✅ 正确:使用缓冲 channel
func correct() {
ch := make(chan int, 1)
ch <- 42
fmt.Println(<-ch)
}
Goroutine 泄露
Goroutine 泄露是指 goroutine 永远不会退出,持续占用资源:
// ❌ 泄露示例:goroutine 永远阻塞
func leak() {
ch := make(chan int)
go func() {
<-ch // 永远没有人发送数据
}()
}
// ✅ 正确:使用 context 或 done channel
func noLeak() {
ch := make(chan int)
done := make(chan struct{})
go func() {
select {
case <-ch:
// 处理数据
case <-done:
return // 可以退出
}
}()
close(done) // 通知 goroutine 退出
}
竞态条件
多个 goroutine 同时访问共享数据且至少有一个是写操作:
// ❌ 竞态条件
var counter int
func raceCondition() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
counter++ // 竞态!
}()
}
wg.Wait()
fmt.Println(counter) // 可能不是 1000
}
// ✅ 正确:使用 mutex 或 atomic
var (
counterSafe int
mu sync.Mutex
)
func noRaceCondition() {
var wg sync.WaitGroup
for i := 0; i < 1000; i++ {
wg.Add(1)
go func() {
defer wg.Done()
mu.Lock()
counterSafe++
mu.Unlock()
}()
}
wg.Wait()
fmt.Println(counterSafe) // 一定是 1000
}
可以使用 go run -race 检测竞态条件。
最佳实践
1. 使用 defer 关闭资源
func processFile(filename string) error {
file, err := os.Open(filename)
if err != nil {
return err
}
defer file.Close() // 确保文件关闭
// 处理文件...
return nil
}
2. 正确处理 goroutine 启动
// ❌ 错误:在 goroutine 内部调用 wg.Add
go func() {
wg.Add(1) // 可能在 wg.Wait() 之后才执行
defer wg.Done()
// ...
}()
// ✅ 正确:在启动 goroutine 前调用 wg.Add
wg.Add(1)
go func() {
defer wg.Done()
// ...
}()
3. 使用 context 进行取消
func operation(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
default:
// 执行操作
}
return nil
}
4. 避免 goroutine 泄露
func worker(ctx context.Context, jobs <-chan Job) {
for {
select {
case job, ok := <-jobs:
if !ok {
return // channel 已关闭
}
process(job)
case <-ctx.Done():
return // 收到取消信号
}
}
}
5. 合理使用缓冲 channel
// 已知数量的数据:使用适当大小的缓冲
ch := make(chan int, 100)
// 未知数量的数据:使用无缓冲或小缓冲
ch := make(chan int) // 或 make(chan int, 1)
小结
本章我们学习了:
- Goroutine:轻量级并发执行单元,使用
go关键字启动 - Channel:goroutine 之间的通信机制,分为有缓冲和无缓冲
- Select:多路复用,同时监听多个 channel
- sync 包:Mutex、RWMutex、WaitGroup、Once、Cond、Pool、Map
- atomic 包:原子操作,用于无锁并发编程
- Context:传递取消信号、超时和请求范围的值
- 并发模式:Pipeline、Fan-out/Fan-in、Worker Pool、生产者-消费者
- 常见问题:死锁、泄露、竞态条件
练习
- 创建一个并发的下载器,使用 Worker Pool 模式限制并发数
- 使用 Pipeline 模式实现数据处理管道(过滤 -> 转换 -> 汇总)
- 使用 Context 实现一个带超时的 HTTP 请求客户端
- 实现"哲学家就餐"问题,避免死锁
- 使用 sync.Map 实现一个并发安全的缓存系统