数据合并与连接
在数据分析中,数据通常分散在多个表格或数据源中,需要将它们合并在一起进行分析。Pandas 提供了强大的数据合并功能,类似于 SQL 的 JOIN 操作。本章将详细介绍如何使用 Pandas 进行数据合并与连接。
合并方法概述
Pandas 提供了多种数据合并方式:
| 方法 | 用途 | 类似 SQL 操作 |
|---|---|---|
pd.concat() | 沿轴向拼接多个对象 | UNION |
pd.merge() | 基于列进行数据库风格连接 | JOIN |
df.join() | 基于索引进行连接 | JOIN ON INDEX |
pd.merge_ordered() | 有序合并 | ORDERED JOIN |
pd.merge_asof() | 近似匹配合并 | ASOF JOIN |
使用 concat 拼接数据
concat 是最基础的合并函数,用于沿指定轴将多个 Pandas 对象拼接在一起。
基本用法
import pandas as pd
import numpy as np
# 创建示例数据
df1 = pd.DataFrame({
'A': ['A0', 'A1', 'A2'],
'B': ['B0', 'B1', 'B2'],
'C': ['C0', 'C1', 'C2']
}, index=[0, 1, 2])
df2 = pd.DataFrame({
'A': ['A3', 'A4', 'A5'],
'B': ['B3', 'B4', 'B5'],
'C': ['C3', 'C4', 'C5']
}, index=[3, 4, 5])
# 纵向拼接(默认 axis=0)
result = pd.concat([df1, df2])
print(result)
# A B C
# 0 A0 B0 C0
# 1 A1 B1 C1
# 2 A2 B2 C2
# 3 A3 B3 C3
# 4 A4 B4 C4
# 5 A5 B5 C5
concat 接受一个列表,列表中的元素可以是 DataFrame 或 Series。拼接后的结果保留原始索引。
横向拼接
# 创建不同列的数据
df3 = pd.DataFrame({
'D': ['D0', 'D1', 'D2'],
'E': ['E0', 'E1', 'E2']
}, index=[0, 1, 2])
# 横向拼接(axis=1)
result = pd.concat([df1, df3], axis=1)
print(result)
# A B C D E
# 0 A0 B0 C0 D0 E0
# 1 A1 B1 C1 D1 E1
# 2 A2 B2 C2 D2 E2
横向拼接时,按照索引对齐数据。如果索引不完全匹配,会产生缺失值。
处理索引冲突
# 创建索引重叠的数据
df4 = pd.DataFrame({
'A': ['A3', 'A4', 'A5'],
'B': ['B3', 'B4', 'B5']
}, index=[0, 1, 2]) # 索引与 df1 重复
# 直接拼接会保留重复索引
result = pd.concat([df1, df4])
print(result.index) # Int64Index([0, 1, 2, 0, 1, 2], dtype='int64')
# 忽略原始索引,重新生成
result = pd.concat([df1, df4], ignore_index=True)
print(result.index) # RangeIndex(start=0, stop=6, step=1)
当数据源没有有意义的索引时,使用 ignore_index=True 可以避免索引冲突。
添加层级标识
当需要区分数据来源时,可以使用 keys 参数:
# 为每个数据源添加层级标识
result = pd.concat([df1, df2], keys=['第一组', '第二组'])
print(result)
# A B C
# 第一组 0 A0 B0 C0
# 1 A1 B1 C1
# 2 A2 B2 C2
# 第二组 3 A3 B3 C3
# 4 A4 B4 C4
# 5 A5 B5 C5
# 通过层级选择数据
print(result.loc['第一组'])
# A B C
# 0 A0 B0 C0
# 1 A1 B1 C1
# 2 A2 B2 C2
这会创建一个多级索引(MultiIndex),便于追踪数据来源。
处理列名不一致
df5 = pd.DataFrame({
'A': ['A0', 'A1'],
'B': ['B0', 'B1'],
'C': ['C0', 'C1']
})
df6 = pd.DataFrame({
'B': ['B2', 'B3'],
'C': ['C2', 'C3'],
'D': ['D2', 'D3']
})
# 外连接(默认):保留所有列,缺失值填充 NaN
result_outer = pd.concat([df5, df6], axis=0)
print(result_outer)
# A B C D
# 0 A0 B0 C0 NaN
# 1 A1 B1 C1 NaN
# 0 NaN B2 C2 D2
# 1 NaN B3 C3 D3
# 内连接:只保留共有列
result_inner = pd.concat([df5, df6], axis=0, join='inner')
print(result_inner)
# B C
# 0 B0 C0
# 1 B1 C1
# 0 B2 C2
# 1 B3 C3
join 参数控制如何处理不匹配的列:
'outer'(默认):取并集,缺失值填充 NaN'inner':取交集,只保留共有列
拼接 Series 与 DataFrame
# 将 Series 拼接到 DataFrame
s1 = pd.Series(['X0', 'X1', 'X2'], name='X')
result = pd.concat([df1, s1], axis=1)
print(result)
# A B C X
# 0 A0 B0 C0 X0
# 1 A1 B1 C1 X1
# 2 A2 B2 C2 X2
# 未命名的 Series 会自动编号
s2 = pd.Series(['Y0', 'Y1', 'Y2'])
result = pd.concat([df1, s2], axis=1)
print(result)
# A B C 0
# 0 A0 B0 C0 Y0
# 1 A1 B1 C1 Y1
# 2 A2 B2 C2 Y2
使用 merge 进行数据库风格连接
merge 函数用于基于共同列将两个 DataFrame 连接起来,类似于 SQL 的 JOIN 操作。
基本用法
# 创建示例数据
employees = pd.DataFrame({
'emp_id': [1, 2, 3, 4],
'name': ['张三', '李四', '王五', '赵六'],
'dept_id': [10, 20, 10, 30]
})
departments = pd.DataFrame({
'dept_id': [10, 20, 40],
'dept_name': ['技术部', '市场部', '人事部']
})
# 基于 dept_id 连接
result = pd.merge(employees, departments, on='dept_id')
print(result)
# emp_id name dept_id dept_name
# 0 1 张三 10 技术部
# 1 3 王五 10 技术部
# 2 2 李四 20 市场部
默认情况下,merge 执行内连接(inner join),只保留两个表中都存在的键值。
连接类型
how 参数控制连接方式:
# 内连接(默认):只保留两表共有的键
inner = pd.merge(employees, departments, on='dept_id', how='inner')
print("内连接结果:")
print(inner)
# emp_id name dept_id dept_name
# 0 1 张三 10 技术部
# 1 3 王五 10 技术部
# 2 2 李四 20 市场部
# 左连接:保留左表所有行
left = pd.merge(employees, departments, on='dept_id', how='left')
print("\n左连接结果:")
print(left)
# emp_id name dept_id dept_name
# 0 1 张三 10 技术部
# 1 2 李四 20 市场部
# 2 3 王五 10 技术部
# 3 4 赵六 30 NaN
# 右连接:保留右表所有行
right = pd.merge(employees, departments, on='dept_id', how='right')
print("\n右连接结果:")
print(right)
# emp_id name dept_id dept_name
# 0 1.0 张三 10 技术部
# 1 3.0 王五 10 技术部
# 2 2.0 李四 20 市场部
# 3 NaN NaN 40 人事部
# 外连接:保留两表所有行
outer = pd.merge(employees, departments, on='dept_id', how='outer')
print("\n外连接结果:")
print(outer)
# emp_id name dept_id dept_name
# 0 1.0 张三 10 技术部
# 1 3.0 王五 10 技术部
# 2 2.0 李四 20 市场部
# 3 4.0 赵六 30 NaN
# 4 NaN NaN 40 人事部
各连接类型的含义:
| 连接类型 | SQL 等价 | 描述 |
|---|---|---|
inner | INNER JOIN | 只保留两表共有的键 |
left | LEFT OUTER JOIN | 保留左表所有行 |
right | RIGHT OUTER JOIN | 保留右表所有行 |
outer | FULL OUTER JOIN | 保留两表所有行 |
cross | CROSS JOIN | 笛卡尔积 |
指定连接键
当两个表的连接列名不同时,使用 left_on 和 right_on:
orders = pd.DataFrame({
'order_id': [101, 102, 103],
'customer_no': [1, 2, 1],
'amount': [1000, 2000, 1500]
})
customers = pd.DataFrame({
'cust_id': [1, 2, 3],
'cust_name': ['客户A', '客户B', '客户C']
})
# 列名不同时指定各自的连接列
result = pd.merge(orders, customers,
left_on='customer_no',
right_on='cust_id')
print(result)
# order_id customer_no amount cust_id cust_name
# 0 101 1 1000 1 客户A
# 1 103 1 1500 1 客户A
# 2 102 2 2000 2 客户B
多列连接
可以基于多个列进行连接:
sales = pd.DataFrame({
'year': [2023, 2023, 2024, 2024],
'product': ['A', 'B', 'A', 'B'],
'sales': [100, 150, 120, 180]
})
targets = pd.DataFrame({
'year': [2023, 2023, 2024, 2024],
'product': ['A', 'B', 'A', 'B'],
'target': [90, 140, 110, 170]
})
# 基于年份和产品两个列连接
result = pd.merge(sales, targets, on=['year', 'product'])
print(result)
# year product sales target
# 0 2023 A 100 90
# 1 2023 B 150 140
# 2 2024 A 120 110
# 3 2024 B 180 170
处理重复列名
当两个表有相同名称的非连接列时,merge 会自动添加后缀:
df_a = pd.DataFrame({
'key': ['K0', 'K1'],
'value': ['A0', 'A1']
})
df_b = pd.DataFrame({
'key': ['K0', 'K1'],
'value': ['B0', 'B1']
})
# 自动添加后缀
result = pd.merge(df_a, df_b, on='key')
print(result)
# key value_x value_y
# 0 K0 A0 B0
# 1 K1 A1 B1
# 自定义后缀
result = pd.merge(df_a, df_b, on='key', suffixes=('_左表', '_右表'))
print(result)
# key value_左表 value_右表
# 0 K0 A0 B0
# 1 K1 A1 B1
基于索引连接
df_left = pd.DataFrame({
'A': ['A0', 'A1', 'A2'],
'B': ['B0', 'B1', 'B2']
}, index=['K0', 'K1', 'K2'])
df_right = pd.DataFrame({
'C': ['C0', 'C1', 'C2'],
'D': ['D0', 'D1', 'D2']
}, index=['K0', 'K2', 'K3'])
# 使用 left_index 和 right_index 基于索引连接
result = pd.merge(df_left, df_right,
left_index=True,
right_index=True,
how='outer')
print(result)
# A B C D
# K0 A0 B0 C0 D0
# K1 A1 B1 NaN NaN
# K2 A2 B2 C1 D1
# K3 NaN NaN C2 D2
验证连接关系
validate 参数用于检查连接键的唯一性,防止意外的数据膨胀:
# 一对一连接验证
df_one = pd.DataFrame({'key': ['A', 'B'], 'value': [1, 2]})
df_one2 = pd.DataFrame({'key': ['A', 'B'], 'info': ['X', 'Y']})
# 验证是否为一对一关系
result = pd.merge(df_one, df_one2, on='key', validate='one_to_one')
print(result)
# key value info
# 0 A 1 X
# 1 B 2 Y
validate 参数的可选值:
'one_to_one'或'1:1':两表的连接键都是唯一的'one_to_many'或'1:m':左表连接键唯一'many_to_one'或'm:1':右表连接键唯一'many_to_many'或'm:m':允许重复键
使用 join 方法
join 方法是基于索引进行连接的便捷方法,语法更简洁:
# 创建示例数据
df_left = pd.DataFrame({
'A': ['A0', 'A1', 'A2'],
'B': ['B0', 'B1', 'B2']
}, index=['K0', 'K1', 'K2'])
df_right = pd.DataFrame({
'C': ['C0', 'C1', 'C2'],
'D': ['D0', 'D1', 'D2']
}, index=['K0', 'K2', 'K3'])
# 使用 join(默认左连接)
result = df_left.join(df_right)
print(result)
# A B C D
# K0 A0 B0 C0 D0
# K1 A1 B1 NaN NaN
# K2 A2 B2 C1 D1
# 指定连接方式
result = df_left.join(df_right, how='outer')
print(result)
# A B C D
# K0 A0 B0 C0 D0
# K1 A1 B1 NaN NaN
# K2 A2 B2 C1 D1
# K3 NaN NaN C2 D2
连接多个 DataFrame
df_right2 = pd.DataFrame({
'E': ['E0', 'E1', 'E2']
}, index=['K0', 'K1', 'K2'])
# 一次性连接多个 DataFrame
result = df_left.join([df_right, df_right2])
print(result)
# A B C D E
# K0 A0 B0 C0 D0 E0
# K1 A1 B1 NaN NaN E1
# K2 A2 B2 C1 D1 E2
基于列连接
join 也可以基于列进行连接:
df_left = pd.DataFrame({
'key': ['K0', 'K1', 'K2'],
'A': ['A0', 'A1', 'A2']
})
df_right = pd.DataFrame({
'B': ['B0', 'B1', 'B2']
}, index=['K0', 'K1', 'K2'])
# 指定左表的连接列
result = df_left.join(df_right, on='key')
print(result)
# key A B
# 0 K0 A0 B0
# 1 K1 A1 B1
# 2 K2 A2 B2
高级合并操作
merge_ordered 有序合并
用于合并有序数据,可以填充缺失值:
# 创建有序数据
left = pd.DataFrame({
'key': ['A', 'B', 'C', 'D'],
'value_left': [1, 2, 3, 4]
})
right = pd.DataFrame({
'key': ['B', 'D', 'E'],
'value_right': [20, 40, 50]
})
# 有序合并(默认外连接)
result = pd.merge_ordered(left, right, on='key', fill_method='ffill')
print(result)
# key value_left value_right
# 0 A 1.0 NaN
# 1 B 2.0 20.0
# 2 C 3.0 20.0 # 向前填充
# 3 D 4.0 40.0
# 4 E NaN 50.0
merge_ordered 特别适合合并时间序列数据。
merge_asof 近似匹配
用于基于最近键值进行合并,常用于时间序列对齐:
# 创建时间序列数据
quotes = pd.DataFrame({
'time': pd.to_datetime([
'2024-01-01 09:00:00',
'2024-01-01 09:30:00',
'2024-01-01 10:00:00',
'2024-01-01 10:30:00'
]),
'price': [100, 101, 102, 103]
})
trades = pd.DataFrame({
'time': pd.to_datetime([
'2024-01-01 09:15:00',
'2024-01-01 09:45:00',
'2024-01-01 10:15:00'
]),
'quantity': [100, 200, 150]
})
# 查找每笔交易时刻的最近报价(向前查找)
result = pd.merge_asof(trades, quotes, on='time')
print(result)
# time quantity price
# 0 2024-01-01 09:15:00 100 100
# 1 2024-01-01 09:45:00 200 101
# 2 2024-01-01 10:15:00 150 102
# 设置时间容差
result = pd.merge_asof(trades, quotes, on='time',
tolerance=pd.Timedelta('20min'))
print(result)
# time quantity price
# 0 2024-01-01 09:15:00 100 100
# 1 2024-01-01 09:45:00 200 101
# 2 2024-01-01 10:15:00 150 102
merge_asof 的关键参数:
direction:查找方向,'backward'(默认)、'forward'、'nearest'tolerance:允许的最大时间差allow_exact_matches:是否允许精确匹配
实战示例
示例:电商订单分析
import pandas as pd
# 创建示例数据
orders = pd.DataFrame({
'order_id': [1, 2, 3, 4, 5],
'customer_id': [101, 102, 101, 103, 102],
'product_id': ['P1', 'P2', 'P1', 'P3', 'P2'],
'quantity': [2, 1, 3, 1, 2],
'order_date': pd.date_range('2024-01-01', periods=5)
})
customers = pd.DataFrame({
'customer_id': [101, 102, 103, 104],
'name': ['张三', '李四', '王五', '赵六'],
'city': ['北京', '上海', '广州', '深圳']
})
products = pd.DataFrame({
'product_id': ['P1', 'P2', 'P3', 'P4'],
'product_name': ['手机', '电脑', '平板', '耳机'],
'price': [5000, 8000, 3000, 500]
})
# 计算订单金额
order_details = pd.merge(orders, products, on='product_id')
order_details['total'] = order_details['quantity'] * order_details['price']
# 关联客户信息
full_data = pd.merge(order_details, customers, on='customer_id')
print("完整订单数据:")
print(full_data[['order_id', 'name', 'product_name', 'quantity', 'total']])
# 分析:各客户消费总额
customer_total = full_data.groupby('name')['total'].sum().sort_values(ascending=False)
print("\n客户消费总额:")
print(customer_total)
# 分析:各城市销售情况
city_sales = full_data.groupby('city')['total'].sum()
print("\n城市销售情况:")
print(city_sales)
示例:多数据源整合
# 模拟多个数据源
sales_2023 = pd.DataFrame({
'month': range(1, 7),
'sales': [100, 120, 130, 125, 140, 150]
})
sales_2024 = pd.DataFrame({
'month': range(1, 7),
'sales': [110, 135, 145, 140, 160, 170]
})
# 合并两年数据并标记年份
sales_2023['year'] = 2023
sales_2024['year'] = 2024
all_sales = pd.concat([sales_2023, sales_2024], ignore_index=True)
# 计算同比增长
comparison = pd.merge(
sales_2023[['month', 'sales']].rename(columns={'sales': 'sales_2023'}),
sales_2024[['month', 'sales']].rename(columns={'sales': 'sales_2024'}),
on='month'
)
comparison['growth_rate'] = (comparison['sales_2024'] - comparison['sales_2023']) / comparison['sales_2023'] * 100
print("同比增长分析:")
print(comparison)
示例:处理重复键问题
# 创建有重复键的数据
students = pd.DataFrame({
'class_id': ['A', 'A', 'B', 'B'],
'student_name': ['张三', '李四', '王五', '赵六']
})
scores = pd.DataFrame({
'class_id': ['A', 'B'],
'teacher': ['王老师', '李老师']
})
# 直接合并会产生笛卡尔积(一对多)
result = pd.merge(students, scores, on='class_id')
print("一对多合并结果:")
print(result)
# class_id student_name teacher
# 0 A 张三 王老师
# 1 A 李四 王老师
# 2 B 王五 李老师
# 3 B 赵六 李老师
小结
本章我们学习了:
- concat 拼接:纵向和横向拼接、处理索引冲突、添加层级标识
- merge 连接:内连接、左连接、右连接、外连接、交叉连接
- join 方法:基于索引的便捷连接方式
- 高级合并:有序合并、近似匹配合并
- 实战应用:电商订单分析、多数据源整合
选择合适的合并方法:
- 简单拼接数据:使用
concat - 基于列值连接:使用
merge - 基于索引连接:使用
join - 时间序列对齐:使用
merge_asof
练习
- 创建两个 DataFrame,分别包含学生信息和成绩信息,使用 merge 进行连接
- 使用 concat 将三个月的销售数据合并为一个 DataFrame
- 比较左连接和内连接的区别,并用代码演示