跳到主要内容

数据导入导出

数据导入导出是数据库管理的重要环节。本章将介绍如何使用 Cypher 的 LOAD CSV 命令、Neo4j Admin 工具以及编程方式来实现数据的高效导入导出。

使用 LOAD CSV 导入数据

LOAD CSV 是 Cypher 提供的内置命令,用于从 CSV 文件导入数据。它支持本地文件和远程 URL,适合中小规模数据的导入。

基本语法

LOAD CSV FROM '文件路径' AS 行变量
Cypher 操作语句

关键参数

  • FROM:指定 CSV 文件的路径,支持 file:///(本地)、http://https://ftp:// 等协议
  • AS:指定行变量的名称,用于在后续语句中引用当前行的数据
  • WITH HEADERS:可选,表示 CSV 文件包含标题行

导入无标题的 CSV 文件

当 CSV 文件没有标题行时,每行数据作为数组访问:

1,ABBA,1992
2,Roxette,1986
3,Europe,1979
4,The Cardigans,1992
-- 使用索引访问列(从 0 开始)
LOAD CSV FROM 'file:///artists.csv' AS row
CREATE (a:Artist {id: toInteger(row[0]), name: row[1], year: toInteger(row[2])})
RETURN a.name, a.year

导入带标题的 CSV 文件

使用 WITH HEADERS 选项,可以通过列名访问数据:

Id,Name,Year
1,ABBA,1992
2,Roxette,1986
3,Europe,1979
4,The Cardigans,1992
-- 使用列名访问数据
LOAD CSV WITH HEADERS FROM 'file:///artists.csv' AS row
CREATE (a:Artist {id: toInteger(row.Id), name: row.Name, year: toInteger(row.Year)})
RETURN a.name, a.year

推荐使用带标题的 CSV 文件:列名让代码更易读,也减少因列顺序变化导致的错误。

从远程 URL 导入

LOAD CSV 支持从远程服务器导入数据:

-- 从 HTTPS URL 导入
LOAD CSV FROM 'https://data.example.com/artists.csv' AS row
CREATE (a:Artist {name: row[1]})

-- 需要 HTTP 基本认证的 URL
LOAD CSV FROM 'https://user:[email protected]/data.csv' AS row
CREATE (n:Node {value: row[0]})

自定义字段分隔符

默认使用逗号作为分隔符,可以通过 FIELDTERMINATOR 指定其他分隔符:

1;ABBA;1992
2;Roxette;1986
-- 使用分号作为分隔符
LOAD CSV FROM 'file:///artists.csv' AS row FIELDTERMINATOR ';'
CREATE (a:Artist {id: toInteger(row[0]), name: row[1]})

数据类型转换

LOAD CSV 导入的所有数据默认为字符串类型,需要使用 Cypher 函数进行类型转换。

常用转换函数

函数说明示例
toInteger()转换为整数toInteger(row.age)
toFloat()转换为浮点数toFloat(row.price)
toBoolean()转换为布尔值toBoolean(row.active)
date()转换为日期date(row.birthDate)
datetime()转换为日期时间datetime(row.createdAt)

示例:完整的数据类型转换

personId,name,age,salary,isActive,birthDate
1,张三,30,15000.50,true,1994-05-15
2,李四,28,12000.00,false,1996-08-20
LOAD CSV WITH HEADERS FROM 'file:///persons.csv' AS row
CREATE (p:Person {
id: toInteger(row.personId),
name: row.name,
age: toInteger(row.age),
salary: toFloat(row.salary),
isActive: toBoolean(row.isActive),
birthDate: date(row.birthDate)
})
RETURN p.name, p.age, p.salary, p.birthDate

处理空值和特殊字符

处理空值

CSV 中的空字段需要特别处理,Neo4j 不存储 null 值:

-- 使用 coalesce 提供默认值
LOAD CSV WITH HEADERS FROM 'file:///companies.csv' AS row
CREATE (c:Company {
id: row.Id,
name: row.Name,
location: coalesce(row.Location, '未知')
})

-- 使用 CASE 处理条件
LOAD CSV WITH HEADERS FROM 'file:///data.csv' AS row
CREATE (n:Node {
value: CASE
WHEN row.value = '' THEN null
ELSE toInteger(row.value)
END
})

-- 使用 nullIf 将空字符串转为 null
LOAD CSV WITH HEADERS FROM 'file:///data.csv' AS row
CREATE (n:Node {
email: nullIf(trim(row.email), '')
})

过滤无效行

使用 WHERE 子句过滤不符合条件的行:

-- 跳过 ID 为空的行
LOAD CSV WITH HEADERS FROM 'file:///data.csv' AS row
WHERE row.Id IS NOT NULL
CREATE (n:Node {id: row.Id})

