应用开发
本章介绍如何使用各种编程语言的官方驱动连接 Neo4j,并在实际应用中进行数据操作。
驱动概述
Neo4j 提供多种官方驱动:
| 语言 | 驱动名称 | 安装命令 |
|---|---|---|
| Python | neo4j | pip install neo4j |
| Java | neo4j-java-driver | Maven/Gradle 依赖 |
| JavaScript | neo4j-driver | npm install neo4j-driver |
| Go | neo4j-go-driver | go get github.com/neo4j/neo4j-go-driver/v5 |
| .NET | Neo4j.Driver | NuGet 包 |
Python 开发
安装驱动
pip install neo4j
基本连接
from neo4j import GraphDatabase
class Neo4jConnection:
def __init__(self, uri, user, password):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
def close(self):
self.driver.close()
def test_connection(self):
with self.driver.session() as session:
result = session.run("RETURN 'Connection successful!' AS message")
return result.single()["message"]
# 使用示例
conn = Neo4jConnection("bolt://localhost:7687", "neo4j", "password")
print(conn.test_connection())
conn.close()
CRUD 操作
class SocialNetworkDAO:
def __init__(self, driver):
self.driver = driver
# 创建用户
def create_user(self, user_id, name, email):
with self.driver.session() as session:
result = session.run("""
CREATE (u:User {userId: $user_id, name: $name, email: $email})
RETURN u
""", user_id=user_id, name=name, email=email)
return result.single()["u"]
# 查询用户
def get_user(self, user_id):
with self.driver.session() as session:
result = session.run("""
MATCH (u:User {userId: $user_id})
RETURN u
""", user_id=user_id)
record = result.single()
return record["u"] if record else None
# 创建关注关系
def follow_user(self, from_user_id, to_user_id):
with self.driver.session() as session:
result = session.run("""
MATCH (from:User {userId: $from_id})
MATCH (to:User {userId: $to_id})
CREATE (from)-[r:FOLLOWS {createdAt: datetime()}]->(to)
RETURN r
""", from_id=from_user_id, to_id=to_user_id)
return result.single()["r"]
# 获取用户关注列表
def get_following(self, user_id):
with self.driver.session() as session:
result = session.run("""
MATCH (u:User {userId: $user_id})-[:FOLLOWS]->(followed)
RETURN followed.userId AS userId, followed.name AS name
""", user_id=user_id)
return [record.data() for record in result]
# 推荐好友(共同好友算法)
def recommend_friends(self, user_id, limit=5):
with self.driver.session() as session:
result = session.run("""
MATCH (u:User {userId: $user_id})-[:FRIEND]-(friend)-[:FRIEND]-(potential)
WHERE NOT (u)-[:FRIEND]-(potential) AND u <> potential
WITH potential, count(friend) AS commonFriends
ORDER BY commonFriends DESC
RETURN potential.name AS name, commonFriends
LIMIT $limit
""", user_id=user_id, limit=limit)
return [record.data() for record in result]
# 使用示例
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
dao = SocialNetworkDAO(driver)
# 创建用户
user1 = dao.create_user("u001", "张三", "[email protected]")
user2 = dao.create_user("u002", "李四", "[email protected]")
# 建立关系
dao.follow_user("u001", "u002")
# 查询
following = dao.get_following("u001")
print(following)
driver.close()
事务管理
from neo4j import Transaction
class TransactionExample:
def __init__(self, driver):
self.driver = driver
# 自动提交事务
def auto_commit(self):
with self.driver.session() as session:
session.run("CREATE (p:Person {name: '张三'})")
# 读写事务
def create_friendship(self, user1_id, user2_id):
with self.driver.session() as session:
# 写事务
def create_tx(tx, id1, id2):
result = tx.run("""
MATCH (u1:User {userId: $id1})
MATCH (u2:User {userId: $id2})
CREATE (u1)-[:FRIEND {createdAt: datetime()}]->(u2)
RETURN u1, u2
""", id1=id1, id2=id2)
return result.single()
return session.execute_write(create_tx, user1_id, user2_id)
# 只读事务
def get_friends(self, user_id):
with self.driver.session() as session:
def read_tx(tx, uid):
result = tx.run("""
MATCH (u:User {userId: $uid})-[:FRIEND]-(friend)
RETURN friend.name AS name
""", uid=uid)
return [record["name"] for record in result]
return session.execute_read(read_tx, user_id)
Java 开发
Maven 依赖
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
<version>5.15.0</version>
</dependency>
基本使用
import org.neo4j.driver.*;
import static org.neo4j.driver.Values.parameters;
public class Neo4jExample implements AutoCloseable {
private final Driver driver;
public Neo4jExample(String uri, String user, String password) {
driver = GraphDatabase.driver(uri, AuthTokens.basic(user, password));
}
@Override
public void close() {
driver.close();
}
// 创建用户
public void createUser(String userId, String name, String email) {
try (Session session = driver.session()) {
session.run("CREATE (u:User {userId: $userId, name: $name, email: $email})",
parameters("userId", userId, "name", name, "email", email));
}
}
// 查询用户
public User getUser(String userId) {
try (Session session = driver.session()) {
Result result = session.run(
"MATCH (u:User {userId: $userId}) RETURN u",
parameters("userId", userId)
);
if (result.hasNext()) {
Record record = result.next();
Node node = record.get("u").asNode();
return new User(
node.get("userId").asString(),
node.get("name").asString(),
node.get("email").asString()
);
}
return null;
}
}
// 事务示例
public void createFriendship(String user1Id, String user2Id) {
try (Session session = driver.session()) {
session.executeWrite(tx -> {
tx.run("""
MATCH (u1:User {userId: $id1})
MATCH (u2:User {userId: $id2})
CREATE (u1)-[:FRIEND {createdAt: datetime()}]->(u2)
""",
parameters("id1", user1Id, "id2", user2Id)
);
return null;
});
}
}
}
// 用户实体类
class User {
private String userId;
private String name;
private String email;
public User(String userId, String name, String email) {
this.userId = userId;
this.name = name;
this.email = email;
}
// Getters and setters
public String getUserId() { return userId; }
public String getName() { return name; }
public String getEmail() { return email; }
}
JavaScript/Node.js 开发
安装驱动
npm install neo4j-driver
基本使用
const neo4j = require('neo4j-driver');
class Neo4jService {
constructor(uri, user, password) {
this.driver = neo4j.driver(uri, neo4j.auth.basic(user, password));
}
async close() {
await this.driver.close();
}
// 创建用户
async createUser(userId, name, email) {
const session = this.driver.session();
try {
const result = await session.run(
`CREATE (u:User {userId: $userId, name: $name, email: $email})
RETURN u`,
{ userId, name, email }
);
return result.records[0].get('u').properties;
} finally {
await session.close();
}
}
// 查询用户
async getUser(userId) {
const session = this.driver.session();
try {
const result = await session.run(
`MATCH (u:User {userId: $userId}) RETURN u`,
{ userId }
);
if (result.records.length > 0) {
return result.records[0].get('u').properties;
}
return null;
} finally {
await session.close();
}
}
// 获取推荐好友
async recommendFriends(userId, limit = 5) {
const session = this.driver.session();
try {
const result = await session.run(
`MATCH (u:User {userId: $userId})-[:FRIEND]-(friend)-[:FRIEND]-(potential)
WHERE NOT (u)-[:FRIEND]-(potential) AND u <> potential
WITH potential, count(friend) AS commonFriends
ORDER BY commonFriends DESC
RETURN potential.name AS name, commonFriends
LIMIT $limit`,
{ userId, limit: parseInt(limit) }
);
return result.records.map(record => ({
name: record.get('name'),
commonFriends: record.get('commonFriends').toNumber()
}));
} finally {
await session.close();
}
}
}
// 使用示例
async function main() {
const service = new Neo4jService('bolt://localhost:7687', 'neo4j', 'password');
try {
const user = await service.createUser('u001', '张三', '[email protected]');
console.log('Created user:', user);
const recommendations = await service.recommendFriends('u001', 5);
console.log('Recommendations:', recommendations);
} finally {
await service.close();
}
}
main().catch(console.error);
Go 开发
Go 语言驱动是 Neo4j 官方支持的新驱动,提供了高性能的异步操作和简洁的 API。
安装驱动
go get github.com/neo4j/neo4j-go-driver/v5
基本连接
package main
import (
"fmt"
"log"
"github.com/neo4j/neo4j-go-driver/v5/neo4j"
)
func main() {
// 创建驱动实例
driver, err := neo4j.NewDriverWithContext(
"bolt://localhost:7687",
neo4j.BasicAuth("neo4j", "password", ""),
)
if err != nil {
log.Fatalf("无法创建驱动: %v", err)
}
// 确保驱动关闭
defer driver.Close()
// 测试连接
err = driver.VerifyConnectivity()
if err != nil {
log.Fatalf("连接失败: %v", err)
}
fmt.Println("连接成功!")
}
CRUD 操作
package main
import (
"context"
"fmt"
"log"
"github.com/neo4j/neo4j-go-driver/v5/neo4j"
)
type UserService struct {
driver neo4j.DriverWithContext
}
// 创建用户
func (s *UserService) CreateUser(ctx context.Context, userID, name, email string) (*User, error) {
session := s.driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
defer session.Close(ctx)
result, err := session.ExecuteWrite(ctx, func(tx neo4j.ManagedTransaction) (interface{}, error) {
query := `
CREATE (u:User {userId: $userId, name: $name, email: $email})
RETURN u
`
params := map[string]interface{}{
"userId": userID,
"name": name,
"email": email,
}
record, err := tx.Run(ctx, query, params)
if err != nil {
return nil, err
}
if record.Next(ctx) {
node := record.Record().Values[0].(neo4j.Node)
return &User{
UserID: node.Props["userId"].(string),
Name: node.Props["name"].(string),
Email: node.Props["email"].(string),
}, nil
}
return nil, record.Err()
})
if err != nil {
return nil, err
}
return result.(*User), nil
}
// 查询用户
func (s *UserService) GetUser(ctx context.Context, userID string) (*User, error) {
session := s.driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
defer session.Close(ctx)
result, err := session.ExecuteRead(ctx, func(tx neo4j.ManagedTransaction) (interface{}, error) {
query := `MATCH (u:User {userId: $userId}) RETURN u`
params := map[string]interface{}{"userId": userID}
record, err := tx.Run(ctx, query, params)
if err != nil {
return nil, err
}
if record.Next(ctx) {
node := record.Record().Values[0].(neo4j.Node)
return &User{
UserID: node.Props["userId"].(string),
Name: node.Props["name"].(string),
Email: node.Props["email"].(string),
}, nil
}
return nil, nil // 用户不存在
})
if err != nil {
return nil, err
}
if result == nil {
return nil, nil
}
return result.(*User), nil
}
// 获取好友推荐
func (s *UserService) RecommendFriends(ctx context.Context, userID string, limit int) ([]User, error) {
session := s.driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeRead})
defer session.Close(ctx)
result, err := session.ExecuteRead(ctx, func(tx neo4j.ManagedTransaction) (interface{}, error) {
query := `
MATCH (u:User {userId: $userId})-[:FRIEND]-(friend)-[:FRIEND]-(potential)
WHERE NOT (u)-[:FRIEND]-(potential) AND u <> potential
WITH potential, count(friend) AS commonFriends
ORDER BY commonFriends DESC
LIMIT $limit
RETURN potential
`
params := map[string]interface{}{
"userId": userID,
"limit": limit,
}
record, err := tx.Run(ctx, query, params)
if err != nil {
return nil, err
}
var users []User
for record.Next(ctx) {
node := record.Record().Values[0].(neo4j.Node)
users = append(users, User{
UserID: node.Props["userId"].(string),
Name: node.Props["name"].(string),
})
}
return users, record.Err()
})
if err != nil {
return nil, err
}
return result.([]User), nil
}
type User struct {
UserID string
Name string
Email string
}
func main() {
driver, err := neo4j.NewDriverWithContext(
"bolt://localhost:7687",
neo4j.BasicAuth("neo4j", "password", ""),
)
if err != nil {
log.Fatal(err)
}
defer driver.Close()
ctx := context.Background()
service := &UserService{driver: driver}
// 创建用户
user, err := service.CreateUser(ctx, "u001", "张三", "[email protected]")
if err != nil {
log.Fatal(err)
}
fmt.Printf("创建用户: %+v\n", user)
// 查询用户
found, err := service.GetUser(ctx, "u001")
if err != nil {
log.Fatal(err)
}
fmt.Printf("查询用户: %+v\n", found)
}
批量操作
// 批量创建用户
func (s *UserService) BatchCreateUsers(ctx context.Context, users []User) error {
session := s.driver.NewSession(ctx, neo4j.SessionConfig{AccessMode: neo4j.AccessModeWrite})
defer session.Close(ctx)
_, err := session.ExecuteWrite(ctx, func(tx neo4j.ManagedTransaction) (interface{}, error) {
query := `
UNWIND $users AS user
CREATE (u:User {userId: user.userId, name: user.name, email: user.email})
`
// 转换为参数格式
params := make([]map[string]interface{}, len(users))
for i, u := range users {
params[i] = map[string]interface{}{
"userId": u.UserID,
"name": u.Name,
"email": u.Email,
}
}
_, err := tx.Run(ctx, query, map[string]interface{}{"users": params})
return nil, err
})
return err
}
.NET 开发
安装驱动
dotnet add package Neo4j.Driver
基本使用
using Neo4j.Driver;
public class Neo4jService : IDisposable
{
private readonly IDriver _driver;
public Neo4jService(string uri, string user, string password)
{
_driver = GraphDatabase.Driver(uri, AuthTokens.Basic(user, password));
}
public async Task CreateUserAsync(string userId, string name, string email)
{
await using var session = _driver.AsyncSession();
await session.ExecuteWriteAsync(async tx =>
{
var query = @"
CREATE (u:User {userId: $userId, name: $name, email: $email})
RETURN u";
var parameters = new
{
userId,
name,
email
};
await tx.RunAsync(query, parameters);
});
}
public async Task<User?> GetUserAsync(string userId)
{
await using var session = _driver.AsyncSession();
return await session.ExecuteReadAsync(async tx =>
{
var query = "MATCH (u:User {userId: $userId}) RETURN u";
var result = await tx.RunAsync(query, new { userId });
if (await result.FetchAsync())
{
var node = result.Current["u"].As<INode>();
return new User
{
UserId = node["userId"].As<string>(),
Name = node["name"].As<string>(),
Email = node["email"].As<string>()
};
}
return null;
});
}
public void Dispose()
{
_driver?.Dispose();
}
}
public class User
{
public string UserId { get; set; }
public string Name { get; set; }
public string? Email { get; set; }
}
Python 连接池
from neo4j import GraphDatabase
# 配置连接池
driver = GraphDatabase.driver(
"bolt://localhost:7687",
auth=("neo4j", "password"),
max_connection_pool_size=50,
connection_acquisition_timeout=60,
connection_timeout=30,
max_transaction_retry_time=30.0
)
Java 连接池
import org.neo4j.driver.*;
Config config = Config.builder()
.withMaxConnectionPoolSize(50)
.withConnectionTimeout(30, TimeUnit.SECONDS)
.withMaxTransactionRetryTime(30, TimeUnit.SECONDS)
.build();
Driver driver = GraphDatabase.driver(uri, AuthTokens.basic(user, password), config);
连接池配置
在生产环境中,合理的连接池配置对应用的性能和稳定性至关重要。Neo4j 驱动使用连接池来管理数据库连接,复用连接可以减少建立连接的开销。
连接池工作原理
┌─────────────────────────────────────────────────────────────┐
│ 应用程序 │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Neo4j 驱动 │
│ ┌─────────────────────────────────────────────────────┐ │
│ │ 连接池 │ │
│ │ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ ┌─────┐ │ │
│ │ │连接1│ │连接2│ │连接3│ │连接4│ │连接5│ ... │ │
│ │ └─────┘ └─────┘ └─────┘ └─────┘ └─────┘ │ │
│ └─────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Neo4j 数据库 │
└─────────────────────────────────────────────────────────────┘
当应用请求连接时:
- 如果池中有空闲连接,直接返回
- 如果池中没有空闲连接但未达到上限,创建新连接
- 如果池已满,请求会等待直到有连接可用或超时
Python 连接池配置
from neo4j import GraphDatabase
# 完整的连接池配置
driver = GraphDatabase.driver(
"bolt://localhost:7687",
auth=("neo4j", "password"),
# 连接池大小配置
max_connection_pool_size=50, # 最大连接数
connection_acquisition_timeout=60, # 获取连接超时(秒)
# 连接生命周期配置
max_connection_lifetime=3600, # 连接最大存活时间(秒)
connection_timeout=30, # 连接建立超时(秒)
# 重试配置
max_transaction_retry_time=30.0, # 事务重试最大时间(秒)
# 安全配置
encrypted=False, # 是否加密连接
trust="TRUST_SYSTEM_CA_SIGNED_CERTIFICATES", # 证书信任策略
)
# 推荐的生产环境配置
def create_production_driver(uri, user, password):
return GraphDatabase.driver(
uri,
auth=(user, password),
max_connection_pool_size=100, # 高并发场景
connection_acquisition_timeout=120, # 稍长的超时
max_connection_lifetime=7200, # 2小时
connection_timeout=30,
max_transaction_retry_time=60.0,
encrypted=True, # 生产环境启用加密
)
# 推荐的开发环境配置
def create_development_driver(uri, user, password):
return GraphDatabase.driver(
uri,
auth=(user, password),
max_connection_pool_size=10, # 开发环境较小
connection_acquisition_timeout=30,
max_connection_lifetime=3600,
connection_timeout=10,
encrypted=False,
)
Java 连接池配置
import org.neo4j.driver.*;
import java.util.concurrent.TimeUnit;
public class Neo4jDriverFactory {
public static Driver createProductionDriver(String uri, String user, String password) {
Config config = Config.builder()
// 连接池配置
.withMaxConnectionPoolSize(100)
.withConnectionAcquisitionTimeout(2, TimeUnit.MINUTES)
// 连接生命周期
.withMaxConnectionLifetime(2, TimeUnit.HOURS)
.withConnectionTimeout(30, TimeUnit.SECONDS)
// 重试配置
.withMaxTransactionRetryTime(1, TimeUnit.MINUTES)
// 安全配置
.withEncryption()
.withTrustStrategy(Config.TrustStrategy.trustSystemCertificates())
// 日志配置
.withLogging(Logging.slf4j())
.build();
return GraphDatabase.driver(uri, AuthTokens.basic(user, password), config);
}
// 连接池监控
public static void printPoolMetrics(Driver driver) {
// 通过 Metrics API 获取连接池状态
// 注意:需要额外配置启用 metrics
System.out.println("连接池配置已生效");
}
}
JavaScript/Node.js 连接池配置
const neo4j = require('neo4j-driver');
// 生产环境配置
const driver = neo4j.driver(
'bolt://localhost:7687',
neo4j.auth.basic('neo4j', 'password'),
{
// 连接池配置
maxConnectionPoolSize: 100,
connectionAcquisitionTimeout: 120000, // 毫秒
// 连接生命周期
maxConnectionLifetime: 7200000, // 2小时(毫秒)
connectionTimeout: 30000,
// 重试配置
maxTransactionRetryTime: 60000,
// 安全配置
encrypted: 'ENCRYPTION_ON',
trust: 'TRUST_SYSTEM_CA_SIGNED_CERTIFICATES',
// 日志级别
logging: {
level: 'info',
logger: (level, message) => console.log(`[${level}] ${message}`)
}
}
);
// 连接池状态检查
async function checkPoolHealth(driver) {
const session = driver.session();
try {
await session.run('RETURN 1');
console.log('连接池健康检查通过');
return true;
} catch (error) {
console.error('连接池健康检查失败:', error.message);
return false;
} finally {
await session.close();
}
}
配置参数详解
| 参数 | 说明 | 默认值 | 推荐值 |
|---|---|---|---|
max_connection_pool_size | 最大连接数 | 100 | 根据并发量调整,一般 50-200 |
connection_acquisition_timeout | 获取连接超时 | 60s | 高负载时可增加 |
max_connection_lifetime | 连接最大存活时间 | 1h | 建议 1-4h |
connection_timeout | 建立连接超时 | 30s | 网络差时可增加 |
max_transaction_retry_time | 事务重试时间 | 30s | 死锁多时可增加 |
连接池大小计算
连接池大小的计算需要考虑以下因素:
# 连接池大小估算公式
# 连接数 = 并发请求数 × 平均查询时间(秒) + 安全裕度
# 示例计算
concurrent_requests = 100 # 并发请求数
avg_query_time = 0.1 # 平均查询时间(秒)
safety_margin = 1.2 # 安全裕度系数
pool_size = int(concurrent_requests * avg_query_time * safety_margin)
# 结果: pool_size = 12
# 实际建议
# - CPU 密集型应用: 连接数 ≈ CPU 核心数 × 2
# - I/O 密集型应用: 连接数可以更高
# - 建议从小开始,逐步调整
连接池监控
监控连接池状态可以帮助发现性能问题:
import time
import logging
from neo4j import GraphDatabase
class MonitoredDriver:
def __init__(self, uri, user, password, **config):
self.driver = GraphDatabase.driver(uri, auth=(user, password), **config)
self.active_sessions = 0
self.total_queries = 0
self.failed_queries = 0
def get_session(self):
"""获取带监控的会话"""
self.active_sessions += 1
session = self.driver.session()
# 包装关闭方法
original_close = session.close
def monitored_close():
self.active_sessions -= 1
original_close()
session.close = monitored_close
return session
def execute_query(self, query, **params):
"""执行带监控的查询"""
self.total_queries += 1
start_time = time.time()
try:
with self.get_session() as session:
result = session.run(query, **params)
return list(result)
except Exception as e:
self.failed_queries += 1
logging.error(f"查询失败: {e}")
raise
finally:
elapsed = time.time() - start_time
logging.info(f"查询耗时: {elapsed:.3f}s")
def get_stats(self):
"""获取统计信息"""
return {
'active_sessions': self.active_sessions,
'total_queries': self.total_queries,
'failed_queries': self.failed_queries,
'success_rate': (self.total_queries - self.failed_queries) / max(self.total_queries, 1)
}
def close(self):
self.driver.close()
# 使用示例
driver = MonitoredDriver(
"bolt://localhost:7687",
"neo4j",
"password",
max_connection_pool_size=50
)
# 定期打印统计
result = driver.execute_query("MATCH (n:Person) RETURN n LIMIT 10")
print(driver.get_stats())
常见问题与解决方案
问题 1:连接耗尽
ServiceUnavailable: Cannot acquire connection from connection pool
原因分析:
- 连接池太小,无法处理当前并发量
- 存在连接泄漏(未正确关闭会话)
- 查询执行时间过长
解决方案:
# 1. 增加连接池大小
driver = GraphDatabase.driver(
uri,
auth=(user, password),
max_connection_pool_size=200 # 增大
)
# 2. 确保正确关闭会话
# 好的做法:使用上下文管理器
with driver.session() as session:
result = session.run("MATCH (n) RETURN n")
# 会话自动关闭
# 不好的做法:忘记关闭
session = driver.session()
result = session.run("MATCH (n) RETURN n")
# 忘记 session.close() 会导致连接泄漏
# 3. 设置查询超时
with driver.session() as session:
result = session.run("MATCH (n) RETURN n")
# 配置 Neo4j 服务端超时
问题 2:连接超时
ServiceUnavailable: Connection to the database failed
解决方案:
# 调整超时配置
driver = GraphDatabase.driver(
uri,
auth=(user, password),
connection_timeout=60, # 增加连接超时
connection_acquisition_timeout=120 # 增加获取连接超时
)
# 添加连接重试逻辑
def connect_with_retry(uri, user, password, max_retries=3):
for attempt in range(max_retries):
try:
driver = GraphDatabase.driver(uri, auth=(user, password))
driver.verify_connectivity()
return driver
except Exception as e:
if attempt == max_retries - 1:
raise
time.sleep(2 ** attempt) # 指数退避
问题 3:连接不稳定
在高负载或网络不稳定环境下,连接可能会中断。
解决方案:
# 1. 启用连接保活
driver = GraphDatabase.driver(
uri,
auth=(user, password),
max_connection_lifetime=1800, # 30分钟后重建连接
)
# 2. 实现健康检查
def health_check(driver):
"""定期检查连接健康状态"""
try:
with driver.session() as session:
session.run("RETURN 1").consume()
return True
except:
return False
# 3. 实现断线重连
class ResilientDriver:
def __init__(self, uri, user, password, **config):
self.uri = uri
self.user = user
self.password = password
self.config = config
self._connect()
def _connect(self):
self.driver = GraphDatabase.driver(
self.uri,
auth=(self.user, self.password),
**self.config
)
def execute(self, query, **params):
try:
with self.driver.session() as session:
return list(session.run(query, **params))
except Exception as e:
# 尝试重连
logging.warning(f"执行失败,尝试重连: {e}")
self._connect()
with self.driver.session() as session:
return list(session.run(query, **params))
错误处理
数据库操作中合理的错误处理对于构建健壮的应用至关重要。不同类型的错误需要不同的处理策略。
错误类型分类
Neo4j 驱动可能抛出以下几类错误:
| 错误类型 | 原因 | 处理策略 |
|---|---|---|
ServiceUnavailable | 服务器不可达或已关闭 | 重试连接、检查网络 |
SessionExpired | 会话已过期 | 创建新会话重试 |
AuthError | 认证失败 | 检查凭据、提示用户 |
ClientError | 客户端请求错误 | 检查查询语法、参数 |
TransientError | 临时性错误 | 自动重试 |
ConstraintViolation | 违反约束 | 检查数据、给用户反馈 |
Python 错误处理
from neo4j import GraphDatabase, exceptions
import time
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class RobustNeo4jClient:
def __init__(self, uri, user, password, max_retries=3):
self.uri = uri
self.user = user
self.password = password
self.max_retries = max_retries
self.driver = None
self._connect()
def _connect(self):
"""建立数据库连接"""
try:
self.driver = GraphDatabase.driver(
self.uri,
auth=(self.user, self.password),
max_connection_lifetime=3600,
max_connection_pool_size=50
)
logger.info("数据库连接成功")
except exceptions.AuthError as e:
logger.error(f"认证失败: {e}")
raise
except exceptions.ServiceUnavailable as e:
logger.error(f"服务不可用: {e}")
raise
def _execute_with_retry(self, work_func, *args, **kwargs):
"""带重试机制的执行函数"""
last_exception = None
for attempt in range(self.max_retries):
try:
with self.driver.session() as session:
return work_func(session, *args, **kwargs)
except exceptions.TransientError as e:
logger.warning(f"临时错误 (尝试 {attempt + 1}/{self.max_retries}): {e}")
last_exception = e
time.sleep(2 ** attempt) # 指数退避
except exceptions.SessionExpired as e:
logger.warning(f"会话过期,重新连接: {e}")
self._connect()
last_exception = e
except exceptions.ServiceUnavailable as e:
logger.warning(f"服务不可用,尝试重连: {e}")
self._connect()
last_exception = e
time.sleep(2 ** attempt)
raise last_exception or Exception("重试次数耗尽")
def create_user(self, user_id, name, email):
"""创建用户,带完整错误处理"""
def _work(session, uid, n, e):
try:
result = session.run(
"CREATE (u:User {userId: $userId, name: $name, email: $email}) RETURN u",
userId=uid, name=n, email=e
)
return result.single()["u"]
except exceptions.ClientError as e:
if "already exists" in str(e):
logger.warning(f"用户 {uid} 已存在")
raise ValueError(f"用户 {uid} 已存在")
raise
return self._execute_with_retry(_work, user_id, name, email)
def get_user(self, user_id):
"""获取用户"""
def _work(session, uid):
result = session.run(
"MATCH (u:User {userId: $userId}) RETURN u",
userId=uid
)
record = result.single()
return record["u"] if record else None
return self._execute_with_retry(_work, user_id)
def close(self):
if self.driver:
self.driver.close()
# 使用示例
try:
client = RobustNeo4jClient("bolt://localhost:7687", "neo4j", "password")
user = client.create_user("u001", "张三", "[email protected]")
print(f"创建成功: {user}")
except exceptions.AuthError:
print("请检查用户名和密码")
except exceptions.ServiceUnavailable:
print("数据库服务不可用,请检查 Neo4j 是否运行")
except ValueError as e:
print(f"业务错误: {e}")
except Exception as e:
print(f"未知错误: {e}")
finally:
client.close()
JavaScript 错误处理
const neo4j = require('neo4j-driver');
class RobustNeo4jClient {
constructor(uri, user, password, maxRetries = 3) {
this.driver = neo4j.driver(uri, neo4j.auth.basic(user, password));
this.maxRetries = maxRetries;
}
async executeWithRetry(workFunc, ...args) {
let lastError;
for (let attempt = 0; attempt < this.maxRetries; attempt++) {
const session = this.driver.session();
try {
return await workFunc(session, ...args);
} catch (error) {
lastError = error;
// 处理不同类型的错误
if (error.code === 'Neo.ClientError.Schema.ConstraintViolation') {
throw new Error(`数据约束违反: ${error.message}`);
}
if (error.code === 'ServiceUnavailable' ||
error.code === 'SessionExpired') {
console.warn(`连接错误 (尝试 ${attempt + 1}/${this.maxRetries})`);
await new Promise(r => setTimeout(r, 1000 * Math.pow(2, attempt)));
continue;
}
// 其他错误直接抛出
throw error;
} finally {
await session.close();
}
}
throw lastError;
}
async createUser(userId, name, email) {
return this.executeWithRetry(async (session, uid, n, e) => {
const result = await session.run(
`CREATE (u:User {userId: $userId, name: $name, email: $email})
RETURN u`,
{ userId: uid, name: n, email: e }
);
return result.records[0].get('u').properties;
}, userId, name, email);
}
async getUser(userId) {
return this.executeWithRetry(async (session, uid) => {
const result = await session.run(
`MATCH (u:User {userId: $userId}) RETURN u`,
{ userId: uid }
);
if (result.records.length > 0) {
return result.records[0].get('u').properties;
}
return null;
}, userId);
}
async close() {
await this.driver.close();
}
}
// 错误处理中间件示例
async function handleDatabaseOperation(operation) {
try {
return await operation();
} catch (error) {
// 认证错误
if (error.code === 'Neo.ClientError.Security.Unauthorized') {
console.error('认证失败,请检查凭据');
throw new Error('AUTH_FAILED');
}
// 约束违反
if (error.code === 'Neo.ClientError.Schema.ConstraintViolation') {
console.error('数据约束违反:', error.message);
throw new Error('CONSTRAINT_VIOLATION');
}
// 语法错误
if (error.code === 'Neo.ClientError.Statement.SyntaxError') {
console.error('查询语法错误:', error.message);
throw new Error('SYNTAX_ERROR');
}
// 连接错误
if (error.code === 'ServiceUnavailable') {
console.error('数据库服务不可用');
throw new Error('SERVICE_UNAVAILABLE');
}
// 未知错误
console.error('数据库操作失败:', error);
throw new Error('DATABASE_ERROR');
}
}
常见错误代码参考
| 错误代码 | 描述 | 解决方案 |
|---|---|---|
Neo.ClientError.Schema.ConstraintViolation | 违反唯一性约束 | 使用 MERGE 或检查数据是否已存在 |
Neo.ClientError.Statement.SyntaxError | Cypher 语法错误 | 检查查询语句语法 |
Neo.ClientError.Statement.ParameterMissing | 缺少参数 | 检查参数是否正确传递 |
Neo.ClientError.Label.NotFound | 标签不存在 | 检查标签名拼写 |
Neo.ClientError.Security.Unauthorized | 认证失败 | 检查用户名密码 |
Neo.TransientError.Transaction.DeadlockDetected | 死锁检测 | 重试事务 |
Neo.TransientError.Transaction.LockClientTimeout | 锁超时 | 减少事务时间或重试 |
最佳实践
1. 使用参数化查询
# 好的做法:使用参数
session.run("MATCH (u:User {userId: $userId}) RETURN u", userId=user_id)
# 避免:字符串拼接
session.run(f"MATCH (u:User {{userId: '{user_id}'}}) RETURN u") # 有注入风险
2. 及时关闭资源
# 好的做法:使用上下文管理器
with driver.session() as session:
result = session.run("MATCH (n) RETURN n LIMIT 10")
# 或者手动关闭
session = driver.session()
try:
result = session.run("MATCH (n) RETURN n LIMIT 10")
finally:
session.close()
3. 使用读写事务
# 写操作使用 execute_write
session.execute_write(create_user_tx, user_data)
# 读操作使用 execute_read
session.execute_read(get_user_tx, user_id)
4. 批量操作
# 使用 UNWIND 进行批量插入
def batch_create_users(tx, users):
tx.run("""
UNWIND $users AS user
CREATE (u:User {userId: user.id, name: user.name, email: user.email})
""", users=users)
# 分批处理
batch_size = 1000
for i in range(0, len(all_users), batch_size):
batch = all_users[i:i+batch_size]
session.execute_write(batch_create_users, batch)
下一步
掌握了应用开发后,可以查看 知识速查表 快速回顾所有重要概念和语法。