跳到主要内容

MySQL 编程集成

MySQL 能与各种编程语言轻松集成。本章将详细介绍如何使用主流编程语言连接和操作 MySQL 数据库,包括连接管理、事务处理、批量操作和性能优化等核心内容。

概述

为什么需要编程集成?

数据库编程是后端开发的核心技能。无论是 Web 应用、数据分析还是微服务架构,都需要通过程序代码与数据库交互。良好的数据库集成需要关注:

  • 连接管理:高效创建、复用和释放数据库连接
  • SQL 执行:安全地构建和执行 SQL 语句
  • 事务控制:保证数据一致性和完整性
  • 性能优化:批量操作、预处理语句、连接池
  • 错误处理:优雅处理数据库异常

常用连接器对比

语言官方连接器第三方库推荐场景
Pythonmysql-connector-pythonPyMySQL, mysqlclient数据分析、Web 后端
JavaMySQL Connector/JHikariCP(连接池)企业级应用
Gogo-sql-driver/mysqlGORM(ORM)微服务、高并发
Node.jsmysql2Sequelize, TypeORM全栈 Web 应用
C#MySqlConnectorDapper, EF Core.NET 应用

Python 集成

Python 连接 MySQL 有多种选择,最常用的是官方的 mysql-connector-python 和第三方的 PyMySQL

安装驱动

# 官方驱动(纯 Python 实现,兼容性好)
pip install mysql-connector-python

# 第三方驱动(API 兼容,性能略好)
pip install pymysql

# C 扩展版本(性能最好,需要编译环境)
pip install mysqlclient

基本连接与查询

使用 mysql-connector-python

import mysql.connector
from mysql.connector import Error

def query_user(user_id):
"""查询单个用户"""
connection = None
cursor = None

try:
# 建立连接
connection = mysql.connector.connect(
host='localhost',
port=3306,
user='root',
password='your_password',
database='mydb',
charset='utf8mb4'
)

# 创建游标(字典格式返回结果)
cursor = connection.cursor(dictionary=True)

# 执行参数化查询(防止 SQL 注入)
sql = "SELECT id, username, email, created_at FROM users WHERE id = %s"
cursor.execute(sql, (user_id,))

# 获取结果
result = cursor.fetchone()

if result:
print(f"用户: {result['username']}")
print(f"邮箱: {result['email']}")
return result
else:
print("用户不存在")
return None

except Error as e:
print(f"数据库错误: {e}")
return None

finally:
# 确保资源释放
if cursor:
cursor.close()
if connection and connection.is_connected():
connection.close()

使用 PyMySQL

import pymysql
from pymysql.cursors import DictCursor

def query_with_pymysql():
"""PyMySQL 基本用法"""
# 使用上下文管理器自动关闭连接
with pymysql.connect(
host='localhost',
user='root',
password='your_password',
database='mydb',
charset='utf8mb4',
cursorclass=DictCursor
) as connection:
with connection.cursor() as cursor:
# 执行查询
cursor.execute("SELECT * FROM users LIMIT 10")

# 获取所有结果
users = cursor.fetchall()
for user in users:
print(f"{user['id']}: {user['username']}")

return users

连接池

在高并发场景下,频繁创建和销毁连接会严重影响性能。连接池通过预先创建并复用连接来解决这个问题。

mysql-connector-python 连接池

from mysql.connector import pooling
import mysql.connector

# 创建连接池
dbconfig = {
"host": "localhost",
"port": 3306,
"user": "root",
"password": "your_password",
"database": "mydb",
"charset": "utf8mb4"
}

# 创建连接池(默认池大小为 5)
connection_pool = pooling.MySQLConnectionPool(
pool_name="mypool",
pool_size=10, # 连接池大小
pool_reset_session=True, # 重置会话状态
**dbconfig
)

def get_connection():
"""从连接池获取连接"""
try:
return connection_pool.get_connection()
except pooling.PoolError as e:
print(f"连接池耗尽: {e}")
return None

# 使用连接池
def query_users():
conn = get_connection()
if conn is None:
return []

try:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM users WHERE status = %s", ('active',))
return cursor.fetchall()
finally:
cursor.close()
conn.close() # 实际上是归还连接池,不是真正关闭

