应用开发
本章介绍如何使用各种编程语言的官方驱动连接 Neo4j,并在实际应用中进行数据操作。
驱动概述
Neo4j 提供多种官方驱动:
| 语言 | 驱动名称 | 安装命令 |
|---|---|---|
| Python | neo4j | pip install neo4j |
| Java | neo4j-java-driver | Maven/Gradle 依赖 |
| JavaScript | neo4j-driver | npm install neo4j-driver |
| Go | neo4j-go-driver | go get github.com/neo4j/neo4j-go-driver/v5 |
| .NET | Neo4j.Driver | NuGet 包 |
Python 开发
安装驱动
pip install neo4j
基本连接
from neo4j import GraphDatabase
class Neo4jConnection:
def __init__(self, uri, user, password):
self.driver = GraphDatabase.driver(uri, auth=(user, password))
def close(self):
self.driver.close()
def test_connection(self):
with self.driver.session() as session:
result = session.run("RETURN 'Connection successful!' AS message")
return result.single()["message"]
# 使用示例
conn = Neo4jConnection("bolt://localhost:7687", "neo4j", "password")
print(conn.test_connection())
conn.close()
CRUD 操作
class SocialNetworkDAO:
def __init__(self, driver):
self.driver = driver
# 创建用户
def create_user(self, user_id, name, email):
with self.driver.session() as session:
result = session.run("""
CREATE (u:User {userId: $user_id, name: $name, email: $email})
RETURN u
""", user_id=user_id, name=name, email=email)
return result.single()["u"]
# 查询用户
def get_user(self, user_id):
with self.driver.session() as session:
result = session.run("""
MATCH (u:User {userId: $user_id})
RETURN u
""", user_id=user_id)
record = result.single()
return record["u"] if record else None
# 创建关注关系
def follow_user(self, from_user_id, to_user_id):
with self.driver.session() as session:
result = session.run("""
MATCH (from:User {userId: $from_id})
MATCH (to:User {userId: $to_id})
CREATE (from)-[r:FOLLOWS {createdAt: datetime()}]->(to)
RETURN r
""", from_id=from_user_id, to_id=to_user_id)
return result.single()["r"]
# 获取用户关注列表
def get_following(self, user_id):
with self.driver.session() as session:
result = session.run("""
MATCH (u:User {userId: $user_id})-[:FOLLOWS]->(followed)
RETURN followed.userId AS userId, followed.name AS name
""", user_id=user_id)
return [record.data() for record in result]
# 推荐好友(共同好友算法)
def recommend_friends(self, user_id, limit=5):
with self.driver.session() as session:
result = session.run("""
MATCH (u:User {userId: $user_id})-[:FRIEND]-(friend)-[:FRIEND]-(potential)
WHERE NOT (u)-[:FRIEND]-(potential) AND u <> potential
WITH potential, count(friend) AS commonFriends
ORDER BY commonFriends DESC
RETURN potential.name AS name, commonFriends
LIMIT $limit
""", user_id=user_id, limit=limit)
return [record.data() for record in result]
# 使用示例
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
dao = SocialNetworkDAO(driver)
# 创建用户
user1 = dao.create_user("u001", "张三", "[email protected]")
user2 = dao.create_user("u002", "李四", "[email protected]")
# 建立关系
dao.follow_user("u001", "u002")
# 查询
following = dao.get_following("u001")
print(following)
driver.close()
事务管理
from neo4j import Transaction
class TransactionExample:
def __init__(self, driver):
self.driver = driver
# 自动提交事务
def auto_commit(self):
with self.driver.session() as session:
session.run("CREATE (p:Person {name: '张三'})")
# 读写事务
def create_friendship(self, user1_id, user2_id):
with self.driver.session() as session:
# 写事务
def create_tx(tx, id1, id2):
result = tx.run("""
MATCH (u1:User {userId: $id1})
MATCH (u2:User {userId: $id2})
CREATE (u1)-[:FRIEND {createdAt: datetime()}]->(u2)
RETURN u1, u2
""", id1=id1, id2=id2)
return result.single()
return session.execute_write(create_tx, user1_id, user2_id)
# 只读事务
def get_friends(self, user_id):
with self.driver.session() as session:
def read_tx(tx, uid):
result = tx.run("""
MATCH (u:User {userId: $uid})-[:FRIEND]-(friend)
RETURN friend.name AS name
""", uid=uid)
return [record["name"] for record in result]
return session.execute_read(read_tx, user_id)
Java 开发
Maven 依赖
<dependency>
<groupId>org.neo4j.driver</groupId>
<artifactId>neo4j-java-driver</artifactId>
<version>5.15.0</version>
</dependency>
基本使用
import org.neo4j.driver.*;
import static org.neo4j.driver.Values.parameters;
public class Neo4jExample implements AutoCloseable {
private final Driver driver;
public Neo4jExample(String uri, String user, String password) {
driver = GraphDatabase.driver(uri, AuthTokens.basic(user, password));
}
@Override
public void close() {
driver.close();
}
// 创建用户
public void createUser(String userId, String name, String email) {
try (Session session = driver.session()) {
session.run("CREATE (u:User {userId: $userId, name: $name, email: $email})",
parameters("userId", userId, "name", name, "email", email));
}
}
// 查询用户
public User getUser(String userId) {
try (Session session = driver.session()) {
Result result = session.run(
"MATCH (u:User {userId: $userId}) RETURN u",
parameters("userId", userId)
);
if (result.hasNext()) {
Record record = result.next();
Node node = record.get("u").asNode();
return new User(
node.get("userId").asString(),
node.get("name").asString(),
node.get("email").asString()
);
}
return null;
}
}
// 事务示例
public void createFriendship(String user1Id, String user2Id) {
try (Session session = driver.session()) {
session.executeWrite(tx -> {
tx.run("""
MATCH (u1:User {userId: $id1})
MATCH (u2:User {userId: $id2})
CREATE (u1)-[:FRIEND {createdAt: datetime()}]->(u2)
""",
parameters("id1", user1Id, "id2", user2Id)
);
return null;
});
}
}
}
// 用户实体类
class User {
private String userId;
private String name;
private String email;
public User(String userId, String name, String email) {
this.userId = userId;
this.name = name;
this.email = email;
}
// Getters and setters
public String getUserId() { return userId; }
public String getName() { return name; }
public String getEmail() { return email; }
}
JavaScript/Node.js 开发
安装驱动
npm install neo4j-driver
基本使用
const neo4j = require('neo4j-driver');
class Neo4jService {
constructor(uri, user, password) {
this.driver = neo4j.driver(uri, neo4j.auth.basic(user, password));
}
async close() {
await this.driver.close();
}
// 创建用户
async createUser(userId, name, email) {
const session = this.driver.session();
try {
const result = await session.run(
`CREATE (u:User {userId: $userId, name: $name, email: $email})
RETURN u`,
{ userId, name, email }
);
return result.records[0].get('u').properties;
} finally {
await session.close();
}
}
// 查询用户
async getUser(userId) {
const session = this.driver.session();
try {
const result = await session.run(
`MATCH (u:User {userId: $userId}) RETURN u`,
{ userId }
);
if (result.records.length > 0) {
return result.records[0].get('u').properties;
}
return null;
} finally {
await session.close();
}
}
// 获取推荐好友
async recommendFriends(userId, limit = 5) {
const session = this.driver.session();
try {
const result = await session.run(
`MATCH (u:User {userId: $userId})-[:FRIEND]-(friend)-[:FRIEND]-(potential)
WHERE NOT (u)-[:FRIEND]-(potential) AND u <> potential
WITH potential, count(friend) AS commonFriends
ORDER BY commonFriends DESC
RETURN potential.name AS name, commonFriends
LIMIT $limit`,
{ userId, limit: parseInt(limit) }
);
return result.records.map(record => ({
name: record.get('name'),
commonFriends: record.get('commonFriends').toNumber()
}));
} finally {
await session.close();
}
}
}
// 使用示例
async function main() {
const service = new Neo4jService('bolt://localhost:7687', 'neo4j', 'password');
try {
const user = await service.createUser('u001', '张三', '[email protected]');
console.log('Created user:', user);
const recommendations = await service.recommendFriends('u001', 5);
console.log('Recommendations:', recommendations);
} finally {
await service.close();
}
}
main().catch(console.error);
连接池配置
Python 连接池
from neo4j import GraphDatabase
# 配置连接池
driver = GraphDatabase.driver(
"bolt://localhost:7687",
auth=("neo4j", "password"),
max_connection_pool_size=50,
connection_acquisition_timeout=60,
connection_timeout=30,
max_transaction_retry_time=30.0
)
Java 连接池
import org.neo4j.driver.*;
Config config = Config.builder()
.withMaxConnectionPoolSize(50)
.withConnectionTimeout(30, TimeUnit.SECONDS)
.withMaxTransactionRetryTime(30, TimeUnit.SECONDS)
.build();
Driver driver = GraphDatabase.driver(uri, AuthTokens.basic(user, password), config);
错误处理
Python 错误处理
from neo4j import GraphDatabase, exceptions
try:
driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))
except exceptions.AuthError as e:
print(f"认证失败: {e}")
except exceptions.ServiceUnavailable as e:
print(f"服务不可用: {e}")
except exceptions.ClientError as e:
print(f"客户端错误: {e}")
JavaScript 错误处理
const neo4j = require('neo4j-driver');
try {
const driver = neo4j.driver('bolt://localhost:7687', neo4j.auth.basic('neo4j', 'password'));
} catch (error) {
if (error.code === 'Neo.ClientError.Security.Unauthorized') {
console.error('认证失败');
} else if (error.code === 'ServiceUnavailable') {
console.error('服务不可用');
} else {
console.error('未知错误:', error);
}
}
最佳实践
1. 使用参数化查询
# 好的做法:使用参数
session.run("MATCH (u:User {userId: $userId}) RETURN u", userId=user_id)
# 避免:字符串拼接
session.run(f"MATCH (u:User {{userId: '{user_id}'}}) RETURN u") # 有注入风险
2. 及时关闭资源
# 好的做法:使用上下文管理器
with driver.session() as session:
result = session.run("MATCH (n) RETURN n LIMIT 10")
# 或者手动关闭
session = driver.session()
try:
result = session.run("MATCH (n) RETURN n LIMIT 10")
finally:
session.close()
3. 使用读写事务
# 写操作使用 execute_write
session.execute_write(create_user_tx, user_data)
# 读操作使用 execute_read
session.execute_read(get_user_tx, user_id)
4. 批量操作
# 使用 UNWIND 进行批量插入
def batch_create_users(tx, users):
tx.run("""
UNWIND $users AS user
CREATE (u:User {userId: user.id, name: user.name, email: user.email})
""", users=users)
# 分批处理
batch_size = 1000
for i in range(0, len(all_users), batch_size):
batch = all_users[i:i+batch_size]
session.execute_write(batch_create_users, batch)
下一步
掌握了应用开发后,可以查看 知识速查表 快速回顾所有重要概念和语法。