Go 数据库编程
数据库是现代应用程序的核心组件。Go 通过 database/sql 包提供了统一的数据库访问接口,支持各种关系型数据库。本章将深入介绍如何使用 Go 进行数据库操作,包括连接管理、查询执行、事务处理以及性能优化。
database/sql 包概述
database/sql 是 Go 标准库中用于数据库操作的包,它提供了以下核心特性:
- 统一的接口:相同的 API 可以操作不同的数据库
- 连接池管理:自动管理数据库连接的创建、复用和销毁
- 预处理语句:支持参数化查询,防止 SQL 注入
- 事务支持:提供完整的事务管理能力
架构设计
Go 的数据库访问采用"驱动+接口"的分层设计:
应用程序
│
▼
database/sql 包(提供统一接口)
│
▼
数据库驱动(实现 driver.Driver 接口)
│
▼
数据库(MySQL、PostgreSQL、SQLite 等)
这种设计让你可以轻松切换底层数据库,而不需要修改应用代码。你只需要导入不同的驱动,使用相同的 database/sql API 即可。
sql.DB 不是连接
很多初学者会误解 sql.DB 的含义。sql.DB 不是单个数据库连接,而是一个数据库句柄,它内部管理着一个连接池。
// sql.DB 内部结构示意
type DB struct {
// 连接池
freeConn []*driverConn // 空闲连接
numOpen int // 已打开的连接数
maxOpen int // 最大连接数
maxIdle int // 最大空闲连接数
// ...
}
理解这一点很重要:
- 你不需要手动打开和关闭连接
- 连接会根据需要自动创建和复用
- 应该在整个应用生命周期内复用同一个
sql.DB实例
导入数据库驱动
使用 database/sql 包之前,需要先导入对应数据库的驱动。驱动实现了 database/sql/driver 接口,负责与具体数据库通信。
常用数据库驱动
| 数据库 | 驱动包 | 导入方式 |
|---|---|---|
| MySQL | github.com/go-sql-driver/mysql | _ "github.com/go-sql-driver/mysql" |
| PostgreSQL | github.com/lib/pq | _ "github.com/lib/pq" |
| SQLite | github.com/mattn/go-sqlite3 | _ "github.com/mattn/go-sqlite3" |
| SQL Server | github.com/denisenkom/go-mssqldb | _ "github.com/denisenkom/go-mssqldb" |
使用空白导入
驱动通常使用空白导入(blank import),即用下划线 _ 作为包别名:
import (
"database/sql"
// 空白导入:执行驱动的 init 函数,注册驱动
_ "github.com/go-sql-driver/mysql"
)
为什么要使用空白导入?驱动包的 init 函数会调用 sql.Register 将自己注册到 database/sql 包中。空白导入确保驱动的 init 函数被执行,但我们不需要直接调用驱动包的任何函数。
注册驱动的过程
// 驱动包内部的代码(简化版)
package mysql
import "database/sql"
func init() {
// 注册驱动,名称为 "mysql"
sql.Register("mysql", &MySQLDriver{})
}
连接数据库
连接数据库使用 sql.Open 函数,它返回一个 *sql.DB 句柄。
基本连接
package main
import (
"database/sql"
"fmt"
"log"
_ "github.com/go-sql-driver/mysql"
)
func main() {
// 打开数据库句柄
// 参数:驱动名称,数据源名称(Data Source Name)
db, err := sql.Open("mysql", "user:password@tcp(127.0.0.1:3306)/dbname")
if err != nil {
log.Fatal(err)
}
// 程序结束时关闭连接池
defer db.Close()
// 验证连接是否成功
err = db.Ping()
if err != nil {
log.Fatal(err)
}
fmt.Println("数据库连接成功!")
}
重要说明:
sql.Open不会立即建立连接,它只是准备数据源db.Ping()才会真正尝试连接,用于验证配置是否正确db.Close()关闭整个连接池,通常在程序退出时调用
数据源名称(DSN)格式
DSN 格式因驱动而异,MySQL 的格式如下:
[用户名[:密码]@][协议(地址)]/数据库名[?参数1=值1&参数2=值2]
// 完整示例
dsn := "root:password@tcp(127.0.0.1:3306)/mydb?charset=utf8mb4&parseTime=True&loc=Local"
// 常用参数说明:
// charset=utf8mb4 - 字符集
// parseTime=True - 自动解析 DATE/DATETIME 到 time.Time
// loc=Local - 时区设置
使用配置对象
对于复杂的连接配置,可以使用驱动提供的配置对象:
import (
"database/sql"
"os"
"github.com/go-sql-driver/mysql"
)
func connectDB() (*sql.DB, error) {
// 使用 MySQL 驱动的配置对象
cfg := mysql.NewConfig()
cfg.User = os.Getenv("DB_USER") // 从环境变量读取
cfg.Passwd = os.Getenv("DB_PASS")
cfg.Net = "tcp"
cfg.Addr = "127.0.0.1:3306"
cfg.DBName = "myapp"
cfg.ParseTime = true
cfg.Loc = time.Local
// 生成 DSN 并连接
db, err := sql.Open("mysql", cfg.FormatDSN())
if err != nil {
return nil, err
}
return db, nil
}
安全存储凭证
永远不要在代码中硬编码数据库凭证!推荐的做法:
// 方式一:使用环境变量
user := os.Getenv("DB_USER")
pass := os.Getenv("DB_PASS")
// 方式二:使用配置文件
type Config struct {
Database struct {
Host string `yaml:"host"`
Port int `yaml:"port"`
User string `yaml:"user"`
Password string `yaml:"password"`
Name string `yaml:"name"`
} `yaml:"database"`
}
// 方式三:使用密钥管理服务(生产环境推荐)
// 如 AWS Secrets Manager、HashiCorp Vault 等
连接池配置
sql.DB 内置连接池,合理的配置对性能至关重要。
连接池参数
// 设置最大打开连接数(包括空闲和在用)
// 默认 0 表示无限制
db.SetMaxOpenConns(25)
// 设置最大空闲连接数
// 默认 2,建议设置为与 MaxOpenConns 相同或略小
db.SetMaxIdleConns(10)
// 设置连接的最大可复用时间
// 超过此时间的连接会被关闭
// 默认 0 表示无限制
db.SetConnMaxLifetime(5 * time.Minute)
// 设置空闲连接的最大存活时间
// Go 1.15+ 新增
db.SetConnMaxIdleTime(2 * time.Minute)
参数详解
SetMaxOpenConns
限制同时打开的最大连接数,包括空闲连接和正在使用的连接。
// 为什么需要限制?
// 1. 数据库服务器有最大连接数限制
// 2. 每个连接消耗内存和资源
// 3. 防止连接泄漏导致资源耗尽
db.SetMaxOpenConns(25)
SetMaxIdleConns
限制连接池中空闲连接的最大数量。
// 空闲连接的作用:
// - 避免频繁创建和销毁连接的开销
// - 提高响应速度
// 设置建议:
// - 高并发场景:设置为 MaxOpenConns 的 50%-75%
// - 低并发场景:可以设置较小值
db.SetMaxIdleConns(10)
SetConnMaxLifetime
设置连接的最大可复用时间。连接被复用的时间超过此值后会被关闭。
// 为什么需要设置?
// 1. 数据库服务器可能会主动断开长时间空闲的连接
// 2. 定期刷新连接可以避免潜在的问题
// 3. 数据库维护窗口期间可能需要重建连接
// 设置建议:
// - 通常设置为 5-30 分钟
// - 应小于数据库服务器的 wait_timeout
db.SetConnMaxLifetime(5 * time.Minute)
SetConnMaxIdleTime
设置空闲连接在池中的最大存活时间。
// 与 SetConnMaxLifetime 的区别:
// - SetConnMaxLifetime:连接从创建开始计时
// - SetConnMaxIdleTime:连接从变为空闲开始计时
// 使用场景:逐渐减少空闲连接数,释放资源
db.SetConnMaxIdleTime(2 * time.Minute)
生产环境推荐配置
func setupDB() (*sql.DB, error) {
db, err := sql.Open("mysql", dsn)
if err != nil {
return nil, err
}
// 生产环境推荐配置
db.SetMaxOpenConns(25) // 根据数据库服务器配置调整
db.SetMaxIdleConns(10) // 约为 MaxOpenConns 的 40%-50%
db.SetConnMaxLifetime(5 * time.Minute)
db.SetConnMaxIdleTime(2 * time.Minute)
// 验证连接
if err := db.Ping(); err != nil {
return nil, err
}
return db, nil
}
监控连接池状态
// 获取连接池统计信息
stats := db.Stats()
fmt.Printf("最大打开连接数: %d\n", stats.MaxOpenConnections)
fmt.Printf("当前打开连接数: %d\n", stats.OpenConnections)
fmt.Printf("正在使用: %d\n", stats.InUse)
fmt.Printf("空闲连接: %d\n", stats.Idle)
fmt.Printf("等待获取连接: %d\n", stats.WaitCount)
fmt.Printf("等待总时长: %v\n", stats.WaitDuration)
查询数据
database/sql 提供了多种查询方法,根据返回结果的数量选择合适的方法。
查询单行:QueryRow
当确定只返回一行或零行时使用 QueryRow:
func getUserByID(db *sql.DB, id int64) (string, error) {
var name string
// QueryRow 返回 *sql.Row,不返回 error
// 错误会延迟到 Scan 时返回
err := db.QueryRow("SELECT name FROM users WHERE id = ?", id).Scan(&name)
if err != nil {
if err == sql.ErrNoRows {
// 没有找到记录
return "", fmt.Errorf("用户 %d 不存在", id)
}
// 其他错误
return "", fmt.Errorf("查询失败: %w", err)
}
return name, nil
}
QueryRow 的特点:
- 不返回 error,简化了错误处理
- 错误在
Scan时才返回 - 如果查询没有结果,
Scan返回sql.ErrNoRows
查询多行:Query
当可能返回多行时使用 Query:
func getUsersByAge(db *sql.DB, minAge int) ([]User, error) {
// Query 返回 *sql.Rows 和可能的 error
rows, err := db.Query("SELECT id, name, email, age FROM users WHERE age >= ?", minAge)
if err != nil {
return nil, fmt.Errorf("查询失败: %w", err)
}
// 重要:确保关闭 rows,释放连接
defer rows.Close()
var users []User
for rows.Next() {
var u User
// Scan 将当前行的值读取到变量中
if err := rows.Scan(&u.ID, &u.Name, &u.Email, &u.Age); err != nil {
return nil, fmt.Errorf("读取数据失败: %w", err)
}
users = append(users, u)
}
// 检查遍历过程中是否发生错误
if err := rows.Err(); err != nil {
return nil, fmt.Errorf("遍历数据失败: %w", err)
}
return users, nil
}
type User struct {
ID int64
Name string
Email string
Age int
}
Query 的使用要点:
- 必须调用
rows.Close()释放资源,通常使用defer - 使用
rows.Next()遍历结果 - 最后检查
rows.Err()捕获遍历过程中的错误
使用 Context 控制超时
对于可能耗时的查询,使用 Context 设置超时:
import (
"context"
"time"
)
func getUserWithTimeout(db *sql.DB, id int64) (string, error) {
// 创建带超时的 context
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
var name string
err := db.QueryRowContext(ctx, "SELECT name FROM users WHERE id = ?", id).Scan(&name)
if err != nil {
return "", err
}
return name, nil
}
// 多行查询也可以使用 Context
func getUsersWithContext(db *sql.DB, ctx context.Context) ([]User, error) {
rows, err := db.QueryContext(ctx, "SELECT id, name FROM users")
if err != nil {
return nil, err
}
defer rows.Close()
// ... 遍历 rows
}
命名参数
除了使用 ? 占位符外,Go 还支持命名参数,使 SQL 语句更具可读性:
import "database/sql"
func queryWithNamedParams(db *sql.DB, minAge int, active bool) ([]User, error) {
rows, err := db.Query(
"SELECT id, name, email FROM users WHERE age >= :minAge AND active = :active",
sql.Named("minAge", minAge),
sql.Named("active", active),
)
if err != nil {
return nil, err
}
defer rows.Close()
var users []User
for rows.Next() {
var u User
if err := rows.Scan(&u.ID, &u.Name, &u.Email); err != nil {
return nil, err
}
users = append(users, u)
}
return users, nil
}
命名参数的优势:
- 可读性更高,参数名直接表达含义
- 参数顺序无关,不易出错
- 对于参数较多的复杂查询特别有用
占位符差异:
不同数据库驱动的占位符语法不同:
| 数据库 | 占位符语法 | 示例 |
|---|---|---|
| MySQL | ? | WHERE id = ? |
| PostgreSQL | $1, $2, ... | WHERE id = $1 |
| SQLite | ? 或 $1 | WHERE id = ? |
| SQL Server | @p1, @p2 | WHERE id = @p1 |
使用命名参数可以避免这种差异带来的困扰。
处理多个结果集
当执行的 SQL 返回多个结果集时,可以使用 NextResultSet 方法:
func queryMultipleResultSets(db *sql.DB) error {
// 执行多条 SQL,返回多个结果集
rows, err := db.Query("SELECT id, name FROM users; SELECT id, title FROM posts;")
if err != nil {
return err
}
defer rows.Close()
// 处理第一个结果集
fmt.Println("=== 用户列表 ===")
for rows.Next() {
var id int
var name string
if err := rows.Scan(&id, &name); err != nil {
return err
}
fmt.Printf("用户: %d - %s\n", id, name)
}
// 切换到下一个结果集
if !rows.NextResultSet() {
return fmt.Errorf("期望更多结果集")
}
// 处理第二个结果集
fmt.Println("=== 文章列表 ===")
for rows.Next() {
var id int
var title string
if err := rows.Scan(&id, &title); err != nil {
return err
}
fmt.Printf("文章: %d - %s\n", id, title)
}
// 检查整个过程中是否有错误
return rows.Err()
}
使用场景:
- 存储过程返回多个结果集
- 批量执行多条查询语句
- 减少数据库往返次数
获取列类型信息
使用 Rows.ColumnTypes 可以获取结果集中每一列的类型信息:
func inspectColumnTypes(db *sql.DB, query string, args ...any) error {
rows, err := db.Query(query, args...)
if err != nil {
return err
}
defer rows.Close()
// 获取列类型信息
columnTypes, err := rows.ColumnTypes()
if err != nil {
return err
}
for i, ct := range columnTypes {
fmt.Printf("列 %d:\n", i+1)
fmt.Printf(" 名称: %s\n", ct.Name())
fmt.Printf(" 数据库类型: %s\n", ct.DatabaseTypeName())
// 获取长度(适用于变长类型)
if length, ok := ct.Length(); ok {
fmt.Printf(" 长度: %d\n", length)
}
// 获取精度和小数位数(适用于数值类型)
if precision, scale, ok := ct.DecimalSize(); ok {
fmt.Printf(" 精度: %d, 小数位数: %d\n", precision, scale)
}
// 检查是否可为 NULL
if nullable, ok := ct.Nullable(); ok {
fmt.Printf(" 可为NULL: %v\n", nullable)
}
// 获取扫描类型
fmt.Printf(" 扫描类型: %v\n", ct.ScanType())
}
return nil
}
修改数据
对于不返回数据的操作(INSERT、UPDATE、DELETE),使用 Exec 方法。
插入数据
func insertUser(db *sql.DB, name, email string) (int64, error) {
// Exec 返回 sql.Result 和 error
result, err := db.Exec(
"INSERT INTO users (name, email, created_at) VALUES (?, ?, NOW())",
name, email,
)
if err != nil {
return 0, fmt.Errorf("插入失败: %w", err)
}
// 获取自增 ID
id, err := result.LastInsertId()
if err != nil {
return 0, fmt.Errorf("获取ID失败: %w", err)
}
return id, nil
}
更新数据
func updateUserEmail(db *sql.DB, id int64, newEmail string) error {
result, err := db.Exec(
"UPDATE users SET email = ? WHERE id = ?",
newEmail, id,
)
if err != nil {
return fmt.Errorf("更新失败: %w", err)
}
// 获取影响的行数
rows, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("获取影响行数失败: %w", err)
}
if rows == 0 {
return fmt.Errorf("用户 %d 不存在", id)
}
return nil
}
删除数据
func deleteUser(db *sql.DB, id int64) error {
result, err := db.Exec("DELETE FROM users WHERE id = ?", id)
if err != nil {
return fmt.Errorf("删除失败: %w", err)
}
rows, err := result.RowsAffected()
if err != nil {
return err
}
if rows == 0 {
return fmt.Errorf("用户 %d 不存在", id)
}
return nil
}
Result 接口
Exec 返回的 sql.Result 接口定义:
type Result interface {
// 返回插入生成的自增 ID
LastInsertId() (int64, error)
// 返回受影响的行数
RowsAffected() (int64, error)
}
预处理语句
预处理语句(Prepared Statement)是预编译的 SQL 语句,可以多次执行,只需要替换参数。它有两个主要优势:
- 性能提升:SQL 只需解析和编译一次
- 安全性:自动处理参数转义,防止 SQL 注入
基本使用
func getUserStmt(db *sql.DB, id int64) (string, error) {
// 准备语句
stmt, err := db.Prepare("SELECT name FROM users WHERE id = ?")
if err != nil {
return "", fmt.Errorf("准备语句失败: %w", err)
}
// 重要:关闭语句,释放资源
defer stmt.Close()
var name string
// 执行预处理语句
err = stmt.QueryRow(id).Scan(&name)
if err != nil {
return "", err
}
return name, nil
}
批量插入示例
预处理语句在批量操作时性能优势明显:
func batchInsertUsers(db *sql.DB, users []User) error {
// 准备插入语句
stmt, err := db.Prepare("INSERT INTO users (name, email) VALUES (?, ?)")
if err != nil {
return err
}
defer stmt.Close()
// 批量执行
for _, u := range users {
_, err := stmt.Exec(u.Name, u.Email)
if err != nil {
return err
}
}
return nil
}
事务中使用预处理语句
在事务中使用预处理语句时,需要使用事务的 Stmt 方法:
func transferWithStmt(db *sql.DB, from, to int64, amount float64) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
// 在事务外准备语句
debitStmt, err := db.Prepare("UPDATE accounts SET balance = balance - ? WHERE id = ?")
if err != nil {
return err
}
defer debitStmt.Close()
// 将语句绑定到事务
txDebitStmt := tx.Stmt(debitStmt)
// 在事务中执行
_, err = txDebitStmt.Exec(amount, from)
if err != nil {
return err
}
return tx.Commit()
}
事务处理
事务将多个数据库操作组合成一个原子单元,要么全部成功,要么全部失败。
基本事务
func transferMoney(db *sql.DB, fromID, toID int64, amount float64) error {
// 开始事务
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("开始事务失败: %w", err)
}
// 确保在函数退出时处理事务
// 如果已经 Commit,Rollback 不会有任何效果
defer tx.Rollback()
// 从转出账户扣款
result, err := tx.Exec(
"UPDATE accounts SET balance = balance - ? WHERE id = ?",
amount, fromID,
)
if err != nil {
return fmt.Errorf("扣款失败: %w", err)
}
rows, _ := result.RowsAffected()
if rows == 0 {
return fmt.Errorf("账户 %d 不存在", fromID)
}
// 向转入账户加款
result, err = tx.Exec(
"UPDATE accounts SET balance = balance + ? WHERE id = ?",
amount, toID,
)
if err != nil {
return fmt.Errorf("入账失败: %w", err)
}
rows, _ = result.RowsAffected()
if rows == 0 {
return fmt.Errorf("账户 %d 不存在", toID)
}
// 提交事务
if err := tx.Commit(); err != nil {
return fmt.Errorf("提交事务失败: %w", err)
}
return nil
}
事务模式详解
Go 的事务处理有一个经典的模式:
func doTransaction(db *sql.DB) error {
// 1. 开始事务
tx, err := db.Begin()
if err != nil {
return err
}
// 2. 使用 defer 确保在出错时回滚
// 这是一个"防御性"回滚:即使已经 Commit,调用 Rollback 也是安全的
defer func() {
if err != nil {
tx.Rollback()
}
}()
// 3. 执行事务操作
if _, err = tx.Exec("..."); err != nil {
return err // defer 会自动回滚
}
if _, err = tx.Exec("..."); err != nil {
return err // defer 会自动回滚
}
// 4. 提交事务
err = tx.Commit()
return err // 如果 Commit 成功,err 为 nil,defer 不会回滚
}
使用 Context 和隔离级别
import (
"context"
"database/sql"
"time"
)
func transactionWithContext(db *sql.DB) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// BeginTx 允许指定隔离级别和上下文
tx, err := db.BeginTx(ctx, &sql.TxOptions{
Isolation: sql.LevelSerializable, // 可序列化隔离级别
ReadOnly: false, // 非只读事务
})
if err != nil {
return err
}
defer tx.Rollback()
// 执行操作...
return tx.Commit()
}
隔离级别
Go 支持标准的事务隔离级别:
const (
LevelDefault IsolationLevel = iota // 使用数据库默认级别
LevelReadUncommitted // 读未提交
LevelReadCommitted // 读已提交
LevelWriteCommitted // 写已提交
LevelRepeatableRead // 可重复读
LevelSnapshot // 快照
LevelSerializable // 可序列化
LevelLinearizable // 线性化
)
选择隔离级别的建议:
| 隔离级别 | 脏读 | 不可重复读 | 幻读 | 性能 | 使用场景 |
|---|---|---|---|---|---|
| Read Uncommitted | 可能 | 可能 | 可能 | 最高 | 几乎不用 |
| Read Committed | 不会 | 可能 | 可能 | 高 | 大多数场景默认 |
| Repeatable Read | 不会 | 不会 | 可能 | 中 | 需要一致性读取 |
| Serializable | 不会 | 不会 | 不会 | 最低 | 严格一致性要求 |
单独连接(Conn)
通常情况下,使用 sql.DB 的方法就足够了,因为它会自动管理连接池。但在某些特殊场景下,需要获取单独的数据库连接。
使用场景
什么时候需要单独连接?
- 需要使用连接特定的功能(如临时表、会话变量)
- 需要执行一系列必须在同一连接上执行的操作
- 需要使用驱动特定的功能
获取和使用单独连接
func useSingleConnection(db *sql.DB, ctx context.Context) error {
// 从连接池获取一个单独的连接
conn, err := db.Conn(ctx)
if err != nil {
return fmt.Errorf("获取连接失败: %w", err)
}
// 必须关闭连接,将其归还到连接池
defer conn.Close()
// 在同一连接上执行多个操作
// 创建临时表
_, err = conn.ExecContext(ctx, "CREATE TEMPORARY TABLE temp_results AS SELECT * FROM users WHERE active = true")
if err != nil {
return err
}
// 使用临时表
rows, err := conn.QueryContext(ctx, "SELECT COUNT(*) FROM temp_results")
if err != nil {
return err
}
defer rows.Close()
var count int
if rows.Next() {
rows.Scan(&count)
}
fmt.Printf("活跃用户数: %d\n", count)
// 连接关闭时,临时表会自动删除
return nil
}
在单独连接上使用事务
func transactionOnConnection(db *sql.DB, ctx context.Context) error {
conn, err := db.Conn(ctx)
if err != nil {
return err
}
defer conn.Close()
// 在单独连接上开始事务
tx, err := conn.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
// 执行事务操作...
return tx.Commit()
}
使用驱动特定功能
通过 Conn.Raw 方法可以访问底层驱动连接,使用驱动特定的功能:
import "github.com/go-sql-driver/mysql"
func useDriverSpecificFeature(db *sql.DB, ctx context.Context) error {
conn, err := db.Conn(ctx)
if err != nil {
return err
}
defer conn.Close()
// 访问底层驱动连接
err = conn.Raw(func(driverConn any) error {
// 类型断言为具体的驱动连接类型
mysqlConn, ok := driverConn.(*mysql.Conn)
if !ok {
return fmt.Errorf("不是 MySQL 连接")
}
// 使用 MySQL 特有的功能
// 例如:获取连接 ID
fmt.Printf("连接 ID: %d\n", mysqlConn.ConnectionID)
return nil
})
return err
}
使用 Raw 方法访问底层驱动连接是高级用法,需要谨慎使用。不当使用可能导致连接状态不一致或其他问题。
NULL 值处理
数据库中的 NULL 值在 Go 中需要特殊处理,因为 Go 的基本类型不支持 NULL。
使用 sql.Null 类型
database/sql 提供了专门的 NULL 处理类型:
import "database/sql"
// 可用的 Null 类型
var (
s sql.NullString // 字符串
i sql.NullInt64 // 整数
f sql.NullFloat64 // 浮点数
b sql.NullBool // 布尔值
t sql.NullTime // 时间(Go 1.13+)
)
func getUserEmail(db *sql.DB, id int64) (string, error) {
var email sql.NullString
err := db.QueryRow("SELECT email FROM users WHERE id = ?", id).Scan(&email)
if err != nil {
return "", err
}
// 检查是否为 NULL
if !email.Valid {
return "", nil // 或者返回特定的错误/默认值
}
return email.String, nil
}
Null 类型的工作方式
每个 sql.Null* 类型都有两个字段:
type NullString struct {
String string // 实际值
Valid bool // 是否非 NULL
}
// 使用示例
func demoNullTypes(db *sql.DB) {
var (
name sql.NullString
age sql.NullInt64
price sql.NullFloat64
active sql.NullBool
)
err := db.QueryRow(`
SELECT name, age, price, active
FROM products
WHERE id = ?
`, 1).Scan(&name, &age, &price, &active)
if err != nil {
log.Fatal(err)
}
// 安全地访问值
if name.Valid {
fmt.Println("名称:", name.String)
} else {
fmt.Println("名称: 未设置")
}
if age.Valid {
fmt.Println("年龄:", age.Int64)
}
}
自定义 NULL 处理类型
可以创建更友好的自定义 NULL 类型:
// 可空字符串,提供默认值
type NullString struct {
sql.NullString
}
func (ns NullString) Value() string {
if ns.Valid {
return ns.String
}
return "" // 默认值
}
// 或者使用指针类型
type User struct {
ID int64
Name string
Email *string // 使用指针,nil 表示 NULL
}
func getUser(db *sql.DB, id int64) (*User, error) {
var u User
err := db.QueryRow("SELECT id, name, email FROM users WHERE id = ?", id).Scan(
&u.ID,
&u.Name,
&u.Email, // 如果 email 为 NULL,则为 nil
)
return &u, err
}
错误处理
常见错误类型
import (
"database/sql"
"errors"
)
func handleErrors(db *sql.DB, id int64) error {
var name string
err := db.QueryRow("SELECT name FROM users WHERE id = ?", id).Scan(&name)
if err != nil {
// 特定错误处理
if errors.Is(err, sql.ErrNoRows) {
// 没有找到记录
return fmt.Errorf("用户不存在: %d", id)
}
// 其他错误
return fmt.Errorf("查询失败: %w", err)
}
return nil
}
错误类型列表
// database/sql 定义的标准错误
var (
ErrTxDone = errors.New("sql: transaction has already been committed or rolled back")
ErrConnDone = errors.New("sql: connection is already closed")
ErrNoRows = errors.New("sql: no rows in result set")
)
错误包装与传递
// 使用 %w 包装错误,保留原始错误链
func processOrder(db *sql.DB, orderID int64) error {
tx, err := db.Begin()
if err != nil {
return fmt.Errorf("开始事务失败: %w", err)
}
defer tx.Rollback()
// 更新库存
if err := updateInventory(tx, orderID); err != nil {
return fmt.Errorf("更新库存失败: %w", err)
}
// 创建订单
if err := createOrder(tx, orderID); err != nil {
return fmt.Errorf("创建订单失败: %w", err)
}
return tx.Commit()
}
// 检查错误链
func main() {
err := processOrder(db, 1)
if err != nil {
// 检查是否包含特定错误
var sqlErr *mysql.MySQLError
if errors.As(err, &sqlErr) {
fmt.Printf("MySQL 错误码: %d\n", sqlErr.Number)
}
log.Fatal(err)
}
}
最佳实践
1. 复用 sql.DB 实例
// 错误:每次请求都创建新实例
func handleRequest(w http.ResponseWriter, r *http.Request) {
db, _ := sql.Open("mysql", dsn) // 错误!
defer db.Close()
// ...
}
// 正确:全局复用
var db *sql.DB
func init() {
var err error
db, err = sql.Open("mysql", dsn)
if err != nil {
log.Fatal(err)
}
// 配置连接池...
}
func handleRequest(w http.ResponseWriter, r *http.Request) {
// 使用全局 db
rows, _ := db.Query("...")
// ...
}
2. 总是关闭 Rows 和 Stmt
func queryUsers(db *sql.DB) ([]User, error) {
rows, err := db.Query("SELECT * FROM users")
if err != nil {
return nil, err
}
defer rows.Close() // 必须关闭!
// ...
}
func preparedQuery(db *sql.DB, id int64) (string, error) {
stmt, err := db.Prepare("SELECT name FROM users WHERE id = ?")
if err != nil {
return "", err
}
defer stmt.Close() // 必须关闭!
// ...
}
3. 使用参数化查询防止注入
// 危险:SQL 注入风险
func dangerous(db *sql.DB, userInput string) {
query := fmt.Sprintf("SELECT * FROM users WHERE name = '%s'", userInput)
db.Query(query) // 用户输入可能包含恶意 SQL
}
// 安全:使用参数化查询
func safe(db *sql.DB, userInput string) {
db.Query("SELECT * FROM users WHERE name = ?", userInput)
}
4. 正确处理事务
// 推荐的事务处理模式
func doTransaction(db *sql.DB) (err error) {
tx, err := db.Begin()
if err != nil {
return err
}
defer func() {
if err != nil {
tx.Rollback()
}
}()
// 执行操作...
if _, err = tx.Exec("..."); err != nil {
return err // defer 会回滚
}
err = tx.Commit()
return err
}
5. 使用 Context 处理超时
func queryWithTimeout(db *sql.DB) ([]User, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
rows, err := db.QueryContext(ctx, "SELECT * FROM users")
if err != nil {
return nil, err
}
defer rows.Close()
// ...
}
6. 使用连接池统计监控
func monitorDB(db *sql.DB) {
ticker := time.NewTicker(30 * time.Second)
for range ticker.C {
stats := db.Stats()
log.Printf(
"DB Stats - Open: %d, InUse: %d, Idle: %d, Wait: %d",
stats.OpenConnections,
stats.InUse,
stats.Idle,
stats.WaitCount,
)
}
}
7. 合理配置连接池参数
根据应用负载和数据库服务器能力配置连接池:
func setupConnectionPool(db *sql.DB) {
// 获取 CPU 核心数作为参考
cpuCount := runtime.GOMAXPROCS(0)
// 最大打开连接数:通常设置为 CPU 核心数的 2-4 倍
// 或者根据数据库服务器的最大连接数限制
maxOpen := cpuCount * 4
if maxOpen > 50 {
maxOpen = 50 // 设置上限
}
db.SetMaxOpenConns(maxOpen)
// 最大空闲连接数:设置为 MaxOpenConns 的一半左右
db.SetMaxIdleConns(maxOpen / 2)
// 连接最大生命周期:5-30 分钟
// 应小于数据库服务器的 wait_timeout
db.SetConnMaxLifetime(10 * time.Minute)
// 空闲连接最大生命周期:连接变为空闲后多久关闭
db.SetConnMaxIdleTime(5 * time.Minute)
}
参数调优建议:
| 参数 | 低并发场景 | 高并发场景 | 说明 |
|---|---|---|---|
| MaxOpenConns | 5-10 | 20-50 | 根据数据库能力设置 |
| MaxIdleConns | 2-5 | 10-25 | 约为 MaxOpenConns 的一半 |
| ConnMaxLifetime | 10-30 分钟 | 5-15 分钟 | 定期刷新连接 |
| ConnMaxIdleTime | 5-10 分钟 | 2-5 分钟 | 释放空闲连接 |
8. 使用连接池健康检查
func healthCheck(db *sql.DB) health.Checker {
return health.Checker{
Check: func(ctx context.Context) error {
// 设置超时
ctx, cancel := context.WithTimeout(ctx, 2*time.Second)
defer cancel()
// 尝试 ping 数据库
return db.PingContext(ctx)
},
}
}
// 在 HTTP 服务中使用
func healthHandler(db *sql.DB) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 2*time.Second)
defer cancel()
if err := db.PingContext(ctx); err != nil {
http.Error(w, "database unhealthy", http.StatusServiceUnavailable)
return
}
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, "OK")
}
}
9. 批量操作优化
对于大量数据插入,使用批量操作而非逐条插入:
// 低效:逐条插入
func insertOneByOne(db *sql.DB, users []User) error {
for _, u := range users {
_, err := db.Exec("INSERT INTO users (name, email) VALUES (?, ?)", u.Name, u.Email)
if err != nil {
return err
}
}
return nil
}
// 高效:批量插入
func batchInsert(db *sql.DB, users []User) error {
// 构建批量插入语句
valueStrings := make([]string, 0, len(users))
valueArgs := make([]any, 0, len(users)*2)
for _, u := range users {
valueStrings = append(valueStrings, "(?, ?)")
valueArgs = append(valueArgs, u.Name)
valueArgs = append(valueArgs, u.Email)
}
query := fmt.Sprintf("INSERT INTO users (name, email) VALUES %s",
strings.Join(valueStrings, ","))
_, err := db.Exec(query, valueArgs...)
return err
}
// 更高效:使用事务 + 预处理语句
func batchInsertOptimized(db *sql.DB, users []User) error {
tx, err := db.Begin()
if err != nil {
return err
}
defer tx.Rollback()
stmt, err := tx.Prepare("INSERT INTO users (name, email) VALUES (?, ?)")
if err != nil {
return err
}
defer stmt.Close()
for _, u := range users {
if _, err := stmt.Exec(u.Name, u.Email); err != nil {
return err
}
}
return tx.Commit()
}
10. 避免 SELECT *
// 不推荐:SELECT *
rows, err := db.Query("SELECT * FROM users WHERE id = ?", id)
// 推荐:明确指定需要的列
rows, err := db.Query("SELECT id, name, email FROM users WHERE id = ?", id)
原因:
- 减少数据传输量
- 避免读取不需要的列
- 表结构变化时更稳定
- 更好的查询性能
完整示例
下面是一个综合示例,展示数据库操作的完整流程:
package main
import (
"context"
"database/sql"
"fmt"
"log"
"time"
_ "github.com/go-sql-driver/mysql"
)
// User 用户模型
type User struct {
ID int64
Name string
Email string
Age int
CreatedAt time.Time
}
// DB 全局数据库句柄
var DB *sql.DB
// InitDB 初始化数据库连接
func InitDB(dsn string) error {
var err error
DB, err = sql.Open("mysql", dsn)
if err != nil {
return fmt.Errorf("打开数据库失败: %w", err)
}
// 配置连接池
DB.SetMaxOpenConns(25)
DB.SetMaxIdleConns(10)
DB.SetConnMaxLifetime(5 * time.Minute)
// 验证连接
if err = DB.Ping(); err != nil {
return fmt.Errorf("连接数据库失败: %w", err)
}
return nil
}
// CreateUser 创建用户
func CreateUser(ctx context.Context, user *User) (int64, error) {
result, err := DB.ExecContext(ctx,
"INSERT INTO users (name, email, age, created_at) VALUES (?, ?, ?, ?)",
user.Name, user.Email, user.Age, time.Now(),
)
if err != nil {
return 0, fmt.Errorf("创建用户失败: %w", err)
}
id, err := result.LastInsertId()
if err != nil {
return 0, fmt.Errorf("获取ID失败: %w", err)
}
return id, nil
}
// GetUserByID 根据ID查询用户
func GetUserByID(ctx context.Context, id int64) (*User, error) {
var user User
err := DB.QueryRowContext(ctx,
"SELECT id, name, email, age, created_at FROM users WHERE id = ?",
id,
).Scan(&user.ID, &user.Name, &user.Email, &user.Age, &user.CreatedAt)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("用户不存在: %d", id)
}
return nil, fmt.Errorf("查询失败: %w", err)
}
return &user, nil
}
// ListUsers 查询所有用户
func ListUsers(ctx context.Context) ([]User, error) {
rows, err := DB.QueryContext(ctx, "SELECT id, name, email, age, created_at FROM users")
if err != nil {
return nil, fmt.Errorf("查询失败: %w", err)
}
defer rows.Close()
var users []User
for rows.Next() {
var u User
if err := rows.Scan(&u.ID, &u.Name, &u.Email, &u.Age, &u.CreatedAt); err != nil {
return nil, fmt.Errorf("读取数据失败: %w", err)
}
users = append(users, u)
}
if err = rows.Err(); err != nil {
return nil, fmt.Errorf("遍历数据失败: %w", err)
}
return users, nil
}
// UpdateUserAge 更新用户年龄(事务示例)
func UpdateUserAge(ctx context.Context, userID int64, newAge int) error {
tx, err := DB.BeginTx(ctx, nil)
if err != nil {
return fmt.Errorf("开始事务失败: %w", err)
}
defer tx.Rollback()
// 检查用户是否存在
var exists bool
err = tx.QueryRowContext(ctx,
"SELECT EXISTS(SELECT 1 FROM users WHERE id = ?)",
userID,
).Scan(&exists)
if err != nil {
return fmt.Errorf("检查用户失败: %w", err)
}
if !exists {
return fmt.Errorf("用户不存在: %d", userID)
}
// 更新年龄
result, err := tx.ExecContext(ctx,
"UPDATE users SET age = ? WHERE id = ?",
newAge, userID,
)
if err != nil {
return fmt.Errorf("更新失败: %w", err)
}
affected, _ := result.RowsAffected()
log.Printf("更新了 %d 行", affected)
return tx.Commit()
}
// DeleteUser 删除用户
func DeleteUser(ctx context.Context, id int64) error {
result, err := DB.ExecContext(ctx, "DELETE FROM users WHERE id = ?", id)
if err != nil {
return fmt.Errorf("删除失败: %w", err)
}
affected, err := result.RowsAffected()
if err != nil {
return fmt.Errorf("获取影响行数失败: %w", err)
}
if affected == 0 {
return fmt.Errorf("用户不存在: %d", id)
}
return nil
}
func main() {
// 初始化数据库
dsn := "root:password@tcp(127.0.0.1:3306)/mydb?parseTime=true"
if err := InitDB(dsn); err != nil {
log.Fatal(err)
}
defer DB.Close()
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
// 创建用户
user := &User{Name: "张三", Email: "[email protected]", Age: 25}
id, err := CreateUser(ctx, user)
if err != nil {
log.Fatal(err)
}
fmt.Printf("创建用户成功,ID: %d\n", id)
// 查询用户
u, err := GetUserByID(ctx, id)
if err != nil {
log.Fatal(err)
}
fmt.Printf("查询用户: %+v\n", u)
// 更新用户
if err := UpdateUserAge(ctx, id, 26); err != nil {
log.Fatal(err)
}
fmt.Println("更新成功")
// 列出所有用户
users, err := ListUsers(ctx)
if err != nil {
log.Fatal(err)
}
fmt.Printf("用户列表: %d 人\n", len(users))
// 删除用户
if err := DeleteUser(ctx, id); err != nil {
log.Fatal(err)
}
fmt.Println("删除成功")
}
小结
本章全面介绍了 Go 的数据库编程:
- database/sql 包:统一接口、驱动分离、连接池管理
- 连接管理:sql.DB 句柄、连接池配置、资源释放
- 查询操作:QueryRow 单行查询、Query 多行查询、Context 超时控制
- 修改操作:Exec 执行、Result 获取结果
- 预处理语句:性能优化、安全防护、批量操作
- 事务处理:原子操作、隔离级别、错误回滚
- NULL 处理:sql.Null 类型、自定义处理
- 最佳实践:资源管理、安全防护、性能优化
关键要点:
sql.DB是连接池,应全局复用- 始终使用参数化查询防止 SQL 注入
- 及时关闭
Rows和Stmt释放资源 - 合理配置连接池参数
- 使用 Context 控制超时
练习
- 实现一个简单的 CRUD API,支持用户的增删改查
- 使用事务实现银行转账功能
- 实现一个批量导入功能,使用预处理语句优化性能
- 添加数据库连接池监控,输出统计信息
- 实现一个带重试机制的数据库操作函数