# Flask 应用示例
from flask import Flask, jsonify

app = Flask(__name__)

@app.route('/users/<int:user_id>')
def get_user(user_id):
conn = get_connection()
if conn is None:
return jsonify({"error": "数据库连接失败"}), 503

try:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM users WHERE id = %s", (user_id,))
user = cursor.fetchone()

if user:
# 处理 datetime 等特殊类型
if 'created_at' in user and user['created_at']:
user['created_at'] = user['created_at'].isoformat()
return jsonify(user)
return jsonify({"error": "用户不存在"}), 404
finally:
cursor.close()
conn.close()

PyMySQL 配合 DBUtils 连接池

# 安装: pip install DBUtils

import pymysql
from dbutils.pooled_db import PooledDB

# 创建连接池
pool = PooledDB(
creator=pymysql, # 使用 pymysql 作为连接
maxconnections=20, # 最大连接数
mincached=2, # 初始空闲连接数
maxcached=5, # 最大空闲连接数
maxshared=3, # 最大共享连接数
blocking=True, # 连接池耗尽时阻塞等待
host='localhost',
port=3306,
user='root',
password='your_password',
database='mydb',
charset='utf8mb4'
)

def get_db_connection():
"""获取数据库连接"""
return pool.connection()

# 使用示例
def get_all_products():
conn = get_db_connection()
try:
with conn.cursor(pymysql.cursors.DictCursor) as cursor:
cursor.execute("SELECT * FROM products WHERE status = 1")
return cursor.fetchall()
finally:
conn.close()

事务管理

事务保证多个数据库操作的原子性,要么全部成功,要么全部回滚。

import mysql.connector
from mysql.connector import Error

def transfer_money(from_account, to_account, amount):
"""转账事务示例"""
conn = None
try:
conn = mysql.connector.connect(
host='localhost',
user='root',
password='your_password',
database='bank_db'
)
conn.autocommit = False # 关闭自动提交,开启事务

cursor = conn.cursor(dictionary=True)

# 1. 检查转出账户余额
cursor.execute(
"SELECT balance FROM accounts WHERE id = %s FOR UPDATE",
(from_account,)
)
from_balance = cursor.fetchone()

if not from_balance:
raise ValueError("转出账户不存在")

if from_balance['balance'] < amount:
raise ValueError("余额不足")

# 2. 扣除转出账户金额
cursor.execute(
"UPDATE accounts SET balance = balance - %s WHERE id = %s",
(amount, from_account)
)

# 3. 增加转入账户金额
cursor.execute(
"UPDATE accounts SET balance = balance + %s WHERE id = %s",
(amount, to_account)
)

# 4. 记录转账日志
cursor.execute(
"""INSERT INTO transfer_logs
(from_account, to_account, amount, created_at)
VALUES (%s, %s, %s, NOW())""",
(from_account, to_account, amount)
)

# 提交事务
conn.commit()
print("转账成功")
return True

except ValueError as e:
# 业务错误,回滚
if conn:
conn.rollback()
print(f"转账失败: {e}")
return False

except Error as e:
# 数据库错误,回滚
if conn:
conn.rollback()
print(f"数据库错误: {e}")
return False

finally:
if conn and conn.is_connected():
cursor.close()
conn.close()

# 使用上下文管理器简化事务
from contextlib import contextmanager

@contextmanager
def transaction(connection_params):
"""事务上下文管理器"""
conn = mysql.connector.connect(**connection_params)
conn.autocommit = False
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()

# 使用示例
def update_user_profile(user_id, new_name, new_email):
with transaction({
'host': 'localhost',
'user': 'root',
'password': 'your_password',
'database': 'mydb'
}) as conn:
cursor = conn.cursor()
cursor.execute(
"UPDATE users SET name = %s, email = %s WHERE id = %s",
(new_name, new_email, user_id)
)
cursor.execute(
"INSERT INTO profile_history (user_id, field, old_value, new_value, changed_at) "
"SELECT %s, 'name', name, %s, NOW() FROM users WHERE id = %s",
(user_id, new_name, user_id)
)

批量操作

批量操作可以显著提高大量数据插入/更新的性能。

import mysql.connector
from datetime import datetime