-- 只导入有效数据
LOAD CSV WITH HEADERS FROM 'file:///data.csv' AS row
WHERE row.email =~ '^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$'
CREATE (u:User {email: row.email})

处理列表值

CSV 中的列表数据需要使用 split() 函数拆分:

movieId,title,genres
1,Toy Story,Adventure|Animation|Comedy
2,Jumanji,Adventure|Children|Fantasy
LOAD CSV WITH HEADERS FROM 'file:///movies.csv' AS row
CREATE (m:Movie {
id: toInteger(row.movieId),
title: row.title,
genres: split(row.genres, '|')
})

导入关系数据

实际应用中,通常需要分别导入节点和关系数据。

分步导入模式

第一步:创建约束确保数据唯一性

-- 在导入前创建唯一性约束
CREATE CONSTRAINT person_id_unique IF NOT EXISTS
FOR (p:Person) REQUIRE p.id IS UNIQUE

CREATE CONSTRAINT movie_id_unique IF NOT EXISTS
FOR (m:Movie) REQUIRE m.id IS UNIQUE

第二步:导入节点数据

-- persons.csv
personId,name,born
1,张三,1994
2,李四,1996
-- 导入人员节点
LOAD CSV WITH HEADERS FROM 'file:///persons.csv' AS row
MERGE (p:Person {id: toInteger(row.personId)})
SET p.name = row.name, p.born = toInteger(row.born)

第三步:导入关系数据

-- relationships.csv
fromId,toId,relationshipType,since
1,2,FRIEND,2020-01-15
1,3,COLLEAGUE,2021-03-01
-- 导入关系
LOAD CSV WITH HEADERS FROM 'file:///relationships.csv' AS row
MATCH (from:Person {id: toInteger(row.fromId)})
MATCH (to:Person {id: toInteger(row.toId)})
CALL apoc.create.relationship(from, row.relationshipType, {since: date(row.since)}, to) YIELD rel
RETURN rel

使用 MERGE vs CREATE

  • CREATE:总是创建新节点,可能导致重复
  • MERGE:如果节点存在则匹配,不存在则创建,避免重复
-- 使用 CREATE(可能重复)
CREATE (p:Person {name: '张三'})

-- 使用 MERGE(幂等操作)
MERGE (p:Person {name: '张三'})
ON CREATE SET p.createdAt = datetime()
ON MATCH SET p.updatedAt = datetime()

批量导入大数据量

对于大型 CSV 文件(数万行以上),需要使用批量事务处理来避免内存问题。

使用 CALL IN TRANSACTIONS

Neo4j 4.0+ 提供了 CALL {…} IN TRANSACTIONS 语法来分批处理:

-- 每批处理 1000 行
LOAD CSV WITH HEADERS FROM 'file:///large_data.csv' AS row
CALL {
WITH row
CREATE (n:Node {id: toInteger(row.id), name: row.name})
} IN TRANSACTIONS OF 1000 ROWS

工作原理

  • 将大数据集分割成多个小事务
  • 每个事务处理指定数量的行
  • 避免单个事务过大导致内存溢出

批量导入最佳实践

-- 1. 先创建约束
CREATE CONSTRAINT person_id_unique IF NOT EXISTS
FOR (p:Person) REQUIRE p.id IS UNIQUE

-- 2. 使用批量事务导入节点
LOAD CSV WITH HEADERS FROM 'file:///persons.csv' AS row
CALL {
WITH row
MERGE (p:Person {id: toInteger(row.id)})
SET p.name = row.name
} IN TRANSACTIONS OF 2000 ROWS

-- 3. 使用批量事务导入关系
LOAD CSV WITH HEADERS FROM 'file:///relationships.csv' AS row
CALL {
WITH row
MATCH (from:Person {id: toInteger(row.fromId)})
MATCH (to:Person {id: toInteger(row.toId)})
MERGE (from)-[r:KNOWS {since: date(row.since)}]->(to)
} IN TRANSACTIONS OF 2000 ROWS

批量大小选择

数据规模推荐批量大小
小型(< 1万行)不需要分批
中型(1-10万行)1000-2000 行/批
大型(> 10万行)2000-5000 行/批

批量大小需要根据可用内存和数据复杂度调整。如果遇到内存错误,减小批量大小。

导入前检查数据质量

在正式导入前,建议先检查 CSV 数据质量:

-- 检查总行数
LOAD CSV FROM 'file:///data.csv' AS row
RETURN count(*) AS totalRows

-- 预览前几行数据
LOAD CSV WITH HEADERS FROM 'file:///data.csv' AS row
RETURN row
LIMIT 5

