MySQL 编程集成
MySQL 能与各种编程语言轻松集成。本章将详细介绍如何使用主流编程语言连接和操作 MySQL 数据库,包括连接管理、事务处理、批量操作和性能优化等核心内容。
概述
为什么需要编程集成?
数据库编程是后端开发的核心技能。无论是 Web 应用、数据分析还是微服务架构,都需要通过程序代码与数据库交互。良好的数据库集成需要关注:
- 连接管理:高效创建、复用和释放数据库连接
- SQL 执行:安全地构建和执行 SQL 语句
- 事务控制:保证数据一致性和完整性
- 性能优化:批量操作、预处理语句、连接池
- 错误处理:优雅处理数据库异常
常用连接器对比
| 语言 | 官方连接器 | 第三方库 | 推荐场景 |
|---|---|---|---|
| Python | mysql-connector-python | PyMySQL, mysqlclient | 数据分析、Web 后端 |
| Java | MySQL Connector/J | HikariCP(连接池) | 企业级应用 |
| Go | go-sql-driver/mysql | GORM(ORM) | 微服务、高并发 |
| Node.js | mysql2 | Sequelize, TypeORM | 全栈 Web 应用 |
| C# | MySqlConnector | Dapper, EF Core | .NET 应用 |
Python 集成
Python 连接 MySQL 有多种选择,最常用的是官方的 mysql-connector-python 和第三方的 PyMySQL。
安装驱动
# 官方驱动(纯 Python 实现,兼容性好)
pip install mysql-connector-python
# 第三方驱动(API 兼容,性能略好)
pip install pymysql
# C 扩展版本(性能最好,需要编译环境)
pip install mysqlclient
基本连接与查询
使用 mysql-connector-python
import mysql.connector
from mysql.connector import Error
def query_user(user_id):
"""查询单个用户"""
connection = None
cursor = None
try:
# 建立连接
connection = mysql.connector.connect(
host='localhost',
port=3306,
user='root',
password='your_password',
database='mydb',
charset='utf8mb4'
)
# 创建游标(字典格式返回结果)
cursor = connection.cursor(dictionary=True)
# 执行参数化查询(防止 SQL 注入)
sql = "SELECT id, username, email, created_at FROM users WHERE id = %s"
cursor.execute(sql, (user_id,))
# 获取结果
result = cursor.fetchone()
if result:
print(f"用户: {result['username']}")
print(f"邮箱: {result['email']}")
return result
else:
print("用户不存在")
return None
except Error as e:
print(f"数据库错误: {e}")
return None
finally:
# 确保资源释放
if cursor:
cursor.close()
if connection and connection.is_connected():
connection.close()
使用 PyMySQL
import pymysql
from pymysql.cursors import DictCursor
def query_with_pymysql():
"""PyMySQL 基本用法"""
# 使用上下文管理器自动关闭连接
with pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='mydb',
charset='utf8mb4',
cursorclass=DictCursor
) as connection:
with connection.cursor() as cursor:
# 执行查询
cursor.execute("SELECT * FROM users LIMIT 10")
# 获取所有结果
users = cursor.fetchall()
for user in users:
print(f"{user['id']}: {user['username']}")
return users
连接池
在高并发场景下,频繁创建和销毁连接会严重影响性能。连接池通过预先创建并复用连接来解决这个问题。
mysql-connector-python 连接池
from mysql.connector import pooling
import mysql.connector
# 创建连接池
dbconfig = {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "your_password",
"database": "mydb",
"charset": "utf8mb4"
}
# 创建连接池(默认池大小为 5)
connection_pool = pooling.MySQLConnectionPool(
pool_name="mypool",
pool_size=10, # 连接池大小
pool_reset_session=True, # 重置会话状态
**dbconfig
)
def get_connection():
"""从连接池获取连接"""
try:
return connection_pool.get_connection()
except pooling.PoolError as e:
print(f"连接池耗尽: {e}")
return None
# 使用连接池
def query_users():
conn = get_connection()
if conn is None:
return []
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM users WHERE status = %s", ('active',))
return cursor.fetchall()
finally:
cursor.close()
conn.close() # 实际上是归还连接池,不是真正关闭
# Flask 应用示例
from flask import Flask, jsonify
app = Flask(__name__)
@app.route('/users/<int:user_id>')
def get_user(user_id):
conn = get_connection()
if conn is None:
return jsonify({"error": "数据库连接失败"}), 503
try:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
user = cursor.fetchone()
if user:
# 处理 datetime 等特殊类型
if 'created_at' in user and user['created_at']:
user['created_at'] = user['created_at'].isoformat()
return jsonify(user)
return jsonify({"error": "用户不存在"}), 404
finally:
cursor.close()
conn.close()
PyMySQL 配合 DBUtils 连接池
# 安装: pip install DBUtils
import pymysql
from dbutils.pooled_db import PooledDB
# 创建连接池
pool = PooledDB(
creator=pymysql, # 使用 pymysql 作为连接
maxconnections=20, # 最大连接数
mincached=2, # 初始空闲连接数
maxcached=5, # 最大空闲连接数
maxshared=3, # 最大共享连接数
blocking=True, # 连接池耗尽时阻塞等待
host='localhost',
port=3306,
user='root',
password='your_password',
database='mydb',
charset='utf8mb4'
)
def get_db_connection():
"""获取数据库连接"""
return pool.connection()
# 使用示例
def get_all_products():
conn = get_db_connection()
try:
with conn.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute("SELECT * FROM products WHERE status = 1")
return cursor.fetchall()
finally:
conn.close()
事务管理
事务保证多个数据库操作的原子性,要么全部成功,要么全部回滚。
import mysql.connector
from mysql.connector import Error
def transfer_money(from_account, to_account, amount):
"""转账事务示例"""
conn = None
try:
conn = mysql.connector.connect(
host='localhost',
user='root',
password='your_password',
database='bank_db'
)
conn.autocommit = False # 关闭自动提交,开启事务
cursor = conn.cursor(dictionary=True)
# 1. 检查转出账户余额
cursor.execute(
"SELECT balance FROM accounts WHERE id = %s FOR UPDATE",
(from_account,)
)
from_balance = cursor.fetchone()
if not from_balance:
raise ValueError("转出账户不存在")
if from_balance['balance'] < amount:
raise ValueError("余额不足")
# 2. 扣除转出账户金额
cursor.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, from_account)
)
# 3. 增加转入账户金额
cursor.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, to_account)
)
# 4. 记录转账日志
cursor.execute(
"""INSERT INTO transfer_logs
(from_account, to_account, amount, created_at)
VALUES (%s, %s, %s, NOW())""",
(from_account, to_account, amount)
)
# 提交事务
conn.commit()
print("转账成功")
return True
except ValueError as e:
# 业务错误,回滚
if conn:
conn.rollback()
print(f"转账失败: {e}")
return False
except Error as e:
# 数据库错误,回滚
if conn:
conn.rollback()
print(f"数据库错误: {e}")
return False
finally:
if conn and conn.is_connected():
cursor.close()
conn.close()
# 使用上下文管理器简化事务
from contextlib import contextmanager
@contextmanager
def transaction(connection_params):
"""事务上下文管理器"""
conn = mysql.connector.connect(**connection_params)
conn.autocommit = False
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
# 使用示例
def update_user_profile(user_id, new_name, new_email):
with transaction({
'host': 'localhost',
'user': 'root',
'password': 'your_password',
'database': 'mydb'
}) as conn:
cursor = conn.cursor()
cursor.execute(
"UPDATE users SET name = %s, email = %s WHERE id = %s",
(new_name, new_email, user_id)
)
cursor.execute(
"INSERT INTO profile_history (user_id, field, old_value, new_value, changed_at) "
"SELECT %s, 'name', name, %s, NOW() FROM users WHERE id = %s",
(user_id, new_name, user_id)
)
批量操作
批量操作可以显著提高大量数据插入/更新的性能。
import mysql.connector
from datetime import datetime
def batch_insert_users(users_data):
"""批量插入用户数据"""
conn = mysql.connector.connect(
host='localhost',
user='root',
password='your_password',
database='mydb'
)
try:
cursor = conn.cursor()
# 方式一:使用 executemany(推荐)
sql = """
INSERT INTO users (username, email, age, created_at)
VALUES (%s, %s, %s, %s)
"""
# 准备数据
now = datetime.now()
data = [
(user['username'], user['email'], user['age'], now)
for user in users_data
]
# 批量执行
cursor.executemany(sql, data)
conn.commit()
print(f"成功插入 {cursor.rowcount} 条记录")
return cursor.rowcount
finally:
cursor.close()
conn.close()
def batch_update_prices(updates):
"""批量更新商品价格"""
conn = mysql.connector.connect(
host='localhost',
user='root',
password='your_password',
database='mydb'
)
try:
cursor = conn.cursor()
# 使用 CASE WHEN 进行批量更新
# 构建 SQL
product_ids = [str(u['id']) for u in updates]
case_sql = "CASE id "
for u in updates:
case_sql += f"WHEN {u['id']} THEN {u['price']} "
case_sql += "END"
sql = f"""
UPDATE products
SET price = {case_sql}
WHERE id IN ({','.join(product_ids)})
"""
cursor.execute(sql)
conn.commit()
return cursor.rowcount
finally:
cursor.close()
conn.close()
# 高性能批量插入(使用 LOAD DATA)
def fast_import_from_csv(csv_file_path):
"""从 CSV 快速导入数据"""
conn = mysql.connector.connect(
host='localhost',
user='root',
password='your_password',
database='mydb'
)
try:
cursor = conn.cursor()
# LOAD DATA INFILE 比 INSERT 快很多
sql = f"""
LOAD DATA LOCAL INFILE '{csv_file_path}'
INTO TABLE users
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\\n'
IGNORE 1 ROWS
(username, email, age)
SET created_at = NOW()
"""
cursor.execute(sql)
conn.commit()
print(f"成功导入 {cursor.rowcount} 条记录")
finally:
cursor.close()
conn.close()
ORM 使用:SQLAlchemy
ORM(对象关系映射)让数据库操作更加面向对象,减少 SQL 编写。
# 安装: pip install sqlalchemy pymysql
from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from datetime import datetime
# 定义基类
Base = declarative_base()
# 定义模型
class User(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(100), unique=True, nullable=False)
age = Column(Integer)
bio = Column(Text)
created_at = Column(DateTime, default=datetime.utcnow)
def __repr__(self):
return f"<User(id={self.id}, username='{self.username}')>"
# 创建引擎(连接池自动管理)
DATABASE_URL = "mysql+pymysql://root:your_password@localhost/mydb?charset=utf8mb4"
engine = create_engine(
DATABASE_URL,
pool_size=10, # 连接池大小
max_overflow=20, # 最大溢出连接
pool_recycle=3600, # 连接回收时间
echo=False # 是否打印 SQL
)
# 创建表
Base.metadata.create_all(engine)
# 创建会话工厂
SessionLocal = sessionmaker(bind=engine, autocommit=False, autoflush=False)
# 依赖注入(FastAPI 示例)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# CRUD 操作示例
class UserRepository:
def __init__(self, db: Session):
self.db = db
def create(self, username: str, email: str, age: int = None) -> User:
"""创建用户"""
user = User(username=username, email=email, age=age)
self.db.add(user)
self.db.commit()
self.db.refresh(user)
return user
def get_by_id(self, user_id: int) -> User:
"""根据 ID 查询"""
return self.db.query(User).filter(User.id == user_id).first()
def get_by_username(self, username: str) -> User:
"""根据用户名查询"""
return self.db.query(User).filter(User.username == username).first()
def get_all(self, skip: int = 0, limit: int = 100):
"""分页查询"""
return self.db.query(User).offset(skip).limit(limit).all()
def update(self, user_id: int, **kwargs) -> User:
"""更新用户"""
user = self.get_by_id(user_id)
if user:
for key, value in kwargs.items():
if hasattr(user, key):
setattr(user, key, value)
self.db.commit()
self.db.refresh(user)
return user
def delete(self, user_id: int) -> bool:
"""删除用户"""
user = self.get_by_id(user_id)
if user:
self.db.delete(user)
self.db.commit()
return True
return False
# 使用示例
db = SessionLocal()
repo = UserRepository(db)
# 创建用户
new_user = repo.create("张三", "[email protected]", age=25)
# 查询用户
user = repo.get_by_id(1)
print(f"用户: {user.username}, 邮箱: {user.email}")
# 更新用户
updated_user = repo.update(1, age=26, bio="这是个人简介")
# 分页查询
users = repo.get_all(skip=0, limit=10)
db.close()
Java 集成 (JDBC)
Java 通过 JDBC 连接 MySQL,是企业级应用的标准方式。
Maven 依赖
<dependencies>
<!-- MySQL 驱动 -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.3.0</version>
</dependency>
<!-- HikariCP 连接池 -->
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>5.1.0</version>
</dependency>
</dependencies>
基本连接与查询
import java.sql.*;
public class MySQLBasicExample {
private static final String URL = "jdbc:mysql://localhost:3306/mydb?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8mb4";
private static final String USER = "root";
private static final String PASSWORD = "your_password";
public static void main(String[] args) {
// 使用 try-with-resources 自动关闭资源
try (Connection conn = DriverManager.getConnection(URL, USER, PASSWORD)) {
// 查询单条记录
String sql = "SELECT id, username, email FROM users WHERE id = ?";
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setInt(1, 1);
try (ResultSet rs = stmt.executeQuery()) {
if (rs.next()) {
System.out.println("ID: " + rs.getInt("id"));
System.out.println("用户名: " + rs.getString("username"));
System.out.println("邮箱: " + rs.getString("email"));
}
}
}
// 插入数据
String insertSql = "INSERT INTO users (username, email, age) VALUES (?, ?, ?)";
try (PreparedStatement stmt = conn.prepareStatement(insertSql, Statement.RETURN_GENERATED_KEYS)) {
stmt.setString(1, "李四");
stmt.setString(2, "[email protected]");
stmt.setInt(3, 28);
int affectedRows = stmt.executeUpdate();
if (affectedRows > 0) {
try (ResultSet generatedKeys = stmt.getGeneratedKeys()) {
if (generatedKeys.next()) {
System.out.println("插入成功,ID: " + generatedKeys.getInt(1));
}
}
}
}
} catch (SQLException e) {
System.err.println("数据库错误: " + e.getMessage());
}
}
}
HikariCP 连接池
生产环境必须使用连接池,HikariCP 是目前性能最好的选择。
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.*;
public class ConnectionPoolExample {
private static HikariDataSource dataSource;
static {
// 配置连接池
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://localhost:3306/mydb?useSSL=false&serverTimezone=Asia/Shanghai");
config.setUsername("root");
config.setPassword("your_password");
config.setDriverClassName("com.mysql.cj.jdbc.Driver");
// 连接池参数
config.setMaximumPoolSize(20); // 最大连接数
config.setMinimumIdle(5); // 最小空闲连接
config.setIdleTimeout(300000); // 空闲超时(毫秒)
config.setConnectionTimeout(20000); // 连接超时(毫秒)
config.setMaxLifetime(1200000); // 连接最大生命周期(毫秒)
config.setLeakDetectionThreshold(60000); // 连接泄漏检测阈值
// 性能优化参数
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
config.addDataSourceProperty("useServerPrepStmts", "true");
config.addDataSourceProperty("useLocalSessionState", "true");
// 连接池名称
config.setPoolName("MyAppPool");
dataSource = new HikariDataSource(config);
}
// 获取连接
public static Connection getConnection() throws SQLException {
return dataSource.getConnection();
}
// 关闭连接池
public static void closePool() {
if (dataSource != null && !dataSource.isClosed()) {
dataSource.close();
}
}
// 使用示例
public static User findById(int userId) {
String sql = "SELECT id, username, email, age FROM users WHERE id = ?";
try (Connection conn = getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setInt(1, userId);
try (ResultSet rs = stmt.executeQuery()) {
if (rs.next()) {
User user = new User();
user.setId(rs.getInt("id"));
user.setUsername(rs.getString("username"));
user.setEmail(rs.getString("email"));
user.setAge(rs.getInt("age"));
return user;
}
}
} catch (SQLException e) {
System.err.println("查询失败: " + e.getMessage());
}
return null;
}
}
// User 实体类
class User {
private int id;
private String username;
private String email;
private Integer age;
// getter 和 setter
public int getId() { return id; }
public void setId(int id) { this.id = id; }
public String getUsername() { return username; }
public void setUsername(String username) { this.username = username; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public Integer getAge() { return age; }
public void setAge(Integer age) { this.age = age; }
}
事务管理
import java.sql.*;
public class TransactionExample {
/**
* 转账事务示例
*/
public static boolean transfer(int fromAccountId, int toAccountId, double amount) {
Connection conn = null;
try {
conn = ConnectionPoolExample.getConnection();
conn.setAutoCommit(false); // 开启事务
// 1. 检查转出账户余额(使用 FOR UPDATE 锁定)
String checkSql = "SELECT balance FROM accounts WHERE id = ? FOR UPDATE";
try (PreparedStatement stmt = conn.prepareStatement(checkSql)) {
stmt.setInt(1, fromAccountId);
ResultSet rs = stmt.executeQuery();
if (!rs.next()) {
throw new SQLException("转出账户不存在");
}
double balance = rs.getDouble("balance");
if (balance < amount) {
throw new SQLException("余额不足");
}
}
// 2. 扣款
String deductSql = "UPDATE accounts SET balance = balance - ? WHERE id = ?";
try (PreparedStatement stmt = conn.prepareStatement(deductSql)) {
stmt.setDouble(1, amount);
stmt.setInt(2, fromAccountId);
stmt.executeUpdate();
}
// 3. 存款
String depositSql = "UPDATE accounts SET balance = balance + ? WHERE id = ?";
try (PreparedStatement stmt = conn.prepareStatement(depositSql)) {
stmt.setDouble(1, amount);
stmt.setInt(2, toAccountId);
stmt.executeUpdate();
}
// 4. 记录日志
String logSql = "INSERT INTO transfer_logs (from_account, to_account, amount, created_at) VALUES (?, ?, ?, NOW())";
try (PreparedStatement stmt = conn.prepareStatement(logSql)) {
stmt.setInt(1, fromAccountId);
stmt.setInt(2, toAccountId);
stmt.setDouble(3, amount);
stmt.executeUpdate();
}
conn.commit(); // 提交事务
System.out.println("转账成功");
return true;
} catch (SQLException e) {
// 回滚事务
if (conn != null) {
try {
conn.rollback();
} catch (SQLException ex) {
ex.printStackTrace();
}
}
System.err.println("转账失败: " + e.getMessage());
return false;
} finally {
if (conn != null) {
try {
conn.setAutoCommit(true); // 恢复自动提交
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}
批量操作
import java.sql.*;
public class BatchExample {
/**
* 批量插入用户
*/
public static int batchInsertUsers(List<User> users) {
String sql = "INSERT INTO users (username, email, age, created_at) VALUES (?, ?, ?, NOW())";
try (Connection conn = ConnectionPoolExample.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {
// 禁用自动提交以提高性能
conn.setAutoCommit(false);
for (User user : users) {
stmt.setString(1, user.getUsername());
stmt.setString(2, user.getEmail());
stmt.setInt(3, user.getAge());
stmt.addBatch(); // 添加到批次
// 每 1000 条执行一次
if (users.indexOf(user) % 1000 == 0) {
stmt.executeBatch();
}
}
// 执行剩余批次
int[] results = stmt.executeBatch();
conn.commit();
return results.length;
} catch (SQLException e) {
System.err.println("批量插入失败: " + e.getMessage());
return 0;
}
}
}
Go 语言集成
Go 语言标准库 database/sql 提供了数据库操作的统一接口,配合 MySQL 驱动使用。
安装驱动
go get -u github.com/go-sql-driver/mysql
基本操作
package main
import (
"database/sql"
"fmt"
"log"
"time"
_ "github.com/go-sql-driver/mysql" // 匿名导入驱动
)
// User 用户模型
type User struct {
ID int
Username string
Email string
Age int
CreatedAt time.Time
}
// 全局数据库连接
var db *sql.DB
func initDB() error {
// DSN 格式: username:password@protocol(address)/dbname?params
dsn := "root:your_password@tcp(127.0.0.1:3306)/mydb?charset=utf8mb4&parseTime=true&loc=Local"
var err error
db, err = sql.Open("mysql", dsn)
if err != nil {
return err
}
// 测试连接
if err = db.Ping(); err != nil {
return err
}
// 连接池配置
db.SetMaxOpenConns(25) // 最大打开连接数
db.SetMaxIdleConns(10) // 最大空闲连接数
db.SetConnMaxLifetime(5 * time.Minute) // 连接最大生命周期
fmt.Println("数据库连接成功")
return nil
}
func main() {
if err := initDB(); err != nil {
log.Fatal(err)
}
defer db.Close()
// 查询单条记录
user, err := getUserByID(1)
if err != nil {
log.Println("查询失败:", err)
} else {
fmt.Printf("用户: %+v\n", user)
}
}
// getUserByID 根据ID查询用户
func getUserByID(id int) (*User, error) {
query := "SELECT id, username, email, age, created_at FROM users WHERE id = ?"
row := db.QueryRow(query, id)
user := &User{}
err := row.Scan(&user.ID, &user.Username, &user.Email, &user.Age, &user.CreatedAt)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("用户不存在")
}
return nil, err
}
return user, nil
}
// getAllUsers 查询所有用户
func getAllUsers() ([]User, error) {
query := "SELECT id, username, email, age, created_at FROM users ORDER BY id"
rows, err := db.Query(query)
if err != nil {
return nil, err
}
defer rows.Close()
var users []User
for rows.Next() {
var user User
if err := rows.Scan(&user.ID, &user.Username, &user.Email, &user.Age, &user.CreatedAt); err != nil {
return nil, err
}
users = append(users, user)
}
return users, nil
}
// insertUser 插入用户
func insertUser(username, email string, age int) (int64, error) {
query := "INSERT INTO users (username, email, age, created_at) VALUES (?, ?, ?, NOW())"
result, err := db.Exec(query, username, email, age)
if err != nil {
return 0, err
}
return result.LastInsertId()
}
// updateUser 更新用户
func updateUser(id int, username string, age int) error {
query := "UPDATE users SET username = ?, age = ? WHERE id = ?"
result, err := db.Exec(query, username, age, id)
if err != nil {
return err
}
rows, _ := result.RowsAffected()
if rows == 0 {
return fmt.Errorf("用户不存在")
}
return nil
}
事务处理
// transferMoney 转账事务
func transferMoney(fromAccount, toAccount int, amount float64) error {
// 开始事务
tx, err := db.Begin()
if err != nil {
return err
}
// 确保事务会被回滚或提交
defer func() {
if p := recover(); p != nil {
tx.Rollback()
panic(p)
}
}()
// 1. 检查转出账户余额(加锁)
var balance float64
err = tx.QueryRow("SELECT balance FROM accounts WHERE id = ? FOR UPDATE", fromAccount).Scan(&balance)
if err != nil {
tx.Rollback()
return err
}
if balance < amount {
tx.Rollback()
return fmt.Errorf("余额不足")
}
// 2. 扣款
_, err = tx.Exec("UPDATE accounts SET balance = balance - ? WHERE id = ?", amount, fromAccount)
if err != nil {
tx.Rollback()
return err
}
// 3. 存款
_, err = tx.Exec("UPDATE accounts SET balance = balance + ? WHERE id = ?", amount, toAccount)
if err != nil {
tx.Rollback()
return err
}
// 4. 记录日志
_, err = tx.Exec(
"INSERT INTO transfer_logs (from_account, to_account, amount, created_at) VALUES (?, ?, ?, NOW())",
fromAccount, toAccount, amount,
)
if err != nil {
tx.Rollback()
return err
}
// 提交事务
return tx.Commit()
}
批量操作
import "strings"
// batchInsertUsers 批量插入用户
func batchInsertUsers(users []User) error {
if len(users) == 0 {
return nil
}
// 构建 SQL
valueStrings := make([]string, 0, len(users))
valueArgs := make([]interface{}, 0, len(users)*3)
for _, user := range users {
valueStrings = append(valueStrings, "(?, ?, ?, NOW())")
valueArgs = append(valueArgs, user.Username, user.Email, user.Age)
}
query := fmt.Sprintf(
"INSERT INTO users (username, email, age, created_at) VALUES %s",
strings.Join(valueStrings, ","),
)
_, err := db.Exec(query, valueArgs...)
return err
}
// 使用预处理语句批量插入(更高效)
func batchInsertWithStmt(users []User) error {
stmt, err := db.Prepare("INSERT INTO users (username, email, age, created_at) VALUES (?, ?, ?, NOW())")
if err != nil {
return err
}
defer stmt.Close()
for _, user := range users {
_, err := stmt.Exec(user.Username, user.Email, user.Age)
if err != nil {
return err
}
}
return nil
}
使用 GORM
GORM 是 Go 语言最流行的 ORM 库,简化了数据库操作。
// 安装: go get -u gorm.io/gorm gorm.io/driver/mysql
package main
import (
"fmt"
"log"
"time"
"gorm.io/driver/mysql"
"gorm.io/gorm"
)
// User 模型
type User struct {
ID uint `gorm:"primaryKey" json:"id"`
Username string `gorm:"uniqueIndex;size:50;not null" json:"username"`
Email string `gorm:"uniqueIndex;size:100;not null" json:"email"`
Age int `json:"age"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
DeletedAt gorm.DeletedAt `gorm:"index" json:"-"` // 软删除
}
func main() {
dsn := "root:your_password@tcp(127.0.0.1:3306)/mydb?charset=utf8mb4&parseTime=True&loc=Local"
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
if err != nil {
log.Fatal(err)
}
// 自动迁移
db.AutoMigrate(&User{})
// 创建
user := User{Username: "张三", Email: "[email protected]", Age: 25}
result := db.Create(&user)
fmt.Printf("插入 ID: %d, 影响行数: %d\n", user.ID, result.RowsAffected)
// 查询
var firstUser User
db.First(&firstUser, 1) // 根据主键查询
db.First(&firstUser, "username = ?", "张三") // 条件查询
// 更新
db.Model(&firstUser).Update("Age", 26)
db.Model(&firstUser).Updates(User{Age: 27, Email: "[email protected]"})
// 删除
db.Delete(&firstUser)
// 查询所有
var users []User
db.Where("age > ?", 20).Find(&users)
// 分页
var paginatedUsers []User
db.Offset(0).Limit(10).Find(&paginatedUsers)
// 事务
db.Transaction(func(tx *gorm.DB) error {
if err := tx.Create(&User{Username: "用户1", Email: "[email protected]"}).Error; err != nil {
return err
}
if err := tx.Create(&User{Username: "用户2", Email: "[email protected]"}).Error; err != nil {
return err
}
return nil
})
}
Node.js 集成
Node.js 连接 MySQL 常用 mysql2 驱动,支持 Promise 和连接池。
安装
npm install mysql2
基本操作
const mysql = require('mysql2/promise');
// 创建连接池
const pool = mysql.createPool({
host: 'localhost',
user: 'root',
password: 'your_password',
database: 'mydb',
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0,
charset: 'utf8mb4'
});
// 查询单条记录
async function getUserById(userId) {
const [rows] = await pool.execute(
'SELECT id, username, email FROM users WHERE id = ?',
[userId]
);
return rows[0];
}
// 查询多条记录
async function getAllUsers(limit = 100) {
const [rows] = await pool.execute(
'SELECT * FROM users ORDER BY id LIMIT ?',
[limit]
);
return rows;
}
// 插入记录
async function createUser(username, email, age) {
const [result] = await pool.execute(
'INSERT INTO users (username, email, age, created_at) VALUES (?, ?, ?, NOW())',
[username, email, age]
);
return result.insertId;
}
// 事务处理
async function transfer(fromId, toId, amount) {
const connection = await pool.getConnection();
try {
await connection.beginTransaction();
// 检查余额
const [accounts] = await connection.execute(
'SELECT balance FROM accounts WHERE id = ? FOR UPDATE',
[fromId]
);
if (accounts.length === 0 || accounts[0].balance < amount) {
throw new Error('余额不足');
}
// 执行转账
await connection.execute(
'UPDATE accounts SET balance = balance - ? WHERE id = ?',
[amount, fromId]
);
await connection.execute(
'UPDATE accounts SET balance = balance + ? WHERE id = ?',
[amount, toId]
);
await connection.commit();
return true;
} catch (error) {
await connection.rollback();
throw error;
} finally {
connection.release();
}
}
// Express 应用示例
const express = require('express');
const app = express();
app.get('/users/:id', async (req, res) => {
try {
const user = await getUserById(req.params.id);
if (!user) {
return res.status(404).json({ error: '用户不存在' });
}
res.json(user);
} catch (error) {
res.status(500).json({ error: error.message });
}
});
app.listen(3000, () => console.log('Server running on port 3000'));
最佳实践总结
连接管理
| 实践 | 说明 |
|---|---|
| 使用连接池 | 避免频繁创建和销毁连接 |
| 合理设置池大小 | 根据并发量调整,通常为 CPU 核心数的 2-3 倍 |
| 及时释放连接 | 使用 try-finally 或上下文管理器确保连接释放 |
| 设置连接超时 | 防止连接泄漏和长时间等待 |
SQL 安全
# ❌ 危险:SQL 注入风险
sql = f"SELECT * FROM users WHERE username = '{username}'"
# ✅ 安全:使用参数化查询
sql = "SELECT * FROM users WHERE username = %s"
cursor.execute(sql, (username,))
性能优化
- 批量操作:使用
executemany或批量插入语句 - 索引优化:确保查询字段有合适的索引
- 预处理语句:复用 SQL 解析结果
- 分页查询:避免一次性获取大量数据
- 只查需要的列:避免
SELECT *
错误处理
import mysql.connector
from mysql.connector import Error, DatabaseError, InterfaceError
try:
conn = mysql.connector.connect(...)
# 执行操作
except InterfaceError as e:
# 网络连接错误
print(f"连接失败: {e}")
except DatabaseError as e:
# 数据库错误
print(f"数据库错误: {e}")
except Error as e:
# 其他 MySQL 错误
print(f"MySQL 错误: {e}")
finally:
if conn and conn.is_connected():
conn.close()
小结
本章我们学习了:
- Python 集成:mysql-connector-python、PyMySQL、连接池、事务、SQLAlchemy ORM
- Java 集成:JDBC、HikariCP 连接池、事务管理、批量操作
- Go 集成:database/sql、连接池、事务、GORM ORM
- Node.js 集成:mysql2、Promise API、连接池、事务
- 最佳实践:连接管理、SQL 安全、性能优化、错误处理
练习
- 使用你熟悉的语言实现一个用户管理模块(CRUD)
- 配置连接池并测试高并发场景下的性能
- 实现一个转账功能,确保事务正确处理
- 使用 ORM 重写一个简单的查询功能
- 实现批量导入 CSV 文件到数据库