def batch_insert_users(users_data):
"""批量插入用户数据"""
conn = mysql.connector.connect(
host='localhost',
user='root',
password='your_password',
database='mydb'
)

try:
cursor = conn.cursor()

# 方式一:使用 executemany(推荐)
sql = """
INSERT INTO users (username, email, age, created_at)
VALUES (%s, %s, %s, %s)
"""

# 准备数据
now = datetime.now()
data = [
(user['username'], user['email'], user['age'], now)
for user in users_data
]

# 批量执行
cursor.executemany(sql, data)
conn.commit()

print(f"成功插入 {cursor.rowcount} 条记录")
return cursor.rowcount

finally:
cursor.close()
conn.close()

def batch_update_prices(updates):
"""批量更新商品价格"""
conn = mysql.connector.connect(
host='localhost',
user='root',
password='your_password',
database='mydb'
)

try:
cursor = conn.cursor()

# 使用 CASE WHEN 进行批量更新
# 构建 SQL
product_ids = [str(u['id']) for u in updates]
case_sql = "CASE id "
for u in updates:
case_sql += f"WHEN {u['id']} THEN {u['price']} "
case_sql += "END"

sql = f"""
UPDATE products
SET price = {case_sql}
WHERE id IN ({','.join(product_ids)})
"""

cursor.execute(sql)
conn.commit()

return cursor.rowcount

finally:
cursor.close()
conn.close()

# 高性能批量插入(使用 LOAD DATA)
def fast_import_from_csv(csv_file_path):
"""从 CSV 快速导入数据"""
conn = mysql.connector.connect(
host='localhost',
user='root',
password='your_password',
database='mydb'
)

try:
cursor = conn.cursor()

# LOAD DATA INFILE 比 INSERT 快很多
sql = f"""
LOAD DATA LOCAL INFILE '{csv_file_path}'
INTO TABLE users
FIELDS TERMINATED BY ','
LINES TERMINATED BY '\\n'
IGNORE 1 ROWS
(username, email, age)
SET created_at = NOW()
"""

cursor.execute(sql)
conn.commit()

print(f"成功导入 {cursor.rowcount} 条记录")

finally:
cursor.close()
conn.close()

ORM 使用:SQLAlchemy

ORM(对象关系映射)让数据库操作更加面向对象,减少 SQL 编写。

# 安装: pip install sqlalchemy pymysql

from sqlalchemy import create_engine, Column, Integer, String, DateTime, Text
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from datetime import datetime

# 定义基类
Base = declarative_base()

# 定义模型
class User(Base):
__tablename__ = 'users'

id = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(100), unique=True, nullable=False)
age = Column(Integer)
bio = Column(Text)
created_at = Column(DateTime, default=datetime.utcnow)

def __repr__(self):
return f"<User(id={self.id}, username='{self.username}')>"

# 创建引擎(连接池自动管理)
DATABASE_URL = "mysql+pymysql://root:your_password@localhost/mydb?charset=utf8mb4"
engine = create_engine(
DATABASE_URL,
pool_size=10, # 连接池大小
max_overflow=20, # 最大溢出连接
pool_recycle=3600, # 连接回收时间
echo=False # 是否打印 SQL
)

# 创建表
Base.metadata.create_all(engine)

# 创建会话工厂
SessionLocal = sessionmaker(bind=engine, autocommit=False, autoflush=False)

# 依赖注入(FastAPI 示例)
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()

# CRUD 操作示例
class UserRepository:
def __init__(self, db: Session):
self.db = db

def create(self, username: str, email: str, age: int = None) -> User:
"""创建用户"""
user = User(username=username, email=email, age=age)
self.db.add(user)
self.db.commit()
self.db.refresh(user)
return user

def get_by_id(self, user_id: int) -> User:
"""根据 ID 查询"""
return self.db.query(User).filter(User.id == user_id).first()

def get_by_username(self, username: str) -> User:
"""根据用户名查询"""
return self.db.query(User).filter(User.username == username).first()

def get_all(self, skip: int = 0, limit: int = 100):
"""分页查询"""
return self.db.query(User).offset(skip).limit(limit).all()