-- 检查特定列的值分布
LOAD CSV WITH HEADERS FROM 'file:///data.csv' AS row
RETURN row.city, count(*) AS count
ORDER BY count DESC
LIMIT 10

-- 检查是否有空值
LOAD CSV WITH HEADERS FROM 'file:///data.csv' AS row
WHERE row.email IS NULL OR row.email = ''
RETURN count(*) AS nullEmailCount

使用 neo4j-admin 导入大规模数据

对于非常大的数据集(百万级节点),推荐使用 neo4j-admin database import 命令,它比 LOAD CSV 更高效。

准备数据文件

将数据准备为 CSV 格式,分为节点文件和关系文件:

节点文件(nodes.csv)

:ID,name,age:INT
1,张三,30
2,李四,28
3,王五,32

关系文件(relationships.csv)

:START_ID,:END_ID,:TYPE
1,2,FRIEND
2,3,COLLEAGUE

执行导入命令

# 停止 Neo4j 服务
neo4j stop

# 执行导入(在数据库离线状态下)
neo4j-admin database import full \
--nodes=import/nodes.csv \
--relationships=import/relationships.csv \
neo4j

# 启动 Neo4j 服务
neo4j start

注意事项

  • 只能在数据库离线状态下执行
  • 会清空现有数据
  • 适合初始化数据库,不适合增量导入

数据导出

使用 Cypher 导出为 CSV

-- 导出查询结果为 CSV
MATCH (p:Person)
RETURN p.id, p.name, p.age
ORDER BY p.id

-- 在 Neo4j Browser 中点击 "Download" 按钮导出

使用 neo4j-admin 导出

# 导出整个数据库
neo4j-admin database dump neo4j --to=/backup/neo4j-$(date +%Y%m%d).dump

# 导出为 Cypher 脚本(需要 APOC)
CALL apoc.export.cypher.all('/path/to/export.cypher', {format: 'cypher-shell'})

使用编程语言导出

Python 示例

from neo4j import GraphDatabase
import csv

def export_to_csv(uri, user, password, output_file):
driver = GraphDatabase.driver(uri, auth=(user, password))

with driver.session() as session:
result = session.run("""
MATCH (p:Person)
RETURN p.id AS id, p.name AS name, p.age AS age
ORDER BY p.id
""")

with open(output_file, 'w', newline='', encoding='utf-8') as f:
writer = csv.writer(f)
writer.writerow(['id', 'name', 'age'])

for record in result:
writer.writerow([
record['id'],
record['name'],
record['age']
])

driver.close()

# 使用
export_to_csv('bolt://localhost:7687', 'neo4j', 'password', 'persons.csv')

常见问题与解决方案

问题一:导入速度慢

原因:每行数据单独处理,事务开销大

解决方案

-- 使用批量事务
LOAD CSV WITH HEADERS FROM 'file:///data.csv' AS row
CALL {
WITH row
CREATE (n:Node {id: toInteger(row.id)})
} IN TRANSACTIONS OF 2000 ROWS

-- 确保已创建索引
CREATE INDEX node_id FOR (n:Node) ON (n.id)

问题二:内存不足

原因:单次事务处理数据量过大

解决方案

  • 减小批量大小
  • 增加 Neo4j 堆内存配置
  • 使用 neo4j-admin database import 替代 LOAD CSV

问题三:编码问题

原因:CSV 文件编码不是 UTF-8

解决方案

  • 将 CSV 文件转换为 UTF-8 编码
  • 使用文本编辑器(如 VS Code)重新保存为 UTF-8

问题四:文件找不到

原因:路径配置错误

解决方案

-- 检查 import 目录配置
-- 在 neo4j.conf 中设置:server.directories.import=/path/to/import

-- 使用相对于 import 目录的路径
LOAD CSV FROM 'file:///data.csv' AS row -- 正确

-- 错误:使用绝对路径
-- LOAD CSV FROM 'file:///C:/data/data.csv' AS row

小结

本章介绍了 Neo4j 数据导入导出的主要方法:

  1. LOAD CSV:适合中小规模数据,支持本地和远程文件
  2. 类型转换:使用 toInteger()date() 等函数转换数据类型
  3. 批量处理:使用 CALL IN TRANSACTIONS 处理大数据量
  4. neo4j-admin:适合百万级数据的初始化导入
  5. 数据导出:使用 Cypher 查询或编程方式导出数据

选择合适的导入方法取决于数据规模和导入频率:

  • 小批量数据:使用 LOAD CSV
  • 大规模初始化:使用 neo4j-admin database import
  • 增量更新:使用 MERGE 配合约束

下一步

掌握了数据导入导出后,继续学习 应用开发,了解如何在实际应用中使用 Neo4j。