def update(self, user_id: int, **kwargs) -> User:
"""更新用户"""
user = self.get_by_id(user_id)
if user:
for key, value in kwargs.items():
if hasattr(user, key):
setattr(user, key, value)
self.db.commit()
self.db.refresh(user)
return user

def delete(self, user_id: int) -> bool:
"""删除用户"""
user = self.get_by_id(user_id)
if user:
self.db.delete(user)
self.db.commit()
return True
return False

# 使用示例
db = SessionLocal()
repo = UserRepository(db)

# 创建用户
new_user = repo.create("张三", "[email protected]", age=25)

# 查询用户
user = repo.get_by_id(1)
print(f"用户: {user.username}, 邮箱: {user.email}")

# 更新用户
updated_user = repo.update(1, age=26, bio="这是个人简介")

# 分页查询
users = repo.get_all(skip=0, limit=10)

db.close()

Java 集成 (JDBC)

Java 通过 JDBC 连接 MySQL,是企业级应用的标准方式。

Maven 依赖

<dependencies>
<!-- MySQL 驱动 -->
<dependency>
<groupId>com.mysql</groupId>
<artifactId>mysql-connector-j</artifactId>
<version>8.3.0</version>
</dependency>

<!-- HikariCP 连接池 -->
<dependency>
<groupId>com.zaxxer</groupId>
<artifactId>HikariCP</artifactId>
<version>5.1.0</version>
</dependency>
</dependencies>

基本连接与查询

import java.sql.*;

public class MySQLBasicExample {

private static final String URL = "jdbc:mysql://localhost:3306/mydb?useSSL=false&serverTimezone=Asia/Shanghai&characterEncoding=utf8mb4";
private static final String USER = "root";
private static final String PASSWORD = "your_password";

public static void main(String[] args) {
// 使用 try-with-resources 自动关闭资源
try (Connection conn = DriverManager.getConnection(URL, USER, PASSWORD)) {

// 查询单条记录
String sql = "SELECT id, username, email FROM users WHERE id = ?";

try (PreparedStatement stmt = conn.prepareStatement(sql)) {
stmt.setInt(1, 1);

try (ResultSet rs = stmt.executeQuery()) {
if (rs.next()) {
System.out.println("ID: " + rs.getInt("id"));
System.out.println("用户名: " + rs.getString("username"));
System.out.println("邮箱: " + rs.getString("email"));
}
}
}

// 插入数据
String insertSql = "INSERT INTO users (username, email, age) VALUES (?, ?, ?)";
try (PreparedStatement stmt = conn.prepareStatement(insertSql, Statement.RETURN_GENERATED_KEYS)) {
stmt.setString(1, "李四");
stmt.setString(2, "[email protected]");
stmt.setInt(3, 28);

int affectedRows = stmt.executeUpdate();

if (affectedRows > 0) {
try (ResultSet generatedKeys = stmt.getGeneratedKeys()) {
if (generatedKeys.next()) {
System.out.println("插入成功,ID: " + generatedKeys.getInt(1));
}
}
}
}

} catch (SQLException e) {
System.err.println("数据库错误: " + e.getMessage());
}
}
}

HikariCP 连接池

生产环境必须使用连接池,HikariCP 是目前性能最好的选择。

import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;
import java.sql.*;

public class ConnectionPoolExample {

private static HikariDataSource dataSource;

static {
// 配置连接池
HikariConfig config = new HikariConfig();
config.setJdbcUrl("jdbc:mysql://localhost:3306/mydb?useSSL=false&serverTimezone=Asia/Shanghai");
config.setUsername("root");
config.setPassword("your_password");
config.setDriverClassName("com.mysql.cj.jdbc.Driver");

// 连接池参数
config.setMaximumPoolSize(20); // 最大连接数
config.setMinimumIdle(5); // 最小空闲连接
config.setIdleTimeout(300000); // 空闲超时(毫秒)
config.setConnectionTimeout(20000); // 连接超时(毫秒)
config.setMaxLifetime(1200000); // 连接最大生命周期(毫秒)
config.setLeakDetectionThreshold(60000); // 连接泄漏检测阈值

// 性能优化参数
config.addDataSourceProperty("cachePrepStmts", "true");
config.addDataSourceProperty("prepStmtCacheSize", "250");
config.addDataSourceProperty("prepStmtCacheSqlLimit", "2048");
config.addDataSourceProperty("useServerPrepStmts", "true");
config.addDataSourceProperty("useLocalSessionState", "true");

// 连接池名称
config.setPoolName("MyAppPool");

dataSource = new HikariDataSource(config);
}

// 获取连接
public static Connection getConnection() throws SQLException {
return dataSource.getConnection();
}

// 关闭连接池
public static void closePool() {
if (dataSource != null && !dataSource.isClosed()) {
dataSource.close();
}
}

// 使用示例
public static User findById(int userId) {
String sql = "SELECT id, username, email, age FROM users WHERE id = ?";

try (Connection conn = getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {

stmt.setInt(1, userId);

try (ResultSet rs = stmt.executeQuery()) {
if (rs.next()) {
User user = new User();
user.setId(rs.getInt("id"));
user.setUsername(rs.getString("username"));
user.setEmail(rs.getString("email"));
user.setAge(rs.getInt("age"));
return user;
}
}

} catch (SQLException e) {
System.err.println("查询失败: " + e.getMessage());
}

return null;
}
}

// User 实体类
class User {
private int id;
private String username;
private String email;
private Integer age;

// getter 和 setter
public int getId() { return id; }
public void setId(int id) { this.id = id; }
public String getUsername() { return username; }
public void setUsername(String username) { this.username = username; }
public String getEmail() { return email; }
public void setEmail(String email) { this.email = email; }
public Integer getAge() { return age; }
public void setAge(Integer age) { this.age = age; }
}

事务管理

import java.sql.*;

public class TransactionExample {

/**
* 转账事务示例
*/
public static boolean transfer(int fromAccountId, int toAccountId, double amount) {
Connection conn = null;

try {
conn = ConnectionPoolExample.getConnection();
conn.setAutoCommit(false); // 开启事务

// 1. 检查转出账户余额(使用 FOR UPDATE 锁定)
String checkSql = "SELECT balance FROM accounts WHERE id = ? FOR UPDATE";
try (PreparedStatement stmt = conn.prepareStatement(checkSql)) {
stmt.setInt(1, fromAccountId);
ResultSet rs = stmt.executeQuery();

if (!rs.next()) {
throw new SQLException("转出账户不存在");
}

double balance = rs.getDouble("balance");
if (balance < amount) {
throw new SQLException("余额不足");
}
}

// 2. 扣款
String deductSql = "UPDATE accounts SET balance = balance - ? WHERE id = ?";
try (PreparedStatement stmt = conn.prepareStatement(deductSql)) {
stmt.setDouble(1, amount);
stmt.setInt(2, fromAccountId);
stmt.executeUpdate();
}

// 3. 存款
String depositSql = "UPDATE accounts SET balance = balance + ? WHERE id = ?";
try (PreparedStatement stmt = conn.prepareStatement(depositSql)) {
stmt.setDouble(1, amount);
stmt.setInt(2, toAccountId);
stmt.executeUpdate();
}

// 4. 记录日志
String logSql = "INSERT INTO transfer_logs (from_account, to_account, amount, created_at) VALUES (?, ?, ?, NOW())";
try (PreparedStatement stmt = conn.prepareStatement(logSql)) {
stmt.setInt(1, fromAccountId);
stmt.setInt(2, toAccountId);
stmt.setDouble(3, amount);
stmt.executeUpdate();
}

conn.commit(); // 提交事务
System.out.println("转账成功");
return true;

} catch (SQLException e) {
// 回滚事务
if (conn != null) {
try {
conn.rollback();
} catch (SQLException ex) {
ex.printStackTrace();
}
}
System.err.println("转账失败: " + e.getMessage());
return false;

} finally {
if (conn != null) {
try {
conn.setAutoCommit(true); // 恢复自动提交
conn.close();
} catch (SQLException e) {
e.printStackTrace();
}
}
}
}
}

批量操作

import java.sql.*;

public class BatchExample {

/**
* 批量插入用户
*/
public static int batchInsertUsers(List<User> users) {
String sql = "INSERT INTO users (username, email, age, created_at) VALUES (?, ?, ?, NOW())";

try (Connection conn = ConnectionPoolExample.getConnection();
PreparedStatement stmt = conn.prepareStatement(sql)) {

// 禁用自动提交以提高性能
conn.setAutoCommit(false);

for (User user : users) {
stmt.setString(1, user.getUsername());
stmt.setString(2, user.getEmail());
stmt.setInt(3, user.getAge());
stmt.addBatch(); // 添加到批次

// 每 1000 条执行一次
if (users.indexOf(user) % 1000 == 0) {
stmt.executeBatch();
}
}

// 执行剩余批次
int[] results = stmt.executeBatch();
conn.commit();

return results.length;

} catch (SQLException e) {
System.err.println("批量插入失败: " + e.getMessage());
return 0;
}
}
}

Go 语言集成

Go 语言标准库 database/sql 提供了数据库操作的统一接口,配合 MySQL 驱动使用。

安装驱动

go get -u github.com/go-sql-driver/mysql

基本操作

package main

import (
"database/sql"
"fmt"
"log"
"time"

_ "github.com/go-sql-driver/mysql" // 匿名导入驱动
)

// User 用户模型
type User struct {
ID int
Username string
Email string
Age int
CreatedAt time.Time
}

// 全局数据库连接
var db *sql.DB

func initDB() error {
// DSN 格式: username:password@protocol(address)/dbname?params
dsn := "root:your_password@tcp(127.0.0.1:3306)/mydb?charset=utf8mb4&parseTime=true&loc=Local"

var err error
db, err = sql.Open("mysql", dsn)
if err != nil {
return err
}

// 测试连接
if err = db.Ping(); err != nil {
return err
}

// 连接池配置
db.SetMaxOpenConns(25) // 最大打开连接数
db.SetMaxIdleConns(10) // 最大空闲连接数
db.SetConnMaxLifetime(5 * time.Minute) // 连接最大生命周期

fmt.Println("数据库连接成功")
return nil
}

func main() {
if err := initDB(); err != nil {
log.Fatal(err)
}
defer db.Close()

// 查询单条记录
user, err := getUserByID(1)
if err != nil {
log.Println("查询失败:", err)
} else {
fmt.Printf("用户: %+v\n", user)
}
}

// getUserByID 根据ID查询用户
func getUserByID(id int) (*User, error) {
query := "SELECT id, username, email, age, created_at FROM users WHERE id = ?"

row := db.QueryRow(query, id)

user := &User{}
err := row.Scan(&user.ID, &user.Username, &user.Email, &user.Age, &user.CreatedAt)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("用户不存在")
}
return nil, err
}

return user, nil
}

// getAllUsers 查询所有用户
func getAllUsers() ([]User, error) {
query := "SELECT id, username, email, age, created_at FROM users ORDER BY id"

rows, err := db.Query(query)
if err != nil {
return nil, err
}
defer rows.Close()

var users []User
for rows.Next() {
var user User
if err := rows.Scan(&user.ID, &user.Username, &user.Email, &user.Age, &user.CreatedAt); err != nil {
return nil, err
}
users = append(users, user)
}

return users, nil
}

// insertUser 插入用户
func insertUser(username, email string, age int) (int64, error) {
query := "INSERT INTO users (username, email, age, created_at) VALUES (?, ?, ?, NOW())"

result, err := db.Exec(query, username, email, age)
if err != nil {
return 0, err
}

return result.LastInsertId()
}

// updateUser 更新用户
func updateUser(id int, username string, age int) error {
query := "UPDATE users SET username = ?, age = ? WHERE id = ?"

result, err := db.Exec(query, username, age, id)
if err != nil {
return err
}

rows, _ := result.RowsAffected()
if rows == 0 {
return fmt.Errorf("用户不存在")
}

return nil
}

事务处理

// transferMoney 转账事务
func transferMoney(fromAccount, toAccount int, amount float64) error {
// 开始事务
tx, err := db.Begin()
if err != nil {
return err
}

// 确保事务会被回滚或提交
defer func() {
if p := recover(); p != nil {
tx.Rollback()
panic(p)
}
}()

// 1. 检查转出账户余额(加锁)
var balance float64
err = tx.QueryRow("SELECT balance FROM accounts WHERE id = ? FOR UPDATE", fromAccount).Scan(&balance)
if err != nil {
tx.Rollback()
return err
}

if balance < amount {
tx.Rollback()
return fmt.Errorf("余额不足")
}

// 2. 扣款
_, err = tx.Exec("UPDATE accounts SET balance = balance - ? WHERE id = ?", amount, fromAccount)
if err != nil {
tx.Rollback()
return err
}

// 3. 存款
_, err = tx.Exec("UPDATE accounts SET balance = balance + ? WHERE id = ?", amount, toAccount)
if err != nil {
tx.Rollback()
return err
}

// 4. 记录日志
_, err = tx.Exec(
"INSERT INTO transfer_logs (from_account, to_account, amount, created_at) VALUES (?, ?, ?, NOW())",
fromAccount, toAccount, amount,
)
if err != nil {
tx.Rollback()
return err
}

// 提交事务
return tx.Commit()
}

批量操作

import "strings"

// batchInsertUsers 批量插入用户
func batchInsertUsers(users []User) error {
if len(users) == 0 {
return nil
}

// 构建 SQL
valueStrings := make([]string, 0, len(users))
valueArgs := make([]interface{}, 0, len(users)*3)

for _, user := range users {
valueStrings = append(valueStrings, "(?, ?, ?, NOW())")
valueArgs = append(valueArgs, user.Username, user.Email, user.Age)
}

query := fmt.Sprintf(
"INSERT INTO users (username, email, age, created_at) VALUES %s",
strings.Join(valueStrings, ","),
)

_, err := db.Exec(query, valueArgs...)
return err
}

// 使用预处理语句批量插入(更高效)
func batchInsertWithStmt(users []User) error {
stmt, err := db.Prepare("INSERT INTO users (username, email, age, created_at) VALUES (?, ?, ?, NOW())")
if err != nil {
return err
}
defer stmt.Close()

for _, user := range users {
_, err := stmt.Exec(user.Username, user.Email, user.Age)
if err != nil {
return err
}
}

return nil
}

使用 GORM

GORM 是 Go 语言最流行的 ORM 库,简化了数据库操作。

// 安装: go get -u gorm.io/gorm gorm.io/driver/mysql

package main

import (
"fmt"
"log"
"time"

"gorm.io/driver/mysql"
"gorm.io/gorm"
)

// User 模型
type User struct {
ID uint `gorm:"primaryKey" json:"id"`
Username string `gorm:"uniqueIndex;size:50;not null" json:"username"`
Email string `gorm:"uniqueIndex;size:100;not null" json:"email"`
Age int `json:"age"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
DeletedAt gorm.DeletedAt `gorm:"index" json:"-"` // 软删除
}

func main() {
dsn := "root:your_password@tcp(127.0.0.1:3306)/mydb?charset=utf8mb4&parseTime=True&loc=Local"

db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
if err != nil {
log.Fatal(err)
}

// 自动迁移
db.AutoMigrate(&User{})

// 创建
user := User{Username: "张三", Email: "[email protected]", Age: 25}
result := db.Create(&user)
fmt.Printf("插入 ID: %d, 影响行数: %d\n", user.ID, result.RowsAffected)

// 查询
var firstUser User
db.First(&firstUser, 1) // 根据主键查询
db.First(&firstUser, "username = ?", "张三") // 条件查询

// 更新
db.Model(&firstUser).Update("Age", 26)
db.Model(&firstUser).Updates(User{Age: 27, Email: "[email protected]"})

// 删除
db.Delete(&firstUser)

// 查询所有
var users []User
db.Where("age > ?", 20).Find(&users)

// 分页
var paginatedUsers []User
db.Offset(0).Limit(10).Find(&paginatedUsers)

// 事务
db.Transaction(func(tx *gorm.DB) error {
if err := tx.Create(&User{Username: "用户1", Email: "[email protected]"}).Error; err != nil {
return err
}
if err := tx.Create(&User{Username: "用户2", Email: "[email protected]"}).Error; err != nil {
return err
}
return nil
})
}

Node.js 集成

Node.js 连接 MySQL 常用 mysql2 驱动,支持 Promise 和连接池。

安装

npm install mysql2

基本操作

const mysql = require('mysql2/promise');

// 创建连接池
const pool = mysql.createPool({
host: 'localhost',
user: 'root',
password: 'your_password',
database: 'mydb',
waitForConnections: true,
connectionLimit: 10,
queueLimit: 0,
charset: 'utf8mb4'
});

// 查询单条记录
async function getUserById(userId) {
const [rows] = await pool.execute(
'SELECT id, username, email FROM users WHERE id = ?',
[userId]
);
return rows[0];
}

// 查询多条记录
async function getAllUsers(limit = 100) {
const [rows] = await pool.execute(
'SELECT * FROM users ORDER BY id LIMIT ?',
[limit]
);
return rows;
}

// 插入记录
async function createUser(username, email, age) {
const [result] = await pool.execute(
'INSERT INTO users (username, email, age, created_at) VALUES (?, ?, ?, NOW())',
[username, email, age]
);
return result.insertId;
}

// 事务处理
async function transfer(fromId, toId, amount) {
const connection = await pool.getConnection();

try {
await connection.beginTransaction();

// 检查余额
const [accounts] = await connection.execute(
'SELECT balance FROM accounts WHERE id = ? FOR UPDATE',
[fromId]
);

if (accounts.length === 0 || accounts[0].balance < amount) {
throw new Error('余额不足');
}

// 执行转账
await connection.execute(
'UPDATE accounts SET balance = balance - ? WHERE id = ?',
[amount, fromId]
);

await connection.execute(
'UPDATE accounts SET balance = balance + ? WHERE id = ?',
[amount, toId]
);

await connection.commit();
return true;

} catch (error) {
await connection.rollback();
throw error;
} finally {
connection.release();
}
}

// Express 应用示例
const express = require('express');
const app = express();

app.get('/users/:id', async (req, res) => {
try {
const user = await getUserById(req.params.id);
if (!user) {
return res.status(404).json({ error: '用户不存在' });
}
res.json(user);
} catch (error) {
res.status(500).json({ error: error.message });
}
});

app.listen(3000, () => console.log('Server running on port 3000'));

最佳实践总结

连接管理

实践说明
使用连接池避免频繁创建和销毁连接
合理设置池大小根据并发量调整,通常为 CPU 核心数的 2-3 倍
及时释放连接使用 try-finally 或上下文管理器确保连接释放
设置连接超时防止连接泄漏和长时间等待

SQL 安全

# ❌ 危险:SQL 注入风险
sql = f"SELECT * FROM users WHERE username = '{username}'"

# ✅ 安全:使用参数化查询
sql = "SELECT * FROM users WHERE username = %s"
cursor.execute(sql, (username,))

性能优化

  1. 批量操作:使用 executemany 或批量插入语句
  2. 索引优化:确保查询字段有合适的索引
  3. 预处理语句:复用 SQL 解析结果
  4. 分页查询:避免一次性获取大量数据
  5. 只查需要的列:避免 SELECT *

错误处理

import mysql.connector
from mysql.connector import Error, DatabaseError, InterfaceError

try:
conn = mysql.connector.connect(...)
# 执行操作
except InterfaceError as e:
# 网络连接错误
print(f"连接失败: {e}")
except DatabaseError as e:
# 数据库错误
print(f"数据库错误: {e}")
except Error as e:
# 其他 MySQL 错误
print(f"MySQL 错误: {e}")
finally:
if conn and conn.is_connected():
conn.close()

小结

本章我们学习了:

  1. Python 集成:mysql-connector-python、PyMySQL、连接池、事务、SQLAlchemy ORM
  2. Java 集成:JDBC、HikariCP 连接池、事务管理、批量操作
  3. Go 集成:database/sql、连接池、事务、GORM ORM
  4. Node.js 集成:mysql2、Promise API、连接池、事务
  5. 最佳实践:连接管理、SQL 安全、性能优化、错误处理

练习

  1. 使用你熟悉的语言实现一个用户管理模块(CRUD)
  2. 配置连接池并测试高并发场景下的性能
  3. 实现一个转账功能,确保事务正确处理
  4. 使用 ORM 重写一个简单的查询功能
  5. 实现批量导入 CSV 文件到数据库

参